In [3]:
from rfo_core.aws.iam import ensure_glue_service_role_exists, resolve_role_arn, get_aws_session
from rfo_core.aws.s3 import ensure_s3_bucket_exists
from rfo_core.aws.s3 import get_bucket_name, create_s3_subfolders
from rfo_core.configuration import (
    aws_key, aws_secret, aws_service_role_name,
    aws_region_default, aws_default_sync_mode, aws_versioning_on
)

In [4]:
import awswrangler as wr
import boto3 as bt

In [5]:
def aws_get_session(region: str = aws_region_default) -> bt.Session:
    return get_aws_session(aws_key=aws_key, aws_secret=aws_secret, aws_region=region)

In [6]:
session = aws_get_session(region='us-east-1')

In [7]:
table="rfo_weather_enriched"

In [8]:
# checking all unique datatypes in the table
sql = "SELECT DISTINCT datatype FROM rfo_weather_enriched"
unique_datatypes = wr.athena.read_sql_query(
    sql=sql,
    database="rfo_analytics",
    boto3_session=session
)
unique_datatypes['datatype'].tolist()

            datatype
0          windSpeed
1   relativeHumidity
2    temperature_max
3           dewpoint
4       weighted_cdd
5            gas_hdd
6        temperature
7       electric_cdd
8      windDirection
9     population_hdd
10           gas_cdd
11      electric_hdd
12        cloudCover
13      weighted_hdd
14   temperature_avg
15    population_cdd
16   temperature_min
17         windChill
18         heatIndex


In [43]:
#To query all columns
# table_info = wr.catalog.table(
#     database="rfo_analytics", 
#     table=table, 
#     boto3_session=session
# )
# columns = table_info['Column Name'].values

#If you know which columns to query
columns = ["datetime","objectid","datatype","avgvalue","siteid","name","station_name","state","region","location","timezone"]

# Build SELECT clause with automatic datetime casting
select_parts = []
for col_name in columns:
    if col_name.lower() == 'datetime':
        select_parts.append(f"CAST({col_name} AS timestamp) as {col_name}")
    else:
        select_parts.append(col_name)

select_clause = ",\n    ".join(select_parts)

# Build and execute query
sql = f"""SELECT 
    {select_clause}
FROM {table}
WHERE region IN ('CAISO', 'NWPP', 'SOUTHWEST')
  AND CAST(datetime AS DATE) BETWEEN DATE '2016-01-01' AND DATE '2025-12-31'
LIMIT 100000
"""

df = wr.athena.read_sql_query(
    sql=sql,
    database="rfo_analytics",
    boto3_session=session
)

In [11]:
#save df a csv file
df.to_csv("ALL_CAINWPPSW_10yrs_100000.csv", index=False)

In [49]:
# Get other columns (excluding those used in pivot)
other_cols = [col for col in df.columns if col not in ['datatype', 'avgvalue']]
other_info = df.reset_index()[other_cols].drop_duplicates(subset=['objectid', 'datetime'])

# Merge with pivot_df on objectid and datetime
pivot_df = pivot_df.merge(other_info, on=['objectid', 'datetime'], how='left')

In [50]:
pivoit_df

NameError: name 'pivoit_df' is not defined

In [46]:
print(pivot_df.head(30).to_string(max_rows=30, max_cols=10))

datatype objectid            datetime  cloudCover  dewpoint  heatIndex  ...  weighted_cdd  weighted_hdd  windChill  windDirection  windSpeed
0         W000180 2025-08-20 00:00:00         NaN       NaN        NaN  ...           NaN           0.0        NaN            NaN        NaN
1         W000180 2025-08-21 00:00:00         NaN       NaN        NaN  ...           NaN           0.3        NaN            NaN        NaN
2         W000180 2025-08-21 09:00:00         NaN       NaN       64.0  ...           NaN           NaN        NaN            NaN        NaN
3         W000180 2025-08-21 10:00:00         NaN       NaN       63.0  ...           NaN           NaN        NaN            NaN        NaN
4         W000180 2025-08-21 11:00:00         NaN       NaN       61.0  ...           NaN           NaN        NaN            NaN        NaN
5         W000180 2025-08-21 12:00:00         NaN       NaN       59.0  ...           NaN           NaN        NaN            NaN        NaN
6         W00

In [48]:
#print unique columns of pivot_df
print(pivot_df.nunique())

datatype
objectid             46
datetime            341
cloudCover           88
dewpoint             56
heatIndex            74
relativeHumidity     94
temperature          71
temperature_avg      78
temperature_max      47
temperature_min      46
weighted_cdd         89
weighted_hdd         11
windChill            73
windDirection       297
windSpeed            32
dtype: int64


In [30]:
# how many rows does a unique objectid, datetime pair have?
# Count the number of rows for each unique (objectid, datetime) pair and show only those with more than one row
counts = df.groupby(['objectid', 'datetime']).size().reset_index(name='row_count')
counts["row_count"].value_counts()

row_count
8     6029
7     3820
6     1854
1      827
2      624
5      552
4      501
3      440
12     164
13     153
11      99
10      42
9       31
Name: count, dtype: int64

In [34]:
#index df by objectid and datetime
df.set_index(['objectid', 'datetime'], inplace=True)

In [36]:
df.head(30)

Unnamed: 0_level_0,Unnamed: 1_level_0,datatype,avgvalue,siteid,name,station_name,state,region,location,timezone
objectid,datetime,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
W000205,2025-08-30 00:00:00,weighted_hdd,0.0,CAISO,,CAISO,,CAISO,,
W000229,2025-09-01 12:00:00,windChill,83.0,KLAS,NV - Las Vegas/McCarran Intl,Las Vegas,NV,SOUTHWEST,MCCARRAN INTERNATIONAL AIRPORT,America/Los_Angeles
W000229,2025-09-01 13:00:00,windChill,83.0,KLAS,NV - Las Vegas/McCarran Intl,Las Vegas,NV,SOUTHWEST,MCCARRAN INTERNATIONAL AIRPORT,America/Los_Angeles
W000229,2025-09-01 14:00:00,windChill,82.0,KLAS,NV - Las Vegas/McCarran Intl,Las Vegas,NV,SOUTHWEST,MCCARRAN INTERNATIONAL AIRPORT,America/Los_Angeles
W000229,2025-09-01 15:00:00,windChill,87.0,KLAS,NV - Las Vegas/McCarran Intl,Las Vegas,NV,SOUTHWEST,MCCARRAN INTERNATIONAL AIRPORT,America/Los_Angeles
W000229,2025-09-01 16:00:00,windChill,91.0,KLAS,NV - Las Vegas/McCarran Intl,Las Vegas,NV,SOUTHWEST,MCCARRAN INTERNATIONAL AIRPORT,America/Los_Angeles
W000229,2025-09-01 17:00:00,windChill,94.0,KLAS,NV - Las Vegas/McCarran Intl,Las Vegas,NV,SOUTHWEST,MCCARRAN INTERNATIONAL AIRPORT,America/Los_Angeles
W000229,2025-09-01 18:00:00,windChill,97.0,KLAS,NV - Las Vegas/McCarran Intl,Las Vegas,NV,SOUTHWEST,MCCARRAN INTERNATIONAL AIRPORT,America/Los_Angeles
W000229,2025-09-01 19:00:00,windChill,99.0,KLAS,NV - Las Vegas/McCarran Intl,Las Vegas,NV,SOUTHWEST,MCCARRAN INTERNATIONAL AIRPORT,America/Los_Angeles
W000229,2025-09-01 20:00:00,windChill,100.0,KLAS,NV - Las Vegas/McCarran Intl,Las Vegas,NV,SOUTHWEST,MCCARRAN INTERNATIONAL AIRPORT,America/Los_Angeles


In [42]:
duplicates = counts[counts['row_count'] > 1][['objectid', 'datetime']]
df.loc[df.index.isin([tuple(x) for x in duplicates.values])].sort_index().head(30)
# i see that for a specific objectid and datetime, there are multiple rows, each with a different value for 'avgvalue' and 'datatype'. i want to check if other columns could also vary
other_cols = [col for col in df.columns if col not in ['datatype', 'avgvalue']]
varying_cols = {}

for col in other_cols:
    # For each duplicate (objectid, datetime) pair, check if the column has more than one unique value
    n_unique = df.loc[df.index.isin([tuple(x) for x in duplicates.values])].groupby(level=[0,1])[col].nunique()
    if (n_unique > 1).any():
        varying_cols[col] = n_unique[n_unique > 1]

varying_cols


{}