# Module 9: File Handling and I/O

This module covers file operations, different file formats, streams, and input/output operations in Python.

## 1. File Basics

### 1.1 Opening and Closing Files

In [None]:
import os
import tempfile

# Create a temporary directory for our examples
temp_dir = tempfile.mkdtemp()
print(f"Working in temporary directory: {temp_dir}")

# Basic file opening
file_path = os.path.join(temp_dir, "example.txt")

# Writing to a file
file = open(file_path, 'w')
file.write("Hello, World!\n")
file.write("This is a test file.\n")
file.close()

# Reading from a file
file = open(file_path, 'r')
content = file.read()
print(f"File content:\n{content}")
file.close()

# Using with statement (context manager) - recommended
with open(file_path, 'r') as file:
    content = file.read()
    print(f"\nUsing 'with' statement:\n{content}")
# File is automatically closed after the with block

# File modes
modes = {
    'r': 'Read (default)',
    'w': 'Write (overwrites existing)',
    'a': 'Append',
    'x': 'Exclusive creation (fails if exists)',
    'b': 'Binary mode',
    't': 'Text mode (default)',
    '+': 'Update (read and write)'
}

print("\nFile modes:")
for mode, description in modes.items():
    print(f"  {mode}: {description}")

### 1.2 Reading Files

In [None]:
# Create a sample file with multiple lines
sample_file = os.path.join(temp_dir, "sample.txt")
with open(sample_file, 'w') as f:
    f.write("Line 1\n")
    f.write("Line 2\n")
    f.write("Line 3\n")
    f.write("Line 4\n")
    f.write("Line 5\n")

# Different ways to read files

# 1. Read entire file
with open(sample_file, 'r') as f:
    content = f.read()
    print("Read entire file:")
    print(content)

# 2. Read specific number of characters
with open(sample_file, 'r') as f:
    chunk = f.read(10)
    print(f"First 10 characters: {repr(chunk)}")

# 3. Read line by line
print("\nReading line by line:")
with open(sample_file, 'r') as f:
    line = f.readline()
    while line:
        print(f"  {repr(line)}")
        line = f.readline()

# 4. Read all lines into a list
with open(sample_file, 'r') as f:
    lines = f.readlines()
    print(f"\nAll lines as list: {lines}")

# 5. Iterate over file object
print("\nIterating over file:")
with open(sample_file, 'r') as f:
    for line_num, line in enumerate(f, 1):
        print(f"  Line {line_num}: {line.strip()}")

# 6. Read file in chunks
def read_in_chunks(file_path, chunk_size=1024):
    with open(file_path, 'r') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

print("\nReading in chunks:")
for i, chunk in enumerate(read_in_chunks(sample_file, 10)):
    print(f"  Chunk {i}: {repr(chunk)}")

### 1.3 Writing Files

In [None]:
# Different ways to write to files
output_file = os.path.join(temp_dir, "output.txt")

# 1. Write mode (overwrites)
with open(output_file, 'w') as f:
    f.write("First line\n")
    f.write("Second line\n")

# 2. Append mode
with open(output_file, 'a') as f:
    f.write("Appended line\n")

# 3. Write multiple lines
lines = ["Line A\n", "Line B\n", "Line C\n"]
with open(output_file, 'a') as f:
    f.writelines(lines)

# 4. Using print function
with open(output_file, 'a') as f:
    print("Printed line", file=f)
    print("Another printed line", file=f)

# Read and display the result
with open(output_file, 'r') as f:
    print("Final file content:")
    print(f.read())

# 5. Writing with formatting
data_file = os.path.join(temp_dir, "data.txt")
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Bob", "age": 25, "city": "London"},
    {"name": "Charlie", "age": 35, "city": "Paris"}
]

with open(data_file, 'w') as f:
    # Write header
    f.write("Name,Age,City\n")
    # Write data
    for person in data:
        f.write(f"{person['name']},{person['age']},{person['city']}\n")

print("\nFormatted data file:")
with open(data_file, 'r') as f:
    print(f.read())

## 2. File Positioning and Seeking

### 2.1 File Pointer Operations

In [None]:
# Create a file for seeking examples
seek_file = os.path.join(temp_dir, "seek_example.txt")
with open(seek_file, 'w') as f:
    f.write("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ")

# File seeking operations
with open(seek_file, 'r') as f:
    # tell() - get current position
    print(f"Initial position: {f.tell()}")
    
    # Read 5 characters
    data = f.read(5)
    print(f"Read: {data}")
    print(f"Position after read: {f.tell()}")
    
    # seek() - move to specific position
    f.seek(10)
    print(f"\nAfter seek(10): {f.tell()}")
    data = f.read(5)
    print(f"Read: {data}")
    
    # Seek from current position (whence=1)
    f.seek(5, 1)  # Move 5 positions forward from current
    print(f"\nAfter seek(5, 1): {f.tell()}")
    data = f.read(5)
    print(f"Read: {data}")
    
    # Seek from end (whence=2)
    f.seek(-10, 2)  # Move 10 positions from end
    print(f"\nAfter seek(-10, 2): {f.tell()}")
    data = f.read(5)
    print(f"Read: {data}")
    
    # Reset to beginning
    f.seek(0)
    print(f"\nReset to beginning: {f.tell()}")

# Random access example
def read_at_position(file_path, position, length):
    with open(file_path, 'r') as f:
        f.seek(position)
        return f.read(length)

print(f"\nRandom access:")
print(f"Characters 10-15: {read_at_position(seek_file, 10, 5)}")
print(f"Characters 20-25: {read_at_position(seek_file, 20, 5)}")

## 3. Binary Files

### 3.1 Reading and Writing Binary Data

In [None]:
import struct
import pickle

# Writing binary data
binary_file = os.path.join(temp_dir, "binary_data.bin")

# Write raw bytes
with open(binary_file, 'wb') as f:
    # Write bytes directly
    f.write(b'Hello, Binary!')
    f.write(bytes([65, 66, 67, 68]))  # ABCD
    
    # Write integers using struct
    f.write(struct.pack('i', 42))  # 4-byte integer
    f.write(struct.pack('f', 3.14159))  # 4-byte float

# Read binary data
with open(binary_file, 'rb') as f:
    # Read first 14 bytes (Hello, Binary!)
    text = f.read(14)
    print(f"Text: {text.decode()}")
    
    # Read next 4 bytes
    letters = f.read(4)
    print(f"Letters: {letters.decode()}")
    
    # Read integer and float
    int_bytes = f.read(4)
    integer = struct.unpack('i', int_bytes)[0]
    print(f"Integer: {integer}")
    
    float_bytes = f.read(4)
    float_val = struct.unpack('f', float_bytes)[0]
    print(f"Float: {float_val}")

# Working with bytearray
data = bytearray(b'Hello')
print(f"\nBytearray: {data}")
data[0] = ord('J')  # Change H to J
print(f"Modified: {data}")
print(f"As string: {data.decode()}")

# Binary file copy
def copy_binary_file(source, destination):
    with open(source, 'rb') as src:
        with open(destination, 'wb') as dst:
            while True:
                chunk = src.read(1024)
                if not chunk:
                    break
                dst.write(chunk)

copy_dest = os.path.join(temp_dir, "binary_copy.bin")
copy_binary_file(binary_file, copy_dest)
print(f"\nBinary file copied to {copy_dest}")

### 3.2 Pickle - Python Object Serialization

In [None]:
import pickle

# Pickling Python objects
pickle_file = os.path.join(temp_dir, "data.pickle")

# Data to pickle
data = {
    'name': 'Alice',
    'age': 30,
    'hobbies': ['reading', 'swimming', 'coding'],
    'scores': (95, 87, 92),
    'address': {
        'street': '123 Main St',
        'city': 'New York'
    }
}

# Save to pickle file
with open(pickle_file, 'wb') as f:
    pickle.dump(data, f)
print("Data pickled successfully")

# Load from pickle file
with open(pickle_file, 'rb') as f:
    loaded_data = pickle.load(f)
print(f"\nLoaded data: {loaded_data}")
print(f"Data matches original: {data == loaded_data}")

# Pickling custom objects
class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age
        self.id = id(self)
    
    def __repr__(self):
        return f"Person(name='{self.name}', age={self.age})"

# Create and pickle custom objects
people = [
    Person("Alice", 30),
    Person("Bob", 25),
    Person("Charlie", 35)
]

people_file = os.path.join(temp_dir, "people.pickle")
with open(people_file, 'wb') as f:
    pickle.dump(people, f)

with open(people_file, 'rb') as f:
    loaded_people = pickle.load(f)

print("\nOriginal people:")
for p in people:
    print(f"  {p}")

print("\nLoaded people:")
for p in loaded_people:
    print(f"  {p}")

# Pickle to string
pickled_string = pickle.dumps(data)
print(f"\nPickled to bytes: {pickled_string[:50]}...")
unpickled = pickle.loads(pickled_string)
print(f"Unpickled from bytes: {unpickled['name']}")

## 4. Working with Different File Formats

### 4.1 CSV Files

In [None]:
import csv

# Writing CSV files
csv_file = os.path.join(temp_dir, "data.csv")

# Write with csv.writer
with open(csv_file, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['Name', 'Age', 'City'])  # Header
    writer.writerow(['Alice', 30, 'New York'])
    writer.writerow(['Bob', 25, 'London'])
    writer.writerows([  # Multiple rows at once
        ['Charlie', 35, 'Paris'],
        ['Diana', 28, 'Tokyo']
    ])

# Read CSV file
print("Reading CSV with csv.reader:")
with open(csv_file, 'r') as f:
    reader = csv.reader(f)
    for row in reader:
        print(f"  {row}")

# Using DictWriter and DictReader
dict_csv_file = os.path.join(temp_dir, "dict_data.csv")

# Write with DictWriter
fieldnames = ['id', 'name', 'email', 'department']
employees = [
    {'id': 1, 'name': 'Alice', 'email': 'alice@example.com', 'department': 'IT'},
    {'id': 2, 'name': 'Bob', 'email': 'bob@example.com', 'department': 'HR'},
    {'id': 3, 'name': 'Charlie', 'email': 'charlie@example.com', 'department': 'Sales'}
]

with open(dict_csv_file, 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(employees)

# Read with DictReader
print("\nReading CSV with DictReader:")
with open(dict_csv_file, 'r') as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(f"  {row['name']}: {row['email']} ({row['department']})")

# Custom CSV dialect
custom_csv_file = os.path.join(temp_dir, "custom.csv")

csv.register_dialect('pipes', delimiter='|', quoting=csv.QUOTE_MINIMAL)

with open(custom_csv_file, 'w', newline='') as f:
    writer = csv.writer(f, dialect='pipes')
    writer.writerow(['Name', 'Description', 'Price'])
    writer.writerow(['Product A', 'Description with, comma', 19.99])
    writer.writerow(['Product B', 'Another item', 29.99])

print("\nCustom dialect CSV:")
with open(custom_csv_file, 'r') as f:
    print(f.read())

### 4.2 JSON Files

In [None]:
import json

# JSON data
data = {
    "name": "John Doe",
    "age": 30,
    "active": True,
    "balance": 1234.56,
    "tags": ["python", "developer", "json"],
    "address": {
        "street": "123 Main St",
        "city": "New York",
        "zip": "10001"
    },
    "phone": None
}

# Write JSON to file
json_file = os.path.join(temp_dir, "data.json")

with open(json_file, 'w') as f:
    json.dump(data, f, indent=2)

print("JSON file content:")
with open(json_file, 'r') as f:
    print(f.read())

# Read JSON from file
with open(json_file, 'r') as f:
    loaded_data = json.load(f)
    print(f"\nLoaded data type: {type(loaded_data)}")
    print(f"Name: {loaded_data['name']}")
    print(f"City: {loaded_data['address']['city']}")

# JSON with custom encoder
from datetime import datetime, date

class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime, date)):
            return obj.isoformat()
        return super().default(obj)

data_with_date = {
    "event": "Meeting",
    "date": datetime.now(),
    "participants": ["Alice", "Bob"]
}

json_date_file = os.path.join(temp_dir, "event.json")
with open(json_date_file, 'w') as f:
    json.dump(data_with_date, f, cls=DateTimeEncoder, indent=2)

print("\nJSON with datetime:")
with open(json_date_file, 'r') as f:
    print(f.read())

# Pretty printing JSON
complex_data = {
    "users": [
        {"id": 1, "name": "Alice", "roles": ["admin", "user"]},
        {"id": 2, "name": "Bob", "roles": ["user"]}
    ],
    "settings": {
        "theme": "dark",
        "notifications": True
    }
}

print("\nPretty printed JSON:")
print(json.dumps(complex_data, indent=2, sort_keys=True))

### 4.3 XML Files

In [None]:
import xml.etree.ElementTree as ET
import xml.dom.minidom as minidom

# Creating XML
root = ET.Element("catalog")

# Add books
book1 = ET.SubElement(root, "book", id="1")
ET.SubElement(book1, "title").text = "Python Programming"
ET.SubElement(book1, "author").text = "John Doe"
ET.SubElement(book1, "year").text = "2023"
ET.SubElement(book1, "price").text = "29.99"

book2 = ET.SubElement(root, "book", id="2")
ET.SubElement(book2, "title").text = "Data Science"
ET.SubElement(book2, "author").text = "Jane Smith"
ET.SubElement(book2, "year").text = "2024"
ET.SubElement(book2, "price").text = "39.99"

# Create tree and save to file
tree = ET.ElementTree(root)
xml_file = os.path.join(temp_dir, "catalog.xml")
tree.write(xml_file, encoding="utf-8", xml_declaration=True)

# Pretty print XML
def prettify_xml(elem):
    rough_string = ET.tostring(elem, encoding='unicode')
    reparsed = minidom.parseString(rough_string)
    return reparsed.toprettyxml(indent="  ")

print("XML content:")
print(prettify_xml(root))

# Parse XML file
tree = ET.parse(xml_file)
root = tree.getroot()

print("Parsed XML data:")
for book in root.findall('book'):
    book_id = book.get('id')
    title = book.find('title').text
    author = book.find('author').text
    price = book.find('price').text
    print(f"  Book {book_id}: {title} by {author} (${price})")

# Modify XML
for book in root.findall('book'):
    price_elem = book.find('price')
    current_price = float(price_elem.text)
    # Apply 10% discount
    new_price = current_price * 0.9
    price_elem.text = f"{new_price:.2f}"
    # Add discount attribute
    price_elem.set('discounted', 'true')

# Save modified XML
tree.write(xml_file, encoding="utf-8", xml_declaration=True)

print("\nModified XML with discounts:")
tree = ET.parse(xml_file)
print(prettify_xml(tree.getroot()))

## 5. File and Directory Operations

### 5.1 Working with Paths

In [None]:
import os
from pathlib import Path

# Using os.path
print("Using os.path:")
current_dir = os.getcwd()
print(f"Current directory: {current_dir}")

# Path operations
file_path = os.path.join(temp_dir, "test", "file.txt")
print(f"\nJoined path: {file_path}")
print(f"Directory: {os.path.dirname(file_path)}")
print(f"Filename: {os.path.basename(file_path)}")
print(f"Split: {os.path.split(file_path)}")
print(f"Extension: {os.path.splitext(file_path)}")

# Check path properties
test_path = temp_dir
print(f"\nPath properties for {test_path}:")
print(f"Exists: {os.path.exists(test_path)}")
print(f"Is file: {os.path.isfile(test_path)}")
print(f"Is directory: {os.path.isdir(test_path)}")
print(f"Absolute: {os.path.isabs(test_path)}")

# Using pathlib (modern approach)
print("\nUsing pathlib:")
path = Path(temp_dir)
print(f"Path: {path}")
print(f"Name: {path.name}")
print(f"Parent: {path.parent}")
print(f"Suffix: {path.suffix}")
print(f"Parts: {path.parts}")

# Create new path
new_file = path / "subfolder" / "newfile.txt"
print(f"\nNew path: {new_file}")
print(f"Parent dirs: {list(new_file.parents)}")

# Path methods
test_file = Path(temp_dir) / "test.txt"
test_file.write_text("Hello from pathlib!")
print(f"\nFile content: {test_file.read_text()}")
print(f"File size: {test_file.stat().st_size} bytes")

# Iterating directory
print(f"\nFiles in {temp_dir}:")
for item in Path(temp_dir).iterdir():
    if item.is_file():
        print(f"  File: {item.name} ({item.stat().st_size} bytes)")
    elif item.is_dir():
        print(f"  Dir: {item.name}/")

### 5.2 Directory Operations

In [None]:
import shutil
import glob

# Create directories
test_dir = os.path.join(temp_dir, "test_directory")
os.makedirs(test_dir, exist_ok=True)

nested_dir = os.path.join(temp_dir, "parent", "child", "grandchild")
os.makedirs(nested_dir, exist_ok=True)
print(f"Created nested directory: {nested_dir}")

# List directory contents
print(f"\nContents of {temp_dir}:")
for item in os.listdir(temp_dir):
    item_path = os.path.join(temp_dir, item)
    if os.path.isdir(item_path):
        print(f"  [DIR] {item}")
    else:
        print(f"  [FILE] {item}")

# Walk directory tree
print("\nWalking directory tree:")
for root, dirs, files in os.walk(temp_dir):
    level = root.replace(temp_dir, '').count(os.sep)
    indent = ' ' * 2 * level
    print(f"{indent}{os.path.basename(root)}/")
    subindent = ' ' * 2 * (level + 1)
    for file in files[:3]:  # Limit output
        print(f"{subindent}{file}")

# Copy files and directories
source_file = os.path.join(temp_dir, "test.txt")
dest_file = os.path.join(test_dir, "test_copy.txt")
shutil.copy2(source_file, dest_file)  # Preserves metadata
print(f"\nCopied file to {dest_file}")

# Copy entire directory
source_dir = test_dir
backup_dir = os.path.join(temp_dir, "backup")
shutil.copytree(source_dir, backup_dir, dirs_exist_ok=True)
print(f"Backed up directory to {backup_dir}")

# Move/rename
old_path = os.path.join(temp_dir, "test.txt")
new_path = os.path.join(temp_dir, "renamed.txt")
if os.path.exists(old_path):
    shutil.move(old_path, new_path)
    print(f"Renamed {old_path} to {new_path}")

# Using glob for pattern matching
print("\nFiles matching pattern '*.txt':")
for file in glob.glob(os.path.join(temp_dir, "*.txt")):
    print(f"  {os.path.basename(file)}")

print("\nFiles matching pattern '**/*.csv' (recursive):")
for file in glob.glob(os.path.join(temp_dir, "**", "*.csv"), recursive=True):
    print(f"  {file}")

## 6. Temporary Files and Directories

In [None]:
import tempfile

# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.tmp') as tmp:
    tmp.write("Temporary data")
    tmp_name = tmp.name
    print(f"Temporary file: {tmp_name}")

# Read temporary file
with open(tmp_name, 'r') as f:
    print(f"Content: {f.read()}")

# Clean up
os.unlink(tmp_name)

# Temporary file with context manager (auto-delete)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt') as tmp:
    tmp.write("This will be deleted automatically")
    tmp.flush()
    print(f"Temp file (auto-delete): {tmp.name}")
    # File is automatically deleted when leaving context

# Temporary directory
with tempfile.TemporaryDirectory() as tmpdir:
    print(f"\nTemporary directory: {tmpdir}")
    
    # Create files in temp directory
    temp_file = os.path.join(tmpdir, "temp.txt")
    with open(temp_file, 'w') as f:
        f.write("Temporary file in temporary directory")
    
    print(f"Files in temp dir: {os.listdir(tmpdir)}")
    # Directory and contents are deleted when leaving context

# Get temp directory path
print(f"\nSystem temp directory: {tempfile.gettempdir()}")

# Create temporary file with specific prefix
with tempfile.NamedTemporaryFile(
    prefix='myapp_',
    suffix='.dat',
    dir=temp_dir
) as tmp:
    print(f"Custom temp file: {tmp.name}")

# SpooledTemporaryFile (memory then disk)
with tempfile.SpooledTemporaryFile(max_size=1024) as tmp:
    tmp.write(b"Small data in memory")
    print(f"In memory: {tmp._rolled}")
    
    # Write more data to exceed max_size
    tmp.write(b"x" * 2000)
    print(f"Rolled to disk: {tmp._rolled}")

## 7. File Compression

In [None]:
import zipfile
import gzip
import tarfile

# Working with ZIP files
zip_path = os.path.join(temp_dir, "archive.zip")

# Create ZIP archive
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
    # Add files
    for file in glob.glob(os.path.join(temp_dir, "*.txt")):
        arcname = os.path.basename(file)
        zipf.write(file, arcname)
        print(f"Added {arcname} to ZIP")

# List ZIP contents
print("\nZIP archive contents:")
with zipfile.ZipFile(zip_path, 'r') as zipf:
    for info in zipf.infolist():
        print(f"  {info.filename}: {info.file_size} bytes -> {info.compress_size} compressed")

# Extract from ZIP
extract_dir = os.path.join(temp_dir, "extracted")
with zipfile.ZipFile(zip_path, 'r') as zipf:
    zipf.extractall(extract_dir)
    print(f"\nExtracted to {extract_dir}")

# Working with GZIP
text_file = os.path.join(temp_dir, "large_text.txt")
with open(text_file, 'w') as f:
    f.write("This is a line of text.\n" * 1000)

gz_file = os.path.join(temp_dir, "compressed.gz")

# Compress with gzip
with open(text_file, 'rb') as f_in:
    with gzip.open(gz_file, 'wb') as f_out:
        f_out.write(f_in.read())

original_size = os.path.getsize(text_file)
compressed_size = os.path.getsize(gz_file)
print(f"\nGZIP compression:")
print(f"Original: {original_size} bytes")
print(f"Compressed: {compressed_size} bytes")
print(f"Compression ratio: {original_size/compressed_size:.2f}x")

# Read from gzip
with gzip.open(gz_file, 'rt') as f:
    first_line = f.readline()
    print(f"\nFirst line from gzip: {first_line.strip()}")

# Working with TAR archives
tar_path = os.path.join(temp_dir, "archive.tar.gz")

# Create TAR archive
with tarfile.open(tar_path, 'w:gz') as tar:
    for file in glob.glob(os.path.join(temp_dir, "*.csv")):
        tar.add(file, arcname=os.path.basename(file))
        print(f"Added {os.path.basename(file)} to TAR")

# List TAR contents
print("\nTAR archive contents:")
with tarfile.open(tar_path, 'r:gz') as tar:
    for member in tar.getmembers():
        print(f"  {member.name}: {member.size} bytes")

## 8. Advanced I/O Operations

### 8.1 String I/O

In [None]:
from io import StringIO, BytesIO

# StringIO - in-memory text stream
string_buffer = StringIO()
string_buffer.write("Hello, ")
string_buffer.write("StringIO!\n")
string_buffer.write("Line 2\n")

# Get value
content = string_buffer.getvalue()
print(f"StringIO content:\n{content}")

# Reset and read
string_buffer.seek(0)
print(f"Read line: {string_buffer.readline()}")

# Use StringIO as file
def process_file(file_obj):
    """Function that expects a file-like object"""
    lines = file_obj.readlines()
    return len(lines)

string_buffer.seek(0)
line_count = process_file(string_buffer)
print(f"Line count: {line_count}")

# BytesIO - in-memory bytes stream
bytes_buffer = BytesIO()
bytes_buffer.write(b"Binary data: ")
bytes_buffer.write(bytes([65, 66, 67, 68]))  # ABCD

bytes_content = bytes_buffer.getvalue()
print(f"\nBytesIO content: {bytes_content}")
print(f"Decoded: {bytes_content.decode()}")

# CSV with StringIO
csv_buffer = StringIO()
writer = csv.writer(csv_buffer)
writer.writerow(['Name', 'Score'])
writer.writerow(['Alice', 95])
writer.writerow(['Bob', 87])

csv_data = csv_buffer.getvalue()
print(f"\nCSV in memory:\n{csv_data}")

# Read CSV from StringIO
csv_buffer.seek(0)
reader = csv.DictReader(csv_buffer)
for row in reader:
    print(f"  {row}")

### 8.2 Memory-Mapped Files

In [None]:
import mmap

# Create a file for memory mapping
mmap_file = os.path.join(temp_dir, "mmap_test.dat")
size = 1024  # 1KB

# Create file with initial data
with open(mmap_file, 'wb') as f:
    f.write(b'\x00' * size)  # Fill with zeros

# Memory-map the file
with open(mmap_file, 'r+b') as f:
    with mmap.mmap(f.fileno(), 0) as mmapped:
        # Write to memory-mapped file
        mmapped[0:5] = b'Hello'
        mmapped[10:15] = b'World'
        
        # Read from memory-mapped file
        print(f"Data at 0-5: {mmapped[0:5]}")
        print(f"Data at 10-15: {mmapped[10:15]}")
        
        # Find in memory-mapped file
        index = mmapped.find(b'World')
        print(f"Found 'World' at index: {index}")
        
        # Slice operations
        mmapped[100:110] = b'0123456789'
        print(f"Data at 100-110: {mmapped[100:110]}")

# Verify changes were written
with open(mmap_file, 'rb') as f:
    f.seek(0)
    print(f"\nFirst 20 bytes: {f.read(20)}")

print(f"\nMemory-mapped file operations completed")

## 9. File Locking and Concurrent Access

In [None]:
import fcntl  # Unix/Linux only
import time
import threading

# Platform-independent file locking example
class FileLock:
    def __init__(self, filename):
        self.filename = filename
        self.lock_file = filename + '.lock'
    
    def acquire(self, timeout=10):
        start_time = time.time()
        while os.path.exists(self.lock_file):
            if time.time() - start_time > timeout:
                raise TimeoutError(f"Could not acquire lock for {self.filename}")
            time.sleep(0.1)
        
        # Create lock file
        with open(self.lock_file, 'w') as f:
            f.write(str(os.getpid()))
    
    def release(self):
        if os.path.exists(self.lock_file):
            os.remove(self.lock_file)
    
    def __enter__(self):
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# Using file lock
shared_file = os.path.join(temp_dir, "shared.txt")

def write_with_lock(thread_id, message):
    with FileLock(shared_file):
        print(f"Thread {thread_id} acquired lock")
        with open(shared_file, 'a') as f:
            f.write(f"Thread {thread_id}: {message}\n")
            time.sleep(0.5)  # Simulate work
        print(f"Thread {thread_id} released lock")

# Create threads
threads = []
for i in range(3):
    t = threading.Thread(target=write_with_lock, args=(i, f"Message {i}"))
    threads.append(t)
    t.start()

# Wait for all threads
for t in threads:
    t.join()

# Read result
print("\nShared file content:")
with open(shared_file, 'r') as f:
    print(f.read())

# Atomic file operations
def atomic_write(filename, data):
    """Write to file atomically using temp file and rename"""
    temp_name = filename + '.tmp'
    
    # Write to temp file
    with open(temp_name, 'w') as f:
        f.write(data)
        f.flush()
        os.fsync(f.fileno())  # Force write to disk
    
    # Atomic rename
    os.replace(temp_name, filename)

atomic_file = os.path.join(temp_dir, "atomic.txt")
atomic_write(atomic_file, "This was written atomically")
print(f"\nAtomic write completed")

## 10. Cleanup

In [None]:
# Clean up temporary directory
import shutil

if os.path.exists(temp_dir):
    shutil.rmtree(temp_dir)
    print(f"Cleaned up temporary directory: {temp_dir}")

## Module Summary

This module covered comprehensive file handling and I/O operations in Python:

1. **File Basics**: Opening, reading, writing, file modes
2. **File Positioning**: seek(), tell(), random access
3. **Binary Files**: Binary I/O, struct module, pickle serialization
4. **File Formats**: CSV, JSON, XML handling
5. **Path Operations**: os.path and pathlib for path manipulation
6. **Directory Operations**: Creating, listing, walking directories
7. **Temporary Files**: Safe temporary file and directory creation
8. **Compression**: ZIP, GZIP, TAR archives
9. **Advanced I/O**: StringIO, BytesIO, memory-mapped files
10. **Concurrent Access**: File locking, atomic operations

Key takeaways:
- Always use context managers (with statement) for file operations
- Choose appropriate file mode for your use case
- Use pathlib for modern path manipulation
- Consider memory-mapped files for large file operations
- Implement proper locking for concurrent file access
- Use appropriate serialization format (JSON, pickle, CSV) for your data
- Handle encoding properly when working with text files