<a href="https://colab.research.google.com/github/ShrikantKGIT/general/blob/main/Simple_Contiguous_File_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Simple file system in Python that interacts with a basic, low-level disk drive API.**

This file system will use a contiguous allocation strategy, which is one of the simplest to implement. It will feature:

**A Mock Disk Drive:** A class that simulates a low-level disk with read_block and write_block functions.

**A Master File Table (MFT):** The first few blocks of the disk will be reserved for a table that keeps track of all files, their names, where they start, and how long they are.

**The File System Logic:** A class that can format the disk, write new files, and read existing ones.

In [1]:
import json
import math

# --- Part 1: The Low-Level Disk Drive API ---
class DiskDrive:
    """
    A simulated low-level disk drive.
    It operates on a fixed number of blocks of a fixed size.
    """
    def __init__(self, num_blocks=256, block_size=512):
        """Initializes the disk with empty blocks."""
        self.num_blocks = num_blocks
        self.block_size = block_size
        # The actual storage: a list of bytearrays
        self.storage = [bytearray(block_size) for _ in range(num_blocks)]
        print(f"Disk initialized: {num_blocks} blocks, {block_size} bytes/block. Total: {num_blocks * block_size / 1024} KB")

    def read_block(self, block_id):
        """Reads a single block of data from the disk."""
        if not 0 <= block_id < self.num_blocks:
            raise IndexError(f"Disk Error: Block ID {block_id} is out of bounds.")
        # print(f"DEBUG: Reading block {block_id}")
        return self.storage[block_id]

    def write_block(self, block_id, data):
        """Writes data to a single block on the disk."""
        if not 0 <= block_id < self.num_blocks:
            raise IndexError(f"Disk Error: Block ID {block_id} is out of bounds.")
        if len(data) > self.block_size:
            raise ValueError("Data size exceeds block size.")

        # Pad data to fill the entire block
        padded_data = data.ljust(self.block_size, b'\0')
        self.storage[block_id] = padded_data
        # print(f"DEBUG: Wrote {len(data)} bytes to block {block_id}")

# --- Part 2: The File System Implementation ---
class SimpleFS:
    """
    A simple, contiguous-allocation file system.
    It uses a Master File Table (MFT) stored at the start of the disk.
    """
    MFT_START_BLOCK = 0
    MFT_MAX_BLOCKS = 4 # Reserve 4 blocks for the MFT
    DATA_START_BLOCK = MFT_START_BLOCK + MFT_MAX_BLOCKS

    def __init__(self, disk_drive):
        self.disk = disk_drive
        self.mft = {} # In-memory cache of the Master File Table

    def format(self):
        """
        Formats the disk. This erases the MFT and prepares the disk
        for use by the file system.
        """
        print("Formatting disk...")
        self.mft = {}
        self._sync_mft_to_disk()
        print("Disk formatted successfully.")

    def _load_mft_from_disk(self):
        """Reads the MFT from the disk into the in-memory cache."""
        mft_bytes = b''
        for i in range(self.MFT_MAX_BLOCKS):
            mft_bytes += self.disk.read_block(self.MFT_START_BLOCK + i)

        # Find the null terminator to remove padding
        mft_bytes = mft_bytes.split(b'\0', 1)[0]

        if mft_bytes:
            self.mft = json.loads(mft_bytes.decode('utf-8'))
        else:
            self.mft = {}

    def _sync_mft_to_disk(self):
        """Writes the in-memory MFT cache to the disk."""
        mft_string = json.dumps(self.mft)
        mft_bytes = mft_string.encode('utf-8')

        if len(mft_bytes) > self.disk.block_size * self.MFT_MAX_BLOCKS:
            raise OverflowError("MFT has grown too large for its reserved space.")

        # Write the MFT bytes block by block
        for i in range(self.MFT_MAX_BLOCKS):
            start = i * self.disk.block_size
            end = start + self.disk.block_size
            chunk = mft_bytes[start:end]
            self.disk.write_block(self.MFT_START_BLOCK + i, chunk)

    def _find_free_space(self, num_blocks_needed):
        """
        Finds a contiguous run of free blocks.
        Returns the starting block ID if found, otherwise None.
        """
        occupied_blocks = set()
        for file_info in self.mft.values():
            start = file_info['start_block']
            length = file_info['num_blocks']
            for i in range(length):
                occupied_blocks.add(start + i)

        # Search for a contiguous free slot
        for start_block in range(self.DATA_START_BLOCK, self.disk.num_blocks - num_blocks_needed + 1):
            is_free = True
            for i in range(num_blocks_needed):
                if (start_block + i) in occupied_blocks:
                    is_free = False
                    break
            if is_free:
                return start_block
        return None

    def write(self, filename, content_bytes):
        """
        Writes a file to the disk.
        """
        print(f"Attempting to write file '{filename}' ({len(content_bytes)} bytes)...")
        self._load_mft_from_disk()

        if filename in self.mft:
            print(f"Error: File '{filename}' already exists. Please delete it first.")
            return

        num_blocks_needed = math.ceil(len(content_bytes) / self.disk.block_size)
        start_block = self._find_free_space(num_blocks_needed)

        if start_block is None:
            print("Error: Not enough contiguous free space on the disk.")
            return

        # Write the file content block by block
        for i in range(num_blocks_needed):
            start = i * self.disk.block_size
            end = start + self.disk.block_size
            chunk = content_bytes[start:end]
            self.disk.write_block(start_block + i, chunk)

        # Update the MFT
        self.mft[filename] = {
            'start_block': start_block,
            'num_blocks': num_blocks_needed,
            'size_bytes': len(content_bytes)
        }
        self._sync_mft_to_disk()
        print(f"File '{filename}' written successfully at blocks {start_block}-{start_block + num_blocks_needed - 1}.")

    def read(self, filename):
        """
        Reads a file from the disk.
        Returns the file content as bytes, or None if not found.
        """
        print(f"Attempting to read file '{filename}'...")
        self._load_mft_from_disk()

        if filename not in self.mft:
            print(f"Error: File '{filename}' not found.")
            return None

        file_info = self.mft[filename]
        file_bytes = b''

        # Read the file content block by block
        for i in range(file_info['num_blocks']):
            block_id = file_info['start_block'] + i
            file_bytes += self.disk.read_block(block_id)

        # Trim padding to the original file size
        file_bytes = file_bytes[:file_info['size_bytes']]

        print(f"File '{filename}' read successfully.")
        return file_bytes

# --- Part 3: Example Usage ---
if __name__ == "__main__":
    # 1. "Install" the hardware
    my_disk = DiskDrive()

    # 2. Create a file system instance on the drive
    fs = SimpleFS(my_disk)

    # 3. Format the disk to prepare it for use
    fs.format()

    # 4. Write some files
    print("\n--- Writing Files ---")
    fs.write("hello.txt", b"Hello World! This is our first file on the new file system.")
    fs.write("data.json", b'{"name": "test", "value": 123, "active": true}')
    fs.write("poem.txt", b"The woods are lovely, dark and deep,\nBut I have promises to keep,\nAnd miles to go before I sleep.")

    # Try to write a file that already exists
    fs.write("hello.txt", b"This should fail.")

    # 5. Read a file back
    print("\n--- Reading Files ---")
    content = fs.read("poem.txt")
    if content:
        print("\nContent of 'poem.txt':")
        print(content.decode('utf-8'))

    print("\n--- Verifying MFT ---")
    # Load the MFT to see the file system's state
    fs._load_mft_from_disk()
    print("Final Master File Table on disk:")
    print(json.dumps(fs.mft, indent=2))


Disk initialized: 256 blocks, 512 bytes/block. Total: 128.0 KB
Formatting disk...
Disk formatted successfully.

--- Writing Files ---
Attempting to write file 'hello.txt' (59 bytes)...
File 'hello.txt' written successfully at blocks 4-4.
Attempting to write file 'data.json' (46 bytes)...
File 'data.json' written successfully at blocks 5-5.
Attempting to write file 'poem.txt' (97 bytes)...
File 'poem.txt' written successfully at blocks 6-6.
Attempting to write file 'hello.txt' (17 bytes)...
Error: File 'hello.txt' already exists. Please delete it first.

--- Reading Files ---
Attempting to read file 'poem.txt'...
File 'poem.txt' read successfully.

Content of 'poem.txt':
The woods are lovely, dark and deep,
But I have promises to keep,
And miles to go before I sleep.

--- Verifying MFT ---
Final Master File Table on disk:
{
  "hello.txt": {
    "start_block": 4,
    "num_blocks": 1,
    "size_bytes": 59
  },
  "data.json": {
    "start_block": 5,
    "num_blocks": 1,
    "size_bytes": 4