In [203]:
from usagrid import s3
import pandas as pd
import numpy as np
from pathlib import Path


pd.options.display.float_format = '{:20,.2f}'.format


def summary_stats(df:pd.DataFrame,file_path) -> pd.DataFrame:

    d = df.describe()

    d.loc["sum"] = df.sum(axis=0)
    d.loc["na_count"] = df.isna().sum(axis=0).astype(int)
    d.loc["expected_count"] = df.shape[0]

    d.loc["null_pct"] = d.loc["na_count"]/d.loc["expected_count"]

    d = d.T

    d = d.assign(file_path=file_path)

    return d

def to_columnar(df:pd.DataFrame) -> pd.DataFrame:

    df = df.assign(year=df.index.year)

    df_columnar = df.reset_index().pivot_table(columns=["year","respondent","respondent_name","type","type_name",
    "timezone","timezone_description","value_units"],values="value",index="period")

    return df_columnar


def freq_ns_str(freq:int) -> str:

    freq = int(freq)

    nanoseconds_to_datetime = {9e+11:"15T",
    1.8e+12:"30T",
    3.6e+12:"60T",
    86400000000000:"1D"
    }

    if freq in nanoseconds_to_datetime:


        return nanoseconds_to_datetime[freq]

    else:
        raise(f"Freq Not Found - Need to implement {freq}")


def calculate_predominant_freqs_str(df:pd.DataFrame) -> list:

    predominant_freqs = list()

    for col in df.columns:
        
        datetime_difference,counts = np.unique(df[col].index.diff(),return_counts=True)

        predominant_freq = datetime_difference[np.argmax(counts)]


        predominant_freq_str = freq_ns_str(predominant_freq)

        predominant_freqs.append(predominant_freq_str)

    return predominant_freqs



def generate_date_time_freq_report(df:pd.DataFrame) -> pd.DataFrame:

    dt_dataframe = pd.DataFrame()

    for col in df.columns[:]:

        respondent_name_index = df.columns.names.index("respondent_name")

        respondent_name = df[[col]].columns[0][respondent_name_index]
        
        datetime_difference,counts = np.unique(df[col].index.diff(),return_counts=True)

        predominant_freq = datetime_difference[np.argmax(counts)]


        predominant_freq_str = freq_ns_str(predominant_freq)


        datetime_difference_frame = np.unique(df[col].index.diff(),return_counts=True)

        df_datetime_freq = pd.DataFrame(datetime_difference_frame).T\
                .rename(columns={0:"date_time",1:"freq"})\
                .assign(respondent_name=respondent_name,
                        predominant_freq=predominant_freq,
                        predominant_freq_str=predominant_freq_str
                        )

        dt_dataframe = pd.concat([dt_dataframe,df_datetime_freq])

    return dt_dataframe





def columnar_add_level(df:pd.DataFrame,level_name:str,level_values:list) -> pd.DataFrame:

    new_level_values = [tuple(list(col)+[level_value])

        for col,level_value in zip(df.columns,level_values)
    ]

    new_level_names = list(df.columns.names)+ [level_name]

    new_levels = pd.MultiIndex.from_tuples(new_level_values,names=new_level_names)


    df.columns = new_levels

    return df


    
def write_report_files(data:pd.DataFrame,medallion:str,folder_name:str,file_name:str):

    report_path = Path(f"{medallion}/balancing_authority/reports/")

    report_path = report_path.joinpath(folder_name)

    report_name = f"{folder_name}_{file_name}"

    report_path = str(report_path.joinpath(report_name))

    s3.write_data_to_s3_pyarrow(bucket_name="usagrid",object_key=report_path,data=data)


In [206]:
from tqdm import tqdm

paths = s3.list_files_in_folder("usagrid","bronze/balancing_authority/")[:-1]

files = [p for p in paths[:] if Path(p).suffix]

pbar =  tqdm(files,leave=True)

for f in pbar:

    pbar.set_description("Processing: " + f)

    read_path = Path(f)

    df_temp = s3.read_pyarrow_df_from_s3("usagrid",f).to_pandas()

    df_columnar = df_temp.pipe(to_columnar)

    predominant_datetime_freqs_string= df_columnar.pipe(calculate_predominant_freqs_str)

    freq_report = df_columnar.pipe(generate_date_time_freq_report)

    df_columnar = df_columnar.pipe(columnar_add_level,level_name="freq",level_values=predominant_datetime_freqs_string)

    df_summary = df_columnar.pipe(summary_stats,file_path=f)

    freq_report.pipe(write_report_files,medallion="silver",folder_name="datetime_freq",file_name=read_path.name)

    df_summary.pipe(write_report_files,medallion="silver",folder_name="summary",file_name=read_path.name)

    s3.write_data_to_s3_pyarrow("usagrid",f.replace("bronze","silver"),df_columnar)



Processing: bronze/balancing_authority/Pacific/Total_interchange/Total_interchange_2024.arrow: 100%|██████████| 60/60 [02:46<00:00,  2.78s/it]                 
