In [2]:
import requests
import pandas as pd
import sqlite3
from datetime import datetime
import time

#etl class
class ETLProcessor:
  def __init__(self, db_name='etl_data.db'):
        self.conn = sqlite3.connect(db_name)

#retrieves the temperature (celsius), precipitation (in inches), and precipitation type for a given latitude, longitude, and date
  def fetch_historical_weather(self,lat, lon, start_date):
    try:
      weather_url = f'https://archive-api.open-meteo.com/v1/archive?latitude={lat}&longitude={lon}&start_date={start_date}&end_date={start_date}&daily=temperature_2m_max,temperature_2m_min,precipitation_sum,weathercode&timezone=auto'
      response = requests.get(weather_url)
      response.raise_for_status()
      data = response.json()
      daily = data.get('daily', {})
      temp_max = daily.get('temperature_2m_max', [None])[0]
      temp_min = daily.get('temperature_2m_min', [None])[0]
      precipitation = daily.get('precipitation_sum', [None])[0]
      weather_code = daily.get('weathercode', [None])[0]
      precipitation_type = self.get_precipitation_type(weather_code)
      return {'temp_max': temp_max, 'temp_min': temp_min, 'precipitation': precipitation, 'precipitation_type': precipitation_type}
    except Exception as e:
      print(f"Error fetching weather data: {e}")

      return {'temp_max': None, 'temp_min': None, 'precipitation': None, 'precipitation_type': None}

 #helper function for to get the type of precipitation based on codes used by open-meteo
  def get_precipitation_type(self, weather_code):
    precipitation_types = {0: 'Clear', 1: 'Mainly Clear', 2: 'Partly Cloudy', 3: 'Cloudy',
            45: 'Fog', 48: 'Depositing Rime Fog', 51: 'Drizzle', 53: 'Drizzle', 55: 'Drizzle',
            61: 'Rain', 63: 'Rain', 65: 'Rain', 66: 'Freezing Rain', 67: 'Freezing Rain',
            71: 'Snow', 73: 'Snow', 75: 'Snow', 77: 'Snow Grains', 80: 'Rain Showers',
            81: 'Rain Showers', 82: 'Rain Showers', 85: 'Snow Showers', 86: 'Snow Showers',
            95: 'Thunderstorm', 96: 'Thunderstorm with Hail', 99: 'Thunderstorm with Hail'}
    return precipitation_types.get(weather_code, 'Unknown')

 #loads the car accident csv data and picks a random sample of 100 car accidents
  def load_accident_data(self,file_path):
    try:
      if file_path.lower().endswith('.csv'):
        accident_data = pd.read_csv(file_path)
      elif file_path.lower().endswith('.json'):
        accident_data = pd.read_json(file_path)
      else:
        print(f"Unsupported file format: {file_path}")
        return pd.DataFrame()
      random_sample = accident_data.sample(n=100, random_state=42)
      return random_sample
    except Exception as e:
      print(f"Error loading CSV file: {e}")
      return pd.DataFrame()
#function that summarizes the ingested data (csv and json via api) by finding the number of rows and columns
  def summarize_data_raw(self, data, label="Raw Data"):
    """
    Prints a summary (number of records and columns) of the provided DataFrame.
    """
    try:
      num_records = len(data)
      num_columns = len(data.columns)
      print(f"{label} Summary: {num_records} records and {num_columns} columns.")
    except Exception as e:
      print(f"Error summarizing {label}: {e}")

  def transform_merge_data(self, accident_data):
    try:
      #converts the start time column in the csv to just the date instead of the date and time
      accident_data['Start_Time'] = pd.to_datetime(accident_data['Start_Time'], errors='coerce').dt.date
      #drops any records with empty start dates
      accident_data = accident_data.dropna(subset=['Start_Time']).copy()
      temps_max, temps_min, precipitations, precipitation_types = [], [], [], []
      #fetches the weather data from the api with the given latitude, longitude, and date of a record in the csv
      for index, row in accident_data.iterrows():
        weather_data = self.fetch_historical_weather(row['Start_Lat'], row['Start_Lng'], row['Start_Time'])
        temps_max.append(weather_data['temp_max'])
        temps_min.append(weather_data['temp_min'])
        precipitations.append(weather_data['precipitation'])
        precipitation_types.append(weather_data['precipitation_type'])
        time.sleep(0.2)
      #turns the weather data into a csv that can be summarized by finding the number of rows and columns
      raw_weather_df = pd.DataFrame({
           "temp_max": temps_max,
           "temp_min": temps_min,
           "precipitation":precipitations,
           "precipitation_type": precipitation_types
            })

      self.summarize_data_raw(raw_weather_df, label="Raw Weather Data")
      print()

      #appends this information to the csv
      accident_data['weather_temp_max'] = temps_max
      accident_data['weather_temp_min'] = temps_min
      accident_data['weather_precipitation'] = precipitations
      accident_data['weather_precipitation_type'] = precipitation_types

      print(f'Data Transformed: {len(accident_data)} records and {len(accident_data.columns)} columns')
      print()
      return accident_data
    except Exception as e:
      print(f"Error transforming data: {e}")
      return pd.DataFrame()
  #loads the information from the csv into a table in a SQLite data base
  def load_data(self, transformed_data, table_name = 'accident_weather'):
    try:
      transformed_data.to_sql(table_name, self.conn, if_exists='replace', index=False)
      print(f'Data Loaded: {len(transformed_data)} records and {len(transformed_data.columns)} columns')
      print()
    except Exception as e:
      print(f"Error loading data: {e}")
  #based on user input the transformed data can be given as a csv or json along with being loaded into the database
  def desired_output_format(self, transformed_data, format, output_file):
    try:
      if format == 'json':
        return transformed_data.to_json(f'{output_file}.json',orient='records')
        print()
      elif format == 'csv':
        return transformed_data.to_csv(f'{output_file}.csv', index=False)
        print()
      elif format == 'sql':
        self.load_data(transformed_data, table_name = output_file)
        print(f'Data converted and saved as {output_file}.{format}')
        print()
      else:
        print(f"Unsupported output format: {format}")
    except Exception as e:
      print(f"Error generating desired output: {e}")
  #the data now loaded into the table is analyzed for correlations
  def analyze_data(self, table_name='accident_weather'):
    try:
      #queries the table for frequency of car accidents based on precipitation type and prints out the chart
      precipitation_query = f'''
        SELECT weather_precipitation_type, COUNT(*) AS accident_count
        FROM {table_name}
        GROUP BY weather_precipitation_type
        ORDER BY accident_count DESC
        '''
      precipitation_analysis = pd.read_sql(precipitation_query, self.conn)
      print('Accident Frequency by Precipitation Type:')
      print(precipitation_analysis)
      print()
      print()
      #queries the table for frequency of car accidents based on maximum temperature and prints out the chart
      temp_query = f'''
        SELECT weather_temp_max, COUNT(*) AS accident_count
        FROM {table_name}
        GROUP BY weather_temp_max
        ORDER BY accident_count DESC
        '''
      temp_analysis = pd.read_sql(temp_query, self.conn)
      print('Accident Frequency by Temperature:')
      print(temp_analysis)
      print()
      print("*" * 40)
      print()
    except Exception as e:
      print(f"Error analyzing data: {e}")
  #summarizes the table in the database by finding the number of rows and columns
  def summarize_data_transformed(self, table_name = 'accidents_weather'):
    try:
      query = f'SELECT COUNT(*) as num_records FROM {table_name}'
      records = pd.read_sql(query, self.conn)['num_records'][0]

      query = f'PRAGMA table_info({table_name})'
      columns = len(pd.read_sql(query, self.conn))

      print(f'Table "{table_name}" summary: {records} records and {columns} columns.')
    except Exception as e:
      print(f'Error summarizing table: {e}')
  #closes connection to db
  def close_connection(self):
        self.conn.close()

def main():
  #prompts user for the name and type of the output file
  output_format = input("Enter output file format (json, csv, or sql): ").strip().lower()
  output_file = input("Enter desired output file name or SQL table name: ")
  accident_file_path = 'USA_ACCIDENTS.csv'

  print()
  print("*" * 40)
  print()
  #declares a new ETLProcessor
  etl = ETLProcessor()
  #loads csv data
  accident_data = etl.load_accident_data(accident_file_path)
  if accident_data.empty:
    print("No data loaded. Exiting...")
    return
  #summarizes the raw csv data
  etl.summarize_data_raw(accident_data, label="Raw Accident Data")
  print()
  #transforms and merges csv with api data
  transformed_data = etl.transform_merge_data(accident_data)
  if transformed_data.empty:
      print("Error during data transformation. Exiting.")
      return


  print()
  #loads data to sql database, summarizes the table and analyzes it
  if output_format == "sql":
    etl.desired_output_format(transformed_data, output_format, output_file)
    print("*" * 40)
    print()
    etl.summarize_data_transformed(table_name=output_file)
    print()
    etl.analyze_data(table_name= output_file)
    #etl.summarize_data_transformed(table_name="accident_weather")

  else:
      etl.desired_output_format(transformed_data, output_format, output_file)
      print("*" * 40)
      print()
      etl.load_data(transformed_data,output_file)
      etl.summarize_data_transformed(output_file)
      print()
      etl.analyze_data(output_file)


 #closes connection to db
  etl.close_connection()

if __name__ == "__main__":
  main()









Enter output file format (json, csv, or sql): json
Enter desired output file name or SQL table name: accident_weather

****************************************

Raw Accident Data Summary: 100 records and 8 columns.

Raw Weather Data Summary: 92 records and 4 columns.

Data Transformed: 92 records and 12 columns


****************************************

Data Loaded: 92 records and 12 columns

Table "accident_weather" summary: 92 records and 12 columns.

Accident Frequency by Precipitation Type:
  weather_precipitation_type  accident_count
0                     Cloudy              29
1                    Drizzle              27
2                       Rain              16
3                       Snow               7
4               Mainly Clear               7
5                      Clear               6


Accident Frequency by Temperature:
    weather_temp_max  accident_count
0               29.2               2
1               28.7               2
2               27.6               2