In [3]:
########## Version 1.0 ################# till 20/07
import datetime as dt
from datetime import datetime, timedelta
import time
from time import perf_counter
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
from functools import partial
import os, sys
import json
import s3fs
import io
import boto3
import tarfile 
import datetime as dt
from io import BytesIO
import joblib
from time import (strftime, 
                  perf_counter, 
                  gmtime)
# from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

filesystem = s3fs.S3FileSystem()

# ============================================= Helper Functions ============================================= #
def save_data(
    df: pd.DataFrame,
    file_name: str,
    path: str,
    date_partition: str,
    partition_cols=None, partition_filename_cb=None, filesystem=filesystem
):
    """Write pandas DataFrame in parquet format to S3 bucket
    Args:
        df (pd.DataFrame): DataFrame to save
        file_name (str): Table name
        path (str): local or AWS S3 path to store the parquet files
        partition_cols (list, optional): Columns used to partition the parquet files. Defaults to None.
    """
    
    date = dt.datetime.strptime(date_partition, "%Y-%m-%d")
    folder = f'/year={date.year}/month={date.month}/day={date.day}'

    pq.write_to_dataset(
        pyarrow.Table.from_pandas(df),
        path + folder,
        filesystem=filesystem,
        partition_cols=partition_cols,
#         basename_template = f"{file_name}.parquet"
        partition_filename_cb=lambda x: f"{file_name}.parquet",
    )

def generate_dates(start_date: str, end_date: str):
    """Generates list of dates
    Args:
        start_date (str): Start date
        end_date (str): End date
    Returns:
        List: List of dates
    """
    sdate = dt.datetime.strptime(start_date, "%Y-%m-%d")
    edate = dt.datetime.strptime(end_date, "%Y-%m-%d") + dt.timedelta(days=1)

    return [
        (sdate + dt.timedelta(days=x)).strftime("%Y-%m-%d")
        for x in range((edate - sdate).days)
    ]
        
def read_parquet_tables(
    file_name: str,
    start_date: str,
    end_date: str,
    path: str,
) -> pd.DataFrame:
    """Read parquet file partitions
    Args:
        file_name (str): Table name
        start_date (str): starting date to clean the data (%Y-%m-%d)
        end_date (str): ending date to clean the data (%Y-%m-%d)
        path (str): local or AWS S3 path to read the parquet files
    Returns:
        pd.DataFrame: Datframe from parquet file
    """
    # convert date range to list of dates
    date_list = generate_dates(start_date, end_date)
    df = pd.DataFrame()
    for read_date in date_list:
        # convert date to integers for filters
        r_year = dt.datetime.strptime(read_date, "%Y-%m-%d").year
        r_month = dt.datetime.strptime(read_date, "%Y-%m-%d").month
        r_day = dt.datetime.strptime(read_date, "%Y-%m-%d").day

        try:
            data = (
                pq.ParquetDataset(
                    path
                    + f"/year={r_year}/month={r_month}/day={r_day}/{file_name}.parquet",
                    filesystem=filesystem,
                )
                .read_pandas()
                .to_pandas()
            )

        except:
            continue  
        df = pd.concat([df, data], ignore_index=True)

    return df

def get_niv(dframe):
    niv_df = dframe[["local_datetime", "ImbalanceQuantity(MAW)(B1780)"]].dropna(how='any').iloc[-1:]  
    niv_val = niv_df.iloc[0]['ImbalanceQuantity(MAW)(B1780)']
    niv_time = niv_df.iloc[0]['local_datetime']
    return niv_val, niv_time


# ============================================= End of Helper Functions ============================================= #

# ============================================= PreProcess Functions ============================================= #

def parse_datetime_to_str(dt_obj):
    """Parses a datetime object to string for model callbacks"""
    return (dt_obj
            .strftime("%Y-%m-%d %H-%M-%S")
            .replace("-", "")
            .replace(" ", "_"))


def read_to_dataframe(file_path, file_format="csv", **kwargs):
    """Read a file to DataFrame in memory"""
    assert file_format in ["csv", "parquet"], \
        "Only .csv and .parquet files are supported"
    # read data into memory, assumes no column is in datetime format requiring parsing
    if file_format == "csv":
        data = pd.read_csv(file_path, **kwargs)  # TODO deal with __index__
    elif file_format == "parquet":
        data = pd.read_parquet(file_path, **kwargs)
    return data


def select_best_features(dataframe):
    """Remove redundant/not useful columns from dataframe"""
    interconnector_cols = [c for c in dataframe.columns if "int" in c and "FUEL" in c]
    dataframe["Total_Int(FUELHH)"] = dataframe[interconnector_cols].apply(sum, axis=1)
    dataframe["Total_Fossil(FUELHH)"] = (dataframe["coal(FUELHH)"] +
                                         dataframe["ocgt(FUELHH)"] +
                                         dataframe["ccgt(FUELHH)"] +
                                         dataframe["oil(FUELHH)"])
    dataframe["Total_Other(FUELHH)"] = (dataframe["biomass(FUELHH)"] +
                                        dataframe["other(FUELHH)"] +
                                        dataframe["nuclear(FUELHH)"])
    dataframe["Total_Hydro(FUELHH)"] = dataframe["npshyd(FUELHH)"] + dataframe["ps(FUELHH)"]
    fuelhh_drop_cols = [c for c in dataframe.columns if "(FUELHH" in c and "total" not in c.lower()]
    # elexon generation cols
    ele_gen_drop_cols = (["Wind_Offshore_fcst(B1440)", "Wind_Onshore_fcst(B1440)"] +
                         [c for c in dataframe.columns if "windforfuelhh" in c.lower()] +  # WINDFORFUELHH
                         [c for c in dataframe.columns if "(B16" in c] +  # B16xx columns
                         ["Total_Load_fcst(B0620)", "Total_Load(B0610)"])  # + act_ele_gen_drop_cols
    # catalyst wind cols
    wind_pc_cols = [c for c in dataframe.columns if "pc" in c.lower()]  # the actual is very corr. with other winds
    sn_wind_cols = [c for c in dataframe.columns if "sn" in c.lower()]
    cat_wind_drop_cols = [c for c in dataframe.columns if "(Wind_" in c and "wind_act(Wind_unrestricted)" != c]
    cat_wind_drop_cols += wind_pc_cols + sn_wind_cols
    # drop columns with redundant information
    cols_to_remove = [
        "niv_act(Balancing_NIV_fcst_3hr)",  # can be used in post process
        "hist_fcst(Balancing_NIV_fcst_3hr)",  # bad feature, not to be used even in post process
        "niv(Balancing_NIV)",  # can be used in post process
        "indicativeNetImbalanceVolume(DERSYSDATA)",
        "systemSellPrice(DERSYSDATA)",
        "totalSystemAdjustmentSellVolume(DERSYSDATA)",
        "totalSystemAdjustmentBuyVolume(DERSYSDATA)",
        "non_bm_stor(Balancing_detailed)",
        "DAI(MELIMBALNGC)",
        "DAM(MELIMBALNGC)",
        "TSDF(SYSDEM)",
        "ITSDO(SYSDEM)",
        "temperature(TEMP)",
        "ImbalancePriceAmount(B1770)",
        "marketIndexPrice(MID)",
        "marketIndexVolume(MID)",
        "DATF(FORDAYDEM)",
        "DAID(FORDAYDEM)",
        "DAIG(FORDAYDEM)",
        "DANF(FORDAYDEM)"
    ]
    # day ahead auction cols
    daa_gwstep_cols = [c for c in dataframe.columns if "daa_gwstep" in c.lower()]
    daa_windrisk_cols = [c for c in dataframe.columns if "windrisk" in c.lower()]  # daauction/range/wind_risk @catalyst
    daa_xgas_cols = [c for c in dataframe.columns if "daa_xgas" in c.lower()]  # xgas @Catalyst too high corr with gas
    daa_gas_cols = [c for c in dataframe.columns if "daa_gas" in c.lower()]  # @Catalyst
    daa_drop_cols = [
        "price_act(DAA)",
        "price_fcst(DAA)",
        "price_weighted(DAA)",
        "hist_fcst(DAA_1D_8AM)",
        "hist_fcst(DAA_1D_2PM)",
    ]
    daa_drop_cols += daa_gwstep_cols + daa_windrisk_cols + daa_xgas_cols + daa_gas_cols
    # drop the columns listed and return
    cols_to_remove += (  # act_ele_gen_drop_cols +
            fuelhh_drop_cols +
            ele_gen_drop_cols +
            cat_wind_drop_cols +
            daa_drop_cols
    )
    return dataframe.drop(cols_to_remove, axis=1)


def prepare_date_features(dataframe):
    """transform date features into tabular format"""
    # parse weekday information
    dataframe["weekday"] = dataframe["SettlementDate"].apply(lambda x: x.isoweekday())
    # cyclic encode the datetime information; sine because the cycle should start from 0
    dataframe["sp_sin"] = np.sin(dataframe["SettlementPeriod"] * (2 * np.pi / 48))
    dataframe["month_sin"] = np.sin(dataframe["month"] * (2 * np.pi / 12))
    dataframe["week_sin"] = np.sin((dataframe["local_datetime"].dt.weekday + 1) * (2 * np.pi / 7))
    # drop unparsed date column; note that SettlementDate is kept for later groupbys
    dataframe.drop(["local_datetime",
        "month",  # information is already encoded
        "SettlementPeriod",  # information is present in hour already, so remove
        "year",
        "weekday"], axis=1, inplace=True)
    return dataframe


def interpolate_and_clip_outliers(dataframe, cutoff=3):
    """Replaces the outlier value with the previous and next value's average using the column's z statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        for idx in range(len(col)):
            row = col[idx]  # save to variable to avoid re-accessing
            if np.abs(row - col_mean) / col_std > cutoff:
                dataframe.loc[idx, col_name] = np.mean([col[idx - 1], col[idx + 1]])
                z_cutoff = cutoff * col_std
                dataframe.loc[idx, col_name] = np.clip(row, col_mean - z_cutoff, col_mean + z_cutoff)
    return dataframe


def compute_ewm_features(dataframe, window=8, alpha=1 - np.log(2) / 4):
    """Computes the exponentially moving weighted average features"""
    weights = list(reversed([(1 - alpha) ** n for n in range(window)]))
    ewma = partial(np.average, weights=weights)
    ewm_cols = [c for c in dataframe.columns if "Imbalance" not in c and  # exclude target variable
                "(" in c and  # this is to exclude time features
                "FUELHH" not in c and  # exclude FUELHH features
                "cash_out" not in c]  # exclude cash_out(Balancing_detailed) feature
    for c in ewm_cols:
        # compute daily ewm, parametrized by alpha
        dataframe[f"ewm_mean_{c}"] = dataframe[c].rolling(window).apply(ewma)
    return dataframe


def compute_shifted_features(dataframe):
    """Computes the features that can be shifted"""
    # compute  backshifted features
    bshift_2sp_cols = [
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bshift_2sp_cols:
        dataframe[f"bshift_2sp_{c}"] = dataframe[c].shift(-2)
    dataframe["bshift_4sp_boas(Balancing_detailed)"] = dataframe["boas(Balancing_detailed)"].shift(-4)
    # compute back-differenced features
    bdiff_cols = [
        "Generation_fcst(B1430)",
        "boas(Balancing_detailed)",
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bdiff_cols:
        dataframe[f"bdiff_1sp_{c}"] = dataframe[c].diff(-1)
    # compute forward shifted feature; other fshifted features based on other cols did not perform well
    fshift_cols = [
        "boas(Balancing_detailed)",
    ]
    for c in fshift_cols:
        dataframe[f"fshift_1hr_{c}"] = dataframe[c].shift(2)
    return dataframe


def preprocess_dataframe(df : pd.DataFrame):
    t0 = time.perf_counter()
    df["year"] = df['local_datetime'].dt.year
    df["month"] = df['local_datetime'].dt.month
    df = select_best_features(df)  # remove columns based on previous results
    df = prepare_date_features(df)  # parse date into cyclic features
    df = interpolate_and_clip_outliers(df, cutoff=2)  # deal outliers using z statistics
    df = compute_ewm_features(df)  # compute exponentially weighted moving average features
    df = compute_shifted_features(df)  # compute features based on shifting and differencing
    # drop some columns that are not performant or are no longer needed
    df = df.drop(["SettlementDate",
                  "wind_act(Wind_unrestricted)",
                  "totalSystemAcceptedOfferVolume(DERSYSDATA)",
                  "totalSystemAcceptedBidVolume(DERSYSDATA)",
                  "Generation_fcst(B1430)",
                  "intraday(Balancing_detailed)",
                  "Solar_fcst(B1440)",
                  "__index_level_0__"], axis=1)
    print("Time taken to preprocess features in seconds:", time.perf_counter() - t0)
    print(f"Shape of the dataframe: {df.shape}")
    df = df.dropna().iloc[-(8+48):]   # Shifted 48
    df_numy= df.drop("ImbalanceQuantity(MAW)(B1780)", axis=1).to_numpy()
    
    return df_numy

# ============================================= End of PreProcess Functions ============================================= #

# ============================================= Support Main Functions ============================================= #

list_cols = ['SettlementTime', 'SP' , 'niv_predicted_1sp', 'niv_predicted_2sp',
             'niv_predicted_3sp', 'niv_predicted_4sp', 'niv_predicted_5sp', 'niv_predicted_6sp',
             'niv_predicted_7sp', 'niv_predicted_8sp', 'ImbalanceQuantity(MAW)(B1780)']

def check_file_is_exists(bucket_name, write_path, path, day2write, cols=list_cols):
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    start_date = dt.datetime.strptime(day2write, "%Y-%m-%d")
    end_date = dt.datetime.strptime(day2write, "%Y-%m-%d") + dt.timedelta(days=1)
    prefix_path = path + f"/year={start_date.year}/month={start_date.month}/day={start_date.day}/"
    print(prefix_path)
    file_list = list(bucket.objects.filter(Prefix=prefix_path))
    if not len(file_list):
        
        print("No file here")
        # Create new Template
        df = pd.DataFrame(index=pd.date_range(start=start_date, end=end_date, freq="30T"), columns=cols[1:])
        df = df.head(48)
        df.index.name = cols[0]
        df['SP'] = range(0, len(df))
        df['SP'] = df['SP'].apply(lambda x: x % 48 + 1)
        df = df.reset_index()
        print(f"Creating empty file at ", day2write)
        
        save_data(df = df,
                  file_name = 'intraday_pred',
                  path = os.path.join(write_path, path),
                  date_partition = day2write
        )
        print(f"Created empty file at ", day2write)
    else: 
        print('File Exits')

def override_result(df: pd.DataFrame, response_arr):
    time_now = datetime.now()
    time_down = time_now - timedelta(minutes=30)
    df = df[list_cols]
    index = df.index[(df['SettlementTime'] > time_down) & (df['SettlementTime'] < time_now)].to_list()[0]
    print ('index = ', index)
    # Insert Value
    for i in range(len(response_arr)):
        df.at[index, 'niv_predicted_{}sp'.format(i + 1)] = response_arr[i]
        print(index, response_arr[i], '\n')
        index += 1
    return df

def get_scaler(resource, bucket, file_key="", model_type="intraday"):
    """Gets the scaler used during training from S3"""
    prefix = "{}/{}/lgbm-{}".format(
        model_type.split("-")[0], 
        strftime('%Y/%m', gmtime()), model_type
    )
    if not file_key:
        # get the most recent trained model outputs
        list_files = []
        for object_summary in bucket.objects.filter(Prefix=prefix):
            list_files.append(object_summary.key)
        list_files = [x for x in list_files if "output/output" in x]
        sorted_files = sorted([x.split(prefix)[1] for x in list_files])
        file_name = prefix + sorted_files[-1]
        print("Output files available:", sorted_files)
        print("File chosen:", file_name)
        # read tarfile from S3 location defined before
        fileobj = BytesIO(resource.Object(bucket.name, file_name).get()["Body"].read())
        with tarfile.open(fileobj=fileobj) as tarf:
            # only 1 scaler is outputted per training session
            scaler_file_name = [f.name for f in tarf if "scaler" in f.name.lower()][0]
            scaler = joblib.load(tarf.extractfile(scaler_file_name))
    else:
        # read tarfile from S3 location defined in arguments passed during call
        fileobj = BytesIO(resource.Object(bucket.name, file_key).get()["Body"].read())
        with tarfile.open(fileobj=fileobj) as tarf:
            # only 1 scaler is outputted per training session
            scaler_file_name = [f.name for f in tarf if "scaler" in f.name.lower()][0]
            scaler = joblib.load(tarf.extractfile(scaler_file_name))
    return scaler
        
def lambda_handler(event, context): #event, context
#     try:
    
    ### Initialize
    s3 = boto3.resource('s3')
    my_bucket = 'fpt-results-storage'
    pred_folder = 'harry'
    bucket = s3.Bucket(my_bucket)
    bucket_model = s3.Bucket('lgbm-model-storage')
    resource = boto3.resource('s3')
    read_path = "s3://scgc/data/merged"
    write_path = "s3://" + my_bucket
    today    = dt.datetime.now().strftime("%Y-%m-%d")
    next_day = (dt.datetime.now() + dt.timedelta(days=1)).strftime("%Y-%m-%d")
    prev_day = (dt.datetime.now() - dt.timedelta(days=1)).strftime("%Y-%m-%d")

    ### Get predict Data
    merged_df = pd.DataFrame()
    merged_df = read_parquet_tables(
        file_name="merged",
        start_date = prev_day,
        end_date = next_day,
        path = read_path,
    )

    ### Pre-process Data

    print("Pre-Processing...............")
    payload = preprocess_dataframe(merged_df)
    print("Ending reading Payload!")

    scaler = get_scaler(resource, bucket_model, model_type="intraday")
    payload = scaler.transform(payload)
    ### Invoke Endpoint & Get Result
    runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
    print ("Invoking endpoint with payload data")
    response = runtime_sm_client.invoke_endpoint(
        EndpointName="lgbm-regressor-inference",
        ContentType="application/JSON",
        TargetModel="lgbm-intraday.tar.gz",
        Body=json.dumps(payload.tolist()), 
    )
    arr = json.loads(response["Body"].read())
    arr = arr[-8:]

    ### Get old result Data, check exist: if not, create a new template
    check_file_is_exists(my_bucket, write_path, pred_folder, today, cols=list_cols)
    check_file_is_exists(my_bucket, write_path, pred_folder, next_day, cols=list_cols)

    ### Read result file
    result_combine = read_parquet_tables(
        file_name= "intraday_pred",
        start_date = prev_day,
        end_date = next_day,
        path = os.path.join(write_path, pred_folder),
    )

    ### Override the result & NIV insert
    result_file = override_result(result_combine, arr)
    niv_value, niv_time = get_niv(merged_df)
    print (niv_value, niv_time, ' \n')
    niv_inx = result_file.index[(result_file['SettlementTime'] > niv_time - timedelta(minutes=10))
                                & (result_file['SettlementTime'] < niv_time + timedelta(minutes=10))].to_list()[0]  # index.name, index.values
    result_file.at[niv_inx, 'ImbalanceQuantity(MAW)(B1780)'] = niv_value
    print ("Lasted NIV Index in Result file : ", niv_inx, ', at ', niv_time)

    #### Divide and deliver to S3 Result
    result_file['SettlementTime'] = pd.to_datetime(result_file['SettlementTime'], format='%Y-%m-%d')

    prev_data = result_file[result_file['SettlementTime'] < dt.datetime.strptime(today, "%Y-%m-%d")]
    save_data(
        df = prev_data,
        file_name = 'intraday_pred',
        path = os.path.join(write_path, pred_folder),
        date_partition = prev_day
    )

    now_data = result_file[(result_file['SettlementTime'] >= dt.datetime.strptime(today, "%Y-%m-%d"))
        & (result_file['SettlementTime'] < dt.datetime.strptime(next_day, "%Y-%m-%d"))]
    save_data(
        df = now_data,
        file_name = 'intraday_pred',
        path = os.path.join(write_path, pred_folder),
        date_partition = today
    )

    next_data = result_file[result_file['SettlementTime'] >= dt.datetime.strptime(next_day, "%Y-%m-%d")]
    save_data(
        df = next_data,
        file_name = 'intraday_pred',
        path = os.path.join(write_path, pred_folder),
        date_partition = next_day
    )
    print("Successfull")

lambda_handler("", "")

Pre-Processing...............
Time taken to preprocess features in seconds: 0.08747246209532022
Shape of the dataframe: (144, 29)
Ending reading Payload!
Output files available: ['-2022-07-18-08-30-05/output/output.tar.gz', '-2022-07-18-14-21-07/output/output.tar.gz', '-2022-07-19-04-57-41/output/output.tar.gz', '-2022-07-19-05-51-46/output/output.tar.gz', '-2022-07-25-09-01-53/output/output.tar.gz', '-2022-07-25-09-10-03/output/output.tar.gz', '-shift-242022-07-22-03-38-01/output/output.tar.gz']
File chosen: intraday/2022/07/lgbm-intraday-shift-242022-07-22-03-38-01/output/output.tar.gz


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


Invoking endpoint with payload data
harry/year=2022/month=7/day=25/
File Exits
harry/year=2022/month=7/day=26/
File Exits
index =  66
66 -106.19780157339841 

67 -191.9337508347731 

68 -22.493092258727998 

69 5.185060368769268 

70 65.51090507823932 

71 61.371475400900636 

72 -52.72628621725866 

73 19.91677108200124 

-120.2 2022-07-25 09:00:00  

Lasted NIV Index in Result file :  66 , at  2022-07-25 09:00:00
Successfull


In [None]:
################################################ Version 1.2 26/7 ########################################################
import datetime as dt
from datetime import datetime, timedelta
import time
from time import perf_counter
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
from functools import partial
import os, sys
import json
import s3fs
import io
import boto3
import tarfile 
import datetime as dt
from io import BytesIO
import joblib
from time import (strftime, 
                  perf_counter, 
                  gmtime)
# from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

filesystem = s3fs.S3FileSystem()

# ============================================= Helper Functions ============================================= #
def save_data(
    df: pd.DataFrame,
    file_name: str,
    path: str,
    date_partition: str,
    partition_cols=None, partition_filename_cb=None, filesystem=filesystem
):
    """Write pandas DataFrame in parquet format to S3 bucket
    Args:
        df (pd.DataFrame): DataFrame to save
        file_name (str): Table name
        path (str): local or AWS S3 path to store the parquet files
        partition_cols (list, optional): Columns used to partition the parquet files. Defaults to None.
    """
    
    date = dt.datetime.strptime(date_partition, "%Y-%m-%d")
    folder = f'/year={date.year}/month={date.month}/day={date.day}'

    pq.write_to_dataset(
        pyarrow.Table.from_pandas(df),
        path + folder,
        filesystem=filesystem,
        partition_cols=partition_cols,
#         basename_template = f"{file_name}.parquet"
        partition_filename_cb=lambda x: f"{file_name}.parquet",
    )

def generate_dates(start_date: str, end_date: str):
    """Generates list of dates
    Args:
        start_date (str): Start date
        end_date (str): End date
    Returns:
        List: List of dates
    """
    sdate = dt.datetime.strptime(start_date, "%Y-%m-%d")
    edate = dt.datetime.strptime(end_date, "%Y-%m-%d") + dt.timedelta(days=1)

    return [
        (sdate + dt.timedelta(days=x)).strftime("%Y-%m-%d")
        for x in range((edate - sdate).days)
    ]
        
def read_parquet_tables(
    file_name: str,
    start_date: str,
    end_date: str,
    path: str,
) -> pd.DataFrame:
    """Read parquet file partitions
    Args:
        file_name (str): Table name
        start_date (str): starting date to clean the data (%Y-%m-%d)
        end_date (str): ending date to clean the data (%Y-%m-%d)
        path (str): local or AWS S3 path to read the parquet files
    Returns:
        pd.DataFrame: Datframe from parquet file
    """
    # convert date range to list of dates
    date_list = generate_dates(start_date, end_date)
    df = pd.DataFrame()
    for read_date in date_list:
        # convert date to integers for filters
        r_year = dt.datetime.strptime(read_date, "%Y-%m-%d").year
        r_month = dt.datetime.strptime(read_date, "%Y-%m-%d").month
        r_day = dt.datetime.strptime(read_date, "%Y-%m-%d").day

        try:
            data = (
                pq.ParquetDataset(
                    path
                    + f"/year={r_year}/month={r_month}/day={r_day}/{file_name}.parquet",
                    filesystem=filesystem,
                )
                .read_pandas()
                .to_pandas()
            )

        except:
            continue  
        df = pd.concat([df, data], ignore_index=True)

    return df

def get_lastest_value(dframe, column_name):
    tmp = dframe[["local_datetime", column_name]].dropna(how='any').iloc[-1:]  
    val = tmp.iloc[0][column_name]
    stime = tmp.iloc[0]['local_datetime']
    return val, stime


# ============================================= End of Helper Functions ============================================= #

# ============================================= PreProcess Functions ============================================= #


def select_best_features(dataframe):
    """Remove redundant/not useful columns from dataframe"""
    interconnector_cols = [c for c in dataframe.columns if "int" in c and "FUEL" in c]
    dataframe["Total_Int(FUELHH)"] = dataframe[interconnector_cols].apply(sum, axis=1)
    dataframe["Total_Fossil(FUELHH)"] = (dataframe["coal(FUELHH)"] +
                                         dataframe["ocgt(FUELHH)"] +
                                         dataframe["ccgt(FUELHH)"] +
                                         dataframe["oil(FUELHH)"])
    dataframe["Total_Other(FUELHH)"] = (dataframe["biomass(FUELHH)"] +
                                        dataframe["other(FUELHH)"] +
                                        dataframe["nuclear(FUELHH)"])
    dataframe["Total_Hydro(FUELHH)"] = dataframe["npshyd(FUELHH)"] + dataframe["ps(FUELHH)"]
    fuelhh_drop_cols = [c for c in dataframe.columns if "(FUELHH" in c and "total" not in c.lower()]
    # elexon generation cols
    ele_gen_drop_cols = (["Wind_Offshore_fcst(B1440)", "Wind_Onshore_fcst(B1440)"] +
                         [c for c in dataframe.columns if "windforfuelhh" in c.lower()] +  # WINDFORFUELHH
                         [c for c in dataframe.columns if "(B16" in c] +  # B16xx columns
                         ["Total_Load_fcst(B0620)", "Total_Load(B0610)"])  # + act_ele_gen_drop_cols
    # catalyst wind cols
    wind_pc_cols = [c for c in dataframe.columns if "pc" in c.lower()]  # the actual is very corr. with other winds
    sn_wind_cols = [c for c in dataframe.columns if "sn" in c.lower()]
    cat_wind_drop_cols = [c for c in dataframe.columns if "(Wind_" in c and "wind_act(Wind_unrestricted)" != c]
    cat_wind_drop_cols += wind_pc_cols + sn_wind_cols
    # drop columns with redundant information
    cols_to_remove = [
        "niv_act(Balancing_NIV_fcst_3hr)",  # can be used in post process
        "hist_fcst(Balancing_NIV_fcst_3hr)",  # bad feature, not to be used even in post process
        "niv(Balancing_NIV)",  # can be used in post process
        "indicativeNetImbalanceVolume(DERSYSDATA)",
        "systemSellPrice(DERSYSDATA)",
        "totalSystemAdjustmentSellVolume(DERSYSDATA)",
        "totalSystemAdjustmentBuyVolume(DERSYSDATA)",
        "non_bm_stor(Balancing_detailed)",
        "DAI(MELIMBALNGC)",
        "DAM(MELIMBALNGC)",
        "TSDF(SYSDEM)",
        "ITSDO(SYSDEM)",
        "temperature(TEMP)",
        "ImbalancePriceAmount(B1770)",
        "marketIndexPrice(MID)",
        "marketIndexVolume(MID)",
        "DATF(FORDAYDEM)",
        "DAID(FORDAYDEM)",
        "DAIG(FORDAYDEM)",
        "DANF(FORDAYDEM)"
    ]
    # day ahead auction cols
    daa_gwstep_cols = [c for c in dataframe.columns if "daa_gwstep" in c.lower()]
    daa_windrisk_cols = [c for c in dataframe.columns if "windrisk" in c.lower()]  # daauction/range/wind_risk @catalyst
    daa_xgas_cols = [c for c in dataframe.columns if "daa_xgas" in c.lower()]  # xgas @Catalyst too high corr with gas
    daa_gas_cols = [c for c in dataframe.columns if "daa_gas" in c.lower()]  # @Catalyst
    daa_drop_cols = [
        "price_act(DAA)",
        "price_fcst(DAA)",
        "price_weighted(DAA)",
        "hist_fcst(DAA_1D_8AM)",
        "hist_fcst(DAA_1D_2PM)",
    ]
    daa_drop_cols += daa_gwstep_cols + daa_windrisk_cols + daa_xgas_cols + daa_gas_cols
    # drop the columns listed and return
    cols_to_remove += (  # act_ele_gen_drop_cols +
            fuelhh_drop_cols +
            ele_gen_drop_cols +
            cat_wind_drop_cols +
            daa_drop_cols
    )
    return dataframe.drop(cols_to_remove, axis=1)


def prepare_date_features(dataframe):
    """transform date features into tabular format"""
    # parse weekday information
    dataframe["weekday"] = dataframe["SettlementDate"].apply(lambda x: x.isoweekday())
    # cyclic encode the datetime information; sine because the cycle should start from 0
    dataframe["sp_sin"] = np.sin(dataframe["SettlementPeriod"] * (2 * np.pi / 48))
    dataframe["month_sin"] = np.sin(dataframe["month"] * (2 * np.pi / 12))
    dataframe["week_sin"] = np.sin((dataframe["local_datetime"].dt.weekday + 1) * (2 * np.pi / 7))
    # drop unparsed date column; note that SettlementDate is kept for later groupbys
    dataframe.drop(["local_datetime",
        "month",  # information is already encoded
        "SettlementPeriod",  # information is present in hour already, so remove
        "year",
        "weekday"], axis=1, inplace=True)
    return dataframe


def interpolate_and_clip_outliers(dataframe, cutoff=3):
    """Replaces the outlier value with the previous and next value's average using the column's z statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        for idx in range(len(col)):
            row = col[idx]  # save to variable to avoid re-accessing
            if np.abs(row - col_mean) / col_std > cutoff:
                dataframe.loc[idx, col_name] = np.mean([col[idx - 1], col[idx + 1]])
                z_cutoff = cutoff * col_std
                dataframe.loc[idx, col_name] = np.clip(row, col_mean - z_cutoff, col_mean + z_cutoff)
    return dataframe


def compute_ewm_features(dataframe, window=8, alpha=1 - np.log(2) / 4):
    """Computes the exponentially moving weighted average features"""
    weights = list(reversed([(1 - alpha) ** n for n in range(window)]))
    ewma = partial(np.average, weights=weights)
    ewm_cols = [c for c in dataframe.columns if "Imbalance" not in c and  # exclude target variable
                "(" in c and  # this is to exclude time features
                "FUELHH" not in c and  # exclude FUELHH features
                "cash_out" not in c]  # exclude cash_out(Balancing_detailed) feature
    for c in ewm_cols:
        # compute daily ewm, parametrized by alpha
        dataframe[f"ewm_mean_{c}"] = dataframe[c].rolling(window).apply(ewma)
    return dataframe


def compute_shifted_features(dataframe):
    """Computes the features that can be shifted"""
    # compute  backshifted features
    bshift_2sp_cols = [
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bshift_2sp_cols:
        dataframe[f"bshift_2sp_{c}"] = dataframe[c].shift(-2)
    dataframe["bshift_4sp_boas(Balancing_detailed)"] = dataframe["boas(Balancing_detailed)"].shift(-4)
    # compute back-differenced features
    bdiff_cols = [
        "Generation_fcst(B1430)",
        "boas(Balancing_detailed)",
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bdiff_cols:
        dataframe[f"bdiff_1sp_{c}"] = dataframe[c].diff(-1)
    # compute forward shifted feature; other fshifted features based on other cols did not perform well
    fshift_cols = [
        "boas(Balancing_detailed)",
    ]
    for c in fshift_cols:
        dataframe[f"fshift_1hr_{c}"] = dataframe[c].shift(2)
    return dataframe


def preprocess_dataframe(df : pd.DataFrame):
    t0 = time.perf_counter()
    df["year"] = df['local_datetime'].dt.year
    df["month"] = df['local_datetime'].dt.month
    df = select_best_features(df)  # remove columns based on previous results
    df = prepare_date_features(df)  # parse date into cyclic features
    df = interpolate_and_clip_outliers(df, cutoff=2)  # deal outliers using z statistics
    df = compute_ewm_features(df)  # compute exponentially weighted moving average features
    df = compute_shifted_features(df)  # compute features based on shifting and differencing
    # drop some columns that are not performant or are no longer needed
    df = df.drop(["SettlementDate",
                  "wind_act(Wind_unrestricted)",
                  "totalSystemAcceptedOfferVolume(DERSYSDATA)",
                  "totalSystemAcceptedBidVolume(DERSYSDATA)",
                  "Generation_fcst(B1430)",
                  "intraday(Balancing_detailed)",
                  "Solar_fcst(B1440)",
                  "__index_level_0__"], axis=1)
    print("Time taken to preprocess features in seconds:", time.perf_counter() - t0)
    print(f"Shape of the dataframe: {df.shape}")
    df = df.dropna().iloc[-(8+48):]   # Shifted 48
    df_numy= df.drop("ImbalanceQuantity(MAW)(B1780)", axis=1).to_numpy()
    
    return df_numy

# ============================================= End of PreProcess Functions ============================================= #

# ============================================= Support Main Functions ============================================= #

list_cols = ['SettlementTime', 'SP' , 'niv_predicted_1sp', 'niv_predicted_2sp',
             'niv_predicted_3sp', 'niv_predicted_4sp', 'niv_predicted_5sp', 'niv_predicted_6sp',
             'niv_predicted_7sp', 'niv_predicted_8sp', 'dayahead_morning', 'dayahead_afternoon', 
             'ImbalanceQuantity(MAW)(B1780)', 'ImbalancePriceAmount(B1770)', 'marketIndexPrice(MID)', 'price_act(DAA)']

def check_file_is_exists(bucket_name, write_path, path, day2write, cols=list_cols):
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    start_date = dt.datetime.strptime(day2write, "%Y-%m-%d")
    end_date = dt.datetime.strptime(day2write, "%Y-%m-%d") + dt.timedelta(days=1)
    prefix_path = path + f"/year={start_date.year}/month={start_date.month}/day={start_date.day}/"
    print(prefix_path)
    file_list = list(bucket.objects.filter(Prefix=prefix_path))
    if not len(file_list):
        
        print("No file here")
        # Create new Template
        df = pd.DataFrame(index=pd.date_range(start=start_date, end=end_date, freq="30T"), columns=cols[1:])
        df = df.head(48)
        df.index.name = cols[0]
        df['SP'] = range(0, len(df))
        df['SP'] = df['SP'].apply(lambda x: x % 48 + 1)
        df = df.reset_index()
        print(f"Creating empty file at ", day2write)
        
        save_data(df = df,
                  file_name = 'intraday_pred',
                  path = os.path.join(write_path, path),
                  date_partition = day2write
        )
        print(f"Created empty file at ", day2write)
    else: 
        print('File Exits')


def override_result(df: pd.DataFrame, niv_time, response_arr):
    df = df[list_cols]
    index = df.index[(df['SettlementTime'] > niv_time + timedelta(minutes=20)) & (df['SettlementTime'] < niv_time + timedelta(minutes=40))].to_list()[0]
    print ('index = ', index)
    # Insert Value
    for i in range(len(response_arr)):
        df.at[index, 'niv_predicted_{}sp'.format(i + 1)] = response_arr[i]
        print(index, response_arr[i], '\n')
        index += 1
    return df

def override(df: pd.DataFrame, column_name, value, stime):
    inx = df.index[(df['SettlementTime'] > stime - timedelta(minutes=10))
                                & (df['SettlementTime'] < stime + timedelta(minutes=10))].values[0]  # index.name, index.values
    df.at[inx, column_name] = value
    print (f"Lasted {column_name} Index in Result file : ", inx, ', at ', stime)
    return df

def get_scaler(resource, bucket, file_key="", model_type="intraday"):
    """Gets the scaler used during training from S3"""
    prefix = "{}/{}/lgbm-{}".format(
        model_type.split("-")[0], 
        strftime('%Y/%m', gmtime()), model_type
    )
    if not file_key:
        # get the most recent trained model outputs
        list_files = []
        for object_summary in bucket.objects.filter(Prefix=prefix):
            list_files.append(object_summary.key)
        list_files = [x for x in list_files if "output/output" in x]
        sorted_files = sorted([x.split(prefix)[1] for x in list_files])
        file_name = prefix + sorted_files[-1]
        print("Output files available:", sorted_files)
        print("File chosen:", file_name)
        # read tarfile from S3 location defined before
        fileobj = BytesIO(resource.Object(bucket.name, file_name).get()["Body"].read())
        with tarfile.open(fileobj=fileobj) as tarf:
            # only 1 scaler is outputted per training session
            scaler_file_name = [f.name for f in tarf if "scaler" in f.name.lower()][0]
            scaler = joblib.load(tarf.extractfile(scaler_file_name))
    else:
        # read tarfile from S3 location defined in arguments passed during call
        fileobj = BytesIO(resource.Object(bucket.name, file_key).get()["Body"].read())
        with tarfile.open(fileobj=fileobj) as tarf:
            # only 1 scaler is outputted per training session
            scaler_file_name = [f.name for f in tarf if "scaler" in f.name.lower()][0]
            scaler = joblib.load(tarf.extractfile(scaler_file_name))
    return scaler
        
def lambda_handler(event, context):
    
    ### Initialize
    s3 = boto3.resource('s3')
    my_bucket = 'fpt-results-storage'
    pred_folder = 'harry'
    bucket = s3.Bucket(my_bucket)
    bucket_model = s3.Bucket('lgbm-model-storage')
    resource = boto3.resource('s3')
    read_path = "s3://scgc/data/merged"
    write_path = "s3://" + my_bucket
    today    = dt.datetime.now().strftime("%Y-%m-%d")
    next_day = (dt.datetime.now() + dt.timedelta(days=1)).strftime("%Y-%m-%d")
    prev_day = (dt.datetime.now() - dt.timedelta(days=1)).strftime("%Y-%m-%d")

    ### Get predict Data
    merged_df = pd.DataFrame()
    merged_df = read_parquet_tables(
        file_name="merged",
        start_date = prev_day,
        end_date = next_day,
        path = read_path,
    )

    ### Pre-process Data

    print("Pre-Processing...............")
    payload = preprocess_dataframe(merged_df)
    print("Ending reading Payload!")

    scaler = get_scaler(resource, bucket_model, model_type="intraday")
    payload = scaler.transform(payload)
    ### Invoke Endpoint & Get Result
    runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
    print ("Invoking endpoint with payload data")
    response = runtime_sm_client.invoke_endpoint(
        EndpointName="lgbm-regressor-inference",
        ContentType="application/JSON",
        TargetModel="lgbm-intraday.tar.gz",
        Body=json.dumps(payload.tolist()), 
    )
    arr = json.loads(response["Body"].read())
    arr = arr[-8:]

    ### Get old
    result Data, check exist: if not, create a new template
    check_file_is_exists(my_bucket, write_path, pred_folder, prev_day, cols=list_cols)
    check_file_is_exists(my_bucket, write_path, pred_folder, today, cols=list_cols)
    check_file_is_exists(my_bucket, write_path, pred_folder, next_day, cols=list_cols)

    ### Read result file
    result_file = read_parquet_tables(
        file_name= "intraday_pred",
        start_date = prev_day,
        end_date = next_day,
        path = os.path.join(write_path, pred_folder),
    )
    
    ### Override the result & NIV insert & MID insert
    niv_value, niv_time = get_lastest_value(merged_df, 'ImbalanceQuantity(MAW)(B1780)')
    result_file = override(result_file, 'ImbalanceQuantity(MAW)(B1780)', niv_value, niv_time)
    
    mid_value, mid_time = get_lastest_value(merged_df, 'marketIndexPrice(MID)')
    result_file = override(result_file, 'marketIndexPrice(MID)', mid_value, mid_time)
    
    ipa_value, ipa_time = get_lastest_value(merged_df, 'ImbalancePriceAmount(B1770)')
    result_file = override(result_file, 'ImbalancePriceAmount(B1770)', ipa_value, ipa_time)
    
    result_file = override_result(result_file, niv_time, arr)
    
    #### Divide and deliver to S3 Result
    result_file['SettlementTime'] = pd.to_datetime(result_file['SettlementTime'], format='%Y-%m-%d')
    
    if '__index_level_0__' in result_file.columns:
        result_file.drop('__index_level_0__', axis = 1)
    
    prev_data = result_file[result_file['SettlementTime'] < dt.datetime.strptime(today, "%Y-%m-%d")]
    save_data(
        df = prev_data,
        file_name = 'intraday_pred',
        path = os.path.join(write_path, pred_folder),
        date_partition = prev_day
    )

    now_data = result_file[(result_file['SettlementTime'] >= dt.datetime.strptime(today, "%Y-%m-%d"))
        & (result_file['SettlementTime'] < dt.datetime.strptime(next_day, "%Y-%m-%d"))]
    save_data(
        df = now_data,
        file_name = 'intraday_pred',
        path = os.path.join(write_path, pred_folder),
        date_partition = today
    )

    next_data = result_file[result_file['SettlementTime'] >= dt.datetime.strptime(next_day, "%Y-%m-%d")]
    save_data(
        df = next_data,
        file_name = 'intraday_pred',
        path = os.path.join(write_path, pred_folder),
        date_partition = next_day
    )
    print("Successfull")
# lambda_handler("", "")

In [None]:
################################################ Version 2.0 29/7 ########################################################
import datetime as dt
from datetime import datetime, timedelta
import time
from time import perf_counter
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
from functools import partial
import os, sys
import json
import s3fs
import io
import boto3
import tarfile 
import datetime as dt
from io import BytesIO
from time import (strftime, 
                  perf_counter, 
                  gmtime)
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

filesystem = s3fs.S3FileSystem()

# ============================================= Helper Functions ============================================= #
def save_data(
    df: pd.DataFrame,
    file_name: str,
    path: str,
    date_partition: str,
    partition_cols=None, partition_filename_cb=None, filesystem=filesystem
):
    """Write pandas DataFrame in parquet format to S3 bucket
    Args:
        df (pd.DataFrame): DataFrame to save
        file_name (str): Table name
        path (str): local or AWS S3 path to store the parquet files
        partition_cols (list, optional): Columns used to partition the parquet files. Defaults to None.
    """
    
    date = dt.datetime.strptime(date_partition, "%Y-%m-%d")
    folder = f'/year={date.year}/month={date.month}/day={date.day}'

    pq.write_to_dataset(
        pyarrow.Table.from_pandas(df),
        path + folder,
        filesystem=filesystem,
        partition_cols=partition_cols,
#         basename_template = f"{file_name}.parquet"
        partition_filename_cb=lambda x: f"{file_name}.parquet",
    )

def generate_dates(start_date: str, end_date: str):
    """Generates list of dates
    Args:
        start_date (str): Start date
        end_date (str): End date
    Returns:
        List: List of dates
    """
    sdate = dt.datetime.strptime(start_date, "%Y-%m-%d")
    edate = dt.datetime.strptime(end_date, "%Y-%m-%d") + dt.timedelta(days=1)

    return [
        (sdate + dt.timedelta(days=x)).strftime("%Y-%m-%d")
        for x in range((edate - sdate).days)
    ]
        
def read_parquet_tables(
    file_name: str,
    start_date: str,
    end_date: str,
    path: str,
) -> pd.DataFrame:
    """Read parquet file partitions
    Args:
        file_name (str): Table name
        start_date (str): starting date to clean the data (%Y-%m-%d)
        end_date (str): ending date to clean the data (%Y-%m-%d)
        path (str): local or AWS S3 path to read the parquet files
    Returns:
        pd.DataFrame: Datframe from parquet file
    """
    # convert date range to list of dates
    date_list = generate_dates(start_date, end_date)
    df = pd.DataFrame()
    for read_date in date_list:
        # convert date to integers for filters
        r_year = dt.datetime.strptime(read_date, "%Y-%m-%d").year
        r_month = dt.datetime.strptime(read_date, "%Y-%m-%d").month
        r_day = dt.datetime.strptime(read_date, "%Y-%m-%d").day

        try:
            data = (
                pq.ParquetDataset(
                    path
                    + f"/year={r_year}/month={r_month}/day={r_day}/{file_name}.parquet",
                    filesystem=filesystem,
                )
                .read_pandas()
                .to_pandas()
            )

        except:
            continue  
        df = pd.concat([df, data], ignore_index=True)

    return df

def get_lastest_value(dframe, column_name):
    tmp = dframe[["local_datetime", column_name]].dropna(how='any').iloc[-1:]  
    val = tmp.iloc[0][column_name]
    stime = tmp.iloc[0]['local_datetime']
    return val, stime


# ============================================= End of Helper Functions ============================================= #

# ============================================= PreProcess Functions ============================================= #


def select_best_features(dataframe):
    """Remove redundant/not useful columns from dataframe"""
    interconnector_cols = [c for c in dataframe.columns if "int" in c and "FUEL" in c]
    dataframe["Total_Int(FUELHH)"] = dataframe[interconnector_cols].apply(sum, axis=1)
    dataframe["Total_Fossil(FUELHH)"] = (dataframe["coal(FUELHH)"] +
                                         dataframe["ocgt(FUELHH)"] +
                                         dataframe["ccgt(FUELHH)"] +
                                         dataframe["oil(FUELHH)"])
    dataframe["Total_Other(FUELHH)"] = (dataframe["biomass(FUELHH)"] +
                                        dataframe["other(FUELHH)"] +
                                        dataframe["nuclear(FUELHH)"])
    dataframe["Total_Hydro(FUELHH)"] = dataframe["npshyd(FUELHH)"] + dataframe["ps(FUELHH)"]
    fuelhh_drop_cols = [c for c in dataframe.columns if "(FUELHH" in c and "total" not in c.lower()]
    # elexon generation cols
    ele_gen_drop_cols = (["Wind_Offshore_fcst(B1440)", "Wind_Onshore_fcst(B1440)"] +
                         [c for c in dataframe.columns if "windforfuelhh" in c.lower()] +  # WINDFORFUELHH
                         [c for c in dataframe.columns if "(B16" in c] +  # B16xx columns
                         ["Total_Load_fcst(B0620)", "Total_Load(B0610)"])  # + act_ele_gen_drop_cols
    # catalyst wind cols
    wind_pc_cols = [c for c in dataframe.columns if "pc" in c.lower()]  # the actual is very corr. with other winds
    sn_wind_cols = [c for c in dataframe.columns if "sn" in c.lower()]
    cat_wind_drop_cols = [c for c in dataframe.columns if "(Wind_" in c and "wind_act(Wind_unrestricted)" != c]
    cat_wind_drop_cols += wind_pc_cols + sn_wind_cols
    # drop columns with redundant information
    cols_to_remove = [
        "niv_act(Balancing_NIV_fcst_3hr)",  # can be used in post process
        "hist_fcst(Balancing_NIV_fcst_3hr)",  # bad feature, not to be used even in post process
        "niv(Balancing_NIV)",  # can be used in post process
        "indicativeNetImbalanceVolume(DERSYSDATA)",
        "systemSellPrice(DERSYSDATA)",
        "totalSystemAdjustmentSellVolume(DERSYSDATA)",
        "totalSystemAdjustmentBuyVolume(DERSYSDATA)",
        "non_bm_stor(Balancing_detailed)",
        "DAI(MELIMBALNGC)",
        "DAM(MELIMBALNGC)",
        "TSDF(SYSDEM)",
        "ITSDO(SYSDEM)",
        "temperature(TEMP)",
        "ImbalancePriceAmount(B1770)",
        "marketIndexPrice(MID)",
        "marketIndexVolume(MID)",
        "DATF(FORDAYDEM)",
        "DAID(FORDAYDEM)",
        "DAIG(FORDAYDEM)",
        "DANF(FORDAYDEM)"
    ]
    # day ahead auction cols
    daa_gwstep_cols = [c for c in dataframe.columns if "daa_gwstep" in c.lower()]
    daa_windrisk_cols = [c for c in dataframe.columns if "windrisk" in c.lower()]  # daauction/range/wind_risk @catalyst
    daa_xgas_cols = [c for c in dataframe.columns if "daa_xgas" in c.lower()]  # xgas @Catalyst too high corr with gas
    daa_gas_cols = [c for c in dataframe.columns if "daa_gas" in c.lower()]  # @Catalyst
    daa_drop_cols = [
        "price_act(DAA)",
        "price_fcst(DAA)",
        "price_weighted(DAA)",
        "hist_fcst(DAA_1D_8AM)",
        "hist_fcst(DAA_1D_2PM)",
    ]
    daa_drop_cols += daa_gwstep_cols + daa_windrisk_cols + daa_xgas_cols + daa_gas_cols
    # drop the columns listed and return
    cols_to_remove += (  # act_ele_gen_drop_cols +
            fuelhh_drop_cols +
            ele_gen_drop_cols +
            cat_wind_drop_cols +
            daa_drop_cols
    )
    return dataframe.drop(cols_to_remove, axis=1)


def prepare_date_features(dataframe):
    """transform date features into tabular format"""
    # cyclic encode the datetime information; sine because the cycle should start from 0
    dataframe["sp_sin"] = np.sin(dataframe["SettlementPeriod"] * (2 * np.pi / 48))
    dataframe["month_sin"] = np.sin(dataframe["local_datetime"].dt.month * (2 * np.pi / 12))
    dataframe["week_sin"] = np.sin((dataframe["local_datetime"].dt.weekday + 1) * (2 * np.pi / 7))
    # drop unparsed date column; note that SettlementDate is kept for later groupbys
    dataframe.drop(["local_datetime",  # information is already encoded
        "SettlementPeriod"], axis=1, inplace=True)
    return dataframe


def interpolate_outliers(dataframe, cutoff=2.5):
    """Replaces the outlier value with the previous and next value's average using the column's z statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        for idx in range(len(col)):
            if np.abs(col[idx] - col_mean) > z_cutoff:
                try:
                    dataframe.loc[idx, col_name] = np.mean([col[idx - 1], col[idx + 1]])
                except IndexError:
                    # this only happens at either end of the input data, so we leave the value as is
                    # it will be processed in the clip_outliers function if the cutoff there is less than or
                    # equal to the cutoff here
                    pass
    return dataframe

def clip_outliers(dataframe, cutoff=2):
    """Clip outlier using z-statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        lower_bound = col_mean - z_cutoff
        upper_bound = col_mean + z_cutoff
        for idx in range(len(col)):
            row = col[idx]  # save to variable to avoid re-accessing
            if np.abs(row - col_mean) > z_cutoff:
                dataframe.loc[idx, col_name] = np.clip(row, lower_bound, upper_bound)
    return dataframe


def compute_ewm_features(dataframe, window=8, alpha=1 - np.log(2) / 4):
    """Computes the exponentially moving weighted average features"""
    weights = list(reversed([(1 - alpha) ** n for n in range(window)]))
    ewma = partial(np.average, weights=weights)
    ewm_cols = [c for c in dataframe.columns if "Imbalance" not in c and  # exclude target variable
                "(" in c and  # this is to exclude time features
                "FUELHH" not in c and  # exclude FUELHH features
                "cash_out" not in c]  # exclude cash_out(Balancing_detailed) feature
    for c in ewm_cols:
        # compute daily ewm, parametrized by alpha
        dataframe[f"ewm_mean_{c}"] = dataframe[c].rolling(window).apply(ewma)
    return dataframe


def compute_shifted_features(dataframe):
    """Computes the features that can be shifted"""
    # compute  backshifted features
    bshift_2sp_cols = [
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bshift_2sp_cols:
        dataframe[f"bshift_2sp_{c}"] = dataframe[c].shift(-2)
    dataframe["bshift_4sp_boas(Balancing_detailed)"] = dataframe["boas(Balancing_detailed)"].shift(-4)
    # compute back-differenced features
    bdiff_cols = [
        "Generation_fcst(B1430)",
        "boas(Balancing_detailed)",
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bdiff_cols:
        dataframe[f"bdiff_1sp_{c}"] = dataframe[c].diff(-1)
    # compute forward shifted feature; other fshifted features based on other cols did not perform well
    fshift_cols = [
        "boas(Balancing_detailed)",
    ]
    for c in fshift_cols:
        dataframe[f"fshift_1hr_{c}"] = dataframe[c].shift(2)
    return dataframe


def preprocess_dataframe(df : pd.DataFrame):
    t0 = time.perf_counter()
    df = select_best_features(df)  # remove columns based on previous results
    df = prepare_date_features(df)  # parse date into cyclic features 
    df = interpolate_outliers(df, cutoff=2.5)  # interpolate using z statistics (avg next/before obs)
    df = clip_outliers(df, cutoff=2)  # clip outliers using z statistics
    df = compute_ewm_features(df)  # compute exponentially weighted moving average features
    df = compute_shifted_features(df)  # compute features based on shifting and differencing
    # drop some columns that are not performant or are no longer needed
    df = df.drop(["SettlementDate",
                  "wind_act(Wind_unrestricted)",
                  "totalSystemAcceptedOfferVolume(DERSYSDATA)",
                  "totalSystemAcceptedBidVolume(DERSYSDATA)",
                  "Generation_fcst(B1430)",
                  "intraday(Balancing_detailed)",
                  "Solar_fcst(B1440)",
                  "__index_level_0__"], axis=1)
    print("Time taken to preprocess features in seconds:", time.perf_counter() - t0)
    # print(f"Shape of the dataframe: {df.shape}")
    df = df.dropna()
    df = df.drop("ImbalanceQuantity(MAW)(B1780)", axis=1)
    
    return df

# ============================================= End of PreProcess Functions ============================================= #

# ============================================= Support Main Functions ============================================= #

list_cols = ['SettlementTime', 'SP' , 'niv_predicted_1sp', 'niv_predicted_2sp',
             'niv_predicted_3sp', 'niv_predicted_4sp', 'niv_predicted_5sp', 'niv_predicted_6sp',
             'niv_predicted_7sp', 'niv_predicted_8sp', 'dayahead_morning', 'dayahead_afternoon', 
             'ImbalanceQuantity(MAW)(B1780)', 'ImbalancePriceAmount(B1770)', 'marketIndexPrice(MID)', 'price_act(DAA)']

def check_file_is_exists(bucket_name, write_path, path, day2write, cols=list_cols):
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    start_date = dt.datetime.strptime(day2write, "%Y-%m-%d")
    end_date = dt.datetime.strptime(day2write, "%Y-%m-%d") + dt.timedelta(days=1)
    
    prefix_path = path + f"/year={start_date.year}/month={start_date.month}/day={start_date.day}/"
    file_list = list(bucket.objects.filter(Prefix=prefix_path))
    if not len(file_list):
        
        print("No file in ", prefix_path)
        # Create new Template
        df = pd.DataFrame(index=pd.date_range(start=start_date, end=end_date, freq="30T"), columns=cols[1:])
        df = df.head(48)
        df.index.name = cols[0]
        df['SP'] = range(0, len(df))
        df['SP'] = df['SP'].apply(lambda x: x % 48 + 1)
        df = df.reset_index()
        print(f"Creating empty file at ", day2write)
        
        save_data(df = df,
                  file_name = 'intraday_pred',
                  path = os.path.join(write_path, path),
                  date_partition = day2write
        )
        print(f"Created empty file at ", day2write)
    else: 
        print('File Exits')


def override_intraday_result(df: pd.DataFrame, niv_time, response_arr):
    """
    df: concatenated result files from yesterday, today & tomorrow
    niv_time: lastest NIV updates
    response_arr: 8 prediction values
    """
    df = df[list_cols]
    index = df.index[(df['SettlementTime'] > niv_time + timedelta(minutes=20)) 
                     &(df['SettlementTime'] < niv_time + timedelta(minutes=40))].values[0]
    print ('Intraday index = ', index)
    # Insert Value
    for i in range(len(response_arr)):
        index += 1
        df.at[index, 'niv_predicted_{}sp'.format(i + 1)] = response_arr[i]
        print(index, response_arr[i], '\n')
    return df

def override_dayahead_result(df: pd.DataFrame, 
                             dayahead_payload: np.array,
                             niv_time: dt.datetime,
                            ):
    """
    df: concatenated result files from yesterday, today & tomorrow
    dayahead_payload: All Payload from yesterday, today & tomorrow
    """
    if (datetime.now().hour == 8 or datetime.now().hour == 14) & (datetime.now().minute > 30):
        df = df[list_cols]
        print ("Length of dayahead_payload: ", len(dayahead_payload))
        ### Invoke DAYAHEAD Endpoint & Get Result
        half_day = 'morning' if datetime.now().hour < 10 else 'afternoon'
        sp_offset = (niv_time.hour * 2) + (niv_time.minute and 1) # Get number 
        print ("Number of SP offset : ", sp_offset)
        dayahead_payload[-(48 + 96 + sp_offset):]
        
        
        ### Get the newest model file aka "TargetModel"
        list_files = []
        for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
            if f"lgbm-regressor-dayahead-{half_day}" in object_summary.key:
                list_files.append(object_summary.key)
        assert len(list_files) == 1, "check s3 should have one file"
        target_model = list_files[0].split("endpoint/")[-1]
        print("Lastes Day-Ahead target Model :", target_model)
        
        runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
        print ("Invoking endpoint with Day-Ahead data in the", half_day)
        response = runtime_sm_client.invoke_endpoint(
            EndpointName="lgbm-regressor-endpoint",
            ContentType ="application/JSON",
            TargetModel =target_model,
            Body=json.dumps(dayahead_payload.tolist()), 
        )
        dayhead_arr = json.loads(response["Body"].read())
        dayhead_arr = dayhead_arr[-(48 + sp_offset + 2):]
        dayhead_arr = dayhead_arr[:48]
    
        # Insert Value
        index = 48 + 46 ### SP = 47 so index = 46 & add 48 from whole yesterday
        print('Length of dayahead array: ', len(dayhead_arr))
        for i in range(len(dayhead_arr)):
            df.at[index, 'dayahead_{}'.format(half_day)] = dayhead_arr[i]
            index += 1
        # df.loc[index:, 'dayahead_{}'.format(half_day)] = dayhead_arr
        return df
    else:
        return df

def override(df: pd.DataFrame, column_name, value, stime):
    inx = df.index[(df['SettlementTime'] > stime - timedelta(minutes=10))
                   &(df['SettlementTime'] < stime + timedelta(minutes=10))].values[0] 
    df.at[inx, column_name] = value
    # print (f"Lasted {column_name} Index in Result file : ", inx, ', at ', stime)
    return df


################################# Initialize ########################################################
s3 = boto3.resource('s3')
my_bucket = 'niv-predictions'
pred_folder = 'lgbm-prediction'
bucket = s3.Bucket(my_bucket)
model_bucket = s3.Bucket('lgbm-model-storage')
resource = boto3.resource('s3')
read_path = "s3://scgc/data/merged"
write_path = "s3://" + my_bucket
prev_day = (dt.datetime.now() - dt.timedelta(days=4)).strftime("%Y-%m-%d")
yesterday= (dt.datetime.now() - dt.timedelta(days=1)).strftime("%Y-%m-%d")
today    = dt.datetime.now().strftime("%Y-%m-%d")
tomorrow = (dt.datetime.now() + dt.timedelta(days=1)).strftime("%Y-%m-%d")


def lambda_handler(event, context):
    ################################# Get & Pre-process Data #############################################
    ### Get Data 
    merged_df = pd.DataFrame()
    merged_df = read_parquet_tables(
        file_name="merged",
        start_date = prev_day,
        end_date = tomorrow,
        path = read_path,
    )
    print ("Shape of merged_df: ", merged_df.shape)
    ### Pre-Process
    print("Pre-Processing...............")
    preprocessed_df  = preprocess_dataframe(merged_df)
    print ("Shape of preprocess_dataframe: ", preprocessed_df.shape)
    intraday_payload = preprocessed_df.iloc[-(8+48):].to_numpy()   # Shifted 48
    dayahead_payload = preprocessed_df.to_numpy()
    print("Ended Pre-Process & got Payload")
    
    ################################# Intraday #######################################################
    ### Get the newest model file aka "TargetModel"
    list_files = []
    for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
        if "lgbm-regressor-intraday" in object_summary.key:
            list_files.append(object_summary.key)
    assert len(list_files) == 1, "check s3 should have one file"
    target_model = list_files[0].split("endpoint/")[-1]
    print("Lastest Intraday target Model :", target_model)
    
    ### Invoke INTRADAY Endpoint & Get Result
    runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
    print ("Invoking endpoint with Payload data")
    response = runtime_sm_client.invoke_endpoint(
        EndpointName="lgbm-regressor-endpoint",
        ContentType ="application/JSON",
        TargetModel = target_model,
        Body=json.dumps(intraday_payload.tolist()),
    )
    intraday_arr = json.loads(response["Body"].read())
    intraday_arr = intraday_arr[-8:]

    ### Get old result Data, check exist: if not, create a new template
    check_file_is_exists(my_bucket, write_path, pred_folder, today, cols=list_cols)
    check_file_is_exists(my_bucket, write_path, pred_folder, tomorrow, cols=list_cols)

    ### Read result file
    result_file = read_parquet_tables(
        file_name= "intraday_pred",
        start_date = yesterday,
        end_date = tomorrow,
        path = os.path.join(write_path, pred_folder),
    )
    print("Shape of Result_file ", result_file.shape)
    
    ### Override the result & insert lastest NIV, MID, price_act(DAA), ImbalancePriceAmount(B1770), 
    niv_value, niv_time = get_lastest_value(merged_df, 'ImbalanceQuantity(MAW)(B1780)')
    result_file = override(result_file, 'ImbalanceQuantity(MAW)(B1780)', niv_value, niv_time)
    
    mid_value, mid_time = get_lastest_value(merged_df, 'marketIndexPrice(MID)')
    result_file = override(result_file, 'marketIndexPrice(MID)', mid_value, mid_time)
    
    ipa_value, ipa_time = get_lastest_value(merged_df, 'ImbalancePriceAmount(B1770)')
    result_file = override(result_file, 'ImbalancePriceAmount(B1770)', ipa_value, ipa_time)
    
    price_value, price_time = get_lastest_value(merged_df, 'price_act(DAA)')
    result_file = override(result_file, 'price_act(DAA)', price_value, price_time)
    
    
    result_file = override_intraday_result(result_file, niv_time, intraday_arr)
    
    if '__index_level_0__' in result_file.columns:
        result_file.drop('__index_level_0__', axis = 1)
    
    ################################# Day-Ahead #######################################################
    result_file = override_dayahead_result(result_file, dayahead_payload, niv_time)
    
    #### Divide and deliver to S3 Result
    result_file['SettlementTime'] = pd.to_datetime(result_file['SettlementTime'], format='%Y-%m-%d')
    
    if '__index_level_0__' in result_file.columns:
        result_file.drop('__index_level_0__', axis = 1)
        
    ################################# Output to file #######################################################
    
    prev_data = result_file[result_file['SettlementTime'] < dt.datetime.strptime(today, "%Y-%m-%d")]
    save_data(
        df = prev_data,
        file_name = 'intraday_pred',
        path = os.path.join(write_path, pred_folder),
        date_partition = yesterday
    )

    now_data = result_file[(result_file['SettlementTime'] >= dt.datetime.strptime(today, "%Y-%m-%d"))
        & (result_file['SettlementTime'] < dt.datetime.strptime(tomorrow, "%Y-%m-%d"))]
    save_data(
        df = now_data,
        file_name = 'intraday_pred',
        path = os.path.join(write_path, pred_folder),
        date_partition = today
    )

    next_data = result_file[result_file['SettlementTime'] >= dt.datetime.strptime(tomorrow, "%Y-%m-%d")]
    save_data(
        df = next_data,
        file_name = 'intraday_pred',
        path = os.path.join(write_path, pred_folder),
        date_partition = tomorrow
    )
    print("Successfull")
# lambda_handler("", "")

In [None]:
################################################ Version 3.0 03/8 ########################################################

import datetime 
from datetime import (datetime,
                      timedelta)
import time
from time import (strftime, 
                  perf_counter, 
                  gmtime)
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
from functools import partial
import gc
import os, sys
import json
import s3fs
import io
import boto3
import tarfile 
import datetime as dt
from io import BytesIO

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
import traceback

filesystem = s3fs.S3FileSystem()


# ============================================= Helper Functions ============================================= #
def save_data(
    df: pd.DataFrame,
    file_name: str,
    path: str,
    date_partition: str,
    partition_cols=None, partition_filename_cb=None, filesystem=filesystem
):
    """Write pandas DataFrame in parquet format to S3 bucket
    Args:
        df (pd.DataFrame): DataFrame to save
        file_name (str): Table name
        path (str): local or AWS S3 path to store the parquet files
        partition_cols (list, optional): Columns used to partition the parquet files. Defaults to None.
    """
    
    date = dt.datetime.strptime(date_partition, "%Y-%m-%d")
    folder = f'/year={date.year}/month={date.month}/day={date.day}'

    pq.write_to_dataset(
        pyarrow.Table.from_pandas(df),
        path + folder,
        filesystem=filesystem,
        partition_cols=partition_cols,
        partition_filename_cb=lambda x: f"{file_name}.parquet",
    )

def generate_dates(start_date: str, end_date: str):
    """Generates list of dates
    Args:
        start_date (str): Start date
        end_date (str): End date
    Returns:
        List: List of dates
    """
    sdate = dt.datetime.strptime(start_date, "%Y-%m-%d")
    edate = dt.datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)

    return [
        (sdate + timedelta(days=x)).strftime("%Y-%m-%d")
        for x in range((edate - sdate).days)
    ]
        
def read_parquet_tables(
    file_name: str,
    start_date: str,
    end_date: str,
    path: str,
) -> pd.DataFrame:
    """Read parquet file partitions
    Args:
        file_name (str): Table name
        start_date (str): starting date to clean the data (%Y-%m-%d)
        end_date (str): ending date to clean the data (%Y-%m-%d)
        path (str): local or AWS S3 path to read the parquet files
    Returns:
        pd.DataFrame: Datframe from parquet file
    """
    # convert date range to list of dates
    date_list = generate_dates(start_date, end_date)
    df = pd.DataFrame()
    for read_date in date_list:
        # convert date to integers for filters
        r_year = dt.datetime.strptime(read_date, "%Y-%m-%d").year
        r_month = dt.datetime.strptime(read_date, "%Y-%m-%d").month
        r_day = dt.datetime.strptime(read_date, "%Y-%m-%d").day

        try:
            data = (
                pq.ParquetDataset(
                    path
                    + f"/year={r_year}/month={r_month}/day={r_day}/{file_name}.parquet",
                    filesystem=filesystem,
                )
                .read_pandas()
                .to_pandas()
            )

        except:
            continue  
        df = pd.concat([df, data], ignore_index=True)

    return df

def truncate_to_dayahead_format(dataframe, date_time):
    """
    Function for post-processing prediction returned. Returns a dataframe of
    prediction that contains the predictions for end of today up to tomorrow:
        ie. from SP_47 TODAY -> SP_46 of TOMORROW inclusive
    Args:
        dataframe: pd.DataFrame containing local_datetime and NIVPredictions
        date_time: datetime chosen

    Returns:
        truncated pd.DataFrame
    """
    now = date_time
    tmr = date_time + timedelta(days=1)
    start_pred = dataframe[dataframe["local_datetime"].dt.day == now.day].iloc[-2].name
    end_pred = dataframe[dataframe["local_datetime"].dt.day == tmr.day].iloc[-3].name
    dataframe_ = dataframe.loc[start_pred:end_pred]  # use loc because we're using index name
    if dataframe_["NIVPredictions"].isna().sum():
        raise Exception("NIVPredictions has NaN after post-processing. "
                        "Consider raising the backshift_niv_by hyperparameter"
                        "during training")

    return dataframe_


def split_file_ands_save_to_s3(result: pd.DataFrame, datetime_val: dt.datetime):
    """
    Split dt to 3 file base on date then save them to s3 
    
    """
    t_today     = datetime_val
    t_yesterday = t_today - timedelta(days=1)
    t_tomorrow  = t_today + timedelta(days=1)
    
    prev_data = result[result['local_datetime'].dt.day == t_yesterday.day]
    save_data(
        df = prev_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = yesterday
    )

    now_data = result[result['local_datetime'].dt.day == t_today.day]
    save_data(
        df = now_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = today
    )

    next_data = result[result['local_datetime'].dt.day == t_tomorrow.day]
    save_data(
        df = next_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = tomorrow
    )



def get_lastest_value(dframe, column_name):
    tmp = dframe[["local_datetime", column_name]].dropna(how='any').iloc[-1:]  
    val = tmp.iloc[0][column_name]
    stime = tmp.iloc[0]['local_datetime']
    return val, stime


# ============================================= End of Helper Functions ============================================= #

# ============================================= PreProcess Functions ============================================= #

def select_best_features(dataframe):
    """Remove redundant/not useful columns from dataframe"""
    interconnector_cols = [c for c in dataframe.columns if "int" in c and "FUEL" in c]
    dataframe["Total_Int(FUELHH)"] = dataframe[interconnector_cols].apply(sum, axis=1)
    dataframe["Total_Fossil(FUELHH)"] = (dataframe["coal(FUELHH)"] +
                                         dataframe["ocgt(FUELHH)"] +
                                         dataframe["ccgt(FUELHH)"] +
                                         dataframe["oil(FUELHH)"])
    dataframe["Total_Other(FUELHH)"] = (dataframe["biomass(FUELHH)"] +
                                        dataframe["other(FUELHH)"] +
                                        dataframe["nuclear(FUELHH)"])
    dataframe["Total_Hydro(FUELHH)"] = dataframe["npshyd(FUELHH)"] + dataframe["ps(FUELHH)"]
    fuelhh_drop_cols = [c for c in dataframe.columns if "(FUELHH" in c and "total" not in c.lower()]
    # elexon generation cols
    ele_gen_drop_cols = (["Wind_Offshore_fcst(B1440)", "Wind_Onshore_fcst(B1440)"] +
                         [c for c in dataframe.columns if "windforfuelhh" in c.lower()] +  # WINDFORFUELHH
                         [c for c in dataframe.columns if "(B16" in c] +  # B16xx columns
                         ["Total_Load_fcst(B0620)", "Total_Load(B0610)"])  # + act_ele_gen_drop_cols
    # catalyst wind cols
    wind_pc_cols = [c for c in dataframe.columns if "pc" in c.lower()]  # the actual is very corr. with other winds
    sn_wind_cols = [c for c in dataframe.columns if "sn" in c.lower()]
    cat_wind_drop_cols = [c for c in dataframe.columns if "(Wind_" in c and "wind_act(Wind_unrestricted)" != c]
    cat_wind_drop_cols += wind_pc_cols + sn_wind_cols
    # drop columns with redundant information
    cols_to_remove = [
        "niv_act(Balancing_NIV_fcst_3hr)",  # can be used in post process
        "hist_fcst(Balancing_NIV_fcst_3hr)",  # bad feature, not to be used even in post process
        "niv(Balancing_NIV)",  # can be used in post process
        "indicativeNetImbalanceVolume(DERSYSDATA)",
        "systemSellPrice(DERSYSDATA)",
        "totalSystemAdjustmentSellVolume(DERSYSDATA)",
        "totalSystemAdjustmentBuyVolume(DERSYSDATA)",
        "non_bm_stor(Balancing_detailed)",
        "DAI(MELIMBALNGC)",
        "DAM(MELIMBALNGC)",
        "TSDF(SYSDEM)",
        "ITSDO(SYSDEM)",
        "temperature(TEMP)",
        "ImbalancePriceAmount(B1770)",
        "marketIndexPrice(MID)",
        "marketIndexVolume(MID)",
        "DATF(FORDAYDEM)",
        "DAID(FORDAYDEM)",
        "DAIG(FORDAYDEM)",
        "DANF(FORDAYDEM)"
    ]
    # day ahead auction cols
    daa_gwstep_cols = [c for c in dataframe.columns if "daa_gwstep" in c.lower()]
    daa_windrisk_cols = [c for c in dataframe.columns if "windrisk" in c.lower()]  # daauction/range/wind_risk @catalyst
    daa_xgas_cols = [c for c in dataframe.columns if "daa_xgas" in c.lower()]  # xgas @Catalyst too high corr with gas
    daa_gas_cols = [c for c in dataframe.columns if "daa_gas" in c.lower()]  # @Catalyst
    daa_drop_cols = [
        "price_act(DAA)",
        "price_fcst(DAA)",
        "price_weighted(DAA)",
        "hist_fcst(DAA_1D_8AM)",
        "hist_fcst(DAA_1D_2PM)",
    ]
    daa_drop_cols += daa_gwstep_cols + daa_windrisk_cols + daa_xgas_cols + daa_gas_cols
    # drop the columns listed and return
    cols_to_remove += (  # act_ele_gen_drop_cols +
            fuelhh_drop_cols +
            ele_gen_drop_cols +
            cat_wind_drop_cols +
            daa_drop_cols
    )
    return dataframe.drop(cols_to_remove, axis=1)


def prepare_date_features(dataframe):
    """transform date features into tabular format"""
    # cyclic encode the datetime information; sine because the cycle should start from 0
    dataframe["sp_sin"] = np.sin(dataframe["SettlementPeriod"] * (2 * np.pi / 48))
    dataframe["month_sin"] = np.sin(dataframe["local_datetime"].dt.month * (2 * np.pi / 12))
    dataframe["week_sin"] = np.sin((dataframe["local_datetime"].dt.weekday + 1) * (2 * np.pi / 7))
    # drop unparsed date column; note that SettlementDate is kept for later groupbys
#     dataframe.drop(["local_datetime",  # information is already encoded
#         "SettlementPeriod"], axis=1, inplace=True)
    return dataframe


def interpolate_outliers(dataframe, cutoff=2.5):
    """Replaces the outlier value with the previous and next value's average using the column's z statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        for idx in range(len(col)):
            if np.abs(col[idx] - col_mean) > z_cutoff:
                try:
                    dataframe.loc[idx, col_name] = np.mean([col[idx - 1], col[idx + 1]])
                except IndexError:
                    # this only happens at either end of the input data, so we leave the value as is
                    # it will be processed in the clip_outliers function if the cutoff there is less than or
                    # equal to the cutoff here
                    pass
    return dataframe

def clip_outliers(dataframe, cutoff=2):
    """Clip outlier using z-statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        lower_bound = col_mean - z_cutoff
        upper_bound = col_mean + z_cutoff
        for idx in range(len(col)):
            row = col[idx]  # save to variable to avoid re-accessing
            if np.abs(row - col_mean) > z_cutoff:
                dataframe.loc[idx, col_name] = np.clip(row, lower_bound, upper_bound)
    return dataframe


def compute_ewm_features(dataframe, window=8, alpha=1 - np.log(2) / 4):
    """Computes the exponentially moving weighted average features"""
    weights = list(reversed([(1 - alpha) ** n for n in range(window)]))
    ewma = partial(np.average, weights=weights)
    ewm_cols = [c for c in dataframe.columns if "Imbalance" not in c and  # exclude target variable
                "(" in c and  # this is to exclude time features
                "FUELHH" not in c and  # exclude FUELHH features
                "cash_out" not in c]  # exclude cash_out(Balancing_detailed) feature
    for c in ewm_cols:
        # compute daily ewm, parametrized by alpha
        dataframe[f"ewm_mean_{c}"] = dataframe[c].rolling(window).apply(ewma)
    return dataframe


def compute_shifted_features(dataframe):
    """Computes the features that can be shifted"""
    # compute  backshifted features
    bshift_2sp_cols = [
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bshift_2sp_cols:
        dataframe[f"bshift_2sp_{c}"] = dataframe[c].shift(-2)
    dataframe["bshift_4sp_boas(Balancing_detailed)"] = dataframe["boas(Balancing_detailed)"].shift(-4)
    # compute back-differenced features
    bdiff_cols = [
        "Generation_fcst(B1430)",
        "boas(Balancing_detailed)",
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bdiff_cols:
        dataframe[f"bdiff_1sp_{c}"] = dataframe[c].diff(-1)
    # compute forward shifted feature; other fshifted features based on other cols did not perform well
    fshift_cols = [
        "boas(Balancing_detailed)",
    ]
    for c in fshift_cols:
        dataframe[f"fshift_1hr_{c}"] = dataframe[c].shift(2)
    return dataframe


def preprocess_dataframe(df : pd.DataFrame):
    t0 = time.perf_counter()
    df = select_best_features(df)  # remove columns based on previous results
    df = prepare_date_features(df)  # parse date into cyclic features 
    df = interpolate_outliers(df, cutoff=2.5)  # interpolate using z statistics (avg next/before obs)
    df = clip_outliers(df, cutoff=2)  # clip outliers using z statistics
    df = compute_ewm_features(df)  # compute exponentially weighted moving average features
    df = compute_shifted_features(df)  # compute features based on shifting and differencing
    # drop some columns that are not performant or are no longer needed
    df = df.drop(["SettlementDate",
                  "wind_act(Wind_unrestricted)",
                  "totalSystemAcceptedOfferVolume(DERSYSDATA)",
                  "totalSystemAcceptedBidVolume(DERSYSDATA)",
                  "Generation_fcst(B1430)",
                  "intraday(Balancing_detailed)",
                  "Solar_fcst(B1440)",
                  "__index_level_0__"], axis=1)
    print("Time taken to preprocess features in seconds:", time.perf_counter() - t0)
    # print(f"Shape of the dataframe: {df.shape}")
    df = df.dropna()
    # df = df.drop("ImbalanceQuantity(MAW)(B1780)", axis=1)
    
    return df


# ============================================= End of PreProcess Functions ============================================= #

# ============================================= Support Main Functions ============================================= #

list_cols = ['local_datetime', 'SettlementPeriod' , 'niv_predicted_1sp', 'niv_predicted_2sp',
             'niv_predicted_3sp', 'niv_predicted_4sp', 'niv_predicted_5sp', 'niv_predicted_6sp',
             'niv_predicted_7sp', 'niv_predicted_8sp', 'dayahead_morning', 'dayahead_afternoon', 
             'ImbalanceQuantity(MAW)(B1780)', 'ImbalancePriceAmount(B1770)', 'marketIndexPrice(MID)', 'price_act(DAA)']

def check_file_is_exists(bucket_name, write_path, path, day2write, cols=list_cols):
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    start_date = dt.datetime.strptime(day2write, "%Y-%m-%d")
    end_date = dt.datetime.strptime(day2write, "%Y-%m-%d") + timedelta(days=1)
    
    prefix_path = path + f"/year={start_date.year}/month={start_date.month}/day={start_date.day}/"
    file_list = list(bucket.objects.filter(Prefix=prefix_path))
    if not len(file_list):
        
        print("No file in ", prefix_path)
        # Create new Template
        df = pd.DataFrame(index=pd.date_range(start=start_date, end=end_date, freq="30T"), columns=cols[1:])
        df = df.head(48)
        df.index.name = cols[0]
        df['SettlementPeriod'] = range(0, len(df))
        df['SettlementPeriod'] = df['SettlementPeriod'].apply(lambda x: x % 48 + 1)
        df = df.reset_index()
        print(f"Creating empty file at ", day2write)
        
        save_data(df = df,
                  file_name = 'prediction',
                  path = os.path.join(write_path, path),
                  date_partition = day2write
        )
        print(f"Created empty file at ", day2write)
    else: 
        print('File Exits')


def override(df: pd.DataFrame, column_name, value, stime):
    inx = df.index[(df['local_datetime'] > stime - timedelta(minutes=10))
                   &(df['local_datetime'] < stime + timedelta(minutes=10))].values[0] 
    df.at[inx, column_name] = value
    # print (f"Lasted {column_name} Index in Result file : ", inx, ', at ', stime)
    return df


def day_ahead(
    result_files : pd.DataFrame,
    processed_df : pd.DataFrame,
    
):
    if (datetime.now().hour == 8 or datetime.now().hour == 14) & (datetime.now().minute > 30):
        # split target out of payload input
        dayahead_payload = processed_df.drop([
            "ImbalanceQuantity(MAW)(B1780)",
            "local_datetime",
            "SettlementPeriod",
        ], axis=1).to_numpy()

        half_day = 'morning' if datetime.now().hour < 12 else 'afternoon'
        print ("length of preprocess_dataframe: ", processed_df.shape)

        ##### invoke model endpoint for getting predictions #####
        ### Get the newest model file aka "TargetModel"
        list_files = []
        for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
            if f"lgbm-regressor-dayahead-{half_day}" in object_summary.key:
                list_files.append(object_summary.key)
        assert len(list_files) == 1, "check s3 should have one file"
        target_model = list_files[0].split("endpoint/")[-1]
        print("Lastest Day-Ahead target Model :", target_model)

        ### Invoke DAY-AHEAD Endpoint & Get Result
        runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
        print ("Invoking day-ahead endpoint with Payload data")
        response = runtime_sm_client.invoke_endpoint(
            EndpointName="lgbm-regressor-endpoint",
            ContentType ="application/JSON",
            TargetModel = target_model,
            Body=json.dumps(dayahead_payload.tolist()),
        )
        dayahead_arr = json.loads(response["Body"].read())
        print("Success Invoking Endpoint!")


        # store the prediction in a new df so that we can do some post processing
        # this dataframe only extends up to the last row NOT containing NaN, which is
        tmp_result_df = processed_df.copy()[["local_datetime", "SettlementPeriod"]]
        tmp_result_df["NIVPredictions"] = dayahead_arr
        tmp_result_df["local_datetime"] = pd.to_datetime(tmp_result_df["local_datetime"])

        # make a dummy df so that we can shift the predictions
        dummy_df = pd.DataFrame()  # .reset_index(drop=True)
        dummy_df["local_datetime"] = pd.date_range(
            processed_df["local_datetime"].iloc[-1] + timedelta(minutes=30),
            processed_df["local_datetime"].iloc[-1] + timedelta(days=2),  # timedelta 2 days to make sure it works
            freq="30T"
        )
        dummy_df["NIVPredictions"] = np.nan
        dummy_df["SettlementPeriod"] = range(processed_df["SettlementPeriod"].iloc[-1],
                                             processed_df["SettlementPeriod"].iloc[-1] + len(dummy_df))
        dummy_df["SettlementPeriod"] = dummy_df["SettlementPeriod"] % 48 + 1

        # concatenate to results dataframe and forward shift NIV predictions the same amount that was backshifted
        tmp_result_df = pd.concat([tmp_result_df, dummy_df], axis=0).reset_index(drop=True)
        tmp_result_df["NIVPredictions"] = tmp_result_df["NIVPredictions"].shift(96)
        tmp_result_df = tmp_result_df.dropna(subset=["NIVPredictions"])
        print(tmp_result_df.shape)
        # truncate the predictions to the datetime range that we want, which is

        tmp_result_df = truncate_to_dayahead_format(tmp_result_df, datetime.now())
        dayahead_arr = tmp_result_df["NIVPredictions"].tolist()                             

        index = 48 + 46 ### SP = 47 so index = 46 & add 48 from whole yesterday
        print('Length of dayahead array: ', len(dayahead_arr))
        for i in range(len(dayahead_arr)):
            result_files.at[index, 'dayahead_{}'.format(half_day)] = dayahead_arr[i]
            index += 1

        return result_files
    else: 
        return result_files


def intraday(
    result_files : pd.DataFrame,
    processed_df : pd.DataFrame,
    
):
    # split target out of payload input
    intraday_payload = processed_df.drop([
        "ImbalanceQuantity(MAW)(B1780)",
        "local_datetime",
        "SettlementPeriod",
    ], axis=1).to_numpy()
    
    print ("length of preprocess_dataframe: ", processed_df.shape)
    
    ##### Invoke model endpoint for getting predictions #####
    ### Get the newest model file aka "TargetModel"
    list_files = []
    for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
        if f"lgbm-regressor-intraday" in object_summary.key:
            list_files.append(object_summary.key)
    assert len(list_files) == 1, "check s3 should have one file"
    target_model = list_files[0].split("endpoint/")[-1]
    print("Lastest Intraday target Model :", target_model)
    
    ### Invoke INTRADAY Endpoint & Get Result
    runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
    print ("Invoking endpoint with Intraday Payload")
    response = runtime_sm_client.invoke_endpoint(
        EndpointName="lgbm-regressor-endpoint",
        ContentType ="application/JSON",
        TargetModel = target_model,
        Body=json.dumps(intraday_payload.tolist()),
    )
    intraday_arr = json.loads(response["Body"].read())
    print("Success Invoking Intraday Endpoint!")
    
    
    # store the prediction in a new df so that we can do some post processing
    # this dataframe only extends up to the last row NOT containing NaN, which is
    tmp_result_df = processed_df.copy()[["local_datetime", "SettlementPeriod"]]
    tmp_result_df["NIVPredictions"] = intraday_arr
    tmp_result_df["local_datetime"] = pd.to_datetime(tmp_result_df["local_datetime"])

    # make a dummy df so that we can shift the predictions
    dummy_df = pd.DataFrame()  # .reset_index(drop=True)
    dummy_df["local_datetime"] = pd.date_range(
        processed_df["local_datetime"].iloc[-1] + timedelta(minutes=30),
        processed_df["local_datetime"].iloc[-1] + timedelta(days=2),  # timedelta 2 days to make sure it works
        freq="30T"
    )
    dummy_df["NIVPredictions"] = np.nan
    dummy_df["SettlementPeriod"] = range(processed_df["SettlementPeriod"].iloc[-1],
                                         processed_df["SettlementPeriod"].iloc[-1] + len(dummy_df))
    dummy_df["SettlementPeriod"] = dummy_df["SettlementPeriod"] % 48 + 1

    # concatenate to results dataframe and forward shift NIV predictions the same amount that was backshifted
    tmp_result_df = pd.concat([tmp_result_df, dummy_df], axis=0).reset_index(drop=True)
    tmp_result_df["NIVPredictions"] = tmp_result_df["NIVPredictions"].shift(48)
    tmp_result_df = tmp_result_df.dropna(subset=["NIVPredictions"])
    print("Shape of Result: ", tmp_result_df.shape)
    
    #### Get Lastest NIV Time to identify where to fill the result
    niv_val, niv_time = get_lastest_value(processed_df, "ImbalanceQuantity(MAW)(B1780)")
    print("Lastest NIV : ", niv_val, niv_time)
    
    tmp_result_df = tmp_result_df[(tmp_result_df['local_datetime'] > niv_time) & 
                                  (tmp_result_df['local_datetime'] < niv_time + timedelta(hours = 4.5))]
    print(tmp_result_df)
    intraday_arr = tmp_result_df["NIVPredictions"].tolist()   
    print('Length of intraday array: ', len(intraday_arr))
    
    # index = result_files.index[result_files['SettlementTime'].dt.datetime == niv_time].values[0]
    index = result_files.index[(result_files['local_datetime'] > niv_time + timedelta(minutes=20))
                               &(result_files['local_datetime'] < niv_time + timedelta(minutes=40))].values[0]
    
    print ('Intraday index = ', index)
    # Insert Value
    for i in range(len(intraday_arr)):
        index += 1
        result_files.at[index, 'niv_predicted_{}sp'.format(i + 1)] = intraday_arr[i]
        print(index, result_files.at[index, 'local_datetime'], intraday_arr[i], '\n')

    return result_files


def lambda_handler(event, context):
    
    merged_df = read_parquet_tables(
        file_name="merged",
        start_date = prev_day,
        end_date = tomorrow,
        path = read_path,
    )
    print ("length of Merged dataframe: ", merged_df.shape)
    merged_df["local_datetime"] = pd.to_datetime(merged_df["local_datetime"])
    df = preprocess_dataframe(merged_df)
    
    ### Check result exist: if not, create a new template
    check_file_is_exists(my_bucket, write_path, pred_folder, today, cols=list_cols)
    check_file_is_exists(my_bucket, write_path, pred_folder, tomorrow, cols=list_cols)
    
    result_files = read_parquet_tables(
        file_name= "prediction", 
        start_date = yesterday,
        end_date = tomorrow,
        path = os.path.join(write_path, pred_folder),
    )
    print("Shape of Result_file ", result_files.shape)
    if 'SettlementTime' in result_files.columns:
        result_files.rename(columns = {'SettlementTime':'local_datetime', 'SP':'SettlementPeriod'}, inplace = True)
    
    niv_value, niv_time = get_lastest_value(merged_df, 'ImbalanceQuantity(MAW)(B1780)')
    result_files = override(result_files, 'ImbalanceQuantity(MAW)(B1780)', niv_value, niv_time)    
    
    mid_value, mid_time = get_lastest_value(merged_df, 'marketIndexPrice(MID)')
    result_files = override(result_files, 'marketIndexPrice(MID)', mid_value, mid_time)
    
    ipa_value, ipa_time = get_lastest_value(merged_df, 'ImbalancePriceAmount(B1770)')
    result_files = override(result_files, 'ImbalancePriceAmount(B1770)', ipa_value, ipa_time)
    
    price_value, price_time = get_lastest_value(merged_df, 'price_act(DAA)')
    result_files = override(result_files, 'price_act(DAA)', price_value, price_time)
    
    
    result_files['local_datetime'] = pd.to_datetime(result_files["local_datetime"])
    
    result_files = day_ahead(result_files, df)
    result_files = intraday (result_files, df)
    
    ##### Delivery files to S3 
    split_file_ands_save_to_s3(result_files, dt.datetime.strptime(today, "%Y-%m-%d"))

    print('Testing successfully!')

################################# Initialize ########################################################
s3 = boto3.resource('s3')
my_bucket = 'niv-predictions'   
pred_folder = 'lgbm-prediction' 
bucket = s3.Bucket(my_bucket)
model_bucket = s3.Bucket('lgbm-model-storage')
resource = boto3.resource('s3')
read_path = "s3://scgc/data/merged"
write_path = "s3://" + my_bucket
prev_day = (dt.datetime.now() - timedelta(days=4)).strftime("%Y-%m-%d")
yesterday= (dt.datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
today    = dt.datetime.now().strftime("%Y-%m-%d")
tomorrow = (dt.datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")

runtime_sm_client = boto3.client("sagemaker-runtime")

# lambda_handler("", "")

In [None]:
############################################### Version 3.1 Aug6 ########################################################

import datetime 
from datetime import (datetime,
                      timedelta)
import time
from time import (strftime, 
                  perf_counter, 
                  gmtime)
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
from functools import partial
import gc
import os, sys
import json
import s3fs
import io
import boto3
import tarfile 
import datetime as dt
from io import BytesIO

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
import traceback

filesystem = s3fs.S3FileSystem()


# ============================================= Helper Functions ============================================= #
def save_data(
    df: pd.DataFrame,
    file_name: str,
    path: str,
    date_partition: str,
    partition_cols=None, partition_filename_cb=None, filesystem=filesystem
):
    """Write pandas DataFrame in parquet format to S3 bucket
    Args:
        df (pd.DataFrame): DataFrame to save
        file_name (str): Table name
        path (str): local or AWS S3 path to store the parquet files
        partition_cols (list, optional): Columns used to partition the parquet files. Defaults to None.
    """
    
    date = dt.datetime.strptime(date_partition, "%Y-%m-%d")
    folder = f'/year={date.year}/month={date.month}/day={date.day}'

    pq.write_to_dataset(
        pyarrow.Table.from_pandas(df),
        path + folder,
        filesystem=filesystem,
        partition_cols=partition_cols,
        partition_filename_cb=lambda x: f"{file_name}.parquet",
    )

def generate_dates(start_date: str, end_date: str):
    """Generates list of dates
    Args:
        start_date (str): Start date
        end_date (str): End date
    Returns:
        List: List of dates
    """
    sdate = dt.datetime.strptime(start_date, "%Y-%m-%d")
    edate = dt.datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)

    return [
        (sdate + timedelta(days=x)).strftime("%Y-%m-%d")
        for x in range((edate - sdate).days)
    ]
        
def read_parquet_tables(
    file_name: str,
    start_date: str,
    end_date: str,
    path: str,
) -> pd.DataFrame:
    """Read parquet file partitions
    Args:
        file_name (str): Table name
        start_date (str): starting date to clean the data (%Y-%m-%d)
        end_date (str): ending date to clean the data (%Y-%m-%d)
        path (str): local or AWS S3 path to read the parquet files
    Returns:
        pd.DataFrame: Datframe from parquet file
    """
    # convert date range to list of dates
    date_list = generate_dates(start_date, end_date)
    df = pd.DataFrame()
    for read_date in date_list:
        # convert date to integers for filters
        r_year = dt.datetime.strptime(read_date, "%Y-%m-%d").year
        r_month = dt.datetime.strptime(read_date, "%Y-%m-%d").month
        r_day = dt.datetime.strptime(read_date, "%Y-%m-%d").day

        try:
            data = (
                pq.ParquetDataset(
                    path
                    + f"/year={r_year}/month={r_month}/day={r_day}/{file_name}.parquet",
                    filesystem=filesystem,
                )
                .read_pandas()
                .to_pandas()
            )

        except:
            continue  
        df = pd.concat([df, data], ignore_index=True)

    return df

def truncate_to_dayahead_format(dataframe, date_time):
    """
    Function for post-processing prediction returned. Returns a dataframe of
    prediction that contains the predictions for end of today up to tomorrow:
        ie. from SP_47 TODAY -> SP_46 of TOMORROW inclusive
    Args:
        dataframe: pd.DataFrame containing local_datetime and NIVPredictions
        date_time: datetime chosen

    Returns:
        truncated pd.DataFrame
    """
    now = date_time
    tmr = date_time + timedelta(days=1)
    start_pred = dataframe[dataframe["local_datetime"].dt.day == now.day].iloc[-2].name
    end_pred = dataframe[dataframe["local_datetime"].dt.day == tmr.day].iloc[-3].name
    dataframe_ = dataframe.loc[start_pred:end_pred]  # use loc because we're using index name
    if dataframe_["NIVPredictions"].isna().sum():
        raise Exception("NIVPredictions has NaN after post-processing. "
                        "Consider raising the backshift_niv_by hyperparameter"
                        "during training")

    return dataframe_


def split_file_ands_save_to_s3(result: pd.DataFrame, datetime_val: dt.datetime):
    """
    Split dt to 3 file base on date then save them to s3 
    
    """
    t_today     = datetime_val
    t_yesterday = t_today - timedelta(days=1)
    t_tomorrow  = t_today + timedelta(days=1)
    
    prev_data = result[result['local_datetime'].dt.day == t_yesterday.day]
    save_data(
        df = prev_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = yesterday
    )

    now_data = result[result['local_datetime'].dt.day == t_today.day]
    save_data(
        df = now_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = today
    )

    next_data = result[result['local_datetime'].dt.day == t_tomorrow.day]
    save_data(
        df = next_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = tomorrow
    )


def update_actual_columns(
    result_file : pd.DataFrame,
    merged_file : pd.DataFrame
):
    columns_name = ['ImbalanceQuantity(MAW)(B1780)', 'ImbalancePriceAmount(B1770)', 
                    'marketIndexPrice(MID)', 'price_act(DAA)']
    # print (merged_file['marketIndexPrice(MID)'].to_list())
    result_file[columns_name] = merged_file[columns_name]
    return result_file

    
def get_lastest_value(dframe, column_name):
    tmp = dframe[["local_datetime", column_name]].dropna(how='any').iloc[-1:]  
    val = tmp.iloc[0][column_name]
    stime = tmp.iloc[0]['local_datetime']
    return val, stime


# ============================================= End of Helper Functions ============================================= #

# ============================================= PreProcess Functions ============================================= #

def select_best_features(dataframe):
    """Remove redundant/not useful columns from dataframe"""
    interconnector_cols = [c for c in dataframe.columns if "int" in c and "FUEL" in c]
    dataframe["Total_Int(FUELHH)"] = dataframe[interconnector_cols].apply(sum, axis=1)
    dataframe["Total_Fossil(FUELHH)"] = (dataframe["coal(FUELHH)"] +
                                         dataframe["ocgt(FUELHH)"] +
                                         dataframe["ccgt(FUELHH)"] +
                                         dataframe["oil(FUELHH)"])
    dataframe["Total_Other(FUELHH)"] = (dataframe["biomass(FUELHH)"] +
                                        dataframe["other(FUELHH)"] +
                                        dataframe["nuclear(FUELHH)"])
    dataframe["Total_Hydro(FUELHH)"] = dataframe["npshyd(FUELHH)"] + dataframe["ps(FUELHH)"]
    fuelhh_drop_cols = [c for c in dataframe.columns if "(FUELHH" in c and "total" not in c.lower()]
    # elexon generation cols
    ele_gen_drop_cols = (["Wind_Offshore_fcst(B1440)", "Wind_Onshore_fcst(B1440)"] +
                         [c for c in dataframe.columns if "windforfuelhh" in c.lower()] +  # WINDFORFUELHH
                         [c for c in dataframe.columns if "(B16" in c] +  # B16xx columns
                         ["Total_Load_fcst(B0620)", "Total_Load(B0610)"])  # + act_ele_gen_drop_cols
    # catalyst wind cols
    wind_pc_cols = [c for c in dataframe.columns if "pc" in c.lower()]  # the actual is very corr. with other winds
    sn_wind_cols = [c for c in dataframe.columns if "sn" in c.lower()]
    cat_wind_drop_cols = [c for c in dataframe.columns if "(Wind_" in c and "wind_act(Wind_unrestricted)" != c]
    cat_wind_drop_cols += wind_pc_cols + sn_wind_cols
    # drop columns with redundant information
    cols_to_remove = [
        "niv_act(Balancing_NIV_fcst_3hr)",  # can be used in post process
        "hist_fcst(Balancing_NIV_fcst_3hr)",  # bad feature, not to be used even in post process
        "niv(Balancing_NIV)",  # can be used in post process
        "indicativeNetImbalanceVolume(DERSYSDATA)",
        "systemSellPrice(DERSYSDATA)",
        "totalSystemAdjustmentSellVolume(DERSYSDATA)",
        "totalSystemAdjustmentBuyVolume(DERSYSDATA)",
        "non_bm_stor(Balancing_detailed)",
        "DAI(MELIMBALNGC)",
        "DAM(MELIMBALNGC)",
        "TSDF(SYSDEM)",
        "ITSDO(SYSDEM)",
        "temperature(TEMP)",
        "ImbalancePriceAmount(B1770)",
        "marketIndexPrice(MID)",
        "marketIndexVolume(MID)",
        "DATF(FORDAYDEM)",
        "DAID(FORDAYDEM)",
        "DAIG(FORDAYDEM)",
        "DANF(FORDAYDEM)"
    ]
    # day ahead auction cols
    daa_gwstep_cols = [c for c in dataframe.columns if "daa_gwstep" in c.lower()]
    daa_windrisk_cols = [c for c in dataframe.columns if "windrisk" in c.lower()]  # daauction/range/wind_risk @catalyst
    daa_xgas_cols = [c for c in dataframe.columns if "daa_xgas" in c.lower()]  # xgas @Catalyst too high corr with gas
    daa_gas_cols = [c for c in dataframe.columns if "daa_gas" in c.lower()]  # @Catalyst
    daa_drop_cols = [
        "price_act(DAA)",
        "price_fcst(DAA)",
        "price_weighted(DAA)",
        "hist_fcst(DAA_1D_8AM)",
        "hist_fcst(DAA_1D_2PM)",
    ]
    daa_drop_cols += daa_gwstep_cols + daa_windrisk_cols + daa_xgas_cols + daa_gas_cols
    # drop the columns listed and return
    cols_to_remove += (  # act_ele_gen_drop_cols +
            fuelhh_drop_cols +
            ele_gen_drop_cols +
            cat_wind_drop_cols +
            daa_drop_cols
    )
    return dataframe.drop(cols_to_remove, axis=1)


def prepare_date_features(dataframe):
    """transform date features into tabular format"""
    # cyclic encode the datetime information; sine because the cycle should start from 0
    dataframe["sp_sin"] = np.sin(dataframe["SettlementPeriod"] * (2 * np.pi / 48))
    dataframe["month_sin"] = np.sin(dataframe["local_datetime"].dt.month * (2 * np.pi / 12))
    dataframe["week_sin"] = np.sin((dataframe["local_datetime"].dt.weekday + 1) * (2 * np.pi / 7))
    # drop unparsed date column; note that SettlementDate is kept for later groupbys
#     dataframe.drop(["local_datetime",  # information is already encoded
#         "SettlementPeriod"], axis=1, inplace=True)
    return dataframe


def interpolate_outliers(dataframe, cutoff=2.5):
    """Replaces the outlier value with the previous and next value's average using the column's z statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        for idx in range(len(col)):
            if np.abs(col[idx] - col_mean) > z_cutoff:
                try:
                    dataframe.loc[idx, col_name] = np.mean([col[idx - 1], col[idx + 1]])
                except IndexError:
                    # this only happens at either end of the input data, so we leave the value as is
                    # it will be processed in the clip_outliers function if the cutoff there is less than or
                    # equal to the cutoff here
                    pass
    return dataframe

def clip_outliers(dataframe, cutoff=2):
    """Clip outlier using z-statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        lower_bound = col_mean - z_cutoff
        upper_bound = col_mean + z_cutoff
        for idx in range(len(col)):
            row = col[idx]  # save to variable to avoid re-accessing
            if np.abs(row - col_mean) > z_cutoff:
                dataframe.loc[idx, col_name] = np.clip(row, lower_bound, upper_bound)
    return dataframe


def compute_ewm_features(dataframe, window=8, alpha=1 - np.log(2) / 4):
    """Computes the exponentially moving weighted average features"""
    weights = list(reversed([(1 - alpha) ** n for n in range(window)]))
    ewma = partial(np.average, weights=weights)
    ewm_cols = [c for c in dataframe.columns if "Imbalance" not in c and  # exclude target variable
                "(" in c and  # this is to exclude time features
                "FUELHH" not in c and  # exclude FUELHH features
                "cash_out" not in c]  # exclude cash_out(Balancing_detailed) feature
    for c in ewm_cols:
        # compute daily ewm, parametrized by alpha
        dataframe[f"ewm_mean_{c}"] = dataframe[c].rolling(window).apply(ewma)
    return dataframe


def compute_shifted_features(dataframe):
    """Computes the features that can be shifted"""
    # compute  backshifted features
    bshift_2sp_cols = [
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bshift_2sp_cols:
        dataframe[f"bshift_2sp_{c}"] = dataframe[c].shift(-2)
    dataframe["bshift_4sp_boas(Balancing_detailed)"] = dataframe["boas(Balancing_detailed)"].shift(-4)
    # compute back-differenced features
    bdiff_cols = [
        "Generation_fcst(B1430)",
        "boas(Balancing_detailed)",
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bdiff_cols:
        dataframe[f"bdiff_1sp_{c}"] = dataframe[c].diff(-1)
    # compute forward shifted feature; other fshifted features based on other cols did not perform well
    fshift_cols = [
        "boas(Balancing_detailed)",
    ]
    for c in fshift_cols:
        dataframe[f"fshift_1hr_{c}"] = dataframe[c].shift(2)
    return dataframe


def preprocess_dataframe(df : pd.DataFrame):
    t0 = time.perf_counter()
    df = select_best_features(df)  # remove columns based on previous results
    df = prepare_date_features(df)  # parse date into cyclic features 
    df = interpolate_outliers(df, cutoff=2.5)  # interpolate using z statistics (avg next/before obs)
    df = clip_outliers(df, cutoff=2)  # clip outliers using z statistics
    df = compute_ewm_features(df)  # compute exponentially weighted moving average features
    df = compute_shifted_features(df)  # compute features based on shifting and differencing
    # drop some columns that are not performant or are no longer needed
    df = df.drop(["SettlementDate",
                  "wind_act(Wind_unrestricted)",
                  "totalSystemAcceptedOfferVolume(DERSYSDATA)",
                  "totalSystemAcceptedBidVolume(DERSYSDATA)",
                  "Generation_fcst(B1430)",
                  "intraday(Balancing_detailed)",
                  "Solar_fcst(B1440)",
                  "__index_level_0__"], axis=1)
    print("Time taken to preprocess features in seconds:", time.perf_counter() - t0)
    # print(f"Shape of the dataframe: {df.shape}")
    df = df.dropna()
    # df = df.drop("ImbalanceQuantity(MAW)(B1780)", axis=1)
    
    return df


# ============================================= End of PreProcess Functions ============================================= #

# ============================================= Support Main Functions ============================================= #

list_cols = ['local_datetime', 'SettlementPeriod' , 'niv_predicted_1sp', 'niv_predicted_2sp',
             'niv_predicted_3sp', 'niv_predicted_4sp', 'niv_predicted_5sp', 'niv_predicted_6sp',
             'niv_predicted_7sp', 'niv_predicted_8sp', 'dayahead_morning', 'dayahead_afternoon', 
             'ImbalanceQuantity(MAW)(B1780)', 'ImbalancePriceAmount(B1770)', 'marketIndexPrice(MID)', 'price_act(DAA)']

def check_file_is_exists(bucket_name, write_path, path, day2write, cols=list_cols):
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    start_date = dt.datetime.strptime(day2write, "%Y-%m-%d")
    end_date = dt.datetime.strptime(day2write, "%Y-%m-%d") + timedelta(days=1)
    
    prefix_path = path + f"/year={start_date.year}/month={start_date.month}/day={start_date.day}/"
    file_list = list(bucket.objects.filter(Prefix=prefix_path))
    if not len(file_list):
        
        print("No file in ", prefix_path)
        # Create new Template
        df = pd.DataFrame(index=pd.date_range(start=start_date, end=end_date, freq="30T"), columns=cols[1:])
        df = df.head(48)
        df.index.name = cols[0]
        df['SettlementPeriod'] = range(0, len(df))
        df['SettlementPeriod'] = df['SettlementPeriod'].apply(lambda x: x % 48 + 1)
        df = df.reset_index()
        print(f"Creating empty file at ", day2write)
        
        save_data(df = df,
                  file_name = 'prediction',
                  path = os.path.join(write_path, path),
                  date_partition = day2write
        )
        print(f"Created empty file at ", day2write)
    else: 
        print('File Exits')


def override(df: pd.DataFrame, column_name, value, stime):
    inx = df.index[(df['local_datetime'] > stime - timedelta(minutes=10))
                   &(df['local_datetime'] < stime + timedelta(minutes=10))].values[0] 
    df.at[inx, column_name] = value
    # print (f"Lasted {column_name} Index in Result file : ", inx, ', at ', stime)
    return df


def day_ahead(
    result_files : pd.DataFrame,
    processed_df : pd.DataFrame,
    
):
    """ 
    Morning prediction deadline: 8:45    you can run at 8:30 (and 7:30)
    Afternoon prediction deadline: 14:30 you can run at 14:15 (and 13:15)
    So I set it run at 8:10 (or 7:10), 14:10 (or 13:10) should meet the deadline
    """
    if (datetime.now().hour in [7, 8, 13, 14]) & (datetime.now().minute < 15):
        # split target out of payload input
        dayahead_payload = processed_df.drop([
            "ImbalanceQuantity(MAW)(B1780)",
            "local_datetime",
            "SettlementPeriod",
        ], axis=1).to_numpy()

        half_day = 'morning' if datetime.now().hour < 12 else 'afternoon'
        print ("length of preprocess_dataframe: ", processed_df.shape)

        ##### invoke model endpoint for getting predictions #####
        ### Get the newest model file aka "TargetModel"
        list_files = []
        for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
            if f"lgbm-regressor-dayahead-{half_day}" in object_summary.key:
                list_files.append(object_summary.key)
        assert len(list_files) == 1, "check s3 should have one file"
        target_model = list_files[0].split("endpoint/")[-1]
        print("Lastest Day-Ahead target Model :", target_model)

        ### Invoke DAY-AHEAD Endpoint & Get Result
        runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
        print ("Invoking day-ahead endpoint with Payload data")
        response = runtime_sm_client.invoke_endpoint(
            EndpointName="lgbm-regressor-endpoint",
            ContentType ="application/JSON",
            TargetModel = target_model,
            Body=json.dumps(dayahead_payload.tolist()),
        )
        dayahead_arr = json.loads(response["Body"].read())
        print("Success Invoking Endpoint!")


        # store the prediction in a new df so that we can do some post processing
        # this dataframe only extends up to the last row NOT containing NaN, which is
        tmp_result_df = processed_df.copy()[["local_datetime", "SettlementPeriod"]]
        tmp_result_df["NIVPredictions"] = dayahead_arr
        tmp_result_df["local_datetime"] = pd.to_datetime(tmp_result_df["local_datetime"])

        # make a dummy df so that we can shift the predictions
        dummy_df = pd.DataFrame()  # .reset_index(drop=True)
        dummy_df["local_datetime"] = pd.date_range(
            processed_df["local_datetime"].iloc[-1] + timedelta(minutes=30),
            processed_df["local_datetime"].iloc[-1] + timedelta(days=2),  # timedelta 2 days to make sure it works
            freq="30T"
        )
        dummy_df["NIVPredictions"] = np.nan
        dummy_df["SettlementPeriod"] = range(processed_df["SettlementPeriod"].iloc[-1],
                                             processed_df["SettlementPeriod"].iloc[-1] + len(dummy_df))
        dummy_df["SettlementPeriod"] = dummy_df["SettlementPeriod"] % 48 + 1

        # concatenate to results dataframe and forward shift NIV predictions the same amount that was backshifted
        tmp_result_df = pd.concat([tmp_result_df, dummy_df], axis=0).reset_index(drop=True)
        tmp_result_df["NIVPredictions"] = tmp_result_df["NIVPredictions"].shift(96)
        tmp_result_df = tmp_result_df.dropna(subset=["NIVPredictions"])
        print(tmp_result_df.shape)
        # truncate the predictions to the datetime range that we want, which is

        tmp_result_df = truncate_to_dayahead_format(tmp_result_df, datetime.now())
        dayahead_arr = tmp_result_df["NIVPredictions"].tolist()                             

        index = 48 + 46 ### SP = 47 so index = 46 & add 48 from whole yesterday
        print('Length of dayahead array: ', len(dayahead_arr))
        for i in range(len(dayahead_arr)):
            result_files.at[index, 'dayahead_{}'.format(half_day)] = dayahead_arr[i]
            index += 1

        return result_files
    else: 
        return result_files


def intraday(
    result_files : pd.DataFrame,
    processed_df : pd.DataFrame,
    
):
    # split target out of payload input
    intraday_payload = processed_df.drop([
        "ImbalanceQuantity(MAW)(B1780)",
        "local_datetime",
        "SettlementPeriod",
    ], axis=1).to_numpy()
    
    print ("length of preprocess_dataframe: ", processed_df.shape)
    
    ##### Invoke model endpoint for getting predictions #####
    ### Get the newest model file aka "TargetModel"
    list_files = []
    for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
        if f"lgbm-regressor-intraday" in object_summary.key:
            list_files.append(object_summary.key)
    assert len(list_files) == 1, "check s3 should have one file"
    target_model = list_files[0].split("endpoint/")[-1]
    print("Lastest Intraday target Model :", target_model)
    
    ### Invoke INTRADAY Endpoint & Get Result
    runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
    print ("Invoking endpoint with Intraday Payload")
    response = runtime_sm_client.invoke_endpoint(
        EndpointName="lgbm-regressor-endpoint",
        ContentType ="application/JSON",
        TargetModel = target_model,
        Body=json.dumps(intraday_payload.tolist()),
    )
    intraday_arr = json.loads(response["Body"].read())
    print("Success Invoking Intraday Endpoint!")
    
    
    # store the prediction in a new df so that we can do some post processing
    # this dataframe only extends up to the last row NOT containing NaN, which is
    tmp_result_df = processed_df.copy()[["local_datetime", "SettlementPeriod"]]
    tmp_result_df["NIVPredictions"] = intraday_arr
    tmp_result_df["local_datetime"] = pd.to_datetime(tmp_result_df["local_datetime"])

    # make a dummy df so that we can shift the predictions
    dummy_df = pd.DataFrame()  # .reset_index(drop=True)
    dummy_df["local_datetime"] = pd.date_range(
        processed_df["local_datetime"].iloc[-1] + timedelta(minutes=30),
        processed_df["local_datetime"].iloc[-1] + timedelta(days=2),  # timedelta 2 days to make sure it works
        freq="30T"
    )
    dummy_df["NIVPredictions"] = np.nan
    dummy_df["SettlementPeriod"] = range(processed_df["SettlementPeriod"].iloc[-1],
                                         processed_df["SettlementPeriod"].iloc[-1] + len(dummy_df))
    dummy_df["SettlementPeriod"] = dummy_df["SettlementPeriod"] % 48 + 1

    # concatenate to results dataframe and forward shift NIV predictions the same amount that was backshifted
    tmp_result_df = pd.concat([tmp_result_df, dummy_df], axis=0).reset_index(drop=True)
    tmp_result_df["NIVPredictions"] = tmp_result_df["NIVPredictions"].shift(48)
    tmp_result_df = tmp_result_df.dropna(subset=["NIVPredictions"])
    print("Shape of Result: ", tmp_result_df.shape)
    
    #### Get Lastest NIV Time to identify where to fill the result
    niv_val, niv_time = get_lastest_value(processed_df, "ImbalanceQuantity(MAW)(B1780)")
    print("Lastest NIV : ", niv_val, niv_time)
    
    tmp_result_df = tmp_result_df[(tmp_result_df['local_datetime'] > niv_time) & 
                                  (tmp_result_df['local_datetime'] < niv_time + timedelta(hours = 4.5))]
    print(tmp_result_df)
    intraday_arr = tmp_result_df["NIVPredictions"].tolist()   
    print('Length of intraday array: ', len(intraday_arr))
    
    # index = result_files.index[result_files['SettlementTime'].dt.datetime == niv_time].values[0]
    index = result_files.index[(result_files['local_datetime'] > niv_time + timedelta(minutes=20))
                               &(result_files['local_datetime'] < niv_time + timedelta(minutes=40))].values[0]
    
    print ('Intraday index = ', index)
    # Insert Value
    for i in range(len(intraday_arr)):
        index += 1
        result_files.at[index, 'niv_predicted_{}sp'.format(i + 1)] = intraday_arr[i]
        print(index, result_files.at[index, 'local_datetime'], intraday_arr[i], '\n')

    return result_files


def lambda_handler(event, context):
    
    merged_df = read_parquet_tables(
        file_name="merged",
        start_date = prev_day,
        end_date = tomorrow,
        path = read_path,
    )
    print ("length of Merged dataframe: ", merged_df.shape)
    merged_df["local_datetime"] = pd.to_datetime(merged_df["local_datetime"])
    df = preprocess_dataframe(merged_df)
    
    ### Check result exist: if not, create a new template
    check_file_is_exists(my_bucket, write_path, pred_folder, today, cols=list_cols)
    check_file_is_exists(my_bucket, write_path, pred_folder, tomorrow, cols=list_cols)
    
    result_files = read_parquet_tables(
        file_name= "prediction", 
        start_date = yesterday,
        end_date = tomorrow,
        path = os.path.join(write_path, pred_folder),
    )
    print("Shape of Result_file ", result_files.shape)
    if 'SettlementTime' in result_files.columns:
        result_files.rename(columns = {'SettlementTime':'local_datetime', 'SP':'SettlementPeriod'}, inplace = True) 
    
    result_files['local_datetime'] = pd.to_datetime(result_files["local_datetime"])
    
    result_files = day_ahead(result_files, df)
    result_files = intraday (result_files, df)
    
    ### Update actual values in order to calculate PnL in the future
    merged_file = merged_df[merged_df['local_datetime'] >= dt.datetime.strptime(yesterday, "%Y-%m-%d")]
    print ("# Row of actual data before joining to result files", merged_file.shape[0])
    result_files = update_actual_columns(result_files, merged_file.reset_index(drop = True))
    
    ##### Delivery files to S3 
    split_file_ands_save_to_s3(result_files, dt.datetime.strptime(today, "%Y-%m-%d"))

    print('Testing successfully!')

################################# Initialize ########################################################
s3 = boto3.resource('s3')
my_bucket = 'niv-predictions'   
pred_folder = 'lgbm-prediction' 
bucket = s3.Bucket(my_bucket)
model_bucket = s3.Bucket('lgbm-model-storage')
resource = boto3.resource('s3')
read_path = "s3://scgc/data/merged"
write_path = "s3://" + my_bucket
prev_day = (dt.datetime.now() - timedelta(days=4)).strftime("%Y-%m-%d")
yesterday= (dt.datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
today    = dt.datetime.now().strftime("%Y-%m-%d")
tomorrow = (dt.datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")

runtime_sm_client = boto3.client("sagemaker-runtime")

lambda_handler("", "")

In [2]:
############################################### Version 3.2 Aug19 ########################################################
### Edit window size of compute_ewm_features function

import datetime 
from datetime import (datetime,
                      timedelta)
import time
from time import (strftime, 
                  perf_counter, 
                  gmtime)
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
from functools import partial
import gc
import os, sys
import json
import s3fs
import io
import boto3
import tarfile 
import datetime as dt
from io import BytesIO

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
import traceback

filesystem = s3fs.S3FileSystem()


# ============================================= Helper Functions ============================================= #
def save_data(
    df: pd.DataFrame,
    file_name: str,
    path: str,
    date_partition: str,
    partition_cols=None, partition_filename_cb=None, filesystem=filesystem
):
    """Write pandas DataFrame in parquet format to S3 bucket
    Args:
        df (pd.DataFrame): DataFrame to save
        file_name (str): Table name
        path (str): local or AWS S3 path to store the parquet files
        partition_cols (list, optional): Columns used to partition the parquet files. Defaults to None.
    """
    
    date = dt.datetime.strptime(date_partition, "%Y-%m-%d")
    folder = f'/year={date.year}/month={date.month}/day={date.day}'

    pq.write_to_dataset(
        pyarrow.Table.from_pandas(df),
        path + folder,
        filesystem=filesystem,
        partition_cols=partition_cols,
        partition_filename_cb=lambda x: f"{file_name}.parquet",
    )

def generate_dates(start_date: str, end_date: str):
    """Generates list of dates
    Args:
        start_date (str): Start date
        end_date (str): End date
    Returns:
        List: List of dates
    """
    sdate = dt.datetime.strptime(start_date, "%Y-%m-%d")
    edate = dt.datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)

    return [
        (sdate + timedelta(days=x)).strftime("%Y-%m-%d")
        for x in range((edate - sdate).days)
    ]
        
def read_parquet_tables(
    file_name: str,
    start_date: str,
    end_date: str,
    path: str,
) -> pd.DataFrame:
    """Read parquet file partitions
    Args:
        file_name (str): Table name
        start_date (str): starting date to clean the data (%Y-%m-%d)
        end_date (str): ending date to clean the data (%Y-%m-%d)
        path (str): local or AWS S3 path to read the parquet files
    Returns:
        pd.DataFrame: Datframe from parquet file
    """
    # convert date range to list of dates
    date_list = generate_dates(start_date, end_date)
    df = pd.DataFrame()
    for read_date in date_list:
        # convert date to integers for filters
        r_year = dt.datetime.strptime(read_date, "%Y-%m-%d").year
        r_month = dt.datetime.strptime(read_date, "%Y-%m-%d").month
        r_day = dt.datetime.strptime(read_date, "%Y-%m-%d").day

        try:
            data = (
                pq.ParquetDataset(
                    path
                    + f"/year={r_year}/month={r_month}/day={r_day}/{file_name}.parquet",
                    filesystem=filesystem,
                )
                .read_pandas()
                .to_pandas()
            )

        except:
            continue  
        df = pd.concat([df, data], ignore_index=True)

    return df

def truncate_to_dayahead_format(dataframe, date_time):
    """
    Function for post-processing prediction returned. Returns a dataframe of
    prediction that contains the predictions for end of today up to tomorrow:
        ie. from SP_47 TODAY -> SP_46 of TOMORROW inclusive
    Args:
        dataframe: pd.DataFrame containing local_datetime and NIVPredictions
        date_time: datetime chosen

    Returns:
        truncated pd.DataFrame
    """
    now = date_time
    tmr = date_time + timedelta(days=1)
    start_pred = dataframe[dataframe["local_datetime"].dt.day == now.day].iloc[-2].name
    end_pred = dataframe[dataframe["local_datetime"].dt.day == tmr.day].iloc[-3].name
    dataframe_ = dataframe.loc[start_pred:end_pred]  # use loc because we're using index name
    if dataframe_["NIVPredictions"].isna().sum():
        raise Exception("NIVPredictions has NaN after post-processing. "
                        "Consider raising the backshift_niv_by hyperparameter"
                        "during training")

    return dataframe_


def split_file_ands_save_to_s3(result: pd.DataFrame, datetime_val: dt.datetime):
    """
    Split dt to 3 file base on date then save them to s3 
    
    """
    t_today     = datetime_val
    t_yesterday = t_today - timedelta(days=1)
    t_tomorrow  = t_today + timedelta(days=1)
    
    prev_data = result[result['local_datetime'].dt.day == t_yesterday.day]
    save_data(
        df = prev_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = yesterday
    )

    now_data = result[result['local_datetime'].dt.day == t_today.day]
    save_data(
        df = now_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = today
    )

    next_data = result[result['local_datetime'].dt.day == t_tomorrow.day]
    save_data(
        df = next_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = tomorrow
    )


def update_actual_columns(
    result_file : pd.DataFrame,
    merged_file : pd.DataFrame
):
    columns_name = ['ImbalanceQuantity(MAW)(B1780)', 'ImbalancePriceAmount(B1770)', 
                    'marketIndexPrice(MID)', 'price_act(DAA)']
    # print (merged_file['marketIndexPrice(MID)'].to_list())
    result_file[columns_name] = merged_file[columns_name]
    return result_file

    
def get_lastest_value(dframe, column_name):
    tmp = dframe[["local_datetime", column_name]].dropna(how='any').iloc[-1:]  
    val = tmp.iloc[0][column_name]
    stime = tmp.iloc[0]['local_datetime']
    return val, stime


# ============================================= End of Helper Functions ============================================= #

# ============================================= PreProcess Functions ============================================= #

def select_best_features(dataframe):
    """Remove redundant/not useful columns from dataframe"""
    interconnector_cols = [c for c in dataframe.columns if "int" in c and "FUEL" in c]
    dataframe["Total_Int(FUELHH)"] = dataframe[interconnector_cols].apply(sum, axis=1)
    dataframe["Total_Fossil(FUELHH)"] = (dataframe["coal(FUELHH)"] +
                                         dataframe["ocgt(FUELHH)"] +
                                         dataframe["ccgt(FUELHH)"] +
                                         dataframe["oil(FUELHH)"])
    dataframe["Total_Other(FUELHH)"] = (dataframe["biomass(FUELHH)"] +
                                        dataframe["other(FUELHH)"] +
                                        dataframe["nuclear(FUELHH)"])
    dataframe["Total_Hydro(FUELHH)"] = dataframe["npshyd(FUELHH)"] + dataframe["ps(FUELHH)"]
    fuelhh_drop_cols = [c for c in dataframe.columns if "(FUELHH" in c and "total" not in c.lower()]
    # elexon generation cols
    ele_gen_drop_cols = (["Wind_Offshore_fcst(B1440)", "Wind_Onshore_fcst(B1440)"] +
                         [c for c in dataframe.columns if "windforfuelhh" in c.lower()] +  # WINDFORFUELHH
                         [c for c in dataframe.columns if "(B16" in c] +  # B16xx columns
                         ["Total_Load_fcst(B0620)", "Total_Load(B0610)"])  # + act_ele_gen_drop_cols
    # catalyst wind cols
    wind_pc_cols = [c for c in dataframe.columns if "pc" in c.lower()]  # the actual is very corr. with other winds
    sn_wind_cols = [c for c in dataframe.columns if "sn" in c.lower()]
    cat_wind_drop_cols = [c for c in dataframe.columns if "(Wind_" in c and "wind_act(Wind_unrestricted)" != c]
    cat_wind_drop_cols += wind_pc_cols + sn_wind_cols
    # drop columns with redundant information
    cols_to_remove = [
        "niv_act(Balancing_NIV_fcst_3hr)",  # can be used in post process
        "hist_fcst(Balancing_NIV_fcst_3hr)",  # bad feature, not to be used even in post process
        "niv(Balancing_NIV)",  # can be used in post process
        "indicativeNetImbalanceVolume(DERSYSDATA)",
        "systemSellPrice(DERSYSDATA)",
        "totalSystemAdjustmentSellVolume(DERSYSDATA)",
        "totalSystemAdjustmentBuyVolume(DERSYSDATA)",
        "non_bm_stor(Balancing_detailed)",
        "DAI(MELIMBALNGC)",
        "DAM(MELIMBALNGC)",
        "TSDF(SYSDEM)",
        "ITSDO(SYSDEM)",
        "temperature(TEMP)",
        "ImbalancePriceAmount(B1770)",
        "marketIndexPrice(MID)",
        "marketIndexVolume(MID)",
        "DATF(FORDAYDEM)",
        "DAID(FORDAYDEM)",
        "DAIG(FORDAYDEM)",
        "DANF(FORDAYDEM)"
    ]
    # day ahead auction cols
    daa_gwstep_cols = [c for c in dataframe.columns if "daa_gwstep" in c.lower()]
    daa_windrisk_cols = [c for c in dataframe.columns if "windrisk" in c.lower()]  # daauction/range/wind_risk @catalyst
    daa_xgas_cols = [c for c in dataframe.columns if "daa_xgas" in c.lower()]  # xgas @Catalyst too high corr with gas
    daa_gas_cols = [c for c in dataframe.columns if "daa_gas" in c.lower()]  # @Catalyst
    daa_drop_cols = [
        "price_act(DAA)",
        "price_fcst(DAA)",
        "price_weighted(DAA)",
        "hist_fcst(DAA_1D_8AM)",
        "hist_fcst(DAA_1D_2PM)",
    ]
    daa_drop_cols += daa_gwstep_cols + daa_windrisk_cols + daa_xgas_cols + daa_gas_cols
    # drop the columns listed and return
    cols_to_remove += (  # act_ele_gen_drop_cols +
            fuelhh_drop_cols +
            ele_gen_drop_cols +
            cat_wind_drop_cols +
            daa_drop_cols
    )
    return dataframe.drop(cols_to_remove, axis=1)


def prepare_date_features(dataframe):
    """transform date features into tabular format"""
    # cyclic encode the datetime information; sine because the cycle should start from 0
    dataframe["sp_sin"] = np.sin(dataframe["SettlementPeriod"] * (2 * np.pi / 48))
    dataframe["month_sin"] = np.sin(dataframe["local_datetime"].dt.month * (2 * np.pi / 12))
    dataframe["week_sin"] = np.sin((dataframe["local_datetime"].dt.weekday + 1) * (2 * np.pi / 7))
    # drop unparsed date column; note that SettlementDate is kept for later groupbys
#     dataframe.drop(["local_datetime",  # information is already encoded
#         "SettlementPeriod"], axis=1, inplace=True)
    return dataframe


def interpolate_outliers(dataframe, cutoff=2.5):
    """Replaces the outlier value with the previous and next value's average using the column's z statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        for idx in range(len(col)):
            if np.abs(col[idx] - col_mean) > z_cutoff:
                try:
                    dataframe.loc[idx, col_name] = np.mean([col[idx - 1], col[idx + 1]])
                except IndexError:
                    # this only happens at either end of the input data, so we leave the value as is
                    # it will be processed in the clip_outliers function if the cutoff there is less than or
                    # equal to the cutoff here
                    pass
    return dataframe

def clip_outliers(dataframe, cutoff=2):
    """Clip outlier using z-statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        lower_bound = col_mean - z_cutoff
        upper_bound = col_mean + z_cutoff
        for idx in range(len(col)):
            row = col[idx]  # save to variable to avoid re-accessing
            if np.abs(row - col_mean) > z_cutoff:
                dataframe.loc[idx, col_name] = np.clip(row, lower_bound, upper_bound)
    return dataframe


def compute_ewm_features(dataframe, window=8, alpha=1 - np.log(2) / 4):
    """Computes the exponentially moving weighted average features"""
    weights = list(reversed([(1 - alpha) ** n for n in range(window)]))
    ewma = partial(np.average, weights=weights)
    ewm_cols = [c for c in dataframe.columns if "Imbalance" not in c and  # exclude target variable
                "(" in c and  # this is to exclude time features
                "FUELHH" not in c and  # exclude FUELHH features
                "cash_out" not in c]  # exclude cash_out(Balancing_detailed) feature
    for c in ewm_cols:
        # compute daily ewm, parametrized by alpha
        dataframe[f"ewm_mean_{c}"] = dataframe[c].rolling(window).apply(ewma)
    return dataframe


def compute_shifted_features(dataframe):
    """Computes the features that can be shifted"""
    # compute  backshifted features
    bshift_2sp_cols = [
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bshift_2sp_cols:
        dataframe[f"bshift_2sp_{c}"] = dataframe[c].shift(-2)
    dataframe["bshift_4sp_boas(Balancing_detailed)"] = dataframe["boas(Balancing_detailed)"].shift(-4)
    # compute back-differenced features
    bdiff_cols = [
        "Generation_fcst(B1430)",
        "boas(Balancing_detailed)",
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bdiff_cols:
        dataframe[f"bdiff_1sp_{c}"] = dataframe[c].diff(-1)
    # compute forward shifted feature; other fshifted features based on other cols did not perform well
    fshift_cols = [
        "boas(Balancing_detailed)",
    ]
    for c in fshift_cols:
        dataframe[f"fshift_1hr_{c}"] = dataframe[c].shift(2)
    return dataframe


def preprocess_dataframe(df : pd.DataFrame):

    t0 = time.perf_counter()
    df = select_best_features(df)  # remove columns based on previous experiment results
    df = prepare_date_features(df)  # parse date into cyclic features
    df = interpolate_outliers(df, cutoff=2.5)  # interpolate using z statistics (avg next/before obs)
    df = clip_outliers(df, cutoff=2)  # clip outliers using z statistics
    df = compute_ewm_features(df, window=4)  # compute exponentially weighted moving average features
    df = compute_shifted_features(df)  # compute features based on shifting and differencing
    
    # drop some columns that are not performant or are no longer needed
    df = df.drop(["SettlementDate",
                  "wind_act(Wind_unrestricted)",
                  "totalSystemAcceptedOfferVolume(DERSYSDATA)",
                  "totalSystemAcceptedBidVolume(DERSYSDATA)",
                  "Generation_fcst(B1430)",
                  "intraday(Balancing_detailed)",
                  "Solar_fcst(B1440)",
                  "__index_level_0__"], axis=1)
    print("Time taken to preprocess features in seconds:", time.perf_counter() - t0)
    df = df.dropna().reset_index(drop=True)
    # df = df.drop("ImbalanceQuantity(MAW)(B1780)", axis=1)
    
    return df


# ============================================= End of PreProcess Functions ============================================= #

# ============================================= Support Main Functions ============================================= #

list_cols = ['local_datetime', 'SettlementPeriod' , 'niv_predicted_1sp', 'niv_predicted_2sp',
             'niv_predicted_3sp', 'niv_predicted_4sp', 'niv_predicted_5sp', 'niv_predicted_6sp',
             'niv_predicted_7sp', 'niv_predicted_8sp', 'dayahead_morning', 'dayahead_afternoon', 
             'ImbalanceQuantity(MAW)(B1780)', 'ImbalancePriceAmount(B1770)', 'marketIndexPrice(MID)', 'price_act(DAA)']

def check_file_is_exists(bucket_name, write_path, path, day2write, cols=list_cols):
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    start_date = dt.datetime.strptime(day2write, "%Y-%m-%d")
    end_date = dt.datetime.strptime(day2write, "%Y-%m-%d") + timedelta(days=1)
    
    prefix_path = path + f"/year={start_date.year}/month={start_date.month}/day={start_date.day}/"
    file_list = list(bucket.objects.filter(Prefix=prefix_path))
    if not len(file_list):
        
        print("No file in ", prefix_path)
        # Create new Template
        df = pd.DataFrame(index=pd.date_range(start=start_date, end=end_date, freq="30T"), columns=cols[1:])
        df = df.head(48)
        df.index.name = cols[0]
        df['SettlementPeriod'] = range(0, len(df))
        df['SettlementPeriod'] = df['SettlementPeriod'].apply(lambda x: x % 48 + 1)
        df = df.reset_index()
        print(f"Creating empty file at ", day2write)
        
        save_data(df = df,
                  file_name = 'prediction',
                  path = os.path.join(write_path, path),
                  date_partition = day2write
        )
        print(f"Created empty file at ", day2write)
    else: 
        print('File Exits')


def override(df: pd.DataFrame, column_name, value, stime):
    inx = df.index[(df['local_datetime'] > stime - timedelta(minutes=10))
                   &(df['local_datetime'] < stime + timedelta(minutes=10))].values[0] 
    df.at[inx, column_name] = value
    # print (f"Lasted {column_name} Index in Result file : ", inx, ', at ', stime)
    return df


def day_ahead(
    result_files : pd.DataFrame,
    processed_df : pd.DataFrame,
    
):
    """ 
    Morning prediction deadline: 8:45    you can run at 8:30 (and 7:30)
    Afternoon prediction deadline: 14:30 you can run at 14:15 (and 13:15)
    So I set it run at 8:10 (or 7:10), 14:10 (or 13:10) should meet the deadline
    """
    if (datetime.now().hour in [7, 8, 13, 14]) & (datetime.now().minute < 15):
        # split target out of payload input
        dayahead_payload = processed_df.drop([
            "ImbalanceQuantity(MAW)(B1780)",
            "local_datetime",
            "SettlementPeriod",
        ], axis=1).to_numpy()

        half_day = 'morning' if datetime.now().hour < 12 else 'afternoon'
        print ("length of preprocess_dataframe: ", processed_df.shape)

        ##### invoke model endpoint for getting predictions #####
        ### Get the newest model file aka "TargetModel"
        list_files = []
        for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
            if f"lgbm-regressor-dayahead-{half_day}" in object_summary.key:
                list_files.append(object_summary.key)
        assert len(list_files) == 1, "check s3 should have one file"
        target_model = list_files[0].split("endpoint/")[-1]
        print("Lastest Day-Ahead target Model :", target_model)

        ### Invoke DAY-AHEAD Endpoint & Get Result
        runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
        print ("Invoking day-ahead endpoint with Payload data")
        response = runtime_sm_client.invoke_endpoint(
            EndpointName="lgbm-regressor-endpoint",
            ContentType ="application/JSON",
            TargetModel = target_model,
            Body=json.dumps(dayahead_payload.tolist()),
        )
        dayahead_arr = json.loads(response["Body"].read())
        print("Success Invoking Endpoint!")


        # store the prediction in a new df so that we can do some post processing
        # this dataframe only extends up to the last row NOT containing NaN, which is
        tmp_result_df = processed_df.copy()[["local_datetime", "SettlementPeriod"]]
        tmp_result_df["NIVPredictions"] = dayahead_arr
        tmp_result_df["local_datetime"] = pd.to_datetime(tmp_result_df["local_datetime"])

        # make a dummy df so that we can shift the predictions
        dummy_df = pd.DataFrame()  # .reset_index(drop=True)
        dummy_df["local_datetime"] = pd.date_range(
            processed_df["local_datetime"].iloc[-1] + timedelta(minutes=30),
            processed_df["local_datetime"].iloc[-1] + timedelta(days=2),  # timedelta 2 days to make sure it works
            freq="30T"
        )
        dummy_df["NIVPredictions"] = np.nan
        dummy_df["SettlementPeriod"] = range(processed_df["SettlementPeriod"].iloc[-1],
                                             processed_df["SettlementPeriod"].iloc[-1] + len(dummy_df))
        dummy_df["SettlementPeriod"] = dummy_df["SettlementPeriod"] % 48 + 1

        # concatenate to results dataframe and forward shift NIV predictions the same amount that was backshifted
        tmp_result_df = pd.concat([tmp_result_df, dummy_df], axis=0).reset_index(drop=True)
        tmp_result_df["NIVPredictions"] = tmp_result_df["NIVPredictions"].shift(96)
        tmp_result_df = tmp_result_df.dropna(subset=["NIVPredictions"])
        print(tmp_result_df.shape)
        # truncate the predictions to the datetime range that we want, which is

        tmp_result_df = truncate_to_dayahead_format(tmp_result_df, datetime.now())
        dayahead_arr = tmp_result_df["NIVPredictions"].tolist()                             

        index = 48 + 46 ### SP = 47 so index = 46 & add 48 from whole yesterday
        print('Length of dayahead array: ', len(dayahead_arr))
        for i in range(len(dayahead_arr)):
            result_files.at[index, 'dayahead_{}'.format(half_day)] = dayahead_arr[i]
            index += 1

        return result_files
    else: 
        return result_files


def intraday(
    result_files : pd.DataFrame,
    processed_df : pd.DataFrame,
    
):
    # split target out of payload input
    intraday_payload = processed_df.drop([
        "ImbalanceQuantity(MAW)(B1780)",
        "local_datetime",
        "SettlementPeriod",
    ], axis=1).to_numpy()
    
    print ("length of preprocess_dataframe: ", processed_df.shape)
    
    ##### Invoke model endpoint for getting predictions #####
    ### Get the newest model file aka "TargetModel"
    list_files = []
    for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
        if f"lgbm-regressor-intraday" in object_summary.key:
            list_files.append(object_summary.key)
    assert len(list_files) == 1, "check s3 should have one file"
    target_model = list_files[0].split("endpoint/")[-1]
    print("Lastest Intraday target Model :", target_model)
    
    ### Invoke INTRADAY Endpoint & Get Result
    runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
    print ("Invoking endpoint with Intraday Payload")
    response = runtime_sm_client.invoke_endpoint(
        EndpointName="lgbm-regressor-endpoint",
        ContentType ="application/JSON",
        TargetModel = target_model,
        Body=json.dumps(intraday_payload.tolist()),
    )
    intraday_arr = json.loads(response["Body"].read())
    print("Success Invoking Intraday Endpoint!")
    
    
    # store the prediction in a new df so that we can do some post processing
    # this dataframe only extends up to the last row NOT containing NaN, which is
    tmp_result_df = processed_df.copy()[["local_datetime", "SettlementPeriod"]]
    tmp_result_df["NIVPredictions"] = intraday_arr
    tmp_result_df["local_datetime"] = pd.to_datetime(tmp_result_df["local_datetime"])

    # make a dummy df so that we can shift the predictions
    dummy_df = pd.DataFrame()  # .reset_index(drop=True)
    dummy_df["local_datetime"] = pd.date_range(
        processed_df["local_datetime"].iloc[-1] + timedelta(minutes=30),
        processed_df["local_datetime"].iloc[-1] + timedelta(days=2),  # timedelta 2 days to make sure it works
        freq="30T"
    )
    dummy_df["NIVPredictions"] = np.nan
    dummy_df["SettlementPeriod"] = range(processed_df["SettlementPeriod"].iloc[-1],
                                         processed_df["SettlementPeriod"].iloc[-1] + len(dummy_df))
    dummy_df["SettlementPeriod"] = dummy_df["SettlementPeriod"] % 48 + 1

    # concatenate to results dataframe and forward shift NIV predictions the same amount that was backshifted
    tmp_result_df = pd.concat([tmp_result_df, dummy_df], axis=0).reset_index(drop=True)
    tmp_result_df["NIVPredictions"] = tmp_result_df["NIVPredictions"].shift(48)
    tmp_result_df = tmp_result_df.dropna(subset=["NIVPredictions"])
    print("Shape of Result: ", tmp_result_df.shape)
    
    #### Get Lastest NIV Time to identify where to fill the result
    niv_val, niv_time = get_lastest_value(processed_df, "ImbalanceQuantity(MAW)(B1780)")
    print("Lastest NIV : ", niv_val, niv_time)
    
    tmp_result_df = tmp_result_df[(tmp_result_df['local_datetime'] > niv_time) & 
                                  (tmp_result_df['local_datetime'] < niv_time + timedelta(hours = 4.5))]
    print(tmp_result_df)
    intraday_arr = tmp_result_df["NIVPredictions"].tolist()   
    print('Length of intraday array: ', len(intraday_arr))
    
    # index = result_files.index[result_files['SettlementTime'].dt.datetime == niv_time].values[0]
    index = result_files.index[(result_files['local_datetime'] > niv_time + timedelta(minutes=20))
                               &(result_files['local_datetime'] < niv_time + timedelta(minutes=40))].values[0]
    
    print ('Intraday index = ', index)
    # Insert Value
    for i in range(len(intraday_arr)):
        index += 1
        result_files.at[index, 'niv_predicted_{}sp'.format(i + 1)] = intraday_arr[i]
        print(index, result_files.at[index, 'local_datetime'], intraday_arr[i], '\n')

    return result_files


def lambda_handler(event, context):
    
    merged_df = read_parquet_tables(
        file_name="merged",
        start_date = prev_day,
        end_date = tomorrow,
        path = read_path,
    )
    print ("length of Merged dataframe: ", merged_df.shape)
    merged_df["local_datetime"] = pd.to_datetime(merged_df["local_datetime"])
    df = preprocess_dataframe(merged_df)
    
    ### Check result exist: if not, create a new template
    check_file_is_exists(my_bucket, write_path, pred_folder, today, cols=list_cols)
    check_file_is_exists(my_bucket, write_path, pred_folder, tomorrow, cols=list_cols)
    
    result_files = read_parquet_tables(
        file_name= "prediction", 
        start_date = yesterday,
        end_date = tomorrow,
        path = os.path.join(write_path, pred_folder),
    )
    print("Shape of Result_file ", result_files.shape)
    if 'SettlementTime' in result_files.columns:
        result_files.rename(columns = {'SettlementTime':'local_datetime', 'SP':'SettlementPeriod'}, inplace = True) 
    
    result_files['local_datetime'] = pd.to_datetime(result_files["local_datetime"])
    
    result_files = day_ahead(result_files, df)
    result_files = intraday (result_files, df)
    
    ### Update actual values in order to calculate PnL in the future
    merged_file = merged_df[merged_df['local_datetime'] >= dt.datetime.strptime(yesterday, "%Y-%m-%d")]
    print ("# Row of actual data before joining to result files", merged_file.shape[0])
    result_files = update_actual_columns(result_files, merged_file.reset_index(drop = True))
    
    ##### Delivery files to S3 
    split_file_ands_save_to_s3(result_files, dt.datetime.strptime(today, "%Y-%m-%d"))

    print('Testing successfully!')

################################# Initialize ########################################################
s3 = boto3.resource('s3')
my_bucket = 'niv-predictions'   
pred_folder = 'lgbm-prediction' 
bucket = s3.Bucket(my_bucket)
model_bucket = s3.Bucket('lgbm-model-storage')
resource = boto3.resource('s3')
read_path = "s3://scgc/data/merged"
write_path = "s3://" + my_bucket
prev_day = (dt.datetime.now() - timedelta(days=4)).strftime("%Y-%m-%d")
yesterday= (dt.datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
today    = dt.datetime.now().strftime("%Y-%m-%d")
tomorrow = (dt.datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")

runtime_sm_client = boto3.client("sagemaker-runtime")

lambda_handler("", "")

length of Merged dataframe:  (288, 179)
Time taken to preprocess features in seconds: 0.15955582999959006
File Exits
File Exits
Shape of Result_file  (144, 16)
length of preprocess_dataframe:  (203, 31)
Lastest Day-Ahead target Model : lgbm-regressor-dayahead-morning-2022-08-04.tar.gz
Invoking day-ahead endpoint with Payload data
Success Invoking Endpoint!
(203, 3)
Length of dayahead array:  48
length of preprocess_dataframe:  (203, 31)
Lastest Intraday target Model : lgbm-regressor-intraday-2022-08-04.tar.gz
Invoking endpoint with Intraday Payload
Success Invoking Intraday Endpoint!
Shape of Result:  (203, 3)
Lastest NIV :  196.6122 2022-08-19 06:30:00
         local_datetime  SettlementPeriod  NIVPredictions
203 2022-08-19 07:00:00                15     -121.561811
204 2022-08-19 07:30:00                16     -214.691102
205 2022-08-19 08:00:00                17      -91.925530
206 2022-08-19 08:30:00                18      -38.511855
207 2022-08-19 09:00:00                19     -2

In [None]:
############################################### Version 3.3 Aug 23 ########################################################
### Edit method of data ingestion: assign DataFrame of selected features (line 359-367) in case new columns be added

import datetime 
from datetime import (datetime,
                      timedelta)
import time
from time import (strftime, 
                  perf_counter, 
                  gmtime)
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
from functools import partial
import gc
import os, sys
import json
import s3fs
import io
import boto3
import tarfile 
import datetime as dt
from io import BytesIO

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
import traceback

filesystem = s3fs.S3FileSystem()


# ============================================= Helper Functions ============================================= #
def save_data(
    df: pd.DataFrame,
    file_name: str,
    path: str,
    date_partition: str,
    partition_cols=None, partition_filename_cb=None, filesystem=filesystem
):
    """Write pandas DataFrame in parquet format to S3 bucket
    Args:
        df (pd.DataFrame): DataFrame to save
        file_name (str): Table name
        path (str): local or AWS S3 path to store the parquet files
        partition_cols (list, optional): Columns used to partition the parquet files. Defaults to None.
    """
    
    date = dt.datetime.strptime(date_partition, "%Y-%m-%d")
    folder = f'/year={date.year}/month={date.month}/day={date.day}'

    pq.write_to_dataset(
        pyarrow.Table.from_pandas(df),
        path + folder,
        filesystem=filesystem,
        partition_cols=partition_cols,
        partition_filename_cb=lambda x: f"{file_name}.parquet",
    )

def generate_dates(start_date: str, end_date: str):
    """Generates list of dates
    Args:
        start_date (str): Start date
        end_date (str): End date
    Returns:
        List: List of dates
    """
    sdate = dt.datetime.strptime(start_date, "%Y-%m-%d")
    edate = dt.datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)

    return [
        (sdate + timedelta(days=x)).strftime("%Y-%m-%d")
        for x in range((edate - sdate).days)
    ]
        
def read_parquet_tables(
    file_name: str,
    start_date: str,
    end_date: str,
    path: str,
) -> pd.DataFrame:
    """Read parquet file partitions
    Args:
        file_name (str): Table name
        start_date (str): starting date to clean the data (%Y-%m-%d)
        end_date (str): ending date to clean the data (%Y-%m-%d)
        path (str): local or AWS S3 path to read the parquet files
    Returns:
        pd.DataFrame: Datframe from parquet file
    """
    # convert date range to list of dates
    date_list = generate_dates(start_date, end_date)
    df = pd.DataFrame()
    for read_date in date_list:
        # convert date to integers for filters
        r_year = dt.datetime.strptime(read_date, "%Y-%m-%d").year
        r_month = dt.datetime.strptime(read_date, "%Y-%m-%d").month
        r_day = dt.datetime.strptime(read_date, "%Y-%m-%d").day

        try:
            data = (
                pq.ParquetDataset(
                    path
                    + f"/year={r_year}/month={r_month}/day={r_day}/{file_name}.parquet",
                    filesystem=filesystem,
                )
                .read_pandas()
                .to_pandas()
            )

        except:
            continue  
        df = pd.concat([df, data], ignore_index=True)

    return df

def truncate_to_dayahead_format(dataframe, date_time):
    """
    Function for post-processing prediction returned. Returns a dataframe of
    prediction that contains the predictions for end of today up to tomorrow:
        ie. from SP_47 TODAY -> SP_46 of TOMORROW inclusive
    Args:
        dataframe: pd.DataFrame containing local_datetime and NIVPredictions
        date_time: datetime chosen

    Returns:
        truncated pd.DataFrame
    """
    now = date_time
    tmr = date_time + timedelta(days=1)
    start_pred = dataframe[dataframe["local_datetime"].dt.day == now.day].iloc[-2].name
    end_pred = dataframe[dataframe["local_datetime"].dt.day == tmr.day].iloc[-3].name
    dataframe_ = dataframe.loc[start_pred:end_pred]  # use loc because we're using index name
    if dataframe_["NIVPredictions"].isna().sum():
        raise Exception("NIVPredictions has NaN after post-processing. "
                        "Consider raising the backshift_niv_by hyperparameter"
                        "during training")

    return dataframe_


def split_file_ands_save_to_s3(result: pd.DataFrame, datetime_val: dt.datetime):
    """
    Split dt to 3 file base on date then save them to s3 
    
    """
    t_today     = datetime_val
    t_yesterday = t_today - timedelta(days=1)
    t_tomorrow  = t_today + timedelta(days=1)
    
    prev_data = result[result['local_datetime'].dt.day == t_yesterday.day]
    save_data(
        df = prev_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = yesterday
    )

    now_data = result[result['local_datetime'].dt.day == t_today.day]
    save_data(
        df = now_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = today
    )

    next_data = result[result['local_datetime'].dt.day == t_tomorrow.day]
    save_data(
        df = next_data,
        file_name = 'prediction',
        path = os.path.join(write_path, pred_folder),
        date_partition = tomorrow
    )


def update_actual_columns(
    result_file : pd.DataFrame,
    merged_file : pd.DataFrame
):
    columns_name = ['ImbalanceQuantity(MAW)(B1780)', 'ImbalancePriceAmount(B1770)', 
                    'marketIndexPrice(MID)', 'price_act(DAA)']
    # print (merged_file['marketIndexPrice(MID)'].to_list())
    result_file[columns_name] = merged_file[columns_name]
    return result_file

    
def get_lastest_value(dframe, column_name):
    tmp = dframe[["local_datetime", column_name]].dropna(how='any').iloc[-1:]  
    val = tmp.iloc[0][column_name]
    stime = tmp.iloc[0]['local_datetime']
    return val, stime


# ============================================= End of Helper Functions ============================================= #

# ============================================= PreProcess Functions ============================================= #

def select_best_features(dataframe):
    """Remove redundant/not useful columns from dataframe"""
    interconnector_cols = [c for c in dataframe.columns if "int" in c and "FUEL" in c]
    dataframe["Total_Int(FUELHH)"] = dataframe[interconnector_cols].apply(sum, axis=1)
    dataframe["Total_Fossil(FUELHH)"] = (dataframe["coal(FUELHH)"] +
                                         dataframe["ocgt(FUELHH)"] +
                                         dataframe["ccgt(FUELHH)"] +
                                         dataframe["oil(FUELHH)"])
    dataframe["Total_Other(FUELHH)"] = (dataframe["biomass(FUELHH)"] +
                                        dataframe["other(FUELHH)"] +
                                        dataframe["nuclear(FUELHH)"])
    dataframe["Total_Hydro(FUELHH)"] = dataframe["npshyd(FUELHH)"] + dataframe["ps(FUELHH)"]
    fuelhh_drop_cols = [c for c in dataframe.columns if "(FUELHH" in c and "total" not in c.lower()]
    # elexon generation cols
    ele_gen_drop_cols = (["Wind_Offshore_fcst(B1440)", "Wind_Onshore_fcst(B1440)"] +
                         [c for c in dataframe.columns if "windforfuelhh" in c.lower()] +  # WINDFORFUELHH
                         [c for c in dataframe.columns if "(B16" in c] +  # B16xx columns
                         ["Total_Load_fcst(B0620)", "Total_Load(B0610)"])  # + act_ele_gen_drop_cols
    # catalyst wind cols
    wind_pc_cols = [c for c in dataframe.columns if "pc" in c.lower()]  # the actual is very corr. with other winds
    sn_wind_cols = [c for c in dataframe.columns if "sn" in c.lower()]
    cat_wind_drop_cols = [c for c in dataframe.columns if "(Wind_" in c and "wind_act(Wind_unrestricted)" != c]
    cat_wind_drop_cols += wind_pc_cols + sn_wind_cols
    # drop columns with redundant information
    cols_to_remove = [
        "niv_act(Balancing_NIV_fcst_3hr)",  # can be used in post process
        "hist_fcst(Balancing_NIV_fcst_3hr)",  # bad feature, not to be used even in post process
        "niv(Balancing_NIV)",  # can be used in post process
        "indicativeNetImbalanceVolume(DERSYSDATA)",
        "systemSellPrice(DERSYSDATA)",
        "totalSystemAdjustmentSellVolume(DERSYSDATA)",
        "totalSystemAdjustmentBuyVolume(DERSYSDATA)",
        "non_bm_stor(Balancing_detailed)",
        "DAI(MELIMBALNGC)",
        "DAM(MELIMBALNGC)",
        "TSDF(SYSDEM)",
        "ITSDO(SYSDEM)",
        "temperature(TEMP)",
        "ImbalancePriceAmount(B1770)",
        "marketIndexPrice(MID)",
        "marketIndexVolume(MID)",
        "DATF(FORDAYDEM)",
        "DAID(FORDAYDEM)",
        "DAIG(FORDAYDEM)",
        "DANF(FORDAYDEM)"
    ]
    # day ahead auction cols
    daa_gwstep_cols = [c for c in dataframe.columns if "daa_gwstep" in c.lower()]
    daa_windrisk_cols = [c for c in dataframe.columns if "windrisk" in c.lower()]  # daauction/range/wind_risk @catalyst
    daa_xgas_cols = [c for c in dataframe.columns if "daa_xgas" in c.lower()]  # xgas @Catalyst too high corr with gas
    daa_gas_cols = [c for c in dataframe.columns if "daa_gas" in c.lower()]  # @Catalyst
    daa_drop_cols = [
        "price_act(DAA)",
        "price_fcst(DAA)",
        "price_weighted(DAA)",
        "hist_fcst(DAA_1D_8AM)",
        "hist_fcst(DAA_1D_2PM)",
    ]
    daa_drop_cols += daa_gwstep_cols + daa_windrisk_cols + daa_xgas_cols + daa_gas_cols
    # drop the columns listed and return
    cols_to_remove += (  # act_ele_gen_drop_cols +
            fuelhh_drop_cols +
            ele_gen_drop_cols +
            cat_wind_drop_cols +
            daa_drop_cols
    )
    return dataframe.drop(cols_to_remove, axis=1)


def prepare_date_features(dataframe):
    """transform date features into tabular format"""
    # cyclic encode the datetime information; sine because the cycle should start from 0
    dataframe["sp_sin"] = np.sin(dataframe["SettlementPeriod"] * (2 * np.pi / 48))
    dataframe["month_sin"] = np.sin(dataframe["local_datetime"].dt.month * (2 * np.pi / 12))
    dataframe["week_sin"] = np.sin((dataframe["local_datetime"].dt.weekday + 1) * (2 * np.pi / 7))
    # drop unparsed date column; note that SettlementDate is kept for later groupbys
#     dataframe.drop(["local_datetime",  # information is already encoded
#         "SettlementPeriod"], axis=1, inplace=True)
    return dataframe


def interpolate_outliers(dataframe, cutoff=2.5):
    """Replaces the outlier value with the previous and next value's average using the column's z statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        for idx in range(len(col)):
            if np.abs(col[idx] - col_mean) > z_cutoff:
                try:
                    dataframe.loc[idx, col_name] = np.mean([col[idx - 1], col[idx + 1]])
                except IndexError:
                    # this only happens at either end of the input data, so we leave the value as is
                    # it will be processed in the clip_outliers function if the cutoff there is less than or
                    # equal to the cutoff here
                    pass
    return dataframe

def clip_outliers(dataframe, cutoff=2):
    """Clip outlier using z-statistic"""
    float_cols = [c for c in dataframe.select_dtypes("float").columns if "sin" not in c]
    for col_name in float_cols:
        col = dataframe[col_name].to_numpy()
        col_mean, col_std = col.mean(), col.std()  # save the mean and std of the dataframe column
        z_cutoff = cutoff * col_std
        lower_bound = col_mean - z_cutoff
        upper_bound = col_mean + z_cutoff
        for idx in range(len(col)):
            row = col[idx]  # save to variable to avoid re-accessing
            if np.abs(row - col_mean) > z_cutoff:
                dataframe.loc[idx, col_name] = np.clip(row, lower_bound, upper_bound)
    return dataframe


def compute_ewm_features(dataframe, window=8, alpha=1 - np.log(2) / 4):
    """Computes the exponentially moving weighted average features"""
    weights = list(reversed([(1 - alpha) ** n for n in range(window)]))
    ewma = partial(np.average, weights=weights)
    ewm_cols = [c for c in dataframe.columns if "Imbalance" not in c and  # exclude target variable
                "(" in c and  # this is to exclude time features
                "FUELHH" not in c and  # exclude FUELHH features
                "cash_out" not in c]  # exclude cash_out(Balancing_detailed) feature
    for c in ewm_cols:
        # compute daily ewm, parametrized by alpha
        dataframe[f"ewm_mean_{c}"] = dataframe[c].rolling(window).apply(ewma)
    return dataframe


def compute_shifted_features(dataframe):
    """Computes the features that can be shifted"""
    # compute  backshifted features
    bshift_2sp_cols = [
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bshift_2sp_cols:
        dataframe[f"bshift_2sp_{c}"] = dataframe[c].shift(-2)
    dataframe["bshift_4sp_boas(Balancing_detailed)"] = dataframe["boas(Balancing_detailed)"].shift(-4)
    # compute back-differenced features
    bdiff_cols = [
        "Generation_fcst(B1430)",
        "boas(Balancing_detailed)",
        "totalSystemAcceptedOfferVolume(DERSYSDATA)",
        "totalSystemAcceptedBidVolume(DERSYSDATA)"
    ]
    for c in bdiff_cols:
        dataframe[f"bdiff_1sp_{c}"] = dataframe[c].diff(-1)
    # compute forward shifted feature; other fshifted features based on other cols did not perform well
    fshift_cols = [
        "boas(Balancing_detailed)",
    ]
    for c in fshift_cols:
        dataframe[f"fshift_1hr_{c}"] = dataframe[c].shift(2)
    return dataframe


def preprocess_dataframe(df : pd.DataFrame):
    
    t0 = time.perf_counter()
    df = select_best_features(df)  # remove columns based on previous experiment results
    df = df[['local_datetime', 'SettlementDate', 'SettlementPeriod',
       'Generation_fcst(B1430)', 'Solar_fcst(B1440)',
       'ImbalanceQuantity(MAW)(B1780)',
       'totalSystemAcceptedOfferVolume(DERSYSDATA)',
       'totalSystemAcceptedBidVolume(DERSYSDATA)', 'boas(Balancing_detailed)',
       'fwrd_trades(Balancing_detailed)', 'imbalngc(Balancing_detailed)',
       'cash_out(Balancing_detailed)', 'intraday(Balancing_detailed)',
       'wind_act(Wind_unrestricted)', 'Total_Int(FUELHH)', 'Total_Fossil(FUELHH)',
       'Total_Other(FUELHH)', 'Total_Hydro(FUELHH)']]  # Select these columns only
    df = prepare_date_features(df)  # parse date into cyclic features
    df = interpolate_outliers(df, cutoff=2.5)  # interpolate using z statistics (avg next/before obs)
    df = clip_outliers(df, cutoff=2)  # clip outliers using z statistics
    df = compute_ewm_features(df, window=4)  # compute exponentially weighted moving average features
    df = compute_shifted_features(df)  # compute features based on shifting and differencing
    
    # drop some columns that are not performant or are no longer needed
    df = df.drop(["SettlementDate",
                  "wind_act(Wind_unrestricted)",
                  "totalSystemAcceptedOfferVolume(DERSYSDATA)",
                  "totalSystemAcceptedBidVolume(DERSYSDATA)",
                  "Generation_fcst(B1430)",
                  "intraday(Balancing_detailed)",
                  "Solar_fcst(B1440)"], axis=1)
    print("Time taken to preprocess features in seconds:", time.perf_counter() - t0)
    df = df.dropna().reset_index(drop=True)
    # df = df.drop("ImbalanceQuantity(MAW)(B1780)", axis=1)
    return df


# ============================================= End of PreProcess Functions ============================================= #

# ============================================= Support Main Functions ============================================= #
list_cols = ['local_datetime', 'SettlementPeriod' , 'niv_predicted_1sp', 'niv_predicted_2sp',
             'niv_predicted_3sp', 'niv_predicted_4sp', 'niv_predicted_5sp', 'niv_predicted_6sp',
             'niv_predicted_7sp', 'niv_predicted_8sp', 'dayahead_morning', 'dayahead_afternoon', 
             'ImbalanceQuantity(MAW)(B1780)', 'ImbalancePriceAmount(B1770)', 'marketIndexPrice(MID)', 'price_act(DAA)']


def check_file_is_exists(bucket_name, write_path, path, day2write, cols=list_cols):
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    start_date = dt.datetime.strptime(day2write, "%Y-%m-%d")
    end_date = dt.datetime.strptime(day2write, "%Y-%m-%d") + timedelta(days=1)
    
    prefix_path = path + f"/year={start_date.year}/month={start_date.month}/day={start_date.day}/"
    file_list = list(bucket.objects.filter(Prefix=prefix_path))
    if not len(file_list):
        
        print("No file in ", prefix_path)
        # Create new Template
        df = pd.DataFrame(index=pd.date_range(start=start_date, end=end_date, freq="30T"), columns=cols[1:])
        df = df.head(48)
        df.index.name = cols[0]
        df['SettlementPeriod'] = range(0, len(df))
        df['SettlementPeriod'] = df['SettlementPeriod'].apply(lambda x: x % 48 + 1)
        df = df.reset_index()
        print(f"Creating empty file at ", day2write)
        
        save_data(df = df,
                  file_name = 'prediction',
                  path = os.path.join(write_path, path),
                  date_partition = day2write
        )
        print(f"Created empty file at ", day2write)
    else: 
        print('File Exits')


def override(df: pd.DataFrame, column_name, value, stime):
    inx = df.index[(df['local_datetime'] > stime - timedelta(minutes=10))
                   &(df['local_datetime'] < stime + timedelta(minutes=10))].values[0] 
    df.at[inx, column_name] = value
    # print (f"Lasted {column_name} Index in Result file : ", inx, ', at ', stime)
    return df


def day_ahead(
    result_files : pd.DataFrame,
    processed_df : pd.DataFrame,
    
):
    """ 
    Morning prediction deadline: 8:45    you can run at 8:30 (and 7:30)
    Afternoon prediction deadline: 14:30 you can run at 14:15 (and 13:15)
    So I set it run at 8:10 (or 7:10), 14:10 (or 13:10) should meet the deadline
    """
    if (datetime.now().hour in [7, 8, 13, 14]) & (datetime.now().minute < 15):
        # split target out of payload input
        dayahead_payload = processed_df.drop([
            "ImbalanceQuantity(MAW)(B1780)",
            "local_datetime",
            "SettlementPeriod",
        ], axis=1).to_numpy()

        half_day = 'morning' if datetime.now().hour < 12 else 'afternoon'
        print ("length of preprocess_dataframe: ", processed_df.shape)

        ##### invoke model endpoint for getting predictions #####
        ### Get the newest model file aka "TargetModel"
        list_files = []
        for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
            if f"lgbm-regressor-dayahead-{half_day}" in object_summary.key:
                list_files.append(object_summary.key)
        assert len(list_files) == 1, "check s3 should have one file"
        target_model = list_files[0].split("endpoint/")[-1]
        print("Lastest Day-Ahead target Model :", target_model)

        ### Invoke DAY-AHEAD Endpoint & Get Result
        runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
        print ("Invoking day-ahead endpoint with Payload data")
        response = runtime_sm_client.invoke_endpoint(
            EndpointName="lgbm-regressor-endpoint",
            ContentType ="application/JSON",
            TargetModel = target_model,
            Body=json.dumps(dayahead_payload.tolist()),
        )
        dayahead_arr = json.loads(response["Body"].read())
        print("Success Invoking Endpoint!")


        # store the prediction in a new df so that we can do some post processing
        # this dataframe only extends up to the last row NOT containing NaN, which is
        tmp_result_df = processed_df.copy()[["local_datetime", "SettlementPeriod"]]
        tmp_result_df["NIVPredictions"] = dayahead_arr
        tmp_result_df["local_datetime"] = pd.to_datetime(tmp_result_df["local_datetime"])

        # make a dummy df so that we can shift the predictions
        dummy_df = pd.DataFrame()  # .reset_index(drop=True)
        dummy_df["local_datetime"] = pd.date_range(
            processed_df["local_datetime"].iloc[-1] + timedelta(minutes=30),
            processed_df["local_datetime"].iloc[-1] + timedelta(days=2),  # timedelta 2 days to make sure it works
            freq="30T"
        )
        dummy_df["NIVPredictions"] = np.nan
        dummy_df["SettlementPeriod"] = range(processed_df["SettlementPeriod"].iloc[-1],
                                             processed_df["SettlementPeriod"].iloc[-1] + len(dummy_df))
        dummy_df["SettlementPeriod"] = dummy_df["SettlementPeriod"] % 48 + 1

        # concatenate to results dataframe and forward shift NIV predictions the same amount that was backshifted
        tmp_result_df = pd.concat([tmp_result_df, dummy_df], axis=0).reset_index(drop=True)
        tmp_result_df["NIVPredictions"] = tmp_result_df["NIVPredictions"].shift(96)
        tmp_result_df = tmp_result_df.dropna(subset=["NIVPredictions"])
        print(tmp_result_df.shape)
        # truncate the predictions to the datetime range that we want, which is

        tmp_result_df = truncate_to_dayahead_format(tmp_result_df, datetime.now())
        dayahead_arr = tmp_result_df["NIVPredictions"].tolist()                             

        index = 48 + 46 ### SP = 47 so index = 46 & add 48 from whole yesterday
        print('Length of dayahead array: ', len(dayahead_arr))
        for i in range(len(dayahead_arr)):
            result_files.at[index, 'dayahead_{}'.format(half_day)] = dayahead_arr[i]
            index += 1

        return result_files
    else: 
        return result_files


def intraday(
    result_files : pd.DataFrame,
    processed_df : pd.DataFrame,
    
):
    # split target out of payload input
    intraday_payload = processed_df.drop([
        "ImbalanceQuantity(MAW)(B1780)",
        "local_datetime",
        "SettlementPeriod",
    ], axis=1).to_numpy()
    
    print ("length of preprocess_dataframe: ", processed_df.shape)
    
    ##### Invoke model endpoint for getting predictions #####
    ### Get the newest model file aka "TargetModel"
    list_files = []
    for object_summary in model_bucket.objects.filter(Prefix="endpoint"):
        if f"lgbm-regressor-intraday" in object_summary.key:
            list_files.append(object_summary.key)
    assert len(list_files) == 1, "check s3 should have one file"
    target_model = list_files[0].split("endpoint/")[-1]
    print("Lastest Intraday target Model :", target_model)
    
    ### Invoke INTRADAY Endpoint & Get Result
    runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
    print ("Invoking endpoint with Intraday Payload")
    response = runtime_sm_client.invoke_endpoint(
        EndpointName="lgbm-regressor-endpoint",
        ContentType ="application/JSON",
        TargetModel = target_model,
        Body=json.dumps(intraday_payload.tolist()),
    )
    intraday_arr = json.loads(response["Body"].read())
    print("Success Invoking Intraday Endpoint!")
    
    
    # store the prediction in a new df so that we can do some post processing
    # this dataframe only extends up to the last row NOT containing NaN, which is
    tmp_result_df = processed_df.copy()[["local_datetime", "SettlementPeriod"]]
    tmp_result_df["NIVPredictions"] = intraday_arr
    tmp_result_df["local_datetime"] = pd.to_datetime(tmp_result_df["local_datetime"])

    # make a dummy df so that we can shift the predictions
    dummy_df = pd.DataFrame()  # .reset_index(drop=True)
    dummy_df["local_datetime"] = pd.date_range(
        processed_df["local_datetime"].iloc[-1] + timedelta(minutes=30),
        processed_df["local_datetime"].iloc[-1] + timedelta(days=2),  # timedelta 2 days to make sure it works
        freq="30T"
    )
    dummy_df["NIVPredictions"] = np.nan
    dummy_df["SettlementPeriod"] = range(processed_df["SettlementPeriod"].iloc[-1],
                                         processed_df["SettlementPeriod"].iloc[-1] + len(dummy_df))
    dummy_df["SettlementPeriod"] = dummy_df["SettlementPeriod"] % 48 + 1

    # concatenate to results dataframe and forward shift NIV predictions the same amount that was backshifted
    tmp_result_df = pd.concat([tmp_result_df, dummy_df], axis=0).reset_index(drop=True)
    tmp_result_df["NIVPredictions"] = tmp_result_df["NIVPredictions"].shift(48)
    tmp_result_df = tmp_result_df.dropna(subset=["NIVPredictions"])
    print("Shape of Result: ", tmp_result_df.shape)
    
    #### Get Lastest NIV Time to identify where to fill the result
    niv_val, niv_time = get_lastest_value(processed_df, "ImbalanceQuantity(MAW)(B1780)")
    print("Lastest NIV : ", niv_val, niv_time)
    
    tmp_result_df = tmp_result_df[(tmp_result_df['local_datetime'] > niv_time) & 
                                  (tmp_result_df['local_datetime'] < niv_time + timedelta(hours = 4.5))]
    print(tmp_result_df)
    intraday_arr = tmp_result_df["NIVPredictions"].tolist()   
    print('Length of intraday array: ', len(intraday_arr))
    
    # index = result_files.index[result_files['SettlementTime'].dt.datetime == niv_time].values[0]
    index = result_files.index[(result_files['local_datetime'] > niv_time + timedelta(minutes=20))
                               &(result_files['local_datetime'] < niv_time + timedelta(minutes=40))].values[0]
    
    print ('Intraday index = ', index)
    # Insert Value
    for i in range(len(intraday_arr)):
        index += 1
        result_files.at[index, 'niv_predicted_{}sp'.format(i + 1)] = intraday_arr[i]
        print(index, result_files.at[index, 'local_datetime'], intraday_arr[i], '\n')

    return result_files


def lambda_handler(event, context):
    
    merged_df = read_parquet_tables(
        file_name="merged",
        start_date = prev_day,
        end_date = tomorrow,
        path = read_path,
    )
    print ("length of Merged dataframe: ", merged_df.shape)
    merged_df["local_datetime"] = pd.to_datetime(merged_df["local_datetime"])
    df = preprocess_dataframe(merged_df)
    
    ### Check result exist: if not, create a new template
    check_file_is_exists(my_bucket, write_path, pred_folder, today, cols=list_cols)
    check_file_is_exists(my_bucket, write_path, pred_folder, tomorrow, cols=list_cols)
    
    result_files = read_parquet_tables(
        file_name= "prediction", 
        start_date = yesterday,
        end_date = tomorrow,
        path = os.path.join(write_path, pred_folder),
    )
    print("Shape of Result_file ", result_files.shape)
    if 'SettlementTime' in result_files.columns:
        result_files.rename(columns = {'SettlementTime':'local_datetime', 'SP':'SettlementPeriod'}, inplace = True) 
    
    result_files['local_datetime'] = pd.to_datetime(result_files["local_datetime"])
    
    result_files = day_ahead(result_files, df)
    result_files = intraday (result_files, df)
    
    ### Update actual values in order to calculate PnL in the future
    merged_file = merged_df[merged_df['local_datetime'] >= dt.datetime.strptime(yesterday, "%Y-%m-%d")]
    print ("# Row of actual data before joining to result files", merged_file.shape[0])
    result_files = update_actual_columns(result_files, merged_file.reset_index(drop = True))
    
    ##### Delivery files to S3 
    split_file_ands_save_to_s3(result_files, dt.datetime.strptime(today, "%Y-%m-%d"))

    print('Testing successfully!')

################################# Initialize ########################################################

s3 = boto3.resource('s3')
my_bucket = 'niv-predictions'   
pred_folder = 'lgbm-prediction' 
bucket = s3.Bucket(my_bucket)
model_bucket = s3.Bucket('lgbm-model-storage')
resource = boto3.resource('s3')
read_path = "s3://scgc/data/merged"
write_path = "s3://" + my_bucket
prev_day = (dt.datetime.now() - timedelta(days=4)).strftime("%Y-%m-%d")
yesterday= (dt.datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
today    = dt.datetime.now().strftime("%Y-%m-%d")
tomorrow = (dt.datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")

runtime_sm_client = boto3.client("sagemaker-runtime")

lambda_handler("", "")