In [None]:
import snowflake.snowpark as snowpark
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col, call_builtin, cast
from utils import *


session = get_active_session()

# CONSTANTS
BRONZE_TABLE_NAME = "BRONZE_RESULTING.INPLAY_RESULTING_MARKETS"
SILVER_TABLE_NAME = BRONZE_TABLE_NAME.replace("BRONZE", "SILVER", 1)


def restructure_silver(df):
    """
    Filter and restructure the inplay_resulting_markets DataFrame.
    """

    fixturemarket_list = [
        "Bets",
        "CreationDate",
        "RobotId",
        "ProviderFixtureId",
        "ProviderId",
        "MessageGuid",
        "ProviderMarketId",
        "MarketId",
        "FixtureId",
        "LastUpdate",
        "ProcessingGuid",
        # What did we say about 'Market'?
    ]
    root_list = [
        "SETTLEMENTSINPLAY",
        "SPORTID",
        "SETTLEMENTSPOSTMATCH",
        "LEAGUEID",
        "LOCATIONID",
        "LIVESCOREGUID",
        '"TOPIC-NAME"',
        "ISOUTRIGHT",
        "FIXTURESTATUS",
        "CURRENT_TIMESTAMP",
    ]

    try:
        print(f"Starting {BRONZE_TABLE_NAME} DataFrame processing")

        # Create a list of column selections
        select_columns = [
            df["FIXTUREMARKET"][field].alias(field) for field in fixturemarket_list
        ] + [df[field2].alias(field2) for field2 in root_list]

        print(f"select_columns: {select_columns}")
        print(f"{BRONZE_TABLE_NAME} DataFrame processed successfully")

        delta_df = df.select(*select_columns)
        delta_df = delta_df.withColumn("Bets", cast(delta_df.Bets, StructType()))
        delta_df = delta_df.withColumnRenamed(
                "CURRENT_TIMESTAMP", "KAFKA_TIMESTAMP"
            )
        
        return delta_df

    except Exception as e:
        print(
            f"An error occurred during {BRONZE_TABLE_NAME} DataFrame processing: {str(e)}"
        )
        raise


def main(session: snowpark.Session):
    try:
        print("Starting the ETL process")
        last_update_date = find_last_update_date(session, BRONZE_TABLE_NAME)
        df, delta_count = find_delta(session, BRONZE_TABLE_NAME, last_update_date)

        if delta_count == 0:
            print("No new data to process. ETL process completed successfully.")
            return
        else:
            latest_timestamp = find_latest_timestamp(df)
            delta_df = restructure_silver(df)
            delta_df_no_null, null_count = remove_null_rows(delta_df)
            # count_duplicates_removed = update_silver(
                # session, delta_df_no_null, SILVER_TABLE_NAME
            # )

            # append new data (delta_df) to table {SILVER_TABLE_NAME}
            delta_df_no_null.write.mode("append").save_as_table(SILVER_TABLE_NAME)

            update_config(session, BRONZE_TABLE_NAME, latest_timestamp)
            # save_metrics(
            #     session,
            #     BRONZE_TABLE_NAME,
            #     delta_count,
            #     count_duplicates_removed,
            #     null_count,
            #     latest_timestamp,
            # )
            print(
                f"ETL process completed successfully. Latest timestamp processed: {latest_timestamp}"
            )
    except Exception as e:
        print(f"An error occurred during the ETL process: {str(e)}")
        raise


if __name__ == "__main__":
    main(session)
