<a href="https://colab.research.google.com/github/PedroCampos93/DataScience/blob/main/ABInBev.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

In [None]:
"""
PROCESSED

"""
import re

from pyspark.shell import spark
from pyspark.sql.functions import (
    col,
    regexp_replace,
    to_date
    )

# ---------------
# AUX FUNCTIONS
# ---------------
def clean_normalize_cols(cols_names: list) -> list[str]:
  cols_names = [x.upper() for x in cols_names]
  cols_names = [re.sub(r"[^A-Z0-9_]", "", x) for x in cols_names]

  return cols_names

# ---------------
# CONSTS
# ---------------
IN_BEV_SAL = "/content/ABInBevRaw/abi_bus_case1_beverage_sales_20210726.csv"
IN_BEV_CHANNEL = "/content/ABInBevRaw/abi_bus_case1_beverage_channel_group_20210726.csv"
IN_DIM_DATE = "/content/ABInBevRaw/AmbevDimensionDate.csv"

COLS_ORDER = [
    "DATE_COMPLETE",
    "CE_BRAND_FLVR",
    "BRAND_NM",
    "BTLR_ORG_LVL_C_DESC",
    "TRADE_CHNL_DESC",
    "CHNL_GROUP",
    "TRADE_GROUP_DESC",
    "TRADE_TYPE_DESC",
    "PKG_CAT",
    "PKG_CAT_DESC",
    "TSR_PCKG_NM",
    "VOLUME_DOLS"
]

# ---------------
# READ
# ---------------
df_bev_sal = (
     spark.read
     .csv(IN_BEV_SAL, header=True, sep="\t")
     )

df_bev_channel = (
     spark.read
     .csv(IN_BEV_CHANNEL, header=True)
     )

df_dim_date = (
     spark.read
     .csv(IN_DIM_DATE, header=True)
     .withColumn("DATE_COMPLETE", to_date("DATE_COMPLETE", "yyyy-MM-dd"))
     )

l_bev_sal_header = (
    spark.read
    .csv(IN_BEV_SAL, header=True, encoding='utf-16le', sep="\t")
    .columns
    )

# ---------------
# ETL
# ---------------
l_bev_sal_header = clean_normalize_cols(l_bev_sal_header)

# ---------------
# DEALING WITH SPECIAL CHARACTERS
# ---------------
for cols in df_bev_sal.columns:
    df_bev_sal = (
        df_bev_sal
        .withColumn(cols, regexp_replace(col(cols), "[^a-zA-Z0-9/. -]", ""))
        )

for i, cols in enumerate(l_bev_sal_header):
    df_bev_sal = df_bev_sal.withColumnRenamed(df_bev_sal.columns[i], cols)

r_join = (
    (df_bev_sal.YEAR == df_dim_date.YEAR)
    & (df_bev_sal.MONTH == df_dim_date.MONTH)
    & (df_bev_sal.PERIOD == df_dim_date.PERIOD)
)

df_bev_sal = (
    df_bev_sal
    .join(df_dim_date, r_join, how="left")
    .drop("DATE", "YEAR", "MONTH", "PERIOD", "DAY_OF_PERIOD")
    .withColumnRenamed("VOLUME", "VOLUME_DOLS")
)


df_bev_sal.show()

df_bev_sal.coalesce(1).write.partitionBy("DATE_COMPLETE").parquet("/content/Processed/processed_beverage_sales", mode="overwrite")

In [None]:
"""
REFINED

"""
from pyspark.shell import spark
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col,
    month,
    rank,
    round,
    sum,
    year
    )

# ---------------
# READ
# ---------------
df_bev_sal = spark.read.parquet("/content/Processed/processed_beverage_sales")

# ---------------
# ETL
# ---------------

df_bev_sal = (
    df_bev_sal
    .join(df_bev_channel, on="TRADE_CHNL_DESC", how="left")
    .select(COLS_ORDER)
)

df_sales_region_g = (
    df_bev_sal.groupBy("BTLR_ORG_LVL_C_DESC", "TRADE_GROUP_DESC")
    .agg({"VOLUME_DOLS": "sum"})
    .withColumnRenamed("sum(VOLUME_DOLS)", "TOTAL_SALES")
    )

window_spec = Window.partitionBy("BTLR_ORG_LVL_C_DESC").orderBy(col("TOTAL_SALES").desc())
window_spec_lowest = Window.partitionBy("BTLR_ORG_LVL_C_DESC").orderBy(col("VOLUME_DOLS"))

df_top_3_trade_groups = (
    df_sales_region_g
    .withColumn("rank", rank().over(window_spec))
    .filter("rank <= 3")
    .select("BTLR_ORG_LVL_C_DESC", "TRADE_GROUP_DESC", "TOTAL_SALES")
    .withColumn("TOTAL_SALES", round(col("TOTAL_SALES"), 2))
)

df_sales_per_brand_month = (
    df_bev_sal
    .withColumn("YEAR", year("DATE_COMPLETE"))
    .withColumn("MONTH", month("DATE_COMPLETE"))
    .groupBy("BRAND_NM", "YEAR", "MONTH")
    .agg(round(sum("VOLUME_DOLS"), 2).alias("TOTAL_SALES"))
    .orderBy("BRAND_NM", "YEAR", "MONTH")
)

df_lowest_sales_per_region = (
    df_bev_sal
    .withColumn("rank", rank().over(window_spec_lowest))
    .filter("rank == 1")
    .select("BTLR_ORG_LVL_C_DESC", "BRAND_NM", "VOLUME_DOLS")
)

# ---------------
# PRINT
# ---------------
df_top_3_trade_groups.show(21)
df_sales_per_brand_month.show()
df_lowest_sales_per_region.show()

df_sales_per_brand_month.coalesce(1).write.partitionBy("YEAR", "MONTH").parquet("/content/Refined/refined_sales_per_brand_month", mode="overwrite")
df_top_3_trade_groups.coalesce(1).write.parquet("/content/Refined/refined_top_3_trade_groups", mode="overwrite")
df_lowest_sales_per_region.coalesce(1).write.parquet("/content/Refined/refined_lowest_sales_per_region", mode="overwrite")
df_bev_sal.coalesce(1).write.partitionBy("DATE_COMPLETE").parquet("/content/Refined/refined_bev_sal", mode="overwrite")

In [None]:
df_bev_sal.show()

+-------------+-------------+-----------+-------------------+--------------------+------------------+----------------+---------------+-------+------------+-------------+-----------+
|DATE_COMPLETE|CE_BRAND_FLVR|   BRAND_NM|BTLR_ORG_LVL_C_DESC|     TRADE_CHNL_DESC|        CHNL_GROUP|TRADE_GROUP_DESC|TRADE_TYPE_DESC|PKG_CAT|PKG_CAT_DESC|  TSR_PCKG_NM|VOLUME_DOLS|
+-------------+-------------+-----------+-------------------+--------------------+------------------+----------------+---------------+-------+------------+-------------+-----------+
|   2006-03-08|         3554| STRAWBERRY|        GREAT LAKES|   MASS MERCHANDISER| MASS MERCHANDISER|         GROCERY|            MIX|   N20O|   20Z/600ML|20z NRP 24L S|       22.5|
|   2006-03-08|         3440|      LEMON|               WEST|           SUPERETTE|            SUPERS|        SERVICES|            MIX|   N20O|   20Z/600ML|  20Z NRP 24L|      162.5|
|   2006-03-08|         3441|  RASPBERRY|          SOUTHEAST|QUICK SERVICE RES...|      FO