Extracting Data - No transformations in this step, just selecting attributes needed and loading all into Tables

In [1]:
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text

In [9]:
# Creating DF from CSV files:
# 

# Crime Data
crime_df = pd.read_csv('crime_data.csv', sep = ',', low_memory=False).reset_index(drop=True)

# Moon Data
moon_2015 = pd.read_csv('Moon_Fases/moon-phases-2015-America_New_York.csv', sep = ',', low_memory=False).reset_index(drop=True)
moon_2016 = pd.read_csv('Moon_Fases/moon-phases-2016-America_New_York.csv', sep = ',', low_memory=False).reset_index(drop=True)
moon_2017 = pd.read_csv('Moon_Fases/moon-phases-2017-America_New_York.csv', sep = ',', low_memory=False).reset_index(drop=True)

moon_df = pd.concat([moon_2015, moon_2016, moon_2017], axis = 0, ignore_index = True).reset_index(drop=True) # just concatenating 

# Weather Data - Same recording, this is one dataset that has been separated into multiple files
humidity_df = pd.read_csv('weather_data/humidity.csv', sep = ',', low_memory=False).reset_index(drop=True)
pressure_df = pd.read_csv('weather_data/pressure.csv', sep = ',', low_memory=False).reset_index(drop=True)
temperature_df = pd.read_csv('weather_data/temperature.csv', sep = ',', low_memory=False).reset_index(drop=True)
weather_description_df = pd.read_csv('weather_data/weather_description.csv', sep = ',', low_memory=False).reset_index(drop=True)
wind_speed_df = pd.read_csv('weather_data/wind_direction.csv', sep = ',', low_memory=False).reset_index(drop=True)
wind_direction_df = pd.read_csv('weather_data/wind_speed.csv', sep = ',', low_memory=False).reset_index(drop=True)

# source: https://pandas.pydata.org/docs/reference/api/pandas.concat.html. April 9 2023
#https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.reset_index.html, March 30 2023


In [35]:
# Checking if the length and datetime of Weather Dataset are the same per subfile:

print(f"Number of records for humidity:{len(humidity_df)}\n",
      f"Number of records for pressure:{len(pressure_df)}\n",
      f"Number of records for humidity:{len(temperature_df)}\n",
      f"Number of records for weather description:{len(weather_description_df)}\n",
      f"Number of records for wind speed:{len(wind_speed_df)}\n",
      f"Number of records for wind direction:{len(wind_direction_df)}\n",
      "Are all Records the same: ",
      len(humidity_df) == len(pressure_df) == len(temperature_df) ==len(weather_description_df) == len(wind_speed_df) == len(wind_direction_df)
      )

print("Are all datetime records the same?", all([all(humidity_df['datetime'] == pressure_df['datetime']),
all(pressure_df['datetime'] == temperature_df['datetime']),
all(temperature_df['datetime'] == weather_description_df['datetime']),
all(weather_description_df['datetime'] == wind_speed_df['datetime']),
all(wind_speed_df['datetime'] == wind_direction_df['datetime'])])
)




Number of records for humidity:45253
 Number of records for pressure:45253
 Number of records for humidity:45253
 Number of records for weather description:45253
 Number of records for wind speed:45253
 Number of records for wind direction:45253
 Are all Records the same:  True
Are all datetime records the same? True


In [45]:
weather_dict = {"datetime":humidity_df['datetime'], "humidity": humidity_df['Boston'], "pressure": pressure_df['Boston'], "temperature": temperature_df['Boston'], "weather_description": weather_description_df['Boston'], "wind_speed": wind_speed_df['Boston'], "wind_direction": wind_direction_df['Boston']}
weather_df = pd.DataFrame(weather_dict)

In [47]:
# Loading the data into SQL:

# Primary keys will be added in SQL

from sqlalchemy import create_engine
from sqlalchemy.sql import text


engine = create_engine("postgresql+psycopg2://postgres:postgres@localhost:5432/Term_Project_Database")

with engine.connect() as conn:

    crime_df.to_sql('crime_stage', con = engine, schema='stage', if_exists='replace', index=False, index_label=None, chunksize=None, dtype=None, method=None)
    moon_df.to_sql('moon_stage', con = engine, schema='stage', if_exists='replace', index=False, index_label=None, chunksize=None, dtype=None, method=None)
    weather_df.to_sql('weather_stage', con = engine, schema='stage', if_exists='replace', index=False, index_label=None, chunksize=None, dtype=None, method=None)

# Source: https://docs.sqlalchemy.org/en/20/core/engines.html#postgresql, April 1 2023
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html#pandas.DataFrame.to_sql, April 1 2023
# https://docs.sqlalchemy.org/en/20/core/connections.html, April 1 2023
# https://stackoverflow.com/questions/35486721/how-to-prevent-use-of-the-first-row-pandas-dataframe-as-column-names-when-using, April 5 2023
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.rename.html, April 5 2023
