In [48]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

# Spark session & context
spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

In [49]:
def transform_data (df_data, df_dates, attributes, metrics, date_column, suffix):
    """Joins data and creates transformed columns

    df_data = Dataframe with the data you want to transform.
    df_dates = Dataframe with auxiliary dates.
    attributes and metrics, list of columns in string format.
    date_column = String with the name of the column that will be used as reference to all the transformations, ONLY ONE COLUMN and must be Date type.
    sufix = sufix to add to the metrics columns.
"""
    #Joins data and dates dataframe by the 'suffix' column in the date Dataframe.
    #Drops the old date column and renames the new one
    #Aggregates the data by the attributes columns.
    df_return = df_data.join(df_dates, df_data[date_column] == df_dates[suffix], 'left').select(df_data['*'],df_dates.closing_date)
    df_return = df_return.drop(date_column).withColumnRenamed('closing_date', date_column)
    agg_metrics = [F.sum(col).alias(col+'_'+suffix) for col in metrics]
    df_return = df_return.groupBy(attributes).agg(*agg_metrics)
    return df_return

def union_agg_data(df_data, transformed_data_aux, metrics_aux):
    #Unions 2 dataframes by first adding the same columns to them
    #Checks if the dataframes contain the same metrics
    #Adds missing metrics to dataframes as 0 columns
    #Unions the dataframes by selecting in the transformed dataframe the same columns as the data one.
    for metric in metrics_aux:
        if metric not in df_data.columns:
            df_data = df_data.withColumn(metric,F.lit(0))
        if metric not in transformed_data_aux.columns:
            transformed_data_aux = transformed_data_aux.withColumn(metric,F.lit(0))
    df_data = df_data.union (transformed_data_aux.select(df_data.columns))
    return df_data
   
def mtd_table_generator (min_date, max_date):
        df = spark.sql("SELECT sequence(to_date('"+ min_date.strftime("%Y-%m-%d") + "'), to_date('"+ max_date.strftime("%Y-%m-%d") + "'), interval 1 day) as closing_date").withColumn('closing_date', F.explode(F.col('closing_date')))
        df2 = spark.sql("SELECT sequence(to_date('"+ min_date.strftime("%Y-%m-%d") + "'), to_date('"+ max_date.strftime("%Y-%m-%d") + "'), interval 1 day) as mtd").withColumn('mtd', F.explode(F.col('mtd')))
        df = df.crossJoin(df2)
        # Filter dataframe keep only dates older than closing date from the beginning of the month.        
        df = df.where((df.closing_date>=df.mtd) & (F.date_trunc('month','closing_date')==F.date_trunc('month','mtd')) )
        df = df.withColumn('mtd_py', F.date_sub('mtd',365)) #MTD Previous year
        return df
def ytd_table_generator (min_date, max_date):
        df = spark.sql("SELECT sequence(to_date('"+ min_date.strftime("%Y-%m-%d") + "'), to_date('"+ max_date.strftime("%Y-%m-%d") + "'), interval 1 day) as closing_date").withColumn('closing_date', F.explode(F.col("closing_date")))
        df2 = spark.sql("SELECT sequence(to_date('"+ min_date.strftime("%Y-%m-%d") + "'), to_date('"+ max_date.strftime("%Y-%m-%d") + "'), interval 1 day) as ytd").withColumn('ytd', F.explode(F.col('ytd')))
        df = df.crossJoin(df2)
        # Filter dataframe keep only dates older than closing date from the beginning of the year.
        df = df.where((df.closing_date>=df.ytd) & (F.date_trunc('year','closing_date')==F.date_trunc('year','ytd')) )
        df = df.withColumn('ytd_py', F.date_sub('ytd',365)) #YTD Previous year
        return df
def one_to_one_table_generator (min_date, max_date):
        df = spark.sql("SELECT sequence(to_date('"+ min_date.strftime("%Y-%m-%d") + "'), to_date('"+ max_date.strftime("%Y-%m-%d") + "'), interval 1 day) as closing_date").withColumn('closing_date', F.explode(F.col("closing_date")))
        df = df.withColumn('pd', F.date_sub('closing_date',1)) #Previous day
        df = df.withColumn('pm', F.add_months(F.col('closing_date'),-1)) #Previous month same day
        df = df.withColumn('ld', F.to_date(F.concat((F.extract(F.lit('YEAR'),'closing_date')-1),F.lit('-12-31')),'yyyy-MM-dd')) #Last december
        df = df.withColumn('pm_ld', F.last_day(F.add_months(F.col('closing_date'),-1))) #Last day of previous month 
        return df


In [50]:
def applyDateTransformations (df_data,attributes:list[str], metrics:list[str], date_column: str, transformations:list[str]):
    """Apply Date Transformations
   
    This function receives a Spark Dataframes and returns a new Spark Dataframe with added numerical columns adding date transformations
    such as previous month, MTD calculations, YTD calculations.

    df_data = Dataframe with the data you want to transform.

    attributes = list of columns with descriptive data in string format. Please include the date column too. 

    metrics = list of columns with numerical data that you want to transform.

    date_column = String with the name of the column that will be used as reference to all the transformations, ONLY ONE COLUMN and 
    the content must be Date type.

    transformations = List of transformations to apply, "MTD", "MTD_PY", "YTD", "YTD_PY", "LD", "PD".
"""
    max_date = df_data.select(date_column).groupBy().agg({date_column:'max'}).collect()[0][0]
    min_date = df_data.select(date_column).groupBy().agg({date_column:'min'}).collect()[0][0]
    metrics_aux = metrics.copy()
    schema = df_data.schema
    transformed_data_aux = spark.createDataFrame([],schema)

    for transformation in transformations:
        if transformation == 'MTD' or transformation == 'MTD_PY':
            mtd_table = mtd_table_generator(min_date,max_date)
            if transformation == 'MTD':
                transformed_data_aux = transform_data(df_data,mtd_table,attributes,metrics,date_column,'MTD')
                metrics_aux.extend([metric+'_MTD' for metric in metrics])
            else:
                transformed_data_aux = transform_data(df_data,mtd_table,attributes,metrics,date_column,'MTD_PY')
                metrics_aux.extend([metric+'_MTD_PY' for metric in metrics])
        if transformation == 'YTD' or transformation == 'YTD_PY':
            ytd_table = ytd_table_generator(min_date,max_date)
            if transformation == 'YTD':
                transformed_data_aux = transform_data(df_data,ytd_table,attributes,metrics,date_column,'YTD')
                metrics_aux.extend([metric+'_YTD' for metric in metrics])
            else:
                transformed_data_aux = transform_data(df_data,ytd_table,attributes,metrics,date_column,'YTD_PY')
                metrics_aux.extend([metric+'_YTD_PY' for metric in metrics])
        if transformation == 'PD':
            pd_table = one_to_one_table_generator(min_date,max_date)
            transformed_data_aux = transform_data(df_data,pd_table,attributes,metrics,date_column,'PD')
            metrics_aux.extend([metric+'_PD' for metric in metrics])
        if transformation == 'PM':
            pm_table = one_to_one_table_generator(min_date,max_date)
            transformed_data_aux = transform_data(df_data,pm_table,attributes,metrics,date_column,'PM')
            metrics_aux.extend([metric+'_PD' for metric in metrics])
        if transformation == 'LD':
            ld_table = one_to_one_table_generator(min_date,max_date)
            transformed_data_aux = transform_data(df_data,ld_table,attributes,metrics,date_column,'LD')
            metrics_aux.extend([metric+'_LD' for metric in metrics])
        if transformation == 'PM_LD':
            ld_table = one_to_one_table_generator(min_date,max_date)
            transformed_data_aux = transform_data(df_data,ld_table,attributes,metrics,date_column,'PM_LD')
            metrics_aux.extend([metric+'_PM_LD' for metric in metrics])

      
        df_data = union_agg_data(df_data, transformed_data_aux, metrics_aux)
    
    agg_metrics = [F.sum(col).alias(col) for col in metrics_aux]
    df_data = df_data.groupBy(df_data_attributes).agg(*agg_metrics)
    return df_data

In [51]:
if __name__=='__main__':
    path = "/home/jovyan/work/Table.csv"
    
    from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, DateType, DoubleType
    customSchema = StructType([
        StructField('Region', StringType(), True),        
        StructField('Product', StringType(), True),
        StructField('Date', DateType(), True),
        StructField('Quantity', IntegerType(), True),
        StructField('Price', StringType(), True) #String because csv uses comma as decimal separator and not supported directly in Spark
    ])
    
    
    df_data = spark.read.schema(customSchema).options(delimiter=";", header=True).csv(path)
    df_data = df_data.withColumn('Price_number', F.regexp_replace('Price', ',', '.').cast(DoubleType())).drop('Price')
    df_data = df_data.withColumnRenamed('Price_number','Price')
    
    
    df_data_metrics = ['Price', 'Quantity']
    df_data_attributes = ['Region','Product','Date']
    df_data_date = 'Date'
    
    df_transformations = applyDateTransformations (df_data,df_data_attributes, df_data_metrics, df_data_date, ['MTD','YTD','MTD_PY','PD','LD','PM','PM_LD'])
    
    df_transformations.show()
    

+------+--------+----------+-----------+--------+------------------+------------+------------------+------------+------------+---------------+-----------+-----------+--------+-----------+-----------+-----------+-----------+--------------+
|Region| Product|      Date|      Price|Quantity|         Price_MTD|Quantity_MTD|         Price_YTD|Quantity_YTD|Price_MTD_PY|Quantity_MTD_PY|   Price_PD|Quantity_PD|Price_LD|Quantity_LD|   Price_PD|Quantity_PD|Price_PM_LD|Quantity_PM_LD|
+------+--------+----------+-----------+--------+------------------+------------+------------------+------------+------------+---------------+-----------+-----------+--------+-----------+-----------+-----------+-----------+--------------+
| North|   Music|2024-03-03| 786.013758|      43|      1146.6766336|         192|      1146.6766336|         192|         0.0|              0|134.4174636|         74|     0.0|          0|134.4174636|         74|        0.0|             0|
| South|Computer|2024-03-01|647.1594121|    