In [0]:
import chardet
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, to_date, regexp_replace, trim, upper, row_number, expr, abs
from pyspark.sql.window import Window

# Detect file encoding
def detect_encoding_spark(spark, path, sample_size=1024):
    try:
        binary_rdd = spark.sparkContext.binaryFiles(path).map(lambda x: x[1])
        sample_bytes = binary_rdd.take(1)[0][:sample_size]
        result = chardet.detect(sample_bytes)
        return result['encoding']
    except Exception as e:
        print(f"Erro ao detectar encoding: {str(e)}")
        return "UTF-8"

# Simplified Schema importing from CSV as String
simple_schema = StructType([StructField(c.upper(), StringType(), True) for c in [
    "DATE", "CE_BRAND_FLVR", "BRAND_NM", "BTLR_ORG_LVL_C_DESC",
    "CHNL_GROUP", "TRADE_CHNL_DESC", "PKG_CAT", "PKG_CAT_DESC",
    "TSR_PCKG_NM", "$ VOLUME", "YEAR", "MONTH", "PERIOD"
]])

# Read from CSV with previous detected encoding
def read_csv_simple(spark, path, delimiter="\t"):
    encoding = detect_encoding_spark(spark, path)

    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("encoding", encoding if encoding != 'UTF-8-SIG' else 'UTF-8') \
        .option("delimiter", delimiter) \
        .schema(simple_schema) \
        .load(path)
    df_upper = df.toDF(*[col_name.upper() for col_name in df.columns])
    return df_upper

# Start spark session
spark = SparkSession.builder.getOrCreate()

# Read beverage_sales, correct the date format and TSR_PCKG_NM standardization
beverage_sales_df = read_csv_simple(
    spark,
    "/mnt/bronze/beverage_sales/abi_bus_case1_beverage_sales_20210726.csv",
    "\t"
).select(
    to_date(
        regexp_replace(col("DATE"), r"^(\d{1})/", r"0$1/"),
        "MM/dd/yyyy"
    ).alias("DATE"),

    col("CE_BRAND_FLVR").cast("integer"),
    col("BRAND_NM"),
    col("BTLR_ORG_LVL_C_DESC"),
    col("CHNL_GROUP"),
    col("TRADE_CHNL_DESC"),
    col("PKG_CAT"),
    col("PKG_CAT_DESC"),
    upper(trim(regexp_replace(col("TSR_PCKG_NM"), r"( \*| S|\*|S)$", ""))).alias("TSR_PCKG_NM"), # Clean TSR_PCKG_NM removing sufixes
    abs(col("$ VOLUME").cast("double")).alias("$ VOLUME"), # absolute numbers, negative to positive
    col("YEAR").cast("integer"),
    col("MONTH").cast("integer"),
    col("PERIOD").cast("integer")
)

# Read channel_features
channel_features_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/mnt/bronze/beverage_channel_features/xxx_bus_case1_beverage_channel_group_20210726.csv")


In [0]:
# Merge
merged_df = beverage_sales_df.join(
    channel_features_df,
    on="TRADE_CHNL_DESC",
    how="left"
)

# Show Schema
# merged_df.printSchema()

In [0]:
# dim_flavor
dim_flavor = merged_df.select(
    col("CE_BRAND_FLVR").alias("flavor_id"),
    col("BRAND_NM").alias("flavor_description")
).distinct()
dim_flavor.write.format("delta").mode("overwrite").save("/mnt/silver/dim_flavor")

In [0]:
# dim_package
dim_package = merged_df.select(
    col("PKG_CAT").alias("package_category_code"),
    col("Pkg_Cat_Desc").alias("package_category_description"),
    col("TSR_PCKG_NM").alias("package_name")
).distinct()
dim_package.write.format("delta").mode("overwrite").save("/mnt/silver/dim_package")

In [0]:
# dim_channel_group
dim_channel_group = merged_df.select(
    col("TRADE_CHNL_DESC").alias("channel_id"),
    col("CHNL_GROUP").alias("channel_group"),
    col("TRADE_GROUP_DESC").alias("trade_group_desc"),
    col("TRADE_TYPE_DESC").alias("trade_type_desc")
).distinct()
dim_channel_group.write.format("delta").mode("overwrite").save("/mnt/silver/dim_channel_group")

In [0]:
# dim_month
dim_month = merged_df.select(
    col("MONTH").alias("month_id")
).distinct().withColumn(
    "month_name",
    expr("""
        CASE month_id
            WHEN 1 THEN 'January'
            WHEN 2 THEN 'February'
            WHEN 3 THEN 'March'
            WHEN 4 THEN 'April'
            WHEN 5 THEN 'May'
            WHEN 6 THEN 'June'
            WHEN 7 THEN 'July'
            WHEN 8 THEN 'August'
            WHEN 9 THEN 'September'
            WHEN 10 THEN 'October'
            WHEN 11 THEN 'November'
            WHEN 12 THEN 'December'
        END
    """))
dim_month.write.format("delta").mode("overwrite").save("/mnt/silver/dim_month")

In [0]:
# Define window
windowSpec = Window.orderBy(
    col("selling_date"),
    col("flavor_id"),
    col("region_desc"),
    col("channel_id"))
# fact_table
fact_sale = merged_df.select(
    col("DATE").alias("selling_date"),
    col("YEAR").alias("year"),
    col("MONTH").alias("month_id"),
    col("PERIOD").alias("week"),
    col("CE_BRAND_FLVR").alias("flavor_id"),
    col("BTLR_ORG_LVL_C_DESC").alias("region_desc"),
    col("PKG_CAT").alias("package_category_code"),
    col("$ VOLUME").alias("sales_volume"),
    col("TRADE_CHNL_DESC").alias("channel_id")
).withColumn(
    "sale_id", row_number().over(windowSpec)
)
# Reorganize columns
fact_sale = fact_sale.select(
    "sale_id", "selling_date", "year", "month_id", "week",
    "flavor_id", "region_desc", "package_category_code",
    "sales_volume", "channel_id"
)
fact_sale.write.format("delta").mode("overwrite").save("/mnt/silver/fact_sale")


In [0]:
#del beverage_sales_df
#del channel_features_df