# BUILD DATA FOR PRODUCTION PREDICTIONS

## IMPORT LIBRARIES

In [None]:
import polars as pl
import numpy as np

## SETUP VARIABLES

In [None]:
LEADING_MONTHS = 1
LEADING_WEEKS = 5

## DATA READING

In [None]:
df_store = pl.read_parquet('../../data/raw/part-00000-tid-2779033056155408584-f6316110-4c9a-4061-ae48-69b77c7c8c36-4-1-c000.snappy.parquet')
df_transaction = pl.read_parquet('../../data/raw/part-00000-tid-5196563791502273604-c90d3a24-52f2-4955-b4ec-fb143aae74d8-4-1-c000.snappy.parquet')
df_product = pl.read_parquet('../../data/raw/part-00000-tid-7173294866425216458-eae53fbf-d19e-4130-ba74-78f96b9675f1-4-1-c000.snappy.parquet')
df_zipcode = pl.read_csv('../../data/raw/georef-zipcode.csv', separator=';')
df_holiday = pl.read_csv('../../data/processed/processed_usa_holiday.csv', separator=',')

## UNIFIYNG DATASETS

In [None]:
df = df_transaction.join(
    df_store,
    left_on="internal_store_id",
    right_on="pdv",
    how='left'
).join(
    df_product,
    left_on="internal_product_id",
    right_on="produto",
    how='left'
)

## DATA WRANGLING

### Dropping negative and rounding values from quantity

In [None]:
df = df.filter(df['quantity'] >= 0)
df = df.with_columns(df['quantity'].round().cast(pl.Int64))

### Treating the overwhelming outliers of 2022-09-11

In [None]:
# Remove from day 2022-09-11 the products with quantity greater than the percentile 75
# and products appearing for the first time

outlier_date = pl.lit('2022-09-11').str.to_date()

outlier_products = df.filter(
    pl.col('transaction_date') == outlier_date
)['internal_product_id'].unique()

normal_products = df.filter(
    (pl.col('internal_product_id').is_in(outlier_products)) &
    (pl.col('transaction_date') < outlier_date)
)['internal_product_id'].unique()

# Calculate the 75th percentile of quantity for the outlier date
outlier_percentile_75 = df.filter(
    pl.col('transaction_date') == outlier_date
).select(
    pl.col('quantity').quantile(0.75, interpolation='linear')
).item()

# Filter the DataFrame
df = df.filter(
    (pl.col('transaction_date') != outlier_date) |
    (
        pl.col('quantity').le(outlier_percentile_75) &
        pl.col('internal_product_id').is_in(normal_products)
    )
)

### Transform temporal features

In [None]:
df = df.with_columns([
    pl.col('transaction_date').dt.month().alias('month'),
    pl.col('transaction_date').dt.week().alias('week_of_year'),
])

In [None]:
max_week = df["week_of_year"].max()
max_month = df["month"].max()

## FEATURE ENGINEERING

### Setup variables for feature engineering

In [None]:
cols = ['quantity','gross_value','net_value','gross_profit','discount']
keys = [
    'internal_product_id', 'internal_store_id', 'distributor_id',
    'premise', 'categoria_pdv', 'zipcode', 'tipos', 'label', 'subcategoria',
    'marca', 'fabricante', 'month', 'week_of_year', 'city'
]
city_partition = ['internal_product_id', 'city']
city_month_keys = ['internal_product_id', 'city', 'month']
city_week_keys = ['internal_product_id', 'city', 'week_of_year']
pdv_week_keys = ['internal_product_id', 'internal_store_id', 'week_of_year']
product_city_partition = ['internal_product_id', 'city']
product_pdv_partition = ['internal_product_id', 'internal_store_id']

### Finding city by zipcode

In [None]:
df_zipcode = df_zipcode.rename({'Zip Code': 'zipcode', 'Official USPS city name': 'city'})
df = df.join(df_zipcode.select(['zipcode', 'city']), on='zipcode', how='left')

### Getting USA holidays

In [None]:
df_holiday = df_holiday.with_columns([
    pl.col('Date').str.to_date().alias('Date'),
    pl.lit(1).alias('holiday')
])

df = df.join(
    df_holiday.select(['Date', 'holiday']),
    left_on='transaction_date',
    right_on='Date',
    how='left'
)

df = df.with_columns(
    pl.col('holiday').fill_null(0)
)

### Droping some columns

In [None]:
df = df.drop(['taxes','categoria','descricao','reference_date', 'transaction_date'])

### Creating new values for prediction

In [None]:
most_sold = df.filter(pl.col('month').is_in([10, 11, 12])).group_by([
    'internal_product_id', 'internal_store_id'
]).agg(
    pl.col('quantity').sum().alias('quantity')
).sort('quantity', descending=True).limit(300_000)

In [None]:
# Create distinct rows for future weeks based on the distinct product-store combinations and weeks
df_distinct = df.filter(
    (pl.col('month').is_in([10, 11, 12])) &
    (
        pl.concat_str([
            pl.col('internal_product_id'),
            pl.col('internal_store_id')
        ], separator=' ').is_in(
            most_sold.select(
                pl.concat_str([
                    pl.col('internal_product_id'),
                    pl.col('internal_store_id')
                ], separator=' ')
            )
        )
    )
)[[
    'internal_product_id',
    'internal_store_id',
    'distributor_id',
    'premise',
    'categoria_pdv',
    'zipcode',
    'tipos',
    'label',
    'subcategoria',
    'marca',
    'fabricante',
    'city',
    'holiday'
]].unique().with_columns(
    pl.lit(None).cast(pl.Int64).alias('quantity'),
    pl.lit(None).cast(pl.Float64).alias('gross_value'),
    pl.lit(None).cast(pl.Float64).alias('net_value'),
    pl.lit(None).cast(pl.Float64).alias('gross_profit'),
    pl.lit(None).cast(pl.Float64).alias('discount')
)

df_new = pl.concat([
    df_distinct.with_columns(
        pl.lit(max_month+1).cast(pl.Int8).alias('month'),
        pl.lit(max_week+i).cast(pl.Int8).alias('week_of_year')
    ) for i in range(1, LEADING_WEEKS+1)
]).select([
    'internal_store_id',
    'internal_product_id',
    'distributor_id',
    'quantity',
    'gross_value',
    'net_value',
    'gross_profit',
    'discount',
    'premise',
    'categoria_pdv',
    'zipcode',
    'tipos',
    'label',
    'subcategoria',
    'marca',
    'fabricante',
    'month',
    'week_of_year',
    'city',
    'holiday'
])

# Concatenate the original dataframe with the new rows for prediction.
# We only keep the required columns from the original dataframe to match the new dataframe's schema.
df = pl.concat([df.filter(pl.col('month') == max_month), df_new])

### Creating Average numbers for skus purchased per city month

In [None]:
monthly_aggs = []
for c in cols:
    monthly_aggs += [
        pl.col(c).sum().alias(f"monthly_{c}_sum"),
    ]

monthly_totals = df.group_by(city_month_keys).agg(monthly_aggs)

monthly_shifts = []
monthly_shifts_names = []
for c in cols:
    monthly_shifts += [
        pl.col(f"monthly_{c}_sum").shift(n=1).over(product_city_partition).alias(f"previous_month_{c}_sum"),
    ]
    monthly_shifts_names += [
        f"previous_month_{c}_sum",
    ]

previous_month_values = monthly_totals.sort("month").with_columns(monthly_shifts)

### Creating shifting numbers for last 5 quantity sales for each product-PDV

In [None]:
quantity_totals = df.group_by(pdv_week_keys).agg(
    pl.col('quantity').sum().alias('quantity')
).with_columns(
    pl.when(pl.col('quantity') == 0).then(None).otherwise(pl.col('quantity')).alias('quantity')
)

quantity_shifts = [
    pl.col("quantity").shift(n=i).over(
        product_pdv_partition
    ).alias(f"quantity_lag{i}") for i in range(1, LEADING_WEEKS+1)
]
quantity_shifts_names = [
    f"quantity_lag{i}" for i in range(1, LEADING_WEEKS+1)
]
quantity_nulls = [
    pl.when(
        pl.col(f"quantity_lag{i}") == 0
    ).then(
        None
    ).otherwise(
        pl.col(f"quantity_lag{i}")
    ).alias(f"quantity_lag{i}") for i in range(1, LEADING_WEEKS+1)
]

previous_quantity_values = quantity_totals.sort(
    pdv_week_keys
).with_columns(quantity_shifts).with_columns(
    quantity_nulls
)

### Group values around keys for the final DataFrame

In [None]:
df = df.group_by(keys).agg([
    pl.col("quantity").sum().alias("quantity"),
    pl.col("holiday").max().alias("holiday")
]).join(
    previous_month_values.select(city_month_keys + monthly_shifts_names),
    on=city_month_keys,
    how="left"
).join(
    previous_quantity_values.select(pdv_week_keys + quantity_shifts_names),
    on=pdv_week_keys,
    how="left"
).filter(
    pl.col('month').eq(max_month + LEADING_MONTHS)
).with_columns([
    (pl.col('previous_month_discount_sum') / pl.col('previous_month_gross_value_sum')).fill_null(0).replace([np.inf, -np.inf], 0).alias('discount_rate_month'),
    (pl.col('previous_month_gross_profit_sum') / pl.col('previous_month_gross_value_sum')).fill_null(0).replace([np.inf, -np.inf], 0).alias('profit_margin_month'),
    pl.lit(LEADING_MONTHS).cast(pl.Int8).alias('month'),
    pl.col('week_of_year') - (max_week+1),
]).drop(['quantity'])

## SAVING THE DATA

In [None]:
df.write_parquet('../../data/processed/processed_production.parquet')