### Clean and Combine Weather and Energy Datasets

In [1]:
# Import Dependencies
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf, to_date, when, split
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
import us

In [2]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [3]:
# Start Spark session
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

### Weather csv

In [4]:
# Read weather CSV
weather_data = "Resources/usweather_month_1981-2023.csv"
weatherspark_df = spark.read.csv(weather_data, header=True, inferSchema=True)
weather_df = weatherspark_df.toPandas()
weather_df.head()

Unnamed: 0,st_abb,st_code,ym,tmin,tmax,tavg,ppt
0,AL,1,198101,-3.205,11.428,4.112,34.136
1,AR,5,198101,-3.541,10.039,3.249,27.502
2,AZ,4,198101,0.343,14.894,7.619,17.308
3,CA,6,198101,2.264,13.607,7.936,113.911
4,CO,8,198101,-9.283,7.325,-0.979,7.307


In [5]:
# Convert abbreviations to full state names and rename columns
weather_df = (weather_df
              .assign(state=lambda x: x['st_abb'].apply(lambda abb: us.states.lookup(abb).name if us.states.lookup(abb) else None)) # add state names
              .rename(columns={'ym': 'date'})) # rename date for merge

# Convert date to datetime and temperatures from Celsius to Fahrenheit
weather_df['date'] = pd.to_datetime(weather_df['date'], format='%Y%m', errors='coerce')
weather_df[['tmin', 'tmax', 'tavg']] = weather_df[['tmin', 'tmax', 'tavg']].apply(lambda x: (x * 9/5) + 32)

# Drop unused columns and handle missing values
weather_df = (weather_df.drop(['st_code'], axis=1).dropna().fillna(0))
weather_df.head()

Unnamed: 0,st_abb,date,tmin,tmax,tavg,ppt,state
0,AL,1981-01-01,26.231,52.5704,39.4016,34.136,Alabama
1,AR,1981-01-01,25.6262,50.0702,37.8482,27.502,Arkansas
2,AZ,1981-01-01,32.6174,58.8092,45.7142,17.308,Arizona
3,CA,1981-01-01,36.0752,56.4926,46.2848,113.911,California
4,CO,1981-01-01,15.2906,45.185,30.2378,7.307,Colorado


### Energy csv

In [6]:
# Read energy CSV
energy_data = "Resources/Total_consumption_for_all_sectors.csv"
energyspark_df = spark.read.csv(energy_data, header=True, inferSchema=True)
energy_df = energyspark_df.toPandas()
energy_df.head()

Unnamed: 0,description,units,source key,1-Jan,1-Feb,1-Mar,1-Apr,1-May,1-Jun,1-Jul,...,23-Dec,24-Jan,24-Feb,24-Mar,24-Apr,24-May,24-Jun,24-Jul,24-Aug,24-Sep
0,Total consumption for all sectors,,,,,,,,,,...,,,,,,,,,,
1,United States,,ELEC.CONS_TOT..M,,,,,,,,...,,,,,,,,,,
2,United States : coal,thousand tons,ELEC.CONS_TOT.COW-US-99.M,90951.0,77545.0,80268.0,72530.0,78810.0,84486.0,93653.0,...,32833.0,43324.0,26700.0,23151.0,21978.0,26929.0,35182.0,41276.0,40239.0,32298.0
3,United States : petroleum liquids,thousand barrels,ELEC.CONS_TOT.PEL-US-99.M,32350.0,17915.0,20239.0,19059.0,17661.0,20112.0,18089.0,...,2049.0,3510.0,1729.0,1795.0,2275.0,2023.0,2030.0,2060.0,2114.0,1781.0
4,United States : petroleum coke,thousand tons,ELEC.CONS_TOT.PC-US-99.M,393.0,357.0,354.0,297.0,346.0,359.0,425.0,...,200.0,197.0,152.0,99.0,147.0,165.0,218.0,235.0,233.0,150.0


In [7]:
# Drop rows with missing values and split the 'description' column
energy_df = (energy_df
             .dropna()
             .assign(state=energy_df['description'].str.split(' : ').str[0],
                     energy=energy_df['description'].str.split(' : ').str[1])
             .drop(['description', 'source key', 'units'], axis=1))

# Melt the DataFrame and pivot it
pivot_df = (energy_df
            .melt(id_vars=['state', 'energy'], var_name='date', value_name='value')
            .pivot_table(index=['state', 'date'], columns='energy', values='value', aggfunc='sum', fill_value=0)
            .replace('--', 0)
            .reset_index()
            .assign(st_abb=lambda x: x['state'].apply(lambda state_name: us.states.lookup(state_name).abbr if us.states.lookup(state_name) else None)))

# Convert the 'date' column to datetime format
def convert_date(date_str):
    # Extract the prefix number and month
    prefix, month_abbr = date_str.split('-')
    year = 2000 + int(prefix)
    return f"{year}-{month_abbr}-01"

# Apply the conversion function to the date column
pivot_df['date'] = pivot_df['date'].apply(convert_date)
pivot_df['date'] = pd.to_datetime(pivot_df['date'], format='%Y-%b-%d', errors='coerce')

# Define columns for conversion and convert to numeric
numeric = ['coal', 'natural gas', 'petroleum coke', 'petroleum liquids']
pivot_df[numeric] = pivot_df[numeric].apply(pd.to_numeric, errors='coerce')

# Convert natural gas and petroleum liquids to kilotons, and combine petroleum coke and liquids
pivot_df['natural gas'] *= 0.027
pivot_df['petroleum liquids'] *= 0.136

# Display the resulting DataFrame
pivot_df.head()

energy,state,date,coal,natural gas,petroleum coke,petroleum liquids,st_abb
0,Alabama,2001-04-01,2236.0,209.898,0.0,28.152,AL
1,Alabama,2001-08-01,3486.0,367.227,0.0,8.84,AL
2,Alabama,2001-12-01,2549.0,293.868,0.0,11.832,AL
3,Alabama,2001-02-01,2729.0,166.158,0.0,16.592,AL
4,Alabama,2001-01-01,3076.0,210.654,0.0,46.24,AL


### Merge the Dataframes and Export File

In [8]:
# Merge the dataframes on state, abbr, and date
df_merged = pd.merge(weather_df, pivot_df, on=['state', 'st_abb', 'date'])

# Rearrange the columns and rename them
df_merged = df_merged.reindex(columns=['state', 'st_abb', 'date', 'tmin', 'tmax', 'tavg', 'ppt',
                                       'coal', 'natural gas', 'petroleum coke', 'petroleum liquids'])
df_merged.rename(columns={'tmin': 'tmin (f)', 'tmax': 'tmax (f)', 'tavg': 'tavg (f)'}, inplace=True)
df_merged.head()

Unnamed: 0,state,st_abb,date,tmin (f),tmax (f),tavg (f),ppt,coal,natural gas,petroleum coke,petroleum liquids
0,Alabama,AL,2001-01-01,29.1524,51.8234,40.487,119.501,3076.0,210.654,0.0,46.24
1,Arkansas,AR,2001-01-01,26.6612,45.7412,36.2012,88.598,1320.0,77.895,0.0,30.192
2,Arizona,AZ,2001-01-01,28.2722,51.9206,40.0964,41.955,1773.0,194.994,0.0,36.448
3,California,CA,2001-01-01,31.7372,52.7198,42.2276,87.828,154.0,2879.145,94.0,85.0
4,Colorado,CO,2001-01-01,11.2748,37.4252,24.35,25.485,1777.0,176.526,0.0,7.072


In [9]:
# Export to csv
df_merged.to_csv('Resources/weather_energy_merged.csv', index=False)