# CSV File Operations

## Learning Objectives
By the end of this notebook, you will be able to:
- Read and write CSV files using Python's csv module
- Handle different CSV formats and delimiters
- Parse CSV data with proper error handling
- Deal with common CSV parsing exceptions
- Process large CSV files efficiently

## 1. Basic CSV Reading

In [None]:
import csv
from typing import List, Dict, Any, Optional
import os

# Create sample CSV data for demonstration
sample_csv_content = """employee_id,name,department,salary,hire_date
EMP001,Alice Johnson,Engineering,75000,2022-01-15
EMP002,Bob Smith,Marketing,65000,2021-06-01
EMP003,Charlie Brown,Sales,70000,2023-03-10
EMP004,Diana Davis,Engineering,80000,2020-11-20
EMP005,Eve Wilson,HR,60000,2022-08-05"""

# Write sample data to file
with open('employees.csv', 'w', newline='', encoding='utf-8') as file:
    file.write(sample_csv_content)

print("Sample CSV file created: employees.csv")
print("Content:")
print(sample_csv_content)

In [None]:
# Method 1: Reading CSV as list of lists
def read_csv_as_lists(filename: str) -> List[List[str]]:
    """
    Read CSV file and return as list of lists.
    
    Args:
        filename: Path to CSV file
    
    Returns:
        List of lists containing CSV data
    """
    data: List[List[str]] = []
    
    with open(filename, 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        for row in csv_reader:
            data.append(row)
    
    return data

# Read the CSV file
csv_data = read_csv_as_lists('employees.csv')

print("CSV data as lists:")
for i, row in enumerate(csv_data):
    print(f"Row {i}: {row}")

# Separate header and data
header = csv_data[0]
rows = csv_data[1:]

print(f"\nHeader: {header}")
print(f"Data rows: {len(rows)}")

In [None]:
# Method 2: Reading CSV as list of dictionaries using DictReader
def read_csv_as_dicts(filename: str) -> List[Dict[str, str]]:
    """
    Read CSV file and return as list of dictionaries.
    
    Args:
        filename: Path to CSV file
    
    Returns:
        List of dictionaries with column names as keys
    """
    data: List[Dict[str, str]] = []
    
    with open(filename, 'r', encoding='utf-8') as file:
        csv_reader = csv.DictReader(file)
        for row in csv_reader:
            data.append(dict(row))  # Convert OrderedDict to regular dict
    
    return data

# Read CSV as dictionaries
employee_records = read_csv_as_dicts('employees.csv')

print("CSV data as dictionaries:")
for i, record in enumerate(employee_records):
    print(f"Employee {i+1}: {record}")

# Access specific fields
print("\nEmployee names and salaries:")
for record in employee_records:
    name = record['name']
    salary = record['salary']
    department = record['department']
    print(f"{name} ({department}): ${salary}")

## 2. CSV Writing Operations

In [None]:
# Method 1: Writing CSV from list of lists
def write_csv_from_lists(filename: str, header: List[str], data: List[List[Any]]) -> None:
    """
    Write CSV file from header and data lists.
    
    Args:
        filename: Output CSV filename
        header: List of column names
        data: List of data rows
    """
    with open(filename, 'w', newline='', encoding='utf-8') as file:
        csv_writer = csv.writer(file)
        
        # Write header
        csv_writer.writerow(header)
        
        # Write data rows
        csv_writer.writerows(data)

# Create new employee data
new_header = ['employee_id', 'name', 'department', 'salary', 'bonus']
new_data = [
    ['EMP006', 'Frank Miller', 'IT', 72000, 5000],
    ['EMP007', 'Grace Lee', 'Finance', 68000, 4000],
    ['EMP008', 'Henry Clark', 'Operations', 65000, 3500]
]

# Write to new CSV file
write_csv_from_lists('new_employees.csv', new_header, new_data)
print("Created new_employees.csv")

# Verify the file was created correctly
with open('new_employees.csv', 'r', encoding='utf-8') as file:
    content = file.read()
    print("\nContent of new_employees.csv:")
    print(content)

In [None]:
# Method 2: Writing CSV from list of dictionaries using DictWriter
def write_csv_from_dicts(filename: str, data: List[Dict[str, Any]], fieldnames: Optional[List[str]] = None) -> None:
    """
    Write CSV file from list of dictionaries.
    
    Args:
        filename: Output CSV filename
        data: List of dictionaries to write
        fieldnames: Optional list of field names (uses keys from first dict if not provided)
    """
    if not data:
        return
    
    # Use provided fieldnames or extract from first dictionary
    if fieldnames is None:
        fieldnames = list(data[0].keys())
    
    with open(filename, 'w', newline='', encoding='utf-8') as file:
        csv_writer = csv.DictWriter(file, fieldnames=fieldnames)
        
        # Write header
        csv_writer.writeheader()
        
        # Write data rows
        csv_writer.writerows(data)

# Create employee data with calculated bonuses
employees_with_bonuses = []
for record in employee_records:
    # Calculate bonus as 10% of salary
    salary = float(record['salary'])
    bonus = salary * 0.10
    
    new_record = record.copy()
    new_record['bonus'] = f"{bonus:.2f}"
    new_record['total_compensation'] = f"{salary + bonus:.2f}"
    
    employees_with_bonuses.append(new_record)

# Write enhanced employee data
write_csv_from_dicts('employees_enhanced.csv', employees_with_bonuses)
print("Created employees_enhanced.csv")

# Display the enhanced data
print("\nEnhanced employee data:")
for record in employees_with_bonuses:
    name = record['name']
    total_comp = record['total_compensation']
    print(f"{name}: ${total_comp} total compensation")

## 3. Handling Different CSV Formats

In [None]:
# Create CSV files with different delimiters and formats

# Tab-separated values (TSV)
tsv_content = """product_id\tname\tprice\tcategory
PRD001\tLaptop\t999.99\tElectronics
PRD002\tMouse\t29.99\tElectronics
PRD003\tDesk\t299.99\tFurniture"""

with open('products.tsv', 'w', encoding='utf-8') as file:
    file.write(tsv_content)

# Semicolon-separated values
semicolon_content = """customer_id;name;email;country
CUST001;John Doe;john@email.com;USA
CUST002;Jane Smith;jane@email.com;Canada
CUST003;Bob Johnson;bob@email.com;UK"""

with open('customers.csv', 'w', encoding='utf-8') as file:
    file.write(semicolon_content)

print("Created sample files with different delimiters")

In [None]:
# Function to read CSV with custom delimiter
def read_csv_with_delimiter(filename: str, delimiter: str = ',') -> List[Dict[str, str]]:
    """
    Read CSV file with custom delimiter.
    
    Args:
        filename: Path to CSV file
        delimiter: Character used to separate fields
    
    Returns:
        List of dictionaries containing the data
    """
    data: List[Dict[str, str]] = []
    
    with open(filename, 'r', encoding='utf-8') as file:
        csv_reader = csv.DictReader(file, delimiter=delimiter)
        for row in csv_reader:
            data.append(dict(row))
    
    return data

# Read TSV file (tab-separated)
products = read_csv_with_delimiter('products.tsv', '\t')
print("Products from TSV file:")
for product in products:
    print(f"  {product['name']}: ${product['price']} ({product['category']})")

# Read semicolon-separated file
customers = read_csv_with_delimiter('customers.csv', ';')
print("\nCustomers from semicolon-separated file:")
for customer in customers:
    print(f"  {customer['name']} ({customer['country']}): {customer['email']}")

In [None]:
# Handling CSV files with quotes and special characters
complex_csv_content = '''product_id,name,description,price
PRD001,"Laptop, Gaming","High-performance laptop with \"RGB\" lighting",1299.99
PRD002,"Mouse, Wireless","Ergonomic mouse with 2.4GHz connection",49.99
PRD003,"Keyboard, Mechanical","Cherry MX switches, includes \"numpad\"",129.99'''

with open('complex_products.csv', 'w', encoding='utf-8') as file:
    file.write(complex_csv_content)

# Read complex CSV with quotes
complex_products = read_csv_with_delimiter('complex_products.csv', ',')
print("\nComplex products with quotes and special characters:")
for product in complex_products:
    print(f"ID: {product['product_id']}")
    print(f"Name: {product['name']}")
    print(f"Description: {product['description']}")
    print(f"Price: ${product['price']}")
    print()

## 4. Error Handling and Data Validation

In [None]:
# Create CSV with problematic data for testing
problematic_csv = """employee_id,name,department,salary,hire_date
EMP001,Alice Johnson,Engineering,75000,2022-01-15
EMP002,Bob Smith,Marketing,invalid_salary,2021-06-01
EMP003,,Sales,70000,2023-03-10
EMP004,Diana Davis,Engineering,80000,invalid_date
EMP005,Eve Wilson,HR,60000,2022-08-05
EMP006,Frank Miller,IT,72000
EMP007,Grace Lee,Finance,68000,2021-12-01,extra_field"""

with open('problematic_employees.csv', 'w', encoding='utf-8') as file:
    file.write(problematic_csv)

print("Created problematic CSV file for testing error handling")

In [None]:
def read_csv_with_validation(filename: str) -> tuple[List[Dict[str, Any]], List[str]]:
    """
    Read CSV file with data validation and error reporting.
    
    Args:
        filename: Path to CSV file
    
    Returns:
        Tuple of (valid_records, error_messages)
    """
    valid_records: List[Dict[str, Any]] = []
    error_messages: List[str] = []
    
    try:
        with open(filename, 'r', encoding='utf-8') as file:
            csv_reader = csv.DictReader(file)
            
            for row_num, row in enumerate(csv_reader, start=2):  # Start at 2 (header is row 1)
                try:
                    # Validate required fields
                    if not row.get('employee_id'):
                        error_messages.append(f"Row {row_num}: Missing employee_id")
                        continue
                    
                    if not row.get('name') or not row.get('name').strip():
                        error_messages.append(f"Row {row_num}: Missing or empty name")
                        continue
                    
                    # Validate and convert salary
                    try:
                        salary = float(row.get('salary', 0))
                        if salary <= 0:
                            error_messages.append(f"Row {row_num}: Invalid salary value")
                            continue
                    except ValueError:
                        error_messages.append(f"Row {row_num}: Salary is not a valid number")
                        continue
                    
                    # Validate date format (basic check)
                    hire_date = row.get('hire_date', '')
                    if hire_date and len(hire_date.split('-')) != 3:
                        error_messages.append(f"Row {row_num}: Invalid date format (expected YYYY-MM-DD)")
                        continue
                    
                    # Create validated record
                    validated_record = {
                        'employee_id': row['employee_id'].strip(),
                        'name': row['name'].strip(),
                        'department': row.get('department', '').strip(),
                        'salary': salary,
                        'hire_date': hire_date
                    }
                    
                    valid_records.append(validated_record)
                    
                except Exception as e:
                    error_messages.append(f"Row {row_num}: Unexpected error - {str(e)}")
                    
    except FileNotFoundError:
        error_messages.append(f"File not found: {filename}")
    except Exception as e:
        error_messages.append(f"Error reading file: {str(e)}")
    
    return valid_records, error_messages

# Test the validation function
valid_employees, errors = read_csv_with_validation('problematic_employees.csv')

print(f"Successfully processed {len(valid_employees)} valid records")
print(f"Found {len(errors)} errors:")

for error in errors:
    print(f"  - {error}")

print("\nValid employee records:")
for emp in valid_employees:
    print(f"  {emp['employee_id']}: {emp['name']} - ${emp['salary']:,.2f}")