In [None]:
import pyspark as ps

from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *

# Calling in data from the catalog
df = spark.read.table("main.flexgen_qa.assets_bms_unmanaged")

# Convert Unix timestamp to timestamp. Used the new timestamp column over time_record column
# because the timestamp column is already truncated down to the second compared to time_record
df = df.withColumn('time', (col('time').cast(DoubleType())/1000000))
df = df.withColumn("timestamp", to_timestamp(from_unixtime("time", "yyyy-MM-dd HH:mm:ss")))

# Dropping unnecessary columns from dataframe as we do not want to take the mean/mode of them
# _metadata: The filepath of the data from AWS S3
# ingestion_time: The time the data was stored
# time: Was initially in unix format and then converted to timestamp and stored in the timestamp column
# _rescued_data: Has all null values, not exactly sure what that field is for
# time_record: Is the same as time but down to the millesecond value, we prefer second value
df = df.drop('_metadata', 'name', 'ingestion_time', 'time', '_rescued_data', 'time_record')

def aggregate_data(df: DataFrame, groupby_cols: list, numeric_cols: list, non_numeric_cols: list) -> DataFrame:
    '''
    Description:
    This function takes a DataFrame, a list of groupby columns, a list of numeric columns, and a list
    of non-numeric columns as inputs and returns an aggregated DataFrame with one row per second,
    aggregating both numeric and non-numeric columns while considering the groupby columns.

    Inputs:
    df (PySpark DataFrame): Input DataFrame.
    groupby_cols (List): List of columns to group by, including 'timestamp'.
    numeric_cols (List): List of numeric columns for aggregation.
    non_numeric_cols (List): List of non-numeric columns for aggregation.

    Returns:
    A PySpark DataFrame with one row per second and aggregated values, considering the groupby columns.
    '''

    # Grouping by the specified columns and aggregating both numeric and non-numeric columns
    aggregated_df = df.groupBy(*groupby_cols).agg(
        *(median(col).alias(col) for col in numeric_cols),  # Numeric aggregation
        *(mode(col).alias(col) for col in non_numeric_cols)  # Non-numeric aggregation
    )

    # Dropping duplicates because if they're not dropped they will cause the groupby columns to appear twice in the dataframe
    aggregated_df = aggregated_df.dropDuplicates(groupby_cols)

    # Sorting the resulting DataFrame by 'timestamp' to have the dataframe in ascending order
    aggregated_df = aggregated_df.sort(col('timestamp'))

    return aggregated_df