In [7]:
# pip install reverse_geocode

In [8]:
# import csv
# import hashlib
# import reverse_geocode as rg
# from pycountry_convert import country_alpha2_to_continent_code, convert_continent_code_to_continent_name



In [9]:
import csv
from datetime import datetime

def load_holiday_data(holiday_file):
    """Load holiday data from a CSV file."""
    holidays = {}
    with open(holiday_file, 'r') as hfile:
        reader = csv.DictReader(hfile)
        for row in reader:
            holiday_date = row['Date']
            try:
                # Parse date in "YYYY-MM-DD" format (adjust if input format differs)
                parsed_date = datetime.strptime(holiday_date, '%Y-%m-%d').date()
                holidays[parsed_date.strftime('%Y-%m-%d')] = row['Holiday']
            except ValueError as e:
                print(f"Error parsing holiday date '{holiday_date}': {e}")
    return holidays

def create_date_dimension(crashes_file, output_file, holiday_file):
    """Enhanced Date Dimension with additional attributes."""
    holiday_data = load_holiday_data(holiday_file)
    
    unique_dates = {}
    with open(crashes_file, 'r') as infile:
        reader = csv.DictReader(infile)
        for row in reader:
            crash_date = row['CRASH_DATE']
            try:
                # Parse the date in the "MM/DD/YYYY HH:MM:SS AM/PM" format
                parsed_date = datetime.strptime(crash_date, '%m/%d/%Y %I:%M:%S %p')
                date_str = parsed_date.strftime('%Y-%m-%d')  # Convert to "YYYY-MM-DD" for uniqueness
                if date_str not in unique_dates:
                    unique_dates[date_str] = {
                        'date_id': str(parsed_date.strftime('%Y%m%d')),  # YYYYMMDD format
                        'month': int(parsed_date.month),
                        'year': int(parsed_date.year),
                        'quarter': int((parsed_date.month - 1) // 3 + 1),
                        'day_of_week': str(parsed_date.strftime('%A')),
                        'week_number': int(parsed_date.isocalendar()[1]),  # Week of the year
                        'is_weekend': 1 if parsed_date.weekday() >= 5 else 0,  # Saturday and Sunday
                        'is_holiday': 1 if date_str in holiday_data else 0  # Check holidays
                       # 'holiday_name': holiday_data.get(date_str, '')  # Holiday name if present
                    }
            except ValueError as e:
                print(f"Error parsing date '{crash_date}': {e}")

    with open(output_file, 'w', newline='') as outfile:
        fieldnames = [
            'date_id', 'month', 'year', 'quarter',
            'day_of_week', 'week_number', 'is_weekend', 'is_holiday' ]
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(unique_dates.values())



# def create_date_dimension(crashes_file, output_file, holiday_list=[]):
#     """Enhanced Date Dimension with additional attributes."""    
#     unique_dates = {}
#     with open(crashes_file, 'r') as infile:
#         reader = csv.DictReader(infile)
#         for row in reader:
#             crash_date = row['CRASH_DATE']
#             try:
#                 # Parse the date in the "MM/DD/YYYY HH:MM:SS AM/PM" format
#                 parsed_date = datetime.strptime(crash_date, '%m/%d/%Y %I:%M:%S %p')
#                 date_str = parsed_date.strftime('%Y-%m-%d')  # Convert to "YYYY-MM-DD" for uniqueness
#                 if date_str not in unique_dates:
#                     unique_dates[date_str] = {
#                         'date_id': str(parsed_date.strftime('%Y%m%d')),  # YYYYMMDD format
#                         #'day': parsed_date.day,
#                         'month': int(parsed_date.month),
#                         'year': int(parsed_date.year),
#                         'quarter': int((parsed_date.month - 1) // 3 + 1),
#                         'day_of_week': str(parsed_date.strftime('%A')),
#                         'week_number': int(parsed_date.isocalendar()[1]),  # Week of the year
#                         'is_weekend': 1 if parsed_date.weekday() >= 5 else 0,  # Saturday and Sunday
#                         'is_holiday': 1 if date_str in holiday_list else 0,  # Check holidays
#                     }
#             except ValueError as e:
#                 print(f"Error parsing date '{crash_date}': {e}")

#     with open(output_file, 'w', newline='') as outfile:
#         fieldnames = [
#             'date_id', 'month', 'year', 'quarter',
#             'day_of_week', 'week_number', 'is_weekend', 'is_holiday'
#         ]
#         writer = csv.DictWriter(outfile, fieldnames=fieldnames)
#         writer.writeheader()
#         writer.writerows(unique_dates.values())


In [10]:
def create_vehicle_dimension(vehicles_file, output_file):
    """
    Create the Vehicle Dimension based on unique vehicle details.
    Each row represents a unique vehicle.
    """
    vehicle_dimension = {}

    # Process vehicle data
    with open(vehicles_file, 'r') as infile:
        reader = csv.DictReader(infile)
        for row in reader:
            vehicle_id = row['VEHICLE_ID']
            if vehicle_id not in vehicle_dimension:
                vehicle_dimension[vehicle_id] = {
                    'vehicle_id': vehicle_id,
                    'crash_id': row['RD_NO'],  # Links to crash_id
                    'vehicle_type': row.get('VEHICLE_TYPE', 'Unknown'),
                    'manufacturer': row.get('MAKE', 'Unknown'),
                    'model': row.get('MODEL', 'Unknown'),
                    'registration_state': row.get('LIC_PLATE_STATE', 'Unknown'),
                }

    # Write Vehicle Dimension to output file
    with open(output_file, 'w', newline='') as outfile:
        fieldnames = ['vehicle_id', 'crash_id', 'vehicle_type', 'manufacturer', 'model', 'registration_state']
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(vehicle_dimension.values())


In [11]:
import csv

def create_geography_dimension(crashes_file, output_file):
    """
    Create the Geography Dimension with area_at_risk based on injury severity.
    Assigns unique location IDs and calculates risk levels.
    """
    injury_scores = {}

    # Process crashes to calculate weighted injury scores
    with open(crashes_file, 'r') as infile:
        reader = csv.DictReader(infile)
        for row in reader:
            beat_id = row['BEAT_OF_OCCURRENCE']
            try:
                # Safely convert injury fields to integers
                fatal_injuries = int(float(row['INJURIES_FATAL']))
                incapacitating_injuries = int(float(row['INJURIES_INCAPACITATING']))
                non_incapacitating_injuries = int(float(row['INJURIES_NON_INCAPACITATING']))
            except (ValueError, KeyError):
                # Default to 0 if any value is invalid or missing
                fatal_injuries = incapacitating_injuries = non_incapacitating_injuries = 0

            # Calculate weighted injury score
            score = (
                5 * fatal_injuries +
                3 * incapacitating_injuries +
                1 * non_incapacitating_injuries
            )
            if beat_id not in injury_scores:
                injury_scores[beat_id] = 0
            injury_scores[beat_id] += score

    # Create geography dimension with risk levels and unique location IDs
    unique_geographies = {}
    location_id_map = {}  # Map unique combination to location_id
    current_location_id = 1

    with open(crashes_file, 'r') as infile:
        reader = csv.DictReader(infile)
        for row in reader:
            beat_id = row['BEAT_OF_OCCURRENCE']
            street_name = row['STREET_NAME']
            street_number = row.get('STREET_NO', '')

            # Create a unique key for combination
            location_key = (beat_id, street_number, street_name)
            if location_key not in location_id_map:
                total_score = injury_scores.get(beat_id, 0)
                # Assign risk levels based on thresholds
                area_at_risk = (
                    "High" if total_score > 150 else
                    "Medium" if total_score > 75 else
                    "Low"
                )
                location_id_map[location_key] = current_location_id
                unique_geographies[current_location_id] = {
                    'location_id': current_location_id,
                    'beat_id': beat_id,
                    'street_name': street_name,
                    'street_number': street_number,
                    'area_risk_level': area_at_risk,
                }
                current_location_id += 1

    # Write Geography Dimension to output file
    with open(output_file, 'w', newline='') as outfile:
        fieldnames = ['location_id', 'beat_id', 'street_name', 'street_number', 'area_risk_level']
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(unique_geographies.values())

    return location_id_map  # Return the mapping for use in other dimensions


In [12]:
def create_person_dimension(people_file, output_file):
    """Extract unique people data and add a boolean is_under_21 attribute."""
    unique_people = {}
    with open(people_file, 'r') as infile:
        reader = csv.DictReader(infile)
        for row in reader:
            person_id = row['PERSON_ID']
            if person_id not in unique_people:
                try:
                    # Check and handle invalid or missing AGE
                    age = int(row['AGE']) if row['AGE'].isdigit() else -1  # Default to -1 if invalid
                except ValueError:
                    age = -1  # Default age if parsing fails

                unique_people[person_id] = {
                    'person_id': person_id,
                    'age': age,
                    'gender': row['SEX'],
                    'role_in_crash': row['PERSON_TYPE'],
                    'injury_severity': row['INJURY_CLASSIFICATION'],
                    'is_under_21': age < 21 if age >= 0 else False,  # False if age is invalid
                }

    with open(output_file, 'w', newline='') as outfile:
        fieldnames = ['person_id', 'age', 'gender', 'role_in_crash', 'injury_severity', 'is_under_21']
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(unique_people.values())




In [13]:
import csv
import hashlib

def create_cause_dimension(crashes_file, output_file):
    with open(crashes_file, 'r') as infile:
        all_crashcauses = {}
        reader = csv.DictReader(infile)
        for row in reader:
            # Generate a unique key based on cause-related fields
            unique_key = f"{row['PRIM_CONTRIBUTORY_CAUSE']}|{row['SEC_CONTRIBUTORY_CAUSE']}|{row['ROADWAY_SURFACE_COND']}|{row['LIGHTING_CONDITION']}|{row['WEATHER_CONDITION']}|{row['POSTED_SPEED_LIMIT']}|{row['TRAFFIC_CONTROL_DEVICE']}|{row['DEVICE_CONDITION']}|{row['ALIGNMENT']}|{row['ROAD_DEFECT']}"
            
            # Create a hashed cause_id for uniqueness
            cause_id = hashlib.md5(unique_key.encode()).hexdigest()[:8]  # Short hash for readability
            
            if cause_id not in all_crashcauses:
                all_crashcauses[cause_id] = {
                    'cause_id': cause_id,
                    'primary_cause': row['PRIM_CONTRIBUTORY_CAUSE'],
                    'secondary_cause': row['SEC_CONTRIBUTORY_CAUSE'],
                    'road_condition': row['ROADWAY_SURFACE_COND'],
                    'lighting_condition': row['LIGHTING_CONDITION'],
                    'weather_condition': row['WEATHER_CONDITION'],
                    'speed_limit': row['POSTED_SPEED_LIMIT'],
                    'traffic_control_device': row['TRAFFIC_CONTROL_DEVICE'],
                    'device_condition': row['DEVICE_CONDITION'],
                    'alignment': row['ALIGNMENT'],
                    'road_defect': row['ROAD_DEFECT'],
                }
    
    # Write the Cause Dimension to a CSV file
    with open(output_file, 'w', newline='') as outfile:
        fieldnames = ['cause_id', 'primary_cause', 'secondary_cause', 'road_condition',
                      'lighting_condition', 'weather_condition', 'speed_limit',
                      'traffic_control_device', 'device_condition', 'alignment', 'road_defect']
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(all_crashcauses.values())


In [14]:
def create_crash_dimension(crashes_file, output_file):

    crash_dimension = {}

    # Process crashes data
    with open(crashes_file, 'r') as infile:
        reader = csv.DictReader(infile)
        for row in reader:
            crash_id = row['RD_NO']
            crash_date = row['CRASH_DATE']
            parsed_date = datetime.strptime(crash_date, '%m/%d/%Y %I:%M:%S %p')
            date_str = parsed_date.strftime('%Y-%m-%d')  # Convert to "YYYY-MM-DD" for uniqueness
            time_str = parsed_date.strftime('%H:%M:%S') # this format is good for SQL to automatically get hour without making new column
            if crash_id not in crash_dimension:
                crash_dimension[crash_id] = {
                    'crash_id': crash_id,
                    'crash_date': date_str,
                    'crash_time': time_str,
                    'num_units': row.get('NUM_UNITS', -1),
                    'crash_severity_category': row.get('MOST_SEVERE_INJURY', 'Unknown')
                }

    # Write Vehicle Dimension to output file
    with open(output_file, 'w', newline='') as outfile:
        fieldnames = ['crash_id', 'crash_date', 'crash_time', 'num_units', 'crash_severity_category']
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(crash_dimension.values())



In [15]:
def create_fact_table(crashes_file, people_file, cause_file, geography_dimension_map, output_file):
    """Create the Fact Table for crashes, linking dimensions using foreign keys."""
    fact_table = []

    # Load Cause Dimension into a dictionary for quick lookup
    cause_mapping = {}
    with open(cause_file, 'r') as cause_infile:
        reader = csv.DictReader(cause_infile)
        for row in reader:
            # Use the same key structure as in create_cause_dimension
            unique_key = f"{row['primary_cause']}|{row['secondary_cause']}|{row['road_condition']}|{row['lighting_condition']}|{row['weather_condition']}|{row['speed_limit']}|{row['traffic_control_device']}|{row['device_condition']}|{row['alignment']}|{row['road_defect']}"
            cause_mapping[unique_key] = row['cause_id']

    # Process crash data and find cause_id
    crashes = {}
    with open(crashes_file, 'r') as infile:
        reader = csv.DictReader(infile)
        for row in reader:
            crash_date = row['CRASH_DATE']
            # Correct date parsing format
            parsed_date = datetime.strptime(crash_date, '%m/%d/%Y %I:%M:%S %p')
            date_id = parsed_date.strftime('%Y%m%d')

            # Lookup cause_id from the Cause Dimension
            unique_key = f"{row['PRIM_CONTRIBUTORY_CAUSE']}|{row['SEC_CONTRIBUTORY_CAUSE']}|{row['ROADWAY_SURFACE_COND']}|{row['LIGHTING_CONDITION']}|{row['WEATHER_CONDITION']}|{row['POSTED_SPEED_LIMIT']}|{row['TRAFFIC_CONTROL_DEVICE']}|{row['DEVICE_CONDITION']}|{row['ALIGNMENT']}|{row['ROAD_DEFECT']}"
            cause_id = cause_mapping.get(unique_key, 'Unknown')  # Default to 'Unknown' if no match

            # Get location_id from geography_dimension_map
            location_key = (row['BEAT_OF_OCCURRENCE'], row.get('STREET_NO', ''), row['STREET_NAME'])
            location_id = geography_dimension_map.get(location_key, 'Unknown')

            crashes[row['RD_NO']] = {
                'location_id': location_id,
                'date_id': date_id,
                'cause_id': cause_id
            }

    # Process people data and link to crashes
    with open(people_file, 'r') as infile:
        reader = csv.DictReader(infile)
        for row in reader:
            crash_id = row['RD_NO']
            person_id = row['PERSON_ID']
            vehicle_id = row.get('VEHICLE_ID', 'Unknown')  # Default if missing
            damage_category = row.get('DAMAGE_CATEGORY', 'Unknown')  # Default if missing
            damage = row['DAMAGE']  # Assuming this is the correct field name for damage

            # Ensure the crash exists in crashes data
            if crash_id in crashes:
                fact = {
                    'crash_id': crash_id,
                    'cause_id': crashes[crash_id]['cause_id'],  # Use cause_id from lookup
                    'person_id': person_id,
                    'vehicle_id': vehicle_id,
                    'location_id': crashes[crash_id]['location_id'],
                    'date_id': crashes[crash_id]['date_id'],
                    'damage': damage,
                    'damage_category': f'{float(damage_category):.2f}' if damage_category.replace('.', '', 1).isdigit() else damage_category
                }
                fact_table.append(fact)

    # Write the Fact Table to the output file
    with open(output_file, 'w', newline='') as outfile:
        fieldnames = ['crash_id', 'cause_id', 'person_id', 'vehicle_id', 'location_id', 'date_id', 'damage', 'damage_category']
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(fact_table)


In [None]:

create_date_dimension('Crashes_deduped.csv', 'DateDimension_withholiday.csv', 'holiday_in_usa.csv')
create_geography_dimension('Crashes_deduped.csv', 'GeographyDimension_2.csv')
create_person_dimension('People_damage_filled_CSV.csv', 'PersonDimension.csv')
create_cause_dimension('Crashes_deduped.csv', 'CauseDimension.csv')
create_vehicle_dimension('Vehicles_IDfilled_nosplit.csv', 'VehicleDimension.csv' )
create_crash_dimension('Crashes_deduped.csv', 'CrashDimension.csv')


location_id_map = create_geography_dimension('Crashes_deduped.csv', 'GeographyDimension_2.csv')


create_fact_table('Crashes_deduped.csv', 'People_damage_filled_CSV.csv', 'CauseDimension.csv', location_id_map, 'FactTable_2.csv')