# 01 - Import STATS19 Dataset, Merge & Save

## Overview

The UK Department for Transport (DfT) road casualty statistics consist of [three primary datasets](https://www.data.gov.uk/dataset/cb7ae6f0-4be6-4935-9277-47e5ce24a11f/road-accidents-safety-data):
- Collision data: Information about each accident event
- Vehicle data: Details about vehicles involved in accidents
- Casualty data: Information about people injured in accidents

These datasets are linked by common identifiers and can be quite large (approximately 4.5GB combined).

## Memory Challenges

When processing large datasets, loading everything into memory at once can lead to `MemoryError` issues. A solution is to process the data in manageable chunks, focusing on:

1. Reading data incrementally 
2. Applying early filtering to reduce data volume
3. Only loading relevant portions of secondary datasets
4. Writing results incrementally to disk

In [4]:
# Import necessary libraries
import pandas as pd
from pathlib import Path
import os

### Chunked Processing Functions
We define specialized functions to handle large data processing:
- `load_filtered_csv`: loads and filters `.csv` files in chunks.

In [5]:
def load_filtered_csv(path, filter_func=None, dtype=None, chunksize=100_000):
    """
    Load and filter a CSV file in chunks
    
    Parameters:
    path : Path to CSV file
    filter_func : Function to filter rows (optional)
    dtype : Dictionary of column data types
    chunksize : Number of rows to process at once
    """
    chunks = []
    for chunk in pd.read_csv(path, dtype=dtype, chunksize=chunksize, low_memory=False):
        if filter_func is not None:
            chunk = chunk[filter_func(chunk)]
        chunks.append(chunk)
        
    return pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame()

- `filter_south_yorkshire`: filter function to isolate data for South Yorkshire

In [6]:
def filter_south_yorkshire(df):
    return df['police_force'] == 14

`clean_and_organise_data`: removes redundant columns and orders identifier columns to the front.

In [7]:
def clean_and_organize_data(merged_data):
    """
    Clean up and organize columns in the merged dataset
    
    Parameters:
    merged_data : DataFrame containing the merged data
    
    Returns:
    DataFrame with cleaned and reorganized columns
    """
    print("  Cleaning and organizing columns...")
    
    # List of columns to drop as identified earlier
    columns_to_drop = [
        'accident_year_x', 'accident_year_y',
        'accident_reference_x', 'accident_reference_y'
    ]
    
    # Drop redundant columns
    cleaned_data = merged_data.drop(columns=columns_to_drop, errors='ignore')
    
    # Rename 'vehicle_reference_x' if it exists
    if 'vehicle_reference_x' in cleaned_data.columns:
        cleaned_data = cleaned_data.rename(columns={'vehicle_reference_x': 'vehicle_reference'})
    
    # Reorder columns to bring reference columns to the front
    reference_columns = [col for col in [
        'accident_index', 'accident_year', 'accident_reference', 
        'vehicle_reference', 'casualty_reference'
    ] if col in cleaned_data.columns]
    
    # Identify remaining columns that aren't in reference_columns
    remaining_columns = [col for col in cleaned_data.columns if col not in reference_columns]
    
    # Combine the lists to reorder DataFrame columns
    ordered_columns = reference_columns + remaining_columns
    
    return cleaned_data[ordered_columns]

- `process_in_chunks`: Main processing function that handles the entire workflow. Keeps relationship integrity.

In [8]:
def process_in_chunks(casualty_path, collision_path, vehicle_path, output_path, filter_func=None, chunksize=50_000, dtype_dict=None):
    """
    Process large datasets while preserving relationships between records
    
    Parameters:
    casualty_path : Path to casualty CSV
    collision_path : Path to collision CSV
    vehicle_path : Path to vehicle CSV
    output_path : Where to save the final filtered data
    filter_func : Function to filter the dataset (optional)
    chunksize : Number of rows to process at once
    dtype_dict : Dictionary of column data types
    """
    # Ensure output directory exists
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    # Step 1: First filter collision data to get relevant accident indices
    print("Step 1: Filtering collision data to get relevant accident indices...")
    relevant_accident_indices = set()
    
    for chunk in pd.read_csv(collision_path, dtype=dtype_dict, chunksize=chunksize, low_memory=False):
        if filter_func is not None:
            filtered_chunk = chunk[filter_func(chunk)]
            if not filtered_chunk.empty:
                relevant_accident_indices.update(filtered_chunk['accident_index'])
    
    print(f"Found {len(relevant_accident_indices)} relevant accidents")
    
    if not relevant_accident_indices:
        print("No relevant data found after filtering. Process complete.")
        return
    
    # Step 2: Process each dataset in chunks, but filter by the complete set of accident indices
    print("\nStep 2: Processing casualty data...")
    processed_casualties = []
    
    for chunk in pd.read_csv(casualty_path, dtype=dtype_dict, chunksize=chunksize, low_memory=False):
        filtered_chunk = chunk[chunk['accident_index'].isin(relevant_accident_indices)]
        if not filtered_chunk.empty:
            processed_casualties.append(filtered_chunk)
    
    if not processed_casualties:
        print("No casualty data found for the filtered accidents. Process complete.")
        return
    
    casualty_data = pd.concat(processed_casualties, ignore_index=True)
    print(f"Processed {len(casualty_data)} casualty records")
    
    # Step 3: Process vehicle data
    print("\nStep 3: Processing vehicle data...")
    processed_vehicles = []
    
    for chunk in pd.read_csv(vehicle_path, dtype=dtype_dict, chunksize=chunksize, low_memory=False):
        filtered_chunk = chunk[chunk['accident_index'].isin(relevant_accident_indices)]
        if not filtered_chunk.empty:
            processed_vehicles.append(filtered_chunk)
    
    if not processed_vehicles:
        print("No vehicle data found for the filtered accidents. Process complete.")
        return
    
    vehicle_data = pd.concat(processed_vehicles, ignore_index=True)
    print(f"Processed {len(vehicle_data)} vehicle records")
    
    # Step 4: Load the filtered collision data
    print("\nStep 4: Loading filtered collision data...")
    processed_collisions = []
    
    for chunk in pd.read_csv(collision_path, dtype=dtype_dict, chunksize=chunksize, low_memory=False):
        filtered_chunk = chunk[chunk['accident_index'].isin(relevant_accident_indices)]
        if not filtered_chunk.empty:
            processed_collisions.append(filtered_chunk)
    
    collision_data = pd.concat(processed_collisions, ignore_index=True)
    print(f"Loaded {len(collision_data)} collision records")
    
    # Step 5: Merge datasets
    print("\nStep 5: Merging datasets...")
    print("  Merging casualty data with collision data...")
    merged_casualty_collision = casualty_data.merge(
        collision_data, on="accident_index", how="inner")
    
    print("  Merging with vehicle data...")
    final_data = merged_casualty_collision.merge(
        vehicle_data, on=["accident_index", "vehicle_reference"], how="inner")
    
    # Step 6: Clean and organize the merged data
    print("\nStep 6: Cleaning and organizing data...")
    final_data = clean_and_organize_data(final_data)
    
    # Step 7: Write the complete dataset
    print("\nStep 7: Writing processed data to file...")
    final_data.to_csv(output_path, index=False)
    
    print(f"\nProcessing complete. {len(final_data)} records saved to {output_path}")
    
    # Clean up memory
    del casualty_data, collision_data, vehicle_data, final_data, merged_casualty_collision

This function:

1. Processes collision data in chunks
2. For each chunk, applies filtering if specified
3. Identifies accident indices in the current chunk
4. Loads only relevant casualty and vehicle data using these indices
5. Merges the datasets appropriately
6. Cleans and organizes the columns
7. Writes each processed chunk to the output file
8. Frees memory after each chunk is processed

#### Memory Optimization Strategy
Our approach follows these key principles:

1. Early filtering: Apply geographic filtering (South Yorkshire) early to minimize data volume
2. Selective loading: Only load data relevant to the current processing chunk
3. Incremental output: Write results to disk as they're processed rather than accumulating in memory
4. Memory cleanup: Explicitly delete intermediate dataframes after they're no longer needed

#### Data Type Specification
We specify data types for identifier columns to ensure consistent joining:

In [9]:
# Specify data types for critical columns
dtype_dict = {
    'accident_index': str,
    'accident_year': str, 
    'accident_reference': str,
    'vehicle_reference': str,
    'casualty_reference': str
}

Now that we have the functions defined, we can run the code and process our datasets:

In [10]:
# File paths
my_dir_path = Path('F:/downloads')
save_path = Path('../data/datasets')
output_file = '../data/STATS19/dft_STATS19_1979_23_SY.csv'

# Process the data in chunks
process_in_chunks(
    casualty_path=my_dir_path/'dft-road-casualty-statistics-casualty-1979-latest-published-year.csv',
    collision_path=my_dir_path/'dft-road-casualty-statistics-collision-1979-latest-published-year.csv',
    vehicle_path=my_dir_path/'dft-road-casualty-statistics-vehicle-1979-latest-published-year.csv',
    output_path=output_file,
    filter_func=filter_south_yorkshire,
    chunksize=50_000,
    dtype_dict=dtype_dict
)

Step 1: Filtering collision data to get relevant accident indices...
Found 180391 relevant accidents

Step 2: Processing casualty data...
Processed 243191 casualty records

Step 3: Processing vehicle data...
Processed 315043 vehicle records

Step 4: Loading filtered collision data...
Loaded 180391 collision records

Step 5: Merging datasets...
  Merging casualty data with collision data...
  Merging with vehicle data...

Step 6: Cleaning and organizing data...
  Cleaning and organizing columns...

Step 7: Writing processed data to file...

Processing complete. 243191 records saved to ../data/STATS19/dft_STATS19_1979_23_SY.csv
