In [25]:
from utils.read_write import read_csv_v2, to_csv

In [26]:
processedFileString = '_Processed.csv'
people_df=read_csv_v2('../data/People'+processedFileString)
vehicles_df=read_csv_v2('../data/Vehicles'+processedFileString)
crashes_df=read_csv_v2('../data/Crashes'+processedFileString)

Reading csv People_Processed.csv
Reading csv Vehicles_Processed.csv
Reading csv Crashes_Processed.csv


In [34]:
len(people_df), len(vehicles_df), len(crashes_df)

(564241, 460437, 257925)

In [27]:
from typing import List, Dict, Any
from collections import OrderedDict
import copy

def create_dict_for_table(
    dfs: List[List[Dict[str, Any]]],
    columns_to_keep: List[List[str]],
    new_column_names: List[List[str]],
    use_index: List[bool],
    index_names: List[str]
) -> List[List[Dict[str, Any]]]:
    """
    Process multiple lists of dictionaries by keeping selected columns, renaming them,
    and optionally adding index columns. Creates deep copies of input dataframes to avoid
    modifying original data.
    
    Args:
        dfs: List of dataframes (each dataframe is a list of dictionaries)
        columns_to_keep: List of lists containing column names to retain for each dataframe
        new_column_names: List of lists containing new names for retained columns
        use_index: List of booleans indicating whether to add index column for each dataframe
        index_names: List of names for index columns
    
    Returns:
        List of processed dataframes with copied and transformed data
        
    Raises:
        ValueError: If input lists have inconsistent lengths or column mappings
    """
    # Validate input lengths
    input_lengths = [len(x) for x in (dfs, columns_to_keep, new_column_names, use_index, index_names)]
    if len(set(input_lengths)) > 1:
        raise ValueError(
            f"Inconsistent input lengths:\n"
            f"dfs: {input_lengths[0]}, columns_to_keep: {input_lengths[1]}, "
            f"new_column_names: {input_lengths[2]}, use_index: {input_lengths[3]}, "
            f"index_names: {input_lengths[4]}"
        )
    
    # Create a deep copy of the input dataframes list
    processed_dfs = []
    
    # Process each dataframe
    for df_idx, (df, keep_cols, new_cols, add_index, idx_name) in enumerate(
        zip(dfs, columns_to_keep, new_column_names, use_index, index_names)
    ):
        # Validate column name mappings
        if len(keep_cols) != len(new_cols):
            raise ValueError(
                f"Mismatched column counts in dataframe at index {df_idx}: "
                f"columns_to_keep: {len(keep_cols)}, new_column_names: {len(new_cols)}"
            )
        
        # Create column name mapping
        col_mapping = dict(zip(keep_cols, new_cols))
        
        # Create new dataframe for processed rows
        processed_df = []
        
        # Process each row in the dataframe
        for row_idx, row in enumerate(df):
            # Create new row with only desired columns and renamed
            new_row = {
                new_name: copy.deepcopy(row[old_name])
                for old_name, new_name in col_mapping.items()
            }
            
            # Add index if requested
            if add_index:
                new_row[idx_name] = row_idx
                
            # Add processed row to new dataframe
            processed_df.append(new_row)
        
        # Add processed dataframe to result list
        processed_dfs.append(processed_df)
    
    return processed_dfs

In [40]:
from collections import defaultdict

def outer_merge(list1, list2, on):
    """
    Perform an outer merge on two lists of dictionaries based on the specified key(s).

    Parameters:
    - list1 (list): First list of dictionaries.
    - list2 (list): Second list of dictionaries.
    - on (str or list): Key(s) to merge on.

    Returns:
    - list: Resulting list of merged dictionaries.
    """
    
    # Create dictionaries keyed by the values in the `on` columns
    def create_key(row, keys):
        """Create a tuple key from dictionary `row` based on `keys`."""
        return tuple(row[key] for key in keys)
    
    dict1 = {create_key(row, on): row for row in list1}
    dict2 = {create_key(row, on): row for row in list2}
    
    # Collect all unique keys
    all_keys = set(dict1.keys()).union(set(dict2.keys()))
    
    merged = []
    
    # For each key, merge dictionaries and handle missing data
    for key in all_keys:
        row1 = dict1.get(key, {})
        row2 = dict2.get(key, {})
        
        merged_row = defaultdict(lambda: None)  # Use None for missing values
        merged_row.update(row1)
        merged_row.update(row2)
        
        # Convert defaultdict back to regular dict
        merged.append(dict(merged_row))

    return merged


In [41]:
mergeKey = "RD_NO"

result = outer_merge(outer_merge(crashes_df, vehicles_df, mergeKey), people_df, mergeKey)

In [43]:
len(result)

257927

In [44]:
result

[{'RD_NO': 'JB470803',
  'CRASH_DATE': '10/10/2018 09:17:00 PM',
  'POSTED_SPEED_LIMIT': 30,
  'TRAFFIC_CONTROL_DEVICE': 'NO CONTROLS',
  'DEVICE_CONDITION': 'NO CONTROLS',
  'WEATHER_CONDITION': 'CLEAR',
  'LIGHTING_CONDITION': 'DARKNESS, LIGHTED ROAD',
  'FIRST_CRASH_TYPE': 'SIDESWIPE SAME DIRECTION',
  'TRAFFICWAY_TYPE': 'NOT DIVIDED',
  'ALIGNMENT': 'STRAIGHT AND LEVEL',
  'ROADWAY_SURFACE_COND': 'DRY',
  'ROAD_DEFECT': 'NO DEFECTS',
  'REPORT_TYPE': 'NOT ON SCENE (DESK REPORT)',
  'CRASH_TYPE': 'NO INJURY / DRIVE AWAY',
  'DATE_POLICE_NOTIFIED': '10/10/2018 09:50:00 PM',
  'PRIM_CONTRIBUTORY_CAUSE': 'DISTRACTION - FROM INSIDE VEHICLE',
  'SEC_CONTRIBUTORY_CAUSE': 'UNABLE TO DETERMINE',
  'STREET_NO': 5867,
  'STREET_DIRECTION': 'W',
  'STREET_NAME': 'LAKE ST',
  'BEAT_OF_OCCURRENCE': 1512,
  'NUM_UNITS': 2,
  'MOST_SEVERE_INJURY': 'NO INDICATION OF INJURY',
  'INJURIES_TOTAL': 0,
  'INJURIES_FATAL': 0,
  'INJURIES_INCAPACITATING': 0,
  'INJURIES_NON_INCAPACITATING': 0,
  'INJURIES

In [31]:
keys = []
for elem in x:
    keys.append(elem['RD_NO'])

In [32]:
len(set(keys))

257927

In [33]:
len(crashes_df)

257925