In [149]:
# Import required libraries and custom utility functions
from utils import postgresImportFile, postgresRunSqlScript, postgresGetCredentials
import pandas as pd
from sqlalchemy import create_engine

In [None]:
# Define the target database name
db_name = 'weather_wizards'

# Create and execute the main database schema
sql_file = 'sql/weather_wizards_schema.sql'
result = postgresRunSqlScript(db_name, sql_file)
print(result)

# Create and execute the hurricane staging schema
# Staging tables are temporary tables used for data loading and transformation
sql_file = 'sql/hurricane_staging_schema.sql'
result = postgresRunSqlScript(db_name, sql_file)
print(result)

# Create and execute the tornado staging schema 
sql_file = 'sql/tornado_staging_schema.sql'
result = postgresRunSqlScript(db_name, sql_file)
print(result)

In [None]:
# Import data files configuration and execution
# Structure: Each section loads specific weather-related data into staging tables

# 1. Load tornado details into staging
table_name = 'tornado_details_staging'
file_location = 'data/tornado_details.csv'
results = postgresImportFile(db_name, table_name, file_location, delimiter=',', headerline=True)  
print (f"{results}")     

In [None]:
# 2. Load hurricane summary data into staging
table_name = 'hurricane_summary_staging'
file_location = 'data/hurricane_summary.csv'
results = postgresImportFile(db_name, table_name, file_location, delimiter=',', headerline=True)  
print (f"{results}")

# 3. Load hurricane details into staging
# Note: This table receives data from two sources - current and retired hurricanes
table_name = 'hurricane_details_staging'
file_location = 'data/hurricane_details.csv'
results = postgresImportFile(db_name, table_name, file_location, delimiter=',', headerline=True)  
print (f"{results}")

# Load retired hurricane details into the same staging table
file_location = 'data/hurricane_details_retired.csv'
results = postgresImportFile(db_name, table_name, file_location, delimiter=',', headerline=True)  
print (f"{results}")

# 4. Load hurricane archive summary
table_name = 'hurricane_archive_summary_staging'
file_location = 'data/hurricane_archive_summary.csv'
results = postgresImportFile(db_name, table_name, file_location, delimiter=',', headerline=True)  
print (f"{results}")

In [None]:
# 5. Load radar station reference data
# Note: This is reference data, not staging data
table_name = 'radar_station'
file_location = 'data/radar_locations.csv'
results = postgresImportFile(db_name, table_name, file_location, delimiter=',', headerline=True)  
print (f"{results}")

In [None]:
# Reference for zip code data source
# https://simplemaps.com/data/us-zips

# Create database connection using SQLAlchemy
# Uses credentials from postgresGetCredentials utility function
engine = create_engine(postgresGetCredentials(engine_type='sqlalchemy') + '/' + db_name)

# Transform the ZIP code data:
# 1. Select only needed columns
# 2. Clean and standardize the data
# 3. Rename columns to match database schema
zip_data = pd.read_csv('data/uszips.csv')
# zip_data.info()
zip_data_region = (
    # Select subset of columns we want to keep
    zip_data[['city', 'state_id', 'zip', 'population', 'lat', 'lng']]
    .assign(
        # Clean population data:
        # - Replace missing values with 0
        # - Convert to integer type
        population=lambda zip_data: zip_data['population'].fillna(0).astype(int),
        # Clean ZIP code data:
        # - Replace missing values with 0
        # - Convert to string
        # - Remove any potential whitespace
        # - Pad with leading zeros to ensure 5 digits
        zip=lambda zip_data: zip_data['zip'].fillna('0').astype(str).str.strip().str.zfill(5)
    )
    # Rename columns to match database schema naming conventions
    .rename(columns={'state_id': 'state', 
                     'zip_code': 'zip', 
                     'population': 'population_size', 
                     'lat': 'latitude', 
                     'lng': 'longitude'})
)

# Import transformed data into PostgreSQL database:
# - Table name: 'region'
# - If table exists, append new data
# - Don't include DataFrame index as a column
zip_data_region.to_sql('region', con=engine, if_exists='append', index=False)

In [None]:
# Execute SQL script to transform data from staging tables into the main database schema
# This script handles multiple transformations:

#  Load storm tables from staging:
#  - Transform and standardize tornado data from tornado_details_staging
#  - Transform and combine hurricane data from:
#    * hurricane_summary_staging
#    * hurricane_details_staging
#    * hurricane_archive_summary_staging
#  - Apply data quality rules and standardization

sql_file = 'sql/load_weather_tables.sql'
result = postgresRunSqlScript(db_name, sql_file)
print(result)