In [112]:
# Import
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F# import year, month, dayofmonth, last_day, date_format, col, trunc
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType
from pyspark.sql.window import Window
import logging
import utils
# from datetime import datetime, timedelta
# import calendar
# import pyspark.pandas as ps
# from pathlib import Path
from dateutil.relativedelta import relativedelta
import pathlib

from datetime import datetime
import calendar

valActualsReportDay = 16
valOpeningVision = 12
valClosingVision = 12
valReinvestmentVision = 12
valLongRunInflation = 1.025
valAnnualizedInflation = 1.0522
valAnnualizedInflationBLNLive = 1.01

In [None]:
## Main File, what is already in convertme.py
##------------------------------------------
sparksesh = SparkSession.builder.getOrCreate()

# Load in data using new class
pathDataLocation = pathlib.Path(utils.funProjRoot(), "tests", "data")
classDataLoader = utils.FileLoader(sparksesh, pathDataLocation)


## Replace Montly Sales aggregation with enrichment, flags and inequalities instead of hard coded rules
# NOTE The stored reporting table/data from previous runs can be used to remove already-aggregagted data in previous years
def funCreateSalesMonthly(dfSalesMonthly: DataFrame) -> DataFrame:
    # dfSalesMonthly =
    classSalesEnrich = utils.DateUtils(dfSalesMonthly, "month")
    # [Potential improvement] Remove previous years of aggregations already stored if CONDITIONS
    dfSalesMonthly = classSalesEnrich.add_month_info_columns()
    dfSalesMonthly = classSalesEnrich.add_year_info_columns()
    dfSalesMonthly = classSalesEnrich.update_leap_year_info()
    return dfSalesMonthly, True


def funCreateSalesFuture(dfForecast: DataFrame) -> DataFrame:
    # Build prediction base dataframe
    dfFuture = utils.funForecastUtils(dfForecast)
    # Create exploded df by location & prediction_month (typically 120 months)
    dfFuture = utils.funSalesFutureBuild(dfFuture)

    
        # Add yearly date info using utilities
    # classFutureEnrich = utils.DateUtils(dfFuture, 'pred_month')
    # dfFuture = classFutureEnrich.add_year_info_columns()
    # dfFuture = classFutureEnrich.add_month_info_columns()
    # dfFuture = classFutureEnrich.update_leap_year_info()

    return dfFuture


#--- Call functions
# dfSalesMonthly, blnLiveForecast = funCreateSalesMonthly(
#     classDataLoader.load_file("dfMonthlySales.csv")
# )
# dfSalesMonthly.show(truncate=True)
# root
#  |-- loc_num: integer (nullable = true)
#  |-- month: date (nullable = true)
#  |-- actuals_reported: date (nullable = true)
#  |-- open_date: date (nullable = true)
#  |-- close_date: string (nullable = true)
#  |-- days: double (nullable = true)
#  |-- inflation_factor: double (nullable = true)
#  |-- inflation_factor_ending: double (nullable = true)
#  |-- age: double (nullable = true)
#  |-- location_type_code: string (nullable = true)
#  |-- concept_code: string (nullable = true)
#  |-- sales: double (nullable = true)
#  |-- month_integer: integer (nullable = true)
#  |-- total_days_in_month: integer (nullable = true)
#  |-- year_integer: integer (nullable = true)
#  |-- is_leap_year: boolean (nullable = true)
#  |-- first_day_of_year: string (nullable = true)


# Load in Prediction data and enrich
# dfSalesDaysFuture = funCreateSalesFuture(
#     classDataLoader.load_file("dfSalesDaysFuture.csv")
# )
# root
#  |-- loc_num: integer (nullable = true)
#  |-- location_type_code: string (nullable = true)
#  |-- open_date: date (nullable = true)
#  |-- close_date: string (nullable = true)
#  |-- price_group: integer (nullable = true)
#  |-- date_forecast: date (nullable = true)
#  |-- months_predict: integer (nullable = true)
#  |-- nMonthStart: integer (nullable = false)
#  |-- strOpenMonth: string (nullable = true)
#  |-- strCloseMonth: string (nullable = true)
#  |-- intMonthsBetween: integer (nullable = true)
#  |-- month_row: integer (nullable = false)
#  |-- pred_month: date (nullable = true)



In [75]:
# dfSalesDaysFuture.printSchema()
dfSalesDaysFuture.show(truncate=True)

+-------+------------------+----------+----------+-----------+-------------+--------------+-----------+------------+-------------+----------------+---------+----------+
|loc_num|location_type_code| open_date|close_date|price_group|date_forecast|months_predict|nMonthStart|strOpenMonth|strCloseMonth|intMonthsBetween|month_row|pred_month|
+-------+------------------+----------+----------+-----------+-------------+--------------+-----------+------------+-------------+----------------+---------+----------+
|   1922|        RESTAURANT|2029-02-01|      null|       9139|   2025-03-25|           120|          0|  2029-02-01|   2200-01-01|             120|        0|2025-03-01|
|   1922|        RESTAURANT|2029-02-01|      null|       9139|   2025-03-25|           120|          0|  2029-02-01|   2200-01-01|             120|        1|2025-04-01|
|   1922|        RESTAURANT|2029-02-01|      null|       9139|   2025-03-25|           120|          0|  2029-02-01|   2200-01-01|             120|        

In [None]:
def funBuildIncrementInflation(
    spark: SparkSession, prediction_forecast_months: int, annualizedInflation: float
) -> DataFrame:
    # Define inflation factors and variables
    begin_inflation_cutoff_date = "2025-05-01"
    intermediate_inflation_cutoff_date = "2026-01-01"

    # Define the schema for the DataFrame
    schema = StructType(
        [
            # StructField("current_date", DateType(), nullable=False),
            StructField("prediction_forecast_months", IntegerType(), nullable=False)
        ]
    )
    newdata = [prediction_forecast_months]
    df = spark.createDataFrame([newdata], schema)
    df = df.withColumn("current_date", F.current_date())

    df = funExplodeSeq(df, "prediction_forecast_months")
    df = (
        df.withColumn(
            "prediction_month",
            F.add_months(F.trunc(F.col("current_date"), "month"), F.col("seq_row")),
        )
        .withColumn(
            "begin_inflation_rate",
            F.when(
                F.col("prediction_month") < begin_inflation_cutoff_date, 1.0
            ).otherwise(annualizedInflation),
        )
        .withColumn(
            "begin_inflation_rate_monthly",
            F.when(
                F.col("seq_row") > 0, F.col("begin_inflation_rate") ** (1 / 12)
            ).otherwise(1.0),
        )
        .withColumn("end_inflation_rate", F.lit(valLongRunInflation))
        .withColumn(
            "slope", (F.col("begin_inflation_rate") - F.col("end_inflation_rate")) / 12
        )
        .withColumn(
            "intermediate_inflation_rate_init",
            (
                F.col("begin_inflation_rate")
                + F.col("slope")
                * F.months_between(
                    F.col("current_date"), F.lit(intermediate_inflation_cutoff_date)
                ).cast("integer")
                - F.col("seq_row")
            ),
        )
        .withColumn(
            "intermediate_inflation_rate",
            F.least(
                F.greatest(
                    F.col("intermediate_inflation_rate_init"),
                    F.col("begin_inflation_rate"),
                ),
                F.col("end_inflation_rate"),
            ),
        )
        .withColumn(
            'intermediate_inflation_rate_monthly',
            F.when(F.col('seq_row') >= 1, F.col('intermediate_inflation_rate')**(1/12)).otherwise(1.0)
        )
    )

    # Add cumulative product of intermediate_inflation_rate_monthly column
    window_cumprod = Window.orderBy('prediction_month').rowsBetween(Window.unboundedPreceding,Window.currentRow)
    # Add running cumulative product column
    df = df.withColumn('incremental_time_inflation', F.product('intermediate_inflation_rate_monthly')\
                .over(window_cumprod))
    
    return df


dfInc = funBuildIncrementInflation(sparksesh, 5, valAnnualizedInflation)
dfInc.show(truncate=True)

+--------------------------+------------+-------+----------------+--------------------+----------------------------+------------------+--------------------+--------------------------------+---------------------------+-----------------------------------+--------------------------+
|prediction_forecast_months|current_date|seq_row|prediction_month|begin_inflation_rate|begin_inflation_rate_monthly|end_inflation_rate|               slope|intermediate_inflation_rate_init|intermediate_inflation_rate|intermediate_inflation_rate_monthly|incremental_time_inflation|
+--------------------------+------------+-------+----------------+--------------------+----------------------------+------------------+--------------------+--------------------------------+---------------------------+-----------------------------------+--------------------------+
|                         5|  2025-03-31|      0|      2025-03-01|                 1.0|                         1.0|             1.025|-0.00208333333333...| 

In [None]:
# def funReinvestmentProjUtils(dfReinvestment: DataFrame) -> DataFrame:
#     dfReinvest = (dfReinvestment
#     .withColumn(
#         'month_shutdown', F.trunc(F.col('shutdown'), 'month')
#     ).withColumn(
#         'month_reopen', F.trunc(F.col('reopen'), 'month')
#     ).withColumn(
#         'month_between_openshut', F.months_between(F.col('month_reopen'), F.col('month_shutdown'))
#         .cast('integer'))
#     )
#     return dfReinvest

# def funExplodeSeq(df: DataFrame, length_col: str) -> DataFrame:
#     dfExplode = (df
#         .withColumn("startval", F.lit(0))
#         .withColumn("seq_list", F.sequence(start="startval", stop=length_col))
#         .withColumn("seq_row", F.explode("seq_list"))
#         .drop("seq_list", "startval")
#     )

#     return dfExplode

# dfinvest = classDataLoader.load_file("dfReinvestmentProjects.csv")

# dfinvest = funReinvestmentProjUtils(dfinvest)

# dfinvest = funExplodeSeq(dfinvest, 'month_between_openshut')
# dfinvest = dfinvest.withColumn(
#     'openshut_month_between', F.add_months(F.col('month_shutdown'), F.col('seq_row'))
# ).drop('seq_row')
# dfinvest.filter(F.col('month_between_openshut') > 1).show(truncate=True)
# # dfinvest.show(truncate=True)

