In [1]:
import pandas as pd
from pyathena import connect
from sklearn.model_selection import train_test_split
import boto3

# Set up Athena connection
conn = connect(
    s3_staging_dir='s3://ecogridadata/athena/',
    region_name='us-east-1'
)
cursor = conn.cursor()

# Set up S3 client
s3 = boto3.client("s3")
bucket = "ecogridadata"

In [2]:
# --- Subregion Dataset ---
query_subregion = "SELECT * FROM ecodataaidatabase.subregion"
df_subregion = pd.read_sql(query_subregion, conn)

# Clean: drop unnecessary columns
df_subregion.drop(columns=['subba-name', 'parent-name', 'value-units'], inplace=True)

# Scrub: drop duplicates and rows missing critical fields
df_subregion.drop_duplicates(inplace=True)
df_subregion.dropna(subset=['period', 'value'], inplace=True)

# Feature engineering: convert period to datetime and create new features
df_subregion['period'] = pd.to_datetime(df_subregion['period'])
df_subregion['month'] = df_subregion['period'].dt.month
df_subregion['weekday'] = df_subregion['period'].dt.weekday

# Preview
print("Subregion DataFrame after cleaning:")
display(df_subregion.head())

  df_subregion = pd.read_sql(query_subregion, conn)


Subregion DataFrame after cleaning:


Unnamed: 0,period,subba,parent,timezone,value,month,weekday
0,2024-12-31,PGAE,CISO,Arizona,247213,12,1
1,2024-12-31,PGAE,CISO,Central,247876,12,1
2,2024-12-31,PGAE,CISO,Eastern,248481,12,1
3,2024-12-31,PGAE,CISO,Mountain,247213,12,1
4,2024-12-31,PGAE,CISO,Pacific,246697,12,1


In [3]:
# --- Energysource Dataset ---
query_energysource = "SELECT * FROM ecodataaidatabase.energysource"
df_energysource = pd.read_sql(query_energysource, conn)

# Clean: drop unnecessary columns
df_energysource.drop(columns=['respondent-name', 'type-name', 'timezone-description', 'value-units'], inplace=True)

# Scrub: drop duplicates and rows missing critical fields
df_energysource.drop_duplicates(inplace=True)
df_energysource.dropna(subset=['period', 'value', 'fueltype'], inplace=True)

# Feature engineering: convert period to datetime and add time features
df_energysource['period'] = pd.to_datetime(df_energysource['period'])
df_energysource['month'] = df_energysource['period'].dt.month
df_energysource['weekday'] = df_energysource['period'].dt.weekday

# Preview
print("Energysource DataFrame after cleaning:")
display(df_energysource.head())

  df_energysource = pd.read_sql(query_energysource, conn)


Energysource DataFrame after cleaning:


Unnamed: 0,period,respondent,fueltype,timezone,value,month,weekday
0,2024-12-31,AECI,COL,Arizona,23782,12,1
1,2024-12-31,AECI,COL,Central,23309,12,1
2,2024-12-31,AECI,COL,Eastern,22893,12,1
3,2024-12-31,AECI,COL,Mountain,23782,12,1
4,2024-12-31,AECI,COL,Pacific,24422,12,1


In [6]:
# --- Neighboring Balancing Dataset ---
query_neighboring = "SELECT * FROM ecodataaidatabase.neighboring_bal"
df_neighboring_bal = pd.read_sql(query_neighboring, conn)

# Clean: drop unnecessary columns
df_neighboring_bal.drop(columns=['fromba-name', 'toba-name', 'value-units'], inplace=True)

# Scrub: drop duplicates and rows missing critical fields
df_neighboring_bal.drop_duplicates(inplace=True)
df_neighboring_bal.dropna(subset=['period', 'value', 'fromba', 'toba'], inplace=True)

# Feature engineering: convert period to datetime and add new features
df_neighboring_bal['period'] = pd.to_datetime(df_neighboring_bal['period'])
df_neighboring_bal['month'] = df_neighboring_bal['period'].dt.month
df_neighboring_bal['weekday'] = df_neighboring_bal['period'].dt.weekday

# Preview
print("Neighboring Balancing DataFrame after cleaning:")
display(df_neighboring_bal.head())

  df_neighboring_bal = pd.read_sql(query_neighboring, conn)


Neighboring Balancing DataFrame after cleaning:


Unnamed: 0,period,fromba,toba,timezone,value,month,weekday
0,2023-04-03,WALC,SRP,Eastern,4533,4,0
1,2023-04-03,WALC,SRP,Mountain,3820,4,0
2,2023-04-03,WALC,SRP,Pacific,3490,4,0
3,2023-04-03,WALC,TEPC,Arizona,-262,4,0
4,2023-04-03,WALC,TEPC,Central,-396,4,0


In [7]:
# --- Demand Dataset ---
query_demand = "SELECT * FROM ecodataaidatabase.demand"
df_demand = pd.read_sql(query_demand, conn)

# Scrub: drop duplicates and rows missing critical fields
df_demand.drop_duplicates(inplace=True)
df_demand.dropna(subset=['period', 'value'], inplace=True)

# Feature engineering: convert period to datetime and add new features
df_demand['period'] = pd.to_datetime(df_demand['period'])
df_demand['month'] = df_demand['period'].dt.month
df_demand['weekday'] = df_demand['period'].dt.weekday

# Preview
print("Demand DataFrame after cleaning:")
display(df_demand.head())

  df_demand = pd.read_sql(query_demand, conn)


Demand DataFrame after cleaning:


Unnamed: 0,period,respondent,respondent-name,type,type-name,timezone,timezone-description,value,value-units,month,weekday


In [18]:
# Test Demand Query
df = pd.read_sql("SELECT * FROM ecodataaidatabase.demand LIMIT 10;", conn)
df.head()

  df = pd.read_sql("SELECT * FROM ecodataaidatabase.demand LIMIT 10;", conn)


Unnamed: 0,period,respondent,respondent-name,type,type-name,timezone,timezone-description,value,value-units
0,2025-03-24,AECI,"Associated Electric Cooperative, Inc.",DF,Day-ahead demand forecast,Central,Central,,
1,2025-03-24,AECI,"Associated Electric Cooperative, Inc.",DF,Day-ahead demand forecast,Eastern,Eastern,,
2,2025-03-24,AVA,Avista Corporation,DF,Day-ahead demand forecast,Arizona,Arizona,,
3,2025-03-24,AVA,Avista Corporation,DF,Day-ahead demand forecast,Central,Central,,
4,2025-03-24,AVA,Avista Corporation,DF,Day-ahead demand forecast,Eastern,Eastern,,


In [None]:
# Save cleaned DataFrames to local CSV files in folder "data_csv"
df_subregion.to_csv("data_csv/subregion_cleaned.csv", index=False)
df_energysource.to_csv("data_csv/energysource_cleaned.csv", index=False)
df_neighboring_bal.to_csv("data_csv/neighboring_bal_cleaned.csv", index=False)
df_demand.to_csv("data_csv/demand_cleaned.csv", index=False)

print("✅ Cleaned CSVs saved locally in 'data_csv/' folder.")

### Saving Cleaned CSV to S3

In [None]:
# Upload each cleaned CSV to the "cleaned" folder in your S3 bucket
s3.upload_file("data_csv/subregion_cleaned.csv", bucket, "cleaned/subregion.csv")
s3.upload_file("data_csv/energysource_cleaned.csv", bucket, "cleaned/energysource.csv")
s3.upload_file("data_csv/neighboring_bal_cleaned.csv", bucket, "cleaned/neighboring_bal.csv")
s3.upload_file("data_csv/demand_cleaned.csv", bucket, "cleaned/demand.csv")

print("✅ Cleaned CSVs uploaded to s3://{}/cleaned/".format(bucket))

# Create External Athena Tables for Cleaned Data

In [None]:
# --- Create Athena Table for Subregion ---
query = """
DROP TABLE IF EXISTS ecodataaidatabase.subregion_cleaned;
CREATE EXTERNAL TABLE IF NOT EXISTS ecodataaidatabase.subregion_cleaned (
    period string,
    subba string,
    parent string,
    timezone string,
    value int,
    month int,
    weekday int
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    "separatorChar" = ",",
    "quoteChar" = "\""
)
LOCATION 's3://ecogridadata/cleaned/subregion.csv'
TBLPROPERTIES ('skip.header.line.count'='1')
"""
cursor.execute(query)
print("✅ Athena table 'subregion_cleaned' created.")

# --- Create Athena Table for Energysource ---
query = """
DROP TABLE IF EXISTS ecodataaidatabase.energysource_cleaned;
CREATE EXTERNAL TABLE IF NOT EXISTS ecodataaidatabase.energysource_cleaned (
    period string,
    respondent string,
    fueltype string,
    timezone string,
    value int,
    month int,
    weekday int
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    "separatorChar" = ",",
    "quoteChar" = "\""
)
LOCATION 's3://ecogridadata/cleaned/energysource.csv'
TBLPROPERTIES ('skip.header.line.count'='1')
"""
cursor.execute(query)
print("✅ Athena table 'energysource_cleaned' created.")

# --- Create Athena Table for Neighboring Bal ---
query = """
DROP TABLE IF EXISTS ecodataaidatabase.neighboring_bal_cleaned;
CREATE EXTERNAL TABLE IF NOT EXISTS ecodataaidatabase.neighboring_bal_cleaned (
    period string,
    fromba string,
    toba string,
    timezone string,
    value int,
    month int,
    weekday int
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    "separatorChar" = ",",
    "quoteChar" = "\""
)
LOCATION 's3://ecogridadata/cleaned/neighboring_bal.csv'
TBLPROPERTIES ('skip.header.line.count'='1')
"""
cursor.execute(query)
print("✅ Athena table 'neighboring_bal_cleaned' created.")

# --- Create Athena Table for Demand ---
query = """
DROP TABLE IF EXISTS ecodataaidatabase.demand_cleaned;
CREATE EXTERNAL TABLE IF NOT EXISTS ecodataaidatabase.demand_cleaned (
    period string,
    respondent string,
    type string,
    timezone string,
    value int,
    month int,
    weekday int
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    "separatorChar" = ",",
    "quoteChar" = "\""
)
LOCATION 's3://ecogridadata/cleaned/demand.csv'
TBLPROPERTIES ('skip.header.line.count'='1')
"""
cursor.execute(query)
print("✅ Athena table 'demand_cleaned' created.")

In [None]:
# Verify Subregion table
print("Subregion Cleaned:")
df_check = pd.read_sql("SELECT * FROM ecodataaidatabase.subregion_cleaned LIMIT 5", conn)
display(df_check)

# Verify Energysource table
print("Energysource Cleaned:")
df_check = pd.read_sql("SELECT * FROM ecodataaidatabase.energysource_cleaned LIMIT 5", conn)
display(df_check)

# Verify Neighboring Bal table
print("Neighboring Bal Cleaned:")
df_check = pd.read_sql("SELECT * FROM ecodataaidatabase.neighboring_bal_cleaned LIMIT 5", conn)
display(df_check)

# Verify Demand table
print("Demand Cleaned:")
df_check = pd.read_sql("SELECT * FROM ecodataaidatabase.demand_cleaned LIMIT 5", conn)
display(df_check)