1. Run cell 2, 3, 4, and 5 (Imports and helper functions)
2. Update cell 6 according to your Test (retailer, year, period).
3. Run your selected Test

In [0]:
!pip install datacompy

Looking in indexes: https://pypi.org/simple, https://****@pkgs.dev.azure.com/marsanalytics/32a89b5b-1d0c-4888-98a9-defbad0d731d/_packaging/sds-shared-utils/pypi/simple/

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [0]:
# imports
import os
from datetime import datetime
from typing import Any, Optional, Union

import pandas as pd
from datacompy import SparkSQLCompare
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    coalesce,
    col,
    concat,
    date_format,
    lag,
    last_day,
    lit,
    lpad,
    regexp_replace,
    round,
    row_number,
    split,
    substring,
    sum,
    to_date,
    upper,
    when,
)
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql.window import Window


In [0]:
# constant values

RSV_VARIANCES_THRESHOLD_LOWER = -0.15
RSV_VARIANCES_THRESHOLD_UPPER = 0.15
GSV_PER_OF_RSV_MIN_BOUND = 0.95
GSV_PER_OF_RSV_MAX_BOUND = 1.05
NSV_PER_OF_GSV_MIN_BOUND = 0.95
NSV_PER_OF_GSV_MAX_BOUND = 1.05
YTD_THRESHOLD_RSV_LOWER = -0.15
YTD_THRESHOLD_RSV_UPPER = 0.15
YTD_THRESHOLD_GSV_LOWER = -0.15
YTD_THRESHOLD_GSV_UPPER = 0.15
YTD_THRESHOLD_NSV_LOWER = -0.15
YTD_THRESHOLD_NSV_UPPER = 0.15
YOY_METRICS_LOWER = -10
YOY_METRICS_UPPER = 10
RSV_COMPARE_VALUE = 0.10

# paths
DIM_CALENDAR = "cdm_common.calendar_period"
DIM_DATE = "cdm_common.calendar_date"
OFFICEDEPOT_PATH = "dcom_gsc_silver.us_officedepot_sales"
HARRISTEETER_PATH = "dcom_gsc_silver.us_harristeeter_sales"
STAPLES_PATH = "dcom_gsc_silver.us_staples_sales"
SAMSCLUB_PATH = "dcom_gsc_silver.us_samsclub_sales"
WALMART_PATH = "dcom_gsc_silver.us_walmart_sales"
MCDONALDS_PATH = "dcom_gsc_silver.us_mcd_sales_dq_check"
HEB_PATH = "dcom_gsc_silver.us_heb_sales"
KROGER_PATH = "dcom_gsc_silver.us_kroger_sales"
NIELSEN_PATH = "dcom_gsc_silver.us_nielsen_sales"
DDAAS_PATH = "dcom_gsc_silver.us_gti_sales"
AMAZON_PATH = "trusted_raw.avc_sales_manufacturing_retail_day_us"
ODD_PATH = "dcom_gsc_silver.us_odd_sales_refactored"
SONIC_PATH = "dcom_gsc_bronze.sharepoint_us_sonic_manual"


RSV_VARIANCES_GROUPING_COLS = ["Market", "year", "period", "customer_name"]
RSV_VARIANCES_PERIOD_COL = "period"
RSV_VARIANCES_DISTINCT_COL = ["customer_name", "Market", "year"]
RSV_VARIANCES_PARTITIONBY = ["customer_name", "Market"]
RSV_VARIANCES_ORDERBY = ["year", "period"]

GROUPING_COLS = [
    "Market",
    "year",
    "period",
    "customer_name",
    "brand_name",
    "channel_format",
]
PERIOD_COL = "period"
DISTINCT_COL = ["customer_name", "Market", "year", "brand_name", "channel_format"]
PARTITIONBY = ["Market", "customer_name", "brand_name", "channel_format"]
ORDERBY = ["year", "period"]

CUSTOMER_ID = [
    "US_USMWCONFTOTAL_PEAPODTOTALECOMTA",
    "US_USMWCONFTOTAL_ALBSCOTOTALECOMTA",
    "US_USMWCONFTOTAL_GIANTEAGLETOTALECOMTA",
    "US_USMWCONFTOTAL_HYVEETOTALECOMTA",
    "US_USMWCONFTOTAL_LOWESFOODTOTALECOMTA",
    "US_USMWCONFTOTAL_MEIJERTOTALECOMTA",
    "US_USMWCONFTOTAL_RALEYSTOTALCORPECOMTA",
    "US_USMWCONFTOTAL_RITEAIDTOTALECOMTA",
    "US_USMWCONFTOTAL_SHOPRITETOTALECOMTA",
    "US_USMWCONFTOTAL_SMARTFINALTOTALECOMTA",
    "US_USMWCONFTOTAL_TARGETTOTALECOMTA",
]

DATA = {
    "Customer_ID": [
        "US_USMWCONFTOTAL_PEAPODTOTALECOMTA",
        "US_USMWCONFTOTAL_ALBSCOTOTALECOMTA",
        "US_USMWCONFTOTAL_GIANTEAGLETOTALECOMTA",
        "US_USMWCONFTOTAL_HYVEETOTALECOMTA",
        "US_USMWCONFTOTAL_LOWESFOODTOTALECOMTA",
        "US_USMWCONFTOTAL_MEIJERTOTALECOMTA",
        "US_USMWCONFTOTAL_RALEYSTOTALCORPECOMTA",
        "US_USMWCONFTOTAL_RITEAIDTOTALECOMTA",
        "US_USMWCONFTOTAL_SHOPRITETOTALECOMTA",
        "US_USMWCONFTOTAL_SMARTFINALTOTALECOMTA",
        "US_USMWCONFTOTAL_TARGETTOTALECOMTA",
    ],
    "CustomerName": [
        "AHOLD",
        "ALBSCO",
        "GIANTEAGLE",
        "HYVEE",
        "LOWESFOOD",
        "MEIJER",
        "RALEYS",
        "RITEAID",
        "SHOPRITE",
        "SMARTFINAL",
        "TARGET",
    ],
}

In [0]:
spark = SparkSession.builder.appName("DQ").getOrCreate()

dim_calendar_period = (
    spark.table(DIM_CALENDAR)
    .filter("row_latest_ind='Y'")
    .filter("calendar_period_type='Financial'")
).select("calendar_period_id", "period_name")

dim_calendar_date = (
    (
        spark.table(DIM_DATE)
        .filter("row_latest_ind='Y'")
        .filter("calendar_type='Financial'")
    )
    .select("calendar_date_id", "period_id", "calendar_date")
    .withColumn("calendar_date2", regexp_replace(col("calendar_date"), "-", ""))
)


def deduplicate_dataframe(df: DataFrame, unique_cols, order_by_col):
    if df:
        window_spec = Window.partitionBy(*unique_cols).orderBy(col(order_by_col).desc())

        ranked_df = df.withColumn("rank", row_number().over(window_spec))

        deduplicated_df = ranked_df.filter(col("rank") == 1).drop("rank")

        return deduplicated_df
    else:
        return None



In [0]:
# gold table, year, period, market, and reatiler change based on your need
data = spark.table("dcom_gsc_gold.pl_features_global")
year = "2025"
retailer = "MEIJER"
period = "03"
market = "United States"
retailer_list, name_of_retailer = [retailer], retailer

In [0]:
# to check the gold data
display(data.filter((F.col("Market") == market) & (F.col("year") == "2024") & (F.col("period") == period) & (F.col("customer_name") == retailer)))


advertising_constribution,controllable_contribution,third_party_volume_tonnes,Market,Region,occasion,product_description,technology,ZREP,product_id,EAN,year,period,units_sold,rsv,gsv,te,nsv,mac,cogs,customer_name,channel_format,currency_code,sub_category_name,super_category_name,brand_name,sub_brand_name,manufacturer_name,iso2_letter_country_code,is_competitor_flag,economic_cell_group_code,Casefill_Quantity,Agreed_Order_Quantity,CSL,product_code,ssa
63.699401650000006,286.9167926668105,0.0662855,United States,,Not Applicable,DOVE ASSORTED CHOCOLATE WRAPPED PIECES/TRUFFLE FC NS PKG SUP LARGE 15.8 OUNCE,Moulded,365093.0,926494004,4000052900,2024,3,148.0,1352.52,1073.0,108.45388029643608,964.546119703564,412.2005095547621,552.345610148802,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,926494004,UNMAPPED
,,,United States,,UNMAPPED,HUBBA BUBBA BUBBLE GUM TAPE IC SEAS PKG SINGLE CANISTER 1 COUNT,NOT APPLICABLE,289334.0,1666100979,2200004115,2024,3,1367.0,1593.23,1267.7734636276273,238.4901711987594,1029.2832924288678,440.9456057075082,588.3376867213594,MEIJER,Click & Mortar,USD,FRUITY CONFECTIONERY,CONFECTIONERY,HUBBA BUBBA,NOT APPLICABLE,MARS,US,0,OTHER,,,,1666100979,UNMAPPED
50.87476718981481,137.59778172129637,0.042287,United States,,Not Applicable,DOVE DARK ALMOND ALMOND CHOCOLATE WRAPPED PIECES/TRUFFLE FC NS PKG SUP MEDIUM 7.61 OUNCE,Moulded,361570.0,2179807619,4000052540,2024,3,196.0,1117.44,864.3600000000001,149.36508477197663,714.9949152280234,230.44709292506877,484.5478223029546,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,2179807619,UNMAPPED
355.40940526666657,-473.8494695683335,0.0749069999999999,United States,,Not Applicable,SKT ORIGINAL MULTIPLE FLAVOR FRUIT FRTY CONF CHEWY IC NS PKG SINGLE 2.17 OUNCE,Panned,161582.0,2387672169,4000000160,2024,3,1218.0,1692.95,1108.38,376.4197621999696,731.9602378000302,468.3726910992269,263.5875467008034,MEIJER,Click & Mortar,USD,FRUITY CONFECTIONERY,CONFECTIONERY,SKITTLES,NOT APPLICABLE,MARS,US,0,OTHER,,,,2387672169,UNMAPPED
,,,United States,,Not Applicable,SNICKERS ORIGINAL CARAMEL PEANUT CHOCOLATE FILLED BAR/CUPS IC NS PKG SINGLE 1.86 OUNCE,Filled Bar,256477.0,1033783015,4000042431,2024,3,4133.0,4764.13,4168.433004300039,988.450066676514,3179.982937623526,2178.930980307825,1001.0519573157007,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,SNICKERS,NOT APPLICABLE,MARS,US,0,OTHER,,,,1033783015,UNMAPPED
,,,United States,,UNMAPPED,DOVE ASSORTED CHOCOLATE WRAPPED PIECES/TRUFFLE FC SEAS PKG SUP LARGE 22.7 OUNCE,NOT APPLICABLE,414626.0,1683436969,4000056672,2024,3,284.0,3213.37,2740.771336863142,427.9588519191561,2312.812484943986,1066.2946407062282,1246.5178442377571,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,1683436969,UNMAPPED
,,,United States,,UNMAPPED,M&M WHITE CHOC STRAWBERRY CHOCOLATE UNWRAPPED PIECES FC SEAS PKG LAYDOWN BAG MEDIUM 7.44 OUNCE,NOT APPLICABLE,436365.0,1711508407,4000058416,2024,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,M&M'S,NOT APPLICABLE,MARS,US,0,OTHER,,,,1711508407,UNMAPPED
,,,United States,,UNMAPPED,SNICKERS ORIGINAL CARAMEL PEANUT CHOCOLATE EGG FC SEAS PKG MULTI PACK SINGLE 1.1 OUNCE 6 COUNT,NOT APPLICABLE,316886.0,1938515281,4000049942,2024,3,381.0,2259.9300000000003,1869.111129392168,359.40524370300056,1509.7058856891676,974.389656148948,535.3162295402196,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,SNICKERS,NOT APPLICABLE,MARS,US,0,OTHER,,,,1938515281,UNMAPPED
54.16670346279206,374.1623312306659,0.053898,United States,,Not Applicable,LFSM SUGAR FREE WINTERMINT MINT TASTE MINTS FC NS PKG PEG BAG X MULTI PACK 2.75 OUNCE,Pressed,384718.0,4185664432,1900000305,2024,3,691.0,1967.69,1243.8000000000002,87.75025452307854,1156.0497454769215,485.1225656435233,670.9271798333982,MEIJER,Click & Mortar,USD,FRUITY CONFECTIONERY,CONFECTIONERY,LIFE SAVERS,NOT APPLICABLE,MARS,US,0,OTHER,,,,4185664432,UNMAPPED
,,,United States,,UNMAPPED,DOVE DARK CARAMEL SEA SALT CARAMEL CHOCOLATE WRAPPED PIECES/TRUFFLE FC SEAS PKG LAYDOWN BAG MEDIUM 7.94 OUNCE,NOT APPLICABLE,389977.0,1969946633,4000054541,2024,3,484.0,2087.19,1738.1596155731986,267.4279072700337,1470.731708303165,715.3881383068517,755.3435699963131,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,1969946633,UNMAPPED


# NULL_CHECKS

In [0]:
# Checks for null values in specific columns for each retailer in the given year, period, and market.

# Iterates over a predefined list of important columns and checks if any null values are present
# for each retailer in the provided `retailer_list`. Filters the data by year, period, market, and
# customer name before performing the null check.
# Prints the null count for each column per retailer to the console.

# update the columns_name based on your need (it will be same for all retailers in gold)
columns_name = ["Market","customer_name","period","year","rsv","gsv","te","nsv","mac","cogs","ZREP","product_id","EAN","currency_code","brand_name", "product_code"]

for cust in retailer_list:
    for column in columns_name:
        if column in data.columns:
            null_count = data.filter(
                F.col(column).isNull()
                & (F.col("year") == year)
                & (F.col("period") == period)
                & (F.col("Market") == market)
                & (F.col("customer_name") == cust)
            ).count()
            if null_count > 0:
                print("There are null values in the data ",column,": " , null_count)
            else:
                print("There are no null values in the data",column,": " , null_count)

There are no null values in the data Market :  0
There are no null values in the data customer_name :  0
There are no null values in the data period :  0
There are no null values in the data year :  0
There are no null values in the data rsv :  0
There are no null values in the data gsv :  0
There are no null values in the data te :  0
There are no null values in the data nsv :  0
There are no null values in the data mac :  0
There are no null values in the data cogs :  0
There are no null values in the data ZREP :  0
There are no null values in the data product_id :  0
There are no null values in the data EAN :  0
There are no null values in the data currency_code :  0
There are no null values in the data brand_name :  0
There are no null values in the data product_code :  0


# YOY_Trends

In [0]:
def get_yoy_metrics(source_df, metric_cols, year, period, cust) -> DataFrame:
    """
    Calculates Year-over-Year (YoY) growth metrics for a given customer, year, and period.

    This function:
    - Generates a full set of customer-period combinations.
    - Joins them with the source data to ensure complete coverage of all periods.
    - Aggregates metrics over time and computes YoY growth using a lag of 13 periods.
    - Filters out results within the defined acceptable YoY growth thresholds.
    - Returns only outlier records (i.e., outside threshold range) for the given year, period, and customer.

    Parameters:
    source_df (DataFrame): Input Spark DataFrame containing historical metrics.
    metric_cols (list): List of metric column names (e.g., ["nsv", "rsv"]) to calculate YoY growth for.
    year (int or str): The year to filter the final results.
    period (int or str): The period (e.g., month or fiscal period) to filter the final results.
    cust (str): Customer identifier (unused directly in logic, but assumed to be in `grouping_cols` or source data).

    Returns:
    tuple:
        - result (DataFrame): DataFrame containing only rows where YoY growth is outside the allowed range.
        - success (bool): True if no outliers are found, False otherwise.

    Notes:
    - Uses constants: `GROUPING_COLS`, `PERIOD_COL`, `DISTINCT_COL`, `PARTITIONBY`, `ORDERBY`, 
      `YOY_METRICS_LOWER`, `YOY_METRICS_UPPER` (assumed to be defined globally).
    - Missing values in metrics are treated as 0 via `coalesce`.
    - Assumes `market` is available in `grouping_cols` or in the input DataFrame.
    - Function currently captures the first `YoY_Growth_` column found when filtering outliers.
    """
    try:
        grouping_cols = GROUPING_COLS
        period_col = PERIOD_COL
        distinct_cols = DISTINCT_COL
        partitionBy = PARTITIONBY
        orderBy = ORDERBY
        join_type = "left"

        periods = [(i,) for i in range(1, 14)]
        full_periods_df = spark.createDataFrame(periods, [f"{period_col}"])

        customers = source_df.select(*distinct_cols).distinct()
        full_combinations_df = customers.crossJoin(full_periods_df)

        df_full = full_combinations_df.join(source_df, on=grouping_cols, how=join_type)

        for metric in metric_cols:
            df_with_zeros = df_full.withColumn(
                f"{metric}", coalesce(f"{metric}", lit(0))
            )

        df_yoy_growth = df_with_zeros.groupBy(grouping_cols).agg(
            *[round(sum(col), 4).alias(col) for col in metric_cols]
        )

        window_spec = Window.partitionBy(partitionBy).orderBy(*orderBy)

        for metric in metric_cols:
            df_yoy_growth = (
                df_yoy_growth.withColumn(
                    f"previous_{metric}", lag(f"{metric}", 13).over(window_spec)
                )
                .withColumn(
                    f"YoY_Growth_{metric}",
                    round(
                        (
                            (
                                (col(f"{metric}") - col(f"previous_{metric}"))
                                / col(f"previous_{metric}")
                            )
                            * 100
                        ),
                        4,
                    ),
                )
                .na.fill(0)
            )
        col_name = [
            col_ for col_ in df_yoy_growth.columns if col_.startswith("YoY_Growth_")
        ][0]

        result_df = df_yoy_growth.filter(
            ~(
                (F.col(f"{col_name}") >= YOY_METRICS_LOWER)
                & (F.col(f"{col_name}") <= YOY_METRICS_UPPER)
            )
        )
        result_df = result_df.filter(
            (F.col("year") == year)
            & (F.col("period") == period)
            & (F.col("Market") == market)
        )

        if result_df.count() > 0:
            success = False
            result = result_df
        else:
            success = True
            result = result_df

    except Exception:
        success = False
        result = result_df
    return result, success


In [0]:
# Runs Year-over-Year (YoY) KPI validation for each retailer and KPI.
# For each combination of retailer and KPI metric, this function filters the input data,
# computes the YoY metric via `get_yoy_metrics`, and displays the results.
# Displays the result DataFrames and prints success flags.

kpi_list = ["rsv","gsv","nsv"]
for cust in retailer_list:
    for kpi in kpi_list:
        data_ = data.filter(F.col("customer_name") == f"{cust}")
        result, success = get_yoy_metrics(data_, [f"{kpi}"], year, period, cust)
        print(kpi, success)
        display(result)

rsv False


Market,year,period,customer_name,brand_name,channel_format,rsv,previous_rsv,YoY_Growth_rsv
United States,2025,3,MEIJER,5 GUM,Click & Mortar,2126.17,3196.24,-33.479
United States,2025,3,MEIJER,ALTOIDS,Click & Mortar,1946.26,2775.99,-29.8895
United States,2025,3,MEIJER,DOUBLE MINT,Click & Mortar,975.38,298.1,227.1989
United States,2025,3,MEIJER,DOVE,Click & Mortar,24830.57,42761.97,-41.9331
United States,2025,3,MEIJER,ECLIPSE,Click & Mortar,1547.02,3081.91,-49.8032
United States,2025,3,MEIJER,HUBBA BUBBA,Click & Mortar,183.45,1596.13,-88.5066
United States,2025,3,MEIJER,LIFE SAVERS,Click & Mortar,14850.7,29795.27,-50.1575
United States,2025,3,MEIJER,M&M'S,Click & Mortar,74351.57,144092.99,-48.4003
United States,2025,3,MEIJER,MILKY WAY US,Click & Mortar,4336.93,6839.16,-36.5868
United States,2025,3,MEIJER,Mixed,Click & Mortar,20016.25,46214.24,-56.6881


gsv False


Market,year,period,customer_name,brand_name,channel_format,gsv,previous_gsv,YoY_Growth_gsv
United States,2025,3,MEIJER,5 GUM,Click & Mortar,1613.2633,2327.2,-30.6779
United States,2025,3,MEIJER,ALTOIDS,Click & Mortar,1488.3674,1952.16,-23.7579
United States,2025,3,MEIJER,DOUBLE MINT,Click & Mortar,673.4309,235.3916,186.0896
United States,2025,3,MEIJER,DOVE,Click & Mortar,19683.1578,35233.3993,-44.1349
United States,2025,3,MEIJER,ECLIPSE,Click & Mortar,1198.0333,2146.6243,-44.1899
United States,2025,3,MEIJER,HUBBA BUBBA,Click & Mortar,145.8012,1270.0987,-88.5205
United States,2025,3,MEIJER,LIFE SAVERS,Click & Mortar,11590.2489,21426.1354,-45.906
United States,2025,3,MEIJER,M&M'S,Click & Mortar,60683.0628,121099.5206,-49.8899
United States,2025,3,MEIJER,MILKY WAY US,Click & Mortar,3346.6431,5069.0647,-33.9791
United States,2025,3,MEIJER,Mixed,Click & Mortar,15834.9718,38772.9812,-59.1598


nsv False


Market,year,period,customer_name,brand_name,channel_format,nsv,previous_nsv,YoY_Growth_nsv
United States,2025,3,MEIJER,5 GUM,Click & Mortar,1346.686,1818.4288,-25.9423
United States,2025,3,MEIJER,ALTOIDS,Click & Mortar,748.307,1599.3343,-53.2113
United States,2025,3,MEIJER,DOUBLE MINT,Click & Mortar,594.5553,186.4721,218.8441
United States,2025,3,MEIJER,DOVE,Click & Mortar,17443.4412,29962.9338,-41.7833
United States,2025,3,MEIJER,ECLIPSE,Click & Mortar,1142.2187,1985.944,-42.4848
United States,2025,3,MEIJER,HUBBA BUBBA,Click & Mortar,113.2429,1031.3851,-89.0203
United States,2025,3,MEIJER,LIFE SAVERS,Click & Mortar,10263.4767,19113.4138,-46.3022
United States,2025,3,MEIJER,M&M'S,Click & Mortar,50619.417,101171.4237,-49.9667
United States,2025,3,MEIJER,MILKY WAY US,Click & Mortar,2897.4796,4143.0926,-30.0648
United States,2025,3,MEIJER,Mixed,Click & Mortar,13756.0714,32784.2507,-58.0406


# KPI_Compare

In [0]:

def kpi_compare_fun(data, status, year, period):
    """
    Compares specific KPIs (GSV or NSV) against RSV to determine pass/fail status.

    Depending on the status type provided, this function filters the dataset by year,
    period, and market, and creates a new column (`GSV_status` or `NSV_status`) indicating:
      - "True" if the value is less than RSV
      - "Fail" otherwise

    Parameters:
    data (DataFrame): Input Spark DataFrame with 'gsv', 'nsv', and 'rsv' columns.
    status (str): Status to compute, either "GSV_status" or "NSV_status".
    year (int or str): Year to filter data.
    period (int or str): Period to filter data.

    Returns:
    DataFrame: A DataFrame with the added status column showing "True" or "Fail".
    """
    try:
        if status == "GSV_status":
            df = (
                data.filter(
                    (F.col("year") == year)
                    & (F.col("period") == period)
                    & (F.col("Market") == market)
                )
            ).withColumn(
                "GSV_status",
                F.when(F.col("gsv") < F.col("rsv"), "True").otherwise("Fail"),
            )
        else:
            df = (
                data.filter(
                    (F.col("year") == year)
                    & (F.col("period") == period)
                    & (F.col("Market") == market)
                )
            ).withColumn(
                "NSV_status",
                F.when(F.col("nsv") < F.col("rsv"), "True").otherwise("Fail"),
            )
    except Exception:
        df = spark.createDataFrame()
    return df


In [0]:
# Validates GSV and NSV status against RSV for each customer in the given year and period.

# For each retailer in the list, this function:
# - Filters the data specific to the customer.
# - Checks each of the status columns: 'GSV_status' and 'NSV_status'.
# - Uses the `kpi_compare_fun` to get validation results.
# - Flags failures where GSV < RSV or NSV < RSV, and displays only failing rows.
# - Prints the validation result (True = Pass, False = Fail) for each check.
# Displays failed rows and prints validation results.

for cust in retailer_list:
    data_ = data.filter(F.col("customer_name") == f"{cust}")
    for i in ["GSV_status", "NSV_status"]:
        df = kpi_compare_fun(data_, i, year, period)
        if df.count() != 0:
            df1 = df.filter(col(f"{i}") == "Fail")
            msg = "GSV < RSV" if i == "GSV_status" else "NSV < RSV"
            if df1.count() > 0:
                success = False
                result = df1
            else:
                success = True
                result = df1
        print(msg, success)
        display(result)

GSV < RSV False


advertising_constribution,controllable_contribution,third_party_volume_tonnes,Market,Region,occasion,product_description,technology,ZREP,product_id,EAN,year,period,units_sold,rsv,gsv,te,nsv,mac,cogs,customer_name,channel_format,currency_code,sub_category_name,super_category_name,brand_name,sub_brand_name,manufacturer_name,iso2_letter_country_code,is_competitor_flag,economic_cell_group_code,Casefill_Quantity,Agreed_Order_Quantity,CSL,product_code,ssa,GSV_status
,,,United States,,UNMAPPED,SKT & STB VRTY PK FRTY CONF FUN SIZE FC SEAS PKG LAYDOWN BAG MEDIUM 14.24 OUNCE,NOT APPLICABLE,469182.0,3266473793,2200030025,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,UNMAPPED,CONFECTIONERY,Mixed,NOT APPLICABLE,MARS,US,0,OTHER,,,,3266473793,UNMAPPED,Fail
,,,United States,,UNMAPPED,DOVE MILK CHOCOLATE WRAPPED PIECES/TRUFFLE FC SEAS PKG LAYDOWN BAG MEDIUM 8.87 OUNCE,NOT APPLICABLE,332181.0,3920434312,4000051195,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,3920434312,UNMAPPED,Fail
,,,United States,,UNMAPPED,LFSG BERRIES WILD BERRIES FRTY CONF GUMMI FC SEAS PKG GIFT BOX 8 OUNCE,NOT APPLICABLE,427716.0,1300770295,2200028868,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,FRUITY CONFECTIONERY,CONFECTIONERY,LIFE SAVERS,NOT APPLICABLE,MARS,US,0,OTHER,,,,1300770295,UNMAPPED,Fail
,,,United States,,UNMAPPED,DOVE DARK TRUFFLE CHOCOLATE HEART FC SEAS PKG GIFT BOX 5.82 OUNCE,NOT APPLICABLE,456885.0,3521794764,4000059468,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,3521794764,UNMAPPED,Fail
,,,United States,,UNMAPPED,DOVE MILK TRUFFLE CHOCOLATE HEART FC SEAS PKG GIFT BOX 5.82 OUNCE,NOT APPLICABLE,456803.0,2228544087,4000059456,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,2228544087,UNMAPPED,Fail
,,,United States,,UNMAPPED,STB ORIGINAL ORIGINAL FRUIT FRTY CONF CHEWY IC NS PKG SINGLE 2.07 OUNCE,NOT APPLICABLE,161583.0,4231544421,4000000051,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,FRUITY CONFECTIONERY,CONFECTIONERY,STARBURST,NOT APPLICABLE,MARS,US,0,OTHER,,,,4231544421,UNMAPPED,Fail
,,,United States,,UNMAPPED,MARS VRTY PK CHOCOLATE MINIATURES FC SEAS PKG GIFT BOX 6.84 OUNCE,NOT APPLICABLE,415694.0,2815051392,4000056824,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,Mixed,MIXED,MARS,US,0,OTHER,,,,2815051392,UNMAPPED,Fail
,,,United States,,UNMAPPED,DOVE MILK CHOCOLATE HEART FC SEAS PKG LAYDOWN BAG MEDIUM 8.87 OUNCE,NOT APPLICABLE,253012.0,4056696727,4000041372,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,4056696727,UNMAPPED,Fail
,,,United States,,UNMAPPED,M&M WHITE CHOC STRAWBERRY & CREME STRAWBERRY CHOCOLATE UNWRAPPED PIECES FC SEAS PKG LAYDOWN BAG MEDIUM 7.44 OUNCE,NOT APPLICABLE,469198.0,3762668482,4000060574,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,M&M'S,NOT APPLICABLE,MARS,US,0,OTHER,,,,3762668482,UNMAPPED,Fail
,,,United States,,UNMAPPED,SNICKERS ORIGINAL CARAMEL PEANUT CHOCOLATE MINIATURES FC SEAS PKG LAYDOWN BAG MEDIUM 10.48 OUNCE,NOT APPLICABLE,401354.0,3460734392,4000055207,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,SNICKERS,NOT APPLICABLE,MARS,US,0,OTHER,,,,3460734392,UNMAPPED,Fail


NSV < RSV False


advertising_constribution,controllable_contribution,third_party_volume_tonnes,Market,Region,occasion,product_description,technology,ZREP,product_id,EAN,year,period,units_sold,rsv,gsv,te,nsv,mac,cogs,customer_name,channel_format,currency_code,sub_category_name,super_category_name,brand_name,sub_brand_name,manufacturer_name,iso2_letter_country_code,is_competitor_flag,economic_cell_group_code,Casefill_Quantity,Agreed_Order_Quantity,CSL,product_code,ssa,NSV_status
,,,United States,,UNMAPPED,SKT & STB VRTY PK FRTY CONF FUN SIZE FC SEAS PKG LAYDOWN BAG MEDIUM 14.24 OUNCE,NOT APPLICABLE,469182.0,3266473793,2200030025,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,UNMAPPED,CONFECTIONERY,Mixed,NOT APPLICABLE,MARS,US,0,OTHER,,,,3266473793,UNMAPPED,Fail
,,,United States,,UNMAPPED,DOVE MILK CHOCOLATE WRAPPED PIECES/TRUFFLE FC SEAS PKG LAYDOWN BAG MEDIUM 8.87 OUNCE,NOT APPLICABLE,332181.0,3920434312,4000051195,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,3920434312,UNMAPPED,Fail
,,,United States,,UNMAPPED,LFSG BERRIES WILD BERRIES FRTY CONF GUMMI FC SEAS PKG GIFT BOX 8 OUNCE,NOT APPLICABLE,427716.0,1300770295,2200028868,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,FRUITY CONFECTIONERY,CONFECTIONERY,LIFE SAVERS,NOT APPLICABLE,MARS,US,0,OTHER,,,,1300770295,UNMAPPED,Fail
,,,United States,,UNMAPPED,DOVE DARK TRUFFLE CHOCOLATE HEART FC SEAS PKG GIFT BOX 5.82 OUNCE,NOT APPLICABLE,456885.0,3521794764,4000059468,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,3521794764,UNMAPPED,Fail
,,,United States,,UNMAPPED,DOVE MILK TRUFFLE CHOCOLATE HEART FC SEAS PKG GIFT BOX 5.82 OUNCE,NOT APPLICABLE,456803.0,2228544087,4000059456,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,2228544087,UNMAPPED,Fail
,,,United States,,UNMAPPED,STB ORIGINAL ORIGINAL FRUIT FRTY CONF CHEWY IC NS PKG SINGLE 2.07 OUNCE,NOT APPLICABLE,161583.0,4231544421,4000000051,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,FRUITY CONFECTIONERY,CONFECTIONERY,STARBURST,NOT APPLICABLE,MARS,US,0,OTHER,,,,4231544421,UNMAPPED,Fail
,,,United States,,UNMAPPED,MARS VRTY PK CHOCOLATE MINIATURES FC SEAS PKG GIFT BOX 6.84 OUNCE,NOT APPLICABLE,415694.0,2815051392,4000056824,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,Mixed,MIXED,MARS,US,0,OTHER,,,,2815051392,UNMAPPED,Fail
,,,United States,,UNMAPPED,DOVE MILK CHOCOLATE HEART FC SEAS PKG LAYDOWN BAG MEDIUM 8.87 OUNCE,NOT APPLICABLE,253012.0,4056696727,4000041372,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,DOVE,NOT APPLICABLE,MARS,US,0,OTHER,,,,4056696727,UNMAPPED,Fail
,,,United States,,UNMAPPED,M&M WHITE CHOC STRAWBERRY & CREME STRAWBERRY CHOCOLATE UNWRAPPED PIECES FC SEAS PKG LAYDOWN BAG MEDIUM 7.44 OUNCE,NOT APPLICABLE,469198.0,3762668482,4000060574,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,M&M'S,NOT APPLICABLE,MARS,US,0,OTHER,,,,3762668482,UNMAPPED,Fail
,,,United States,,UNMAPPED,SNICKERS ORIGINAL CARAMEL PEANUT CHOCOLATE MINIATURES FC SEAS PKG LAYDOWN BAG MEDIUM 10.48 OUNCE,NOT APPLICABLE,401354.0,3460734392,4000055207,2025,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,MEIJER,Click & Mortar,USD,CHOCOLATE,CONFECTIONERY,SNICKERS,NOT APPLICABLE,MARS,US,0,OTHER,,,,3460734392,UNMAPPED,Fail


# Period_Check

In [0]:
def latest_period(data, cust, year_value, period_value):
    """
    Checks if data exists for the given customer, year, and period.

    Filters the input DataFrame based on customer name, year, period, and market,
    and returns the count of matching records.

    Parameters:
    data (DataFrame): Spark DataFrame containing retail sales or metrics data.
    cust (str): Name of the customer/retailer to filter.
    year_value (str or int): Year value to check.
    period_value (str or int): Period value to check.

    Returns:
    int: Count of rows matching the specified customer, year, and period in the given market.
    """
    period_count = (
        data.filter(F.col("period") == f"{period_value}")
        .filter(F.col("year") == f"{year_value}")
        .filter(F.col("customer_name") == f"{cust}")
        .filter(F.col("Market") == market)
    ).count()
    return period_count


In [0]:
"""
Validates whether the latest data period exists for each retailer.

Calls the `latest_period` function for each customer in the retailer list and checks
whether data is available for the specified year and period.

Prints whether the latest period exists for each customer and the period count.
"""
for cust in retailer_list:
    period_count = latest_period(data, cust, year, period)
    if period_count:
        success = True
        result = period_count
        print("latest period exists", period_count)
    else:
        success = False
        result = period_count
        print("latest period does not exist", period_count)

latest period exists 283


# RSV_Variances_between_same_periods

    Steps performed in the function:
    1. Create Period Combinations: A list of periods from 1 to 13 is generated, representing all periods in a year.
    2. Generate Customer-Period Combinations: A Cartesian product (cross join) is performed between customers 
       and periods to ensure every customer has an entry for each period.
    3. Left Join Data: The customer-period combinations are joined with the source data to ensure all customer-period 
       combinations exist in the dataset.
    4. Handle Missing Data: Null values in the metric columns (like RSV, GSV) are replaced with zeros using `coalesce`.
    5. Group and Aggregate Metrics: The data is grouped by the specified grouping columns (e.g., customer, market) 
       and the sum of the specified metrics is calculated for each group.
    6. Calculate Year-over-Year or Period-over-Period Growth: A window specification is used to calculate lag values 
       for previous periods or years, depending on the condition.
    7. Check Variance Range: A condition is applied to check if the calculated variance falls within a specified range. 
       If not, the function flags it as a failure.
    8. formula: (rsv/previous_rsv)-1

In [0]:
def get_rsv_variances(
    source_df, metric_cols, conditions, year, period, cust
) -> DataFrame:
    """
    Computes RSV (Retail Sales Value) variance for a given customer across time,
    comparing either with the previous year or the previous period.

    The function generates all customer-period combinations, fills missing data with 0s,
    and calculates the period-over-period variance.
    It checks whether the variance lies within predefined thresholds and returns
    a result dictionary along with the processed DataFrame.

    Parameters:
    source_df (DataFrame): Input Spark DataFrame with sales data.
    metric_cols (list): List of metric column names (e.g., ["rsv"]).
    conditions (str): Either 'p_year' (previous year) or any other string (previous period).
    year (int or str): The year to validate.
    period (int or str): The period to validate.
    cust (str): Customer name.

    Returns:
    tuple: (result_dict, DataFrame)
        result_dict (dict): A dictionary summarizing the result and status of the variance check.
        DataFrame: The DataFrame with calculated variance and status column.
    """
    try:
        grouping_cols = RSV_VARIANCES_GROUPING_COLS
        period_col = RSV_VARIANCES_PERIOD_COL
        distinct_cols = RSV_VARIANCES_DISTINCT_COL
        partitionBy = RSV_VARIANCES_PARTITIONBY
        orderBy = RSV_VARIANCES_ORDERBY
        join_type = "left"

        periods = [(i,) for i in range(1, 14)]
        full_periods_df = spark.createDataFrame(periods, [f"{period_col}"])

        customers = source_df.select(*distinct_cols).distinct()
        full_combinations_df = customers.crossJoin(full_periods_df)

        df_full = full_combinations_df.join(source_df, on=grouping_cols, how=join_type)

        for metric in metric_cols:
            df_with_zeros = df_full.withColumn(
                f"{metric}", coalesce(f"{metric}", lit(0))
            )

        df_yoy_growth = df_with_zeros.groupBy(grouping_cols).agg(
            *[round(sum(col), 4).alias(col) for col in metric_cols]
        )

        window_spec = Window.partitionBy(partitionBy).orderBy(*orderBy)

        # - If `conditions == "p_year"`, the function will compute the YoY growth by comparing the current period's value
        #   to the value from the same period in the previous year (lag of 13 periods).
        # - If `conditions != "p_year"`, the function will compute the PoP growth by comparing the current period's value
        #   to the value from the previous period (lag of 1 period).
        for metric in metric_cols:
            if conditions == "p_year":
                df_yoy_growth = (
                    df_yoy_growth.withColumn(
                        f"previous_{metric}", lag(f"{metric}", 13).over(window_spec)
                    )
                    .withColumn(
                        f"YoY_Growth_{metric}",
                        round(((col(f"{metric}") / col(f"previous_{metric}")) - 1), 4),
                    )
                    .na.fill(0)
                )
            else:
                df_yoy_growth = (
                    df_yoy_growth.withColumn(
                        f"previous_{metric}", lag(f"{metric}", 1).over(window_spec)
                    )
                    .withColumn(
                        f"YoY_Growth_{metric}",
                        round(((col(f"{metric}") / col(f"previous_{metric}")) - 1), 4),
                    )
                    .na.fill(0)
                )
        col_name = [
            col_ for col_ in df_yoy_growth.columns if col_.startswith("YoY_Growth_")
        ][0]
        df = df_yoy_growth.filter(
            (F.col("year") == year)
            & (F.col("period") == period)
            & (F.col("Market") == market)
        ).withColumn(
            "is_in_range",
            when(
                (df_yoy_growth[f"{col_name}"] >= RSV_VARIANCES_THRESHOLD_LOWER)
                & (df_yoy_growth[f"{col_name}"] <= RSV_VARIANCES_THRESHOLD_UPPER),
                True,
            ).otherwise(False),
        )
        status = df.filter(df["is_in_range"] == False).count() == 0
        if status:
            df = df
            msg = (
                f'{{"YEAR": "{year}", "PERIOD": "{period}", "RETAILER": "{cust}", "STATUS": "Current Period X Previous Year" }}'
                if conditions == "p_year"
                else f'{{"YEAR": "{year}", "PERIOD": "{period}", "RETAILER": "{cust}", "STATUS":"Current Period X Previous Period" }}'
            )
            value = (
                "RSV Variance Percentage between the Current Period and the Previous Year Passed."
                if conditions == "p_year"
                else "RSV Variance Percentage between the Current Period and the Previous Period Passed."
            )
            result = {
                "MESSAGE": msg,
                "STATUS": success,
                "DATASET": "column",
                "VALUE": value,
                "TYPE_OF_DATASET": "Column",
            }
        else:
            df = df
            msg = (
                f'{{"YEAR": "{year}", "PERIOD": "{period}", "RETAILER": "{cust}", "STATUS": "Current Period X Previous Year"  }}'
                if conditions == "p_year"
                else f'{{"YEAR": "{year}", "PERIOD": "{period}", "RETAILER": "{cust}", "STATUS":"Current Period X Previous Period" }}'
            )
            value = (
                "RSV Variance Percentage between the Current Period and the Previous Year Failed."
                if conditions == "p_year"
                else "RSV Variance Percentage between the Current Period and the Previous Period Failed."
            )
            result = {
                "MESSAGE": msg,
                "STATUS": success,
                "DATASET": "column",
                "VALUE": value,
                "TYPE_OF_DATASET": "Column",
            }

    except Exception:
        df = df
        result = {
            "MESSAGE": f'{{"YEAR": "{year}", "PERIOD": "{period}", "RETAILER": "{cust}", "STATUS":"ERROR"  }}',
            "STATUS": success,
            "DATASET": "DATASET",
            "VALUE": f"Compare RSV Variance Percentage Failed. An error occurred.",
            "TYPE_OF_DATASET": "DATASET",
        }
    return result, df

In [0]:
# Calculate the RSV variances for a given retailer and period. 

# This function calculates the Year-over-Year (YoY) variance for the specified 
# metric columns (e.g., 'rsv') between the current period and previous year, 
# or between the current period and previous period based on the condition.
kpi_list = ["rsv"]

for cust in retailer_list:
    data_ = data.filter(F.col("customer_name") == f"{cust}")
    result, df = get_rsv_variances(data_, ["rsv"], "s_year", year, period, cust)
    print(result)
    display(df)

{'MESSAGE': '{"YEAR": "2025", "PERIOD": "03", "RETAILER": "MEIJER", "STATUS":"Current Period X Previous Period" }', 'STATUS': True, 'DATASET': 'column', 'VALUE': 'RSV Variance Percentage between the Current Period and the Previous Period Failed.', 'TYPE_OF_DATASET': 'Column'}


Market,year,period,customer_name,rsv,previous_rsv,YoY_Growth_rsv,is_in_range
United States,2025,3,MEIJER,227189.65,503302.58,-0.5486,False


# RSV_Variances_between_last_periods

In [0]:
# Calculate the RSV variances for a given retailer and period. 

# This function calculates the Year-over-Year (YoY) variance for the specified 
# metric columns (e.g., 'rsv') between the current period and previous year, 
# or between the current period and previous period based on the condition.
kpi_list = ["rsv"]

for cust in retailer_list:
    data_ = data.filter(F.col("customer_name") == f"{cust}")
    result, df  = get_rsv_variances(data_, ["rsv"], "p_year", year, period, cust)
    print(result)
    display(df)

{'MESSAGE': '{"YEAR": "2025", "PERIOD": "03", "RETAILER": "MEIJER", "STATUS": "Current Period X Previous Year"  }', 'STATUS': True, 'DATASET': 'column', 'VALUE': 'RSV Variance Percentage between the Current Period and the Previous Year Failed.', 'TYPE_OF_DATASET': 'Column'}


Market,year,period,customer_name,rsv,previous_rsv,YoY_Growth_rsv,is_in_range
United States,2025,3,MEIJER,227189.65,421377.86,-0.4608,False


# GSV_Per_of_RSV

In [0]:
def get_gsv_per_of_rsv(source_df, metric_cols, year, period) -> DataFrame:
    """
    This function calculates the **GSV as a percentage of RSV** for a given customer, 
    year, and period, and returns a filtered DataFrame that meets specific conditions 
    on the GSV/RSV ratio.

    It calculates the previous and current GSV/RSV ratios, performs YoY growth calculations, 
    and ranks customers based on these ratios, ultimately filtering and returning rows based 
    on predefined bounds.

    Parameters:
    -----------
    source_df : DataFrame
        The source DataFrame containing the sales data for various metrics like RSV, GSV, etc.
    
    metric_cols : list
        A list of metric columns (e.g., ['rsv', 'gsv']) to calculate ratios for.
    
    year : int
        The year for which GSV/RSV percentage is calculated.
    
    period : int
        The period (within the year) for which GSV/RSV percentage is calculated.

    Returns:
    --------
    DataFrame :
        A DataFrame filtered based on GSV/RSV conditions with two DataFrames:
        - `df_filtered`: Contains the rows that do not meet the predefined bounds.
        - `df_sorted_with_rank`: Contains all the rows sorted and ranked.
    """
    # Define grouping columns, period column, etc.
    grouping_cols = RSV_VARIANCES_GROUPING_COLS  # Grouping columns like customer, market, etc.
    period_col = RSV_VARIANCES_PERIOD_COL  # Period column name (e.g., "period")
    distinct_cols = RSV_VARIANCES_DISTINCT_COL  # Distinct columns like customer name, market, etc.
    partitionBy = RSV_VARIANCES_PARTITIONBY  # Partitioning columns for window functions
    orderBy = RSV_VARIANCES_ORDERBY  # Columns for sorting in window functions
    join_type = "left"  # Type of join to use for merging DataFrames

    # Create a list of periods (1 to 13) for all months of the year
    periods = [(i,) for i in range(1, 14)]  # Periods 1 to 13
    full_periods_df = spark.createDataFrame(periods, [f"{period_col}"])  # DataFrame with all periods

    # Get distinct customers from the source DataFrame
    customers = source_df.select(*distinct_cols).distinct()  # Select distinct customers and markets
    full_combinations_df = customers.crossJoin(full_periods_df)  # Cross join to create combinations of customers and periods

    # Join the full combinations DataFrame with the source data on the grouping columns
    df_full = full_combinations_df.join(source_df, on=grouping_cols, how=join_type)

    # Replace null values with zeros for each metric column (e.g., 'rsv', 'gsv')
    for metric in metric_cols:
        df_with_zeros = df_full.withColumn(f"{metric}", coalesce(f"{metric}", lit(0)))

    # Group the data by the grouping columns and aggregate the metrics (e.g., sum of 'rsv' and 'gsv')
    df_yoy_growth = df_with_zeros.groupBy(grouping_cols).agg(
        *[round(sum(col), 4).alias(col) for col in metric_cols]  # Sum the metrics and round to 4 decimal places
    )

    # Define window specification for partitioning by customer and sorting by the columns specified in `orderBy`
    window_spec = Window.partitionBy(partitionBy).orderBy(*orderBy)

    # Calculate the previous period values for the metrics (lag function)
    for metric in metric_cols:
        df_yoy_growth = df_yoy_growth.withColumn(
            f"previous_{metric}", lag(f"{metric}", 1).over(window_spec)  # Get the value of the previous period
        ).na.fill(0)  # Fill null values with 0

    # Add new columns for the percentage of GSV over RSV (for both current and previous periods)
    df_yoy_growth_new = df_yoy_growth.withColumn(
        "previous_GSV_RSV_Percentage",
        when(
            (col("previous_rsv") != 0) & (col("previous_gsv") != 0),  # Ensure no division by zero
            (col("previous_gsv") / col("previous_rsv")) * 100,  # Calculate percentage of previous GSV over RSV
        ).otherwise(0),  # If either previous RSV or GSV is 0, set the percentage to 0
    ).withColumn(
        "current_GSV_RSV_Percentage",
        when(
            (col("rsv") != 0) & (col("gsv") != 0),  # Ensure no division by zero
            (col("gsv") / col("rsv")) * 100  # Calculate percentage of current GSV over RSV
        ).otherwise(0),  # If either current RSV or GSV is 0, set the percentage to 0
    )

    # Filter data for the specified year and sort by customer and the previous GSV/RSV percentage
    df = df_yoy_growth_new.filter(f"year == {year}").orderBy(
        "customer_name", "previous_GSV_RSV_Percentage", ascending=True  # Sorting by customer name and GSV/RSV percentage
    )

    # Define a window spec for row numbers to rank customers based on GSV/RSV percentage
    window_spec = Window.orderBy("customer_name", "previous_GSV_RSV_Percentage")

    # Add a rank column to the DataFrame based on sorting order
    df_sorted_with_rank = df.withColumn("rank", row_number().over(window_spec))

    # Get the 3rd and 4th rows based on the rank and calculate the average percentage
    third_value = (
        df_sorted_with_rank.filter(col("rank") == 3)  # Filter the 3rd row
        .select("previous_GSV_RSV_Percentage")  # Select the previous GSV/RSV percentage column
        .collect()[0][0]  # Collect the value from the DataFrame
    )
    fourth_value = (
        df_sorted_with_rank.filter(col("rank") == 4)  # Filter the 4th row
        .select("previous_GSV_RSV_Percentage")  # Select the previous GSV/RSV percentage column
        .collect()[0][0]  # Collect the value from the DataFrame
    )

    # Calculate the average of the 3rd and 4th values
    average_3rd_4th = (third_value + fourth_value) / 2

    # Multiply the average by 95% to calculate the lower bound
    lower_bound = average_3rd_4th * GSV_PER_OF_RSV_MIN_BOUND

    # Get the 10th and 11th rows based on the rank and calculate the average percentage
    tenth_value = (
        df_sorted_with_rank.filter(col("rank") == 10)  # Filter the 10th row
        .select("previous_GSV_RSV_Percentage")  # Select the previous GSV/RSV percentage column
        .collect()[0][0]  # Collect the value from the DataFrame
    )
    eleventh_value = (
        df_sorted_with_rank.filter(col("rank") == 11)  # Filter the 11th row
        .select("previous_GSV_RSV_Percentage")  # Select the previous GSV/RSV percentage column
        .collect()[0][0]  # Collect the value from the DataFrame
    )

    # Calculate the average of the 10th and 11th values
    average_10rd_11th = (tenth_value + eleventh_value) / 2

    # Multiply the average by 105% to calculate the upper bound
    upper_bound = average_10rd_11th * GSV_PER_OF_RSV_MAX_BOUND

    # Add columns for lower and upper bounds in the DataFrame
    df_sorted_with_rank = df_sorted_with_rank.withColumn(
        "lower_bound", lit(lower_bound)  # Add a column for the lower bound
    ).withColumn("upper_bound", lit(upper_bound))  # Add a column for the upper bound

    # Filter out rows where the current GSV/RSV percentage is outside the bounds
    df_filtered = (
        df_sorted_with_rank.filter(~(col("current_GSV_RSV_Percentage") >= lower_bound))  # Below the lower bound
        .filter(~(col("current_GSV_RSV_Percentage") <= upper_bound))  # Above the upper bound
    )

    # Further filter by the specified year, period, and market
    df_filtered = df_filtered.filter(
            (col("year") == year)
            & (col("period") == period)
            & (F.col("Market") == market)
        )

    # Return the filtered DataFrame and the sorted DataFrame with rankings
    return df_filtered, df_sorted_with_rank


# Example usage
data_ = data.filter((F.col("customer_name") == f"{name_of_retailer}"))  # Filter data for a specific retailer
df_filtered, df_sorted_with_rank = get_gsv_per_of_rsv(data_, ["rsv", "gsv"], year, period)

# Iterate through retailer list to check the filtered results
for cust in retailer_list:
    if df_filtered.count() != 0:  # If the filtered DataFrame is not empty, set success to False
        success = False
        result = df_filtered
    else:  # Otherwise, set success to True
        success = True
        result = df_filtered

print(success)  # Print whether the success condition is met
display(result)  # Display the result DataFrame


True


Market,year,period,customer_name,rsv,gsv,previous_rsv,previous_gsv,previous_GSV_RSV_Percentage,current_GSV_RSV_Percentage,rank,lower_bound,upper_bound


# NSV_Per_of_GSV

In [0]:
def get_nsv_per_of_gsv(source_df, metric_cols, year, period) -> DataFrame:
    """
    This function calculates the **NSV as a percentage of GSV** for a given customer, 
    year, and period, and returns a filtered DataFrame that meets specific conditions 
    on the NSV/GSV ratio.

    It calculates the previous and current NSV/GSV ratios, performs YoY growth calculations, 
    and ranks customers based on these ratios, ultimately filtering and returning rows based 
    on predefined bounds.

    Parameters:
    -----------
    source_df : DataFrame
        The source DataFrame containing the sales data for various metrics like NSV, GSV, etc.
    
    metric_cols : list
        A list of metric columns (e.g., ['nsv', 'gsv']) to calculate ratios for.
    
    year : int
        The year for which NSV/GSV percentage is calculated.
    
    period : int
        The period (within the year) for which NSV/GSV percentage is calculated.

    Returns:
    --------
    DataFrame :
        A DataFrame filtered based on NSV/GSV conditions with two DataFrames:
        - `df_filtered`: Contains the rows that do not meet the predefined bounds.
        - `df_sorted_with_rank`: Contains all the rows sorted and ranked.
    """
    # Define grouping columns, period column, etc.
    grouping_cols = RSV_VARIANCES_GROUPING_COLS  # Grouping columns like customer, market, etc.
    period_col = RSV_VARIANCES_PERIOD_COL  # Period column name (e.g., "period")
    distinct_cols = RSV_VARIANCES_DISTINCT_COL  # Distinct columns like customer name, market, etc.
    partitionBy = RSV_VARIANCES_PARTITIONBY  # Partitioning columns for window functions
    orderBy = RSV_VARIANCES_ORDERBY  # Columns for sorting in window functions
    join_type = "left"  # Type of join to use for merging DataFrames

    # Create a list of periods (1 to 13) for all months of the year
    periods = [(i,) for i in range(1, 14)]  # Periods 1 to 13
    full_periods_df = spark.createDataFrame(periods, [f"{period_col}"])  # DataFrame with all periods

    # Get distinct customers from the source DataFrame
    customers = source_df.select(*distinct_cols).distinct()  # Select distinct customers and markets
    full_combinations_df = customers.crossJoin(full_periods_df)  # Cross join to create combinations of customers and periods

    # Join the full combinations DataFrame with the source data on the grouping columns
    df_full = full_combinations_df.join(source_df, on=grouping_cols, how=join_type)

    # Replace null values with zeros for each metric column (e.g., 'nsv', 'gsv')
    for metric in metric_cols:
        df_with_zeros = df_full.withColumn(f"{metric}", coalesce(f"{metric}", lit(0)))

    # Group the data by the grouping columns and aggregate the metrics (e.g., sum of 'nsv' and 'gsv')
    df_yoy_growth = df_with_zeros.groupBy(grouping_cols).agg(
        *[round(sum(col), 4).alias(col) for col in metric_cols]  # Sum the metrics and round to 4 decimal places
    )

    # Define window specification for partitioning by customer and sorting by the columns specified in `orderBy`
    window_spec = Window.partitionBy(partitionBy).orderBy(*orderBy)

    # Calculate the previous period values for the metrics (lag function)
    for metric in metric_cols:
        df_yoy_growth = df_yoy_growth.withColumn(
            f"previous_{metric}", lag(f"{metric}", 1).over(window_spec)  # Get the value of the previous period
        ).na.fill(0)  # Fill null values with 0

    # Add new columns for the percentage of NSV over GSV (for both current and previous periods)
    df_yoy_growth_new = df_yoy_growth.withColumn(
        "previous_nsv_gsv_Percentage",
        when(
            (col("previous_nsv") != 0) & (col("previous_gsv") != 0),  # Ensure no division by zero
            (col("previous_nsv") / col("previous_gsv")) * 100,  # Calculate percentage of previous NSV over GSV
        ).otherwise(0),  # If either previous GSV or NSV is 0, set the percentage to 0
    ).withColumn(
        "current_nsv_gsv_Percentage",
        when(
            (col("nsv") != 0) & (col("gsv") != 0),  # Ensure no division by zero
            (col("nsv") / col("gsv")) * 100  # Calculate percentage of current NSV over GSV
        ).otherwise(0),  # If either current GSV or NSV is 0, set the percentage to 0
    )

    # Filter data for the specified year and sort by customer and the previous NSV/GSV percentage
    df = df_yoy_growth_new.filter(f"year == {year}").orderBy(
        "customer_name", "previous_nsv_gsv_Percentage", ascending=True  # Sorting by customer name and NSV/GSV percentage
    )

    # Define a window spec for row numbers to rank customers based on NSV/GSV percentage
    window_spec = Window.orderBy("customer_name", "previous_nsv_gsv_Percentage")

    # Add a rank column to the DataFrame based on sorting order
    df_sorted_with_rank = df.withColumn("rank", row_number().over(window_spec))

    # Get the 3rd and 4th rows based on the rank and calculate the average percentage
    third_value = (
        df_sorted_with_rank.filter(col("rank") == 3)  # Filter the 3rd row
        .select("previous_nsv_gsv_Percentage")  # Select the previous NSV/GSV percentage column
        .collect()[0][0]  # Collect the value from the DataFrame
    )
    fourth_value = (
        df_sorted_with_rank.filter(col("rank") == 4)  # Filter the 4th row
        .select("previous_nsv_gsv_Percentage")  # Select the previous NSV/GSV percentage column
        .collect()[0][0]  # Collect the value from the DataFrame
    )

    # Calculate the average of the 3rd and 4th values
    average_3rd_4th = (third_value + fourth_value) / 2

    # Multiply the average by 95% to calculate the lower bound
    lower_bound = average_3rd_4th * NSV_PER_OF_GSV_MIN_BOUND

    # Get the 10th and 11th rows based on the rank and calculate the average percentage
    tenth_value = (
        df_sorted_with_rank.filter(col("rank") == 10)  # Filter the 10th row
        .select("previous_nsv_gsv_Percentage")  # Select the previous NSV/GSV percentage column
        .collect()[0][0]  # Collect the value from the DataFrame
    )
    eleventh_value = (
        df_sorted_with_rank.filter(col("rank") == 11)  # Filter the 11th row
        .select("previous_nsv_gsv_Percentage")  # Select the previous NSV/GSV percentage column
        .collect()[0][0]  # Collect the value from the DataFrame
    )

    # Calculate the average of the 10th and 11th values
    average_10rd_11th = (tenth_value + eleventh_value) / 2

    # Multiply the average by 105% to calculate the upper bound
    upper_bound = average_10rd_11th * NSV_PER_OF_GSV_MAX_BOUND

    # Add columns for lower and upper bounds in the DataFrame
    df_sorted_with_rank = df_sorted_with_rank.withColumn(
        "lower_bound", lit(lower_bound)  # Add a column for the lower bound
    ).withColumn("upper_bound", lit(upper_bound))  # Add a column for the upper bound

    # Filter out rows where the current NSV/GSV percentage is outside the bounds
    df_filtered = (
        df_sorted_with_rank.filter(~(col("current_nsv_gsv_Percentage") >= lower_bound))  # Below the lower bound
        .filter(~(col("current_nsv_gsv_Percentage") <= upper_bound))  # Above the upper bound
    )

    # Further filter by the specified year, period, and market
    df_filtered = df_filtered.filter(
            (col("year") == year)
            & (col("period") == period)
            & (F.col("Market") == market)
        )

    # Return the filtered DataFrame and the sorted DataFrame with rankings
    return df_filtered, df_sorted_with_rank


# Example usage
data_ = data.filter((F.col("customer_name") == f"{name_of_retailer}"))  # Filter data for a specific retailer
df_filtered, df_sorted_with_rank = get_nsv_per_of_gsv(data_, ["nsv", "gsv"], year, period)

# Iterate through retailer list to check the filtered results
for cust in retailer_list:
    if df_filtered.count() != 0:  # If the filtered DataFrame is not empty, set success to False
        success = False
        result = df_filtered
    else:  # Otherwise, set success to True
        success = True
        result = df_filtered

print(success)  # Print whether the success condition is met
display(result)  # Display the result DataFrame


True


Market,year,period,customer_name,nsv,gsv,previous_nsv,previous_gsv,previous_nsv_gsv_Percentage,current_nsv_gsv_Percentage,rank,lower_bound,upper_bound


# YTD_for_RSV

In [0]:
def YTD_Thresholds_for_RSV(source_df, metric_cols, year, period) -> DataFrame:
    """
    This function calculates Year-to-Date (YTD) RSV thresholds based on current and previous 
    periods' data, comparing them to defined threshold bounds. It filters out results that fall 
    outside the thresholds and returns the filtered DataFrame for a specific year and period.

    Parameters:
    -----------
    source_df : DataFrame
        The source DataFrame containing RSV data (among other metrics).
    
    metric_cols : list
        A list of metric columns (e.g., ['rsv']) to calculate thresholds for.
    
    year : int
        The year for which RSV thresholds are calculated.
    
    period : int
        The period within the year (e.g., month or quarter) for which thresholds are calculated.

    Returns:
    --------
    DataFrame :
        A DataFrame filtered based on RSV thresholds with the rows meeting specific conditions.
    """

    # Define the grouping columns, period column, and other variables
    grouping_cols = GROUPING_COLS  # Grouping columns like customer, market, etc.
    period_col = PERIOD_COL  # Period column name (e.g., "period")
    distinct_cols = DISTINCT_COL  # Distinct columns like customer name, market, etc.
    partitionBy = PARTITIONBY  # Partitioning columns for window functions
    orderBy = ORDERBY  # Columns for sorting in window functions
    join_type = "left"  # Type of join to use for merging DataFrames

    # Create a list of periods (1 to 13) to cover all months of the year (if it's monthly data)
    periods = [(i,) for i in range(1, 14)]  # Periods 1 to 13
    full_periods_df = spark.createDataFrame(periods, [f"{period_col}"])  # DataFrame with all periods

    # Get distinct customers and join them with all periods (cross join)
    customers = source_df.select(*distinct_cols).distinct()  # Get distinct customers
    full_combinations_df = customers.crossJoin(full_periods_df)  # Cross join customers with periods

    # Perform a left join to combine the full combinations with the source data
    df_full = full_combinations_df.join(source_df, on=grouping_cols, how=join_type)

    # Replace null values with zeros for each metric column (e.g., 'rsv')
    for metric in metric_cols:
        df_with_zeros = df_full.withColumn(f"{metric}", coalesce(f"{metric}", lit(0)))

    # Group the data by the grouping columns and aggregate the metrics (e.g., sum of 'rsv')
    df_yoy_growth = df_with_zeros.groupBy(grouping_cols).agg(
        *[round(sum(col), 4).alias(col) for col in metric_cols]  # Sum the metrics and round to 4 decimal places
    )

    # Define window specification to partition by customer, market, and year
    window_spec = Window.partitionBy(partitionBy).orderBy(*orderBy)

    # Calculate the previous period values for the metrics (lag function)
    for metric in metric_cols:
        df_yoy_growth = df_yoy_growth.withColumn(
            f"previous_{metric}", lag(f"{metric}", 13).over(window_spec)  # Get the value of the previous period (13 periods back)
        ).na.fill(0)  # Fill null values with 0

    # Define a window spec to calculate the sum of RSV for current and previous periods
    window_current = Window.partitionBy("Market", "year", "customer_name").orderBy(
        "period"
    )

    # Calculate the current period and previous period RSV values, and calculate the percentage difference
    df_current = (
        df_yoy_growth.withColumn(
            "RSV_current_period", F.sum("rsv").over(window_current)  # Sum of RSV for current period
        ).withColumn("RSV_previous_period", F.sum("previous_rsv").over(window_current))  # Sum of RSV for previous period
    ).withColumn(
        "Current_periodvsPrevious_year",  # Calculate percentage change between current and previous periods
        when(
            (col("RSV_current_period") != 0) & (col("RSV_previous_period") != 0),  # Ensure no division by zero
            round((col("RSV_current_period") / col("RSV_previous_period")) - 1, 4),  # Percentage change from previous period
        ).otherwise(0),  # If either period is zero, set the percentage change to 0
    )

    # Filter the data based on predefined threshold bounds (lower and upper limits for RSV percentage change)
    df_filtered = df_current.filter(
        ~(
            (F.col("Current_periodvsPrevious_year") >= YTD_THRESHOLD_RSV_LOWER)
            & (F.col("Current_periodvsPrevious_year") <= YTD_THRESHOLD_RSV_UPPER)
        )
    )

    # Further filter the data for the specified year, period, and market
    df_filtered = df_filtered.filter(
        (col("year") == year) & (col("period") == period) & (F.col("Market") == market)
    )

    # Return the filtered DataFrame
    return df_filtered


# Example usage:
# Filter the data for a specific retailer
data_ = data.filter((F.col("customer_name") == f"{name_of_retailer}"))
df_filtered = YTD_Thresholds_for_RSV(data_, ['rsv'], year, period)

# Iterate through the retailer list to check if the filtered results are empty or not
for cust in retailer_list:
    if df_filtered.count() != 0:  # If the filtered DataFrame is not empty, set success to False
        success = False
        result = df_filtered
    else:  # Otherwise, set success to True
        success = True
        result = df_filtered

print(success)  # Print whether the success condition is met
display(df_filtered)  # Display the result DataFrame


True


Market,year,period,customer_name,brand_name,channel_format,rsv,previous_rsv,RSV_current_period,RSV_previous_period,Current_periodvsPrevious_year


# YTD_for_GSV

In [0]:
def YTD_Thresholds_for_GSV(source_df, metric_cols, year, period) -> DataFrame:
    """
    This function calculates Year-to-Date (YTD) GSV thresholds based on current and previous 
    periods' data, comparing them to defined threshold bounds. It filters out results that fall 
    outside the thresholds and returns the filtered DataFrame for a specific year and period.

    Parameters:
    -----------
    source_df : DataFrame
        The source DataFrame containing GSV data (among other metrics).
    
    metric_cols : list
        A list of metric columns (e.g., ['gsv']) to calculate thresholds for.
    
    year : int
        The year for which GSV thresholds are calculated.
    
    period : int
        The period within the year (e.g., month or quarter) for which thresholds are calculated.

    Returns:
    --------
    DataFrame :
        A DataFrame filtered based on GSV thresholds with the rows meeting specific conditions.
    """

    # Define the grouping columns, period column, and other variables
    grouping_cols = GROUPING_COLS  # Grouping columns like customer, market, etc.
    period_col = PERIOD_COL  # Period column name (e.g., "period")
    distinct_cols = DISTINCT_COL  # Distinct columns like customer name, market, etc.
    partitionBy = PARTITIONBY  # Partitioning columns for window functions
    orderBy = ORDERBY  # Columns for sorting in window functions
    join_type = "left"  # Type of join to use for merging DataFrames

    # Create a list of periods (1 to 13) to cover all months of the year (if it's monthly data)
    periods = [(i,) for i in range(1, 14)]  # Periods 1 to 13
    full_periods_df = spark.createDataFrame(periods, [f"{period_col}"])  # DataFrame with all periods

    # Get distinct customers and join them with all periods (cross join)
    customers = source_df.select(*distinct_cols).distinct()  # Get distinct customers
    full_combinations_df = customers.crossJoin(full_periods_df)  # Cross join customers with periods

    # Perform a left join to combine the full combinations with the source data
    df_full = full_combinations_df.join(source_df, on=grouping_cols, how=join_type)

    # Replace null values with zeros for each metric column (e.g., 'gsv')
    for metric in metric_cols:
        df_with_zeros = df_full.withColumn(f"{metric}", coalesce(f"{metric}", lit(0)))

    # Group the data by the grouping columns and aggregate the metrics (e.g., sum of 'gsv')
    df_yoy_growth = df_with_zeros.groupBy(grouping_cols).agg(
        *[round(sum(col), 4).alias(col) for col in metric_cols]  # Sum the metrics and round to 4 decimal places
    )

    # Define window specification to partition by customer, market, and year
    window_spec = Window.partitionBy(partitionBy).orderBy(*orderBy)

    # Calculate the previous period values for the metrics (lag function)
    for metric in metric_cols:
        df_yoy_growth = df_yoy_growth.withColumn(
            f"previous_{metric}", lag(f"{metric}", 13).over(window_spec)  # Get the value of the previous period (13 periods back)
        ).na.fill(0)  # Fill null values with 0

    # Define a window spec to calculate the sum of GSV for current and previous periods
    window_current = Window.partitionBy("Market", "year", "customer_name").orderBy(
        "period"
    )

    # Calculate the current period and previous period GSV values, and calculate the percentage difference
    df_current = (
        df_yoy_growth.withColumn(
            "GSV_current_period", F.sum("gsv").over(window_current)  # Sum of GSV for current period
        ).withColumn("GSV_previous_period", F.sum("previous_gsv").over(window_current))  # Sum of GSV for previous period
    ).withColumn(
        "Current_periodvsPrevious_year",  # Calculate percentage change between current and previous periods
        when(
            (col("GSV_current_period") != 0) & (col("GSV_previous_period") != 0),  # Ensure no division by zero
            round((col("GSV_current_period") / col("GSV_previous_period")) - 1, 4),  # Percentage change from previous period
        ).otherwise(0),  # If either period is zero, set the percentage change to 0
    )

    # Filter the data based on predefined threshold bounds (lower and upper limits for GSV percentage change)
    df_filtered = df_current.filter(
        ~(
            (F.col("Current_periodvsPrevious_year") >= YTD_THRESHOLD_GSV_LOWER)
            & (F.col("Current_periodvsPrevious_year") <= YTD_THRESHOLD_GSV_UPPER)
        )
    )

    # Further filter the data for the specified year, period, and market
    df_filtered = df_filtered.filter(
        (col("year") == year) & (col("period") == period) & (F.col("Market") == market)
    )

    # Return the filtered DataFrame
    return df_filtered


# Example usage:
# Filter the data for a specific retailer
data_ = data.filter((F.col("customer_name") == f"{name_of_retailer}"))
df_filtered = YTD_Thresholds_for_GSV(data_, ['gsv'], year, period)

# Iterate through the retailer list to check if the filtered results are empty or not
for cust in retailer_list:
    if df_filtered.count() != 0:  # If the filtered DataFrame is not empty, set success to False
        success = False
        result = df_filtered
    else:  # Otherwise, set success to True
        success = True
        result = df_filtered

print(success)  # Print whether the success condition is met
display(result)  # Display the result DataFrame


True


Market,year,period,customer_name,brand_name,channel_format,gsv,previous_gsv,GSV_current_period,GSV_previous_period,Current_periodvsPrevious_year


# YTD_for_NSV

In [0]:
def YTD_Thresholds_for_NSV(source_df, metric_cols, year, period) -> DataFrame:
    """
    This function calculates the Year-to-Date (YTD) thresholds for NSV (Net Sales Value) 
    by comparing the current period against the previous period and applying filters based on thresholds.

    Parameters:
    source_df (DataFrame): The source DataFrame containing the data for customers, NSV, etc.
    metric_cols (list): A list of metric columns to work with (e.g., ['nsv']).
    year (int): The year for which the calculations should be done.
    period (int): The specific period (month, quarter, etc.) to focus on.

    Returns:
    DataFrame: A filtered DataFrame based on the YTD threshold calculations.
    """
    
    # Define the columns used for grouping, partitioning, and ordering
    grouping_cols = GROUPING_COLS  # Columns to group data by (e.g., customer_name, market)
    period_col = PERIOD_COL  # Column name representing the period (e.g., month or quarter)
    distinct_cols = DISTINCT_COL  # Columns to use for selecting distinct customers
    partitionBy = PARTITIONBY  # Columns for partitioning in window functions
    orderBy = ORDERBY  # Columns for ordering data in window functions
    join_type = "left"  # Use left join to ensure all customer-period combinations are kept
    
    # Create a DataFrame representing all periods from 1 to 13 (e.g., months or quarters)
    periods = [(i,) for i in range(1, 14)]  # Creating a list of tuples [(1,), (2,), ..., (13,)]
    full_periods_df = spark.createDataFrame(periods, [f"{period_col}"])  # Create a DataFrame with periods 1 to 13
    
    # Get the distinct customers from the source DataFrame
    customers = source_df.select(*distinct_cols).distinct()  # Selecting distinct customer data from source
    
    # Cross join the customers with all possible periods to ensure each customer has data for every period
    full_combinations_df = customers.crossJoin(full_periods_df)  # All combinations of customers and periods
    
    # Perform a left join with the original source data to include data for every customer-period combination
    df_full = full_combinations_df.join(source_df, on=grouping_cols, how=join_type)  # Join on the specified grouping columns
    
    # Replace any null values in the metric columns (e.g., 'nsv') with 0 to ensure valid calculations
    for metric in metric_cols:
        df_with_zeros = df_full.withColumn(f"{metric}", coalesce(f"{metric}", lit(0)))  # Replace null values with 0
    
    # Group by the specified columns and aggregate the values of the metric columns (e.g., 'nsv') by summing them
    df_yoy_growth = df_with_zeros.groupBy(grouping_cols).agg(
        *[round(sum(col), 4).alias(col) for col in metric_cols]  # Summing and rounding each metric column
    )
    
    # Define a window specification for partitioning by market, year, and customer, and ordering by period
    window_spec = Window.partitionBy(partitionBy).orderBy(*orderBy)  # Specify window for partitioning by relevant columns
    
    # Add a column to calculate the value of the previous period's metric for each row using the lag function
    for metric in metric_cols:
        df_yoy_growth = df_yoy_growth.withColumn(
            f"previous_{metric}", lag(f"{metric}", 13).over(window_spec)  # Get the previous period value for the metric
        ).na.fill(0)  # Fill missing values with 0 for the previous period
    
    # Define a window for the current period calculation
    window_current = Window.partitionBy("Market", "year", "customer_name").orderBy("period")  # Define the current period window
    
    # Calculate the sum for the current and previous period for NSV
    df_current = (
        df_yoy_growth.withColumn(
            "NSV_current_period", F.sum("nsv").over(window_current)  # Sum of current period NSV
        ).withColumn("NSV_previous_period", F.sum("previous_nsv").over(window_current))  # Sum of previous period NSV
    ).withColumn(
        "Current_periodvsPrevious_year",
        when(
            (col("NSV_current_period") != 0) & (col("NSV_previous_period") != 0),
            round((col("NSV_current_period") / col("NSV_previous_period")) - 1, 4),  # Calculate the YoY growth percentage
        ).otherwise(0),  # If either period is 0, the growth is set to 0
    )
    
    # Apply filters based on YTD thresholds to exclude values within the threshold range
    df_filtered = df_current.filter(
        ~(
            (F.col("Current_periodvsPrevious_year") >= YTD_THRESHOLD_NSV_LOWER) & 
            (F.col("Current_periodvsPrevious_year") <= YTD_THRESHOLD_NSV_UPPER)  # Exclude values within the threshold range
        )
    )
    
    # Apply additional filters based on year, period, and market
    df_filtered = df_filtered.filter(
        (col("year") == year) & (col("period") == period) & (F.col("Market") == market)  # Filter data for the specified year, period, and market
    )
    
    # Return the filtered DataFrame
    return df_filtered


# Apply the function to a filtered dataset for the given retailer
data_ = data.filter(
    (F.col("customer_name") == f"{name_of_retailer}")  # Filter the data for a specific retailer
)

# Call the YTD_Thresholds_for_NSV function to calculate thresholds for the given metric ('nsv')
df_filtered = YTD_Thresholds_for_NSV(data_, ['nsv'], year, period)

# Iterate over the list of retailers to check whether the filtered DataFrame has data
for cust in retailer_list:
    if df_filtered.count() != 0:
        success = False  # If there is data, mark as unsuccessful
        result = df_filtered  # Store the filtered data as the result
    else:
        success = True  # If there is no data, mark as successful
        result = df_filtered  # Store the filtered data as the result

# Print the success flag and display the result
print(success)
display(result)


True


Market,year,period,customer_name,brand_name,channel_format,nsv,previous_nsv,NSV_current_period,NSV_previous_period,Current_periodvsPrevious_year


# GSV_RSV_MAP

In [0]:
# Filter data based on the retailer's name.
data_ = data.filter(F.col("customer_name") == f"{name_of_retailer}")
# Here, we are filtering the `data` DataFrame to include only rows where the `customer_name` matches the specified `name_of_retailer`.

# Filter data based on the year, period, and market, and create a new column 'condition_met'.
df = data_.filter(
    (F.col("year") == f"{year}")  # Filter rows where the `year` matches the specified `year`
    & (F.col("period") == f"{period}")  # Filter rows where the `period` matches the specified `period`
    & (F.col("Market") == market)  # Filter rows where the `Market` matches the specified `market`
).withColumn("condition_met", (F.col("gsv") == 0.0) & (F.col("rsv") > 0))  
# Create a new column `condition_met` that is `True` when `gsv` (Gross Sales Value) is 0.0 and `rsv` (Returned Sales Value) is greater than 0.

# Filter the DataFrame to only include rows where the 'condition_met' column is True.
df_filtered = df.filter(F.col("condition_met") == True)
# Filter the `df` DataFrame to only include rows where the `condition_met` column is `True` (i.e., where `gsv` is 0 and `rsv` > 0).

# Iterate through the list of retailers and check if there is any data that meets the condition.
for cust in retailer_list:
    # If there are any rows in df_filtered (i.e., count is not 0), set success to False.
    if df_filtered.count() != 0:  
        success = False
        result = df_filtered  # Store the filtered DataFrame
    else:
        # If no rows meet the condition, set success to True (no data found).
        success = True
        result = df_filtered  # Store the filtered DataFrame (even if it's empty)

# Output the success flag and display the filtered results.
print(success)  # Print whether the condition was met for any rows.
display(result)  # Display the filtered DataFrame (it may be empty if no conditions were met).


True


advertising_constribution,controllable_contribution,third_party_volume_tonnes,Market,Region,occasion,product_description,technology,ZREP,product_id,EAN,year,period,units_sold,rsv,gsv,te,nsv,mac,cogs,customer_name,channel_format,currency_code,sub_category_name,super_category_name,brand_name,sub_brand_name,manufacturer_name,iso2_letter_country_code,is_competitor_flag,economic_cell_group_code,Casefill_Quantity,Agreed_Order_Quantity,CSL,product_code,ssa,condition_met


# Historical_Check

In [0]:
def get_metrics(source_df, grouping_cols: list[str]) -> DataFrame:
    """
    Calculate aggregated metrics (RSV) for the given DataFrame by grouping by specified columns.
    
    Args:
        source_df (DataFrame): The source DataFrame containing the data.
        grouping_cols (list[str]): List of columns by which to group the data.

    Returns:
        DataFrame: A new DataFrame with the sum of "rsv" for each group of the specified columns, rounded to 4 decimal places.
    """
    # Cast the "rsv" column to double type for accurate aggregation.
    df = source_df.withColumn("rsv", col("rsv").cast("double"))
    
    # Group by the provided columns and aggregate the sum of "rsv", rounding to 4 decimal places.
    df = df.groupBy(grouping_cols).agg(round(sum("rsv"), 4).alias("rsv"))
    
    return df


def compare_df(
    row_file,
    final_file,
    key_col,
    output_file_path,
    rel_tol,
    compare_for,
    filter_condition=None,
    group_by_col=None,
    sum_cols=None,
):
    """
    Compare two DataFrames based on key columns, specific columns to compare, and aggregation settings.
    
    Args:
        row_file (DataFrame): The first DataFrame to compare.
        final_file (DataFrame): The second DataFrame to compare.
        key_col (list[str]): List of key columns to join the DataFrames on.
        output_file_path (str or None): Path to save the output if mismatches are found.
        rel_tol (float): The relative tolerance for the comparison.
        compare_for (list[str]): List of columns to compare.
        filter_condition (str or None, optional): Filter condition to apply on the DataFrames.
        group_by_col (str or None, optional): Grouping columns for aggregation.
        sum_cols (list[str] or None, optional): Columns to sum during aggregation.

    Returns:
        tuple: A tuple where the first element is a boolean indicating success or failure, and the second element
               is a DataFrame containing any mismatched rows if failure occurred.
    """
    if compare_for:
        # Select only the key columns and the columns to compare in both DataFrames
        row_file = row_file.select(*key_col, *compare_for)
        final_file = final_file.select(*key_col, *compare_for)

    if group_by_col:
        # Group the DataFrames by the specified columns and sum the columns
        row_file = row_file.groupBy(group_by_col).agg(
            *[sum(col).alias(col) for col in sum_cols]
        )
        final_file = final_file.groupBy(group_by_col).agg(
            *[round(sum(col), 4).alias(col) for col in sum_cols]
        )

    if filter_condition:
        # Apply any filter condition to the DataFrames
        row_file = row_file.filter(filter_condition)
        final_file = final_file.filter(filter_condition)

    # Compare the two DataFrames using the SparkSQLCompare class
    compare_obj = SparkSQLCompare(spark, row_file, final_file, key_col, rel_tol)

    if compare_obj.all_mismatch().count() != 0:
        # Return False if there are mismatches in the comparison
        return False, compare_obj.all_mismatch()
    
    # Return True if no mismatches are found, along with an empty DataFrame
    return compare_obj.all_mismatch().count() == 0, spark.createDataFrame([], row_file.schema)


def source_df():
    """
    Retrieves and processes retailer-specific sales data, returning a unified DataFrame.

    Depending on the retailer name provided in the `name_of_retailer` variable,
    this function loads data from different sources, performs necessary transformations,
    and returns a unified DataFrame containing the following columns:
        - Market: The market where the data was collected (always "United States").
        - customer_name: The name of the retailer.
        - rsv: The retail sales value.
        - period: The period for the sales data.
        - year: The year of the sales data.

    The function handles various retailers including "OFFICEDEPOT", "HARRISTEETER", 
    "STAPLES", "SAMSCLUB", "WALMART", "MCDONALDS", "HEB", "KROGER", etc.

    Returns:
        DataFrame: A DataFrame containing the processed sales data for the specified retailer.
    """
    if name_of_retailer == "OFFICEDEPOT":
        result = (
            (spark.table("dcom_gsc_silver.us_officedepot_sales"))
            .withColumn("MarsPeriod", concat(lit("20"), col("Period")))
            .withColumn("year", col("MarsPeriod").substr(1, 4))
            .withColumn("period", col("MarsPeriod").substr(5, 2))
            .withColumnRenamed("CY_GSV", "rsv")
            .withColumnRenamed("Customer", "customer_name")
            .withColumn("Market", lit("United States"))
        ).select("Market", "customer_name", "rsv", "period", "year")

    elif name_of_retailer == "HARRISTEETER":
        harris_sales_df = spark.table("dcom_gsc_silver.us_harristeeter_sales")
        harris_sales_df = deduplicate_dataframe(
            harris_sales_df,
            ["sheetname", "upc"],
            "insert_date",
        )
        delta_df = (
            harris_sales_df.withColumn(
                "DateID1",
                (concat("Year", lit("-"), "Month", lit("-"), lit(1), lit(2))).cast(
                    "date"
                ),
            )
            .withColumn("DateID", last_day(col("DateID1")))
            .drop("DateID1")
        )

        df = delta_df.groupBy("Year", "Month", "DateID").agg(sum("Total_sales"))

        df = df.withColumn("Days", substring(col("DateID"), -2, 2))
        df = df.withColumn("Days", col("Days").cast("integer")).withColumn(
            "Month", col("Month").cast("integer")
        )
        df = df.withColumn("AvgDailyRSV", col("sum(Total_sales)") / col("Days"))

        mars_calendar = spark.table("raw_manual_us.mars_calendar")
        final_df = df.join(
            mars_calendar,
            (df.Year == mars_calendar.YEAR) & (df.Month == mars_calendar.MONTH_NUMBER),
            how="inner",
        ).select(df["*"], mars_calendar.MARSYEAR, mars_calendar.MARSPERIOD)

        final_df = final_df.withColumn("MARSYEAR", col("MARSYEAR").cast("string"))
        filtered_df = final_df.groupBy("MARSPERIOD", "MARSYEAR").agg(
            sum("AvgDailyRSV").alias("sum_rsv")
        )

        result = (
            (filtered_df)
            .withColumnRenamed("MARSPERIOD", "period")
            .withColumnRenamed("MARSYEAR", "year")
            .withColumn("period", lpad(col("period").cast("string"), 2, "0"))
            .withColumn("customer_name", lit("HARRISTEETER"))
            .withColumnRenamed("sum_rsv", "rsv")
            .withColumn("Market", lit("United States"))
        ).select("Market", "customer_name", "rsv", "period", "year")

    elif name_of_retailer == "STAPLES":
        result = (
            spark.table("dcom_gsc_silver.us_staples_sales")
            .withColumn("period_col", concat(lit("20"), col("period")))
            .withColumn("year", col("period_col").substr(1, 4))
            .withColumn("period", col("period_col").substr(5, 2))
            .withColumn("customer_name", upper(col("customer")))
            .withColumnRenamed("CY_GSV", "rsv")
            .withColumn("Market", lit("United States"))
            .select("Market", "customer_name", "rsv", "period", "year")
        )

    elif name_of_retailer == "SAMSCLUB":
        result = (
            (
                spark.table("dcom_gsc_silver.us_samsclub_sales")
                .select("year", "period", "digital_rsv_dollars")
                .withColumnRenamed("digital_rsv_dollars", "rsv")
                .withColumn("Market", lit("United States"))
                .withColumn("customer_name", lit("SAMSCLUB"))
            ).select("Market", "customer_name", "rsv", "period", "year")
        ).withColumn(
            "period", F.lpad(F.regexp_replace(F.col("period"), "P", ""), 2, "0")
        )

    elif name_of_retailer == "WALMART":
        result = (
            (
                (spark.table("dcom_gsc_silver.us_walmart_sales"))
                .select("year", "period", "digital_rsv_dollars")
                .withColumnRenamed("digital_rsv_dollars", "rsv")
                .withColumn("Market", lit("United States"))
                .withColumn("customer_name", lit("WALMART"))
            ).select("Market", "customer_name", "rsv", "period", "year")
        ).withColumn(
            "period", F.lpad(F.regexp_replace(F.col("period"), "P", ""), 2, "0")
        )

    elif name_of_retailer == "MCDONALDS":
        result = (
            spark.table("dcom_gsc_silver.us_mcd_sales_dq_check")
            .withColumn("year", col("Date").substr(1, 4))
            .withColumn("period", col("Date").substr(5, 2))
            .withColumn("customer_name", lit("MCDONALDS"))
            .withColumnRenamed("Sales_Value", "rsv")
            .withColumn("Market", lit("United States"))
            .select("Market", "customer_name", "rsv", "period", "year")
        )

    elif name_of_retailer == "HEB":
        data = (spark.table("dcom_gsc_silver.us_heb_sales"))
        result_df = dim_calendar_date.join(
            data, dim_calendar_date.calendar_date == data.sales_week_beg_date, "inner"
        )
        result_df2 = result_df.join(
            dim_calendar_period,
            result_df.period_id == dim_calendar_period.calendar_period_id,
            "inner",
        )
        result_df4 = (
            (
                result_df2.withColumn(
                    "year", F.split(F.col("period_name"), "P")[0]
                ).withColumn("period", F.split(F.col("period_name"), "P")[1])
            )
            .withColumnRenamed("Total_Retail", "rsv")
            .withColumn("Market", lit("United States"))
            .withColumn("customer_name", lit("HEB"))
        )
        result = result_df4.select(
            "Market", "customer_name", "rsv", "period", "year"
        )

    elif name_of_retailer == "KROGER":
        kroger_sales_ = spark.table("dcom_gsc_silver.us_kroger_sales")
        kroger_sales_df = kroger_sales_.join(
            dim_calendar_date,
            dim_calendar_date["calendar_date"] == kroger_sales_["PERIOD"],
            how="left",
        )

        result = (
            (
                kroger_sales_df.join(
                    dim_calendar_period,
                    dim_calendar_period["calendar_period_id"]
                    == kroger_sales_df["period_id"],
                    how="left",
                )
            )
            .withColumn("year", col("period_name").substr(1, 4))
            .withColumn("Market", lit("United States"))
            .withColumn("period", col("period_name").substr(6, 2))
            .withColumnRenamed("SALES", "rsv")
            .withColumn("customer_name", lit("KROGER"))
        ).select("Market", "customer_name", "rsv", "period", "year")

    elif name_of_retailer in [
        "AHOLD",
        "ALBSCO",
        "GIANTEAGLE",
        "HYVEE",
        "LOWESFOOD",
        "MEIJER",
        "RALEYS",
        "RITEAID",
        "SHOPRITE",
        "SMARTFINAL",
        "TARGET",
    ]:
        df_Customer = spark.createDataFrame(pd.DataFrame(DATA))

        df_Sales = (
            (spark.table("dcom_gsc_silver.us_nielsen_sales"))
            .filter((col("db_code") == "USMWCONFTOTAL") & (col("country") == "US"))
            .filter(col("customer_id").isin(CUSTOMER_ID))
        )
        df_Dim = (spark.table("dcom_gsc_silver.us_nielsen_product")).filter(
            (col("db_code") == "USMWCONFTOTAL") & (col("country") == "US")
        )

        df_joined = df_Sales.join(
            df_Dim, df_Sales.product_id == df_Dim.product_id, "left"
        )

        df_req = (
            df_joined.filter(col("global_manufacturer") == "MARS WRIGLEY")
            .filter((col("units") > 0.0) & (col("value") > 0.0))
            .groupby("customer_id", "nielsen_calendar_id")
            .agg(sum("units"), sum("value"))
        ).withColumn("week_id", split(col("nielsen_calendar_id"), "_").getItem(2))

        Nielsen_df = (
            (
                df_req.join(
                    dim_calendar_date,
                    df_req.week_id == dim_calendar_date.calendar_date2,
                    "left",
                ).join(
                    df_Customer, df_req.customer_id == df_Customer.Customer_ID, "left"
                )
            )
            .withColumnRenamed("sum(value)", "rsv")
            .withColumnRenamed("CustomerName", "customer_name")
        )

        Nielsen_final = (
            (
                Nielsen_df.join(
                    dim_calendar_period,
                    dim_calendar_period["calendar_period_id"]
                    == Nielsen_df["period_id"],
                    how="left",
                )
            )
            .withColumn("year", col("period_name").substr(1, 4))
            .withColumn("Market", lit("United States"))
            .withColumn("period", col("period_name").substr(6, 2))
        ).select("Market", "customer_name", "rsv", "period", "year")
        result = Nielsen_final.filter(F.col("customer_name") == f"{name_of_retailer}")

    elif name_of_retailer in ["MASON", "RICHARDS"]:
        df_US_DDAAS_2 = spark.table("dcom_gsc_silver.us_gti_sales")
        df_req_2 = (
            (
                (
                    df_US_DDAAS_2.filter(col("RetailerID").isin(f"{name_of_retailer}"))
                    .groupBy("RetailerID", "DateID")
                    .agg(sum("SoldProductShippedValue"))
                )
                .withColumnRenamed("DateID", "Date_ID")
                .withColumn("#Days", substring(col("Date_ID"), -2, 2))
                .withColumn("Cal_Month", substring(col("Date_ID"), 1, 6))
                .withColumn("#Days", col("#Days").cast("int"))
                .withColumn(
                    "AvgDailyRSV", col("sum(SoldProductShippedValue)") / col("#Days")
                )
            )
            .withColumnRenamed("AvgDailyRSV", "rsv")
            .withColumnRenamed("RetailerID", "customer_name")
        )

        DDAS_2 = dim_calendar_date.join(
            df_req_2, dim_calendar_date.calendar_date2 == df_req_2.Date_ID, "inner"
        )
        result = (
            (
                dim_calendar_period.join(
                    DDAS_2,
                    dim_calendar_period.calendar_period_id == DDAS_2.period_id,
                    "inner",
                )
            )
            .withColumn("year", col("period_name").substr(1, 4))
            .withColumn("Market", lit("United States"))
            .withColumn("period", col("period_name").substr(6, 2))
        ).select("Market", "customer_name", "rsv", "period", "year")

    elif name_of_retailer in ["AMAZON"]:
        df = spark.table("trusted_raw.avc_sales_manufacturing_retail_day_us")
        result_df = (
            df.groupBy("START_DATE")
            .agg(
                F.sum("SHIPPED_REVENUE").alias("rsv"),
                F.sum("SHIPPED_UNITS").alias("unit"),
                F.year("START_DATE").alias("CalYear"),
                F.month("START_DATE").alias("CalMonth"),
                F.dayofmonth("START_DATE").alias("CalDay"),
            )
            .withColumn(
                "Date_ID",
                F.concat(
                    F.col("CalYear"),
                    F.lpad(F.col("CalMonth"), 2, "0"),
                    F.lpad(F.col("CalDay"), 2, "0"),
                ),
            )
        )
        amazon_df = result_df.join(
            dim_calendar_date,
            result_df.Date_ID == dim_calendar_date.calendar_date2,
            "inner",
        )

        amazon_1 = amazon_df.join(
            dim_calendar_period,
            dim_calendar_period["calendar_period_id"] == amazon_df["period_id"],
            how="left",
        )

        result = (
            amazon_1.withColumn("year", col("period_name").substr(1, 4))
            .withColumn("Market", lit("United States"))
            .withColumn("customer_name", lit("AMAZON-US"))
            .withColumn("period", col("period_name").substr(6, 2))
        ).select("Market", "customer_name", "rsv", "period", "year")

    elif name_of_retailer in ["BJS", "COSTCO", "CVS", "DOLLARGENERAL", "WALGREENS"]:
        data = spark.table("dcom_gsc_silver.us_gti_sales")
        df_US_DDAAS_1 = data.withColumn(
            "SoldProductShippedQty", col("SoldProductShippedQty").cast("double")
        ).withColumn(
            "SoldProductShippedValue", col("SoldProductShippedValue").cast("double")
        )

        df_req_1 = (
            (
                df_US_DDAAS_1.filter(
                    (col("SoldProductShippedValue") > 0.0)
                    & (col("SoldProductShippedQty") > 0.0)
                )
                .groupBy("RetailerID", "DateID")
                .agg(sum("SoldProductShippedValue"))
            )
            .withColumnRenamed("DateID", "Date_ID")
            .withColumnRenamed("sum(SoldProductShippedValue)", "rsv")
            .withColumnRenamed("RetailerID", "customer_name")
        )

        DDAS_ = df_req_1.join(
            dim_calendar_date,
            df_req_1.Date_ID == dim_calendar_date.calendar_date2,
            "inner",
        )

        DDAS_1 = (
            (
                DDAS_.join(
                    dim_calendar_period,
                    dim_calendar_period["calendar_period_id"] == DDAS_["period_id"],
                    how="left",
                )
            )
            .withColumn("year", col("period_name").substr(1, 4))
            .withColumn("Market", lit("United States"))
            .withColumn("period", col("period_name").substr(6, 2))
        ).select("Market", "customer_name", "rsv", "period", "year")
        result = DDAS_1.filter(F.col("customer_name") == f"{name_of_retailer}")

    elif name_of_retailer in ["DOORDASH", "INSTACART", "UBER", "SHIPT"]:
        odd = (
            (spark.table("dcom_gsc_silver.us_odd_sales_refactored"))
            .withColumn("period", lpad(col("period").cast("string"), 2, "0"))
            .withColumn("Market", lit("United States"))
            .select("Market", "customer_name", "rsv", "period", "year")
        )
        result = odd.filter(F.col("customer_name") == f"{name_of_retailer}")

    elif name_of_retailer == "SONIC":
        SONIC = (
            (spark.table("dcom_gsc_bronze.sharepoint_us_sonic_manual"))
            .withColumnRenamed("TOTALDOLLARS", "rsv")
            .withColumnRenamed("INVOICEYEAR", "year")
            .withColumnRenamed("MARSPERIOD", "period")
            .withColumn("customer_name", lit("SONIC"))
            .withColumn("Market", lit("United States"))
            .withColumn("period", lpad(col("period").cast("string"), 2, "0"))
            .select("Market", "customer_name", "rsv", "period", "year")
        )
        result = SONIC.filter(F.col("customer_name") == f"{name_of_retailer}")

    else:
        schema = StructType([StructField("column1", StringType(), True)])
        result = spark.createDataFrame([], schema)
    return result


In [0]:
def rsv_calculate(metrics_features, source_data, retailer, year, period):
    """
    Compares the RSV (Retail Sales Value) metrics between two DataFrames: 
    metrics_features and source_data, for a specific retailer, year, and period.
    
    Args:
        metrics_features (DataFrame): The DataFrame containing the metrics data.
        source_data (DataFrame): The source data to compare against.
        retailer (str): The name of the retailer to filter and compare.
        year (str): The year for which the comparison is being performed.
        period (str): The period (e.g., month) for which the comparison is being performed.
    
    Returns:
        tuple: 
            - success (bool): Whether the comparison was successful.
            - mismatched_data (DataFrame): Data that failed the comparison (if any).
    """
    
    # Define the key columns to be used for comparison
    key_col = ["Market", "customer_name", "year", "period"]
    
    # Define the relative tolerance value for comparison (to allow for small differences)
    rel_tol = RSV_COMPARE_VALUE
    
    # The columns on which to compare the data
    compare_for = ["rsv"]
    
    # Output file path for saving mismatched data (not used here)
    output_file_path = None
    
    # Group by columns (not used here)
    group_by_col = None
    
    # Columns to sum (not used here)
    sum_cols = None
    
    # Call compare_df function to perform the comparison between the source and metrics data
    result_status, mismatched_data = compare_df(
        source_data,
        metrics_features.filter(
            metrics_features["customer_name"].isin(f"{retailer}")
        ),
        key_col,
        output_file_path,
        rel_tol,
        compare_for,
        filter_condition=None,
        group_by_col=None,
        sum_cols=None,
    )
    
    # If comparison was successful, return True, otherwise handle mismatches
    if result_status:
        success = True
    else:
        # If retailer is "AODcomm", consider the comparison successful even if mismatches exist
        if retailer == "AODcomm":
            success = True
        else:
            success = False
    
    return success, mismatched_data


# Fetch the source data from the source_df() function
source_ = source_df()

# If the source data is not empty, perform further processing
if source_.count() != 0:
    
    # Filter the source data by year, period, and market
    source = source_.filter(
        (F.col("year") == f"{year}")
        & (F.col("period") == f"{period}")
        & (F.col("Market") == market)
    )
    
    # Filter the metrics data by year, period, retailer, and market
    data_ = (data).filter(
        (F.col("year") == f"{year}")
        & (F.col("period") == f"{period}")
        & (F.col("customer_name") == f"{name_of_retailer}")
        & (F.col("Market") == market)
    )

    # Call the get_metrics function to extract the required metrics for both the data and source
    metrics_features = get_metrics(
        data_, ["Market", "customer_name", "year", "period"]
    )
    source_data = get_metrics(
        source, ["Market", "customer_name", "year", "period"]
    )

    # Iterate over the list of retailers and perform the RSV calculation and comparison
    for retailer in retailer_list:
        # Call the rsv_calculate function to compare metrics for each retailer
        result_status, mismatched_data = rsv_calculate(
            metrics_features, source_data, retailer, year, period
        )
        
        # Print the comparison result status (True if successful, False if there are mismatches)
        print(result_status)
        
        # Display the mismatched data, if any
        display(mismatched_data)


False


market,customer_name,year,period,rsv_df1,rsv_df2
United States,MEIJER,2025,3,226366.11,227189.65


# DQ TABLE

In [0]:
table = spark.table("dcom_gsc_dq.dq_results")

# change the process_name and process_date based on your needs
filtered = table.filter(table["PROCESS_NAME"] == "pl_features_global")\
    .filter(table["PROCESS_DATE"] == "20250418")
display(table)

DATAFRAME_NAME,TEST_NAME,MESSAGE,APP_ID,STATUS,DATASET,VALUE,TYPE_OF_DATASET,DQ_CHECK_TIMESTAMP,PROJECT_NAME,PROCESS_NAME,PROCESS_DATE
pl_features_global,rsv_var_bw_periods_current_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""HARRISTEETER"", ""OTHER"": ""Current Period X Previous Period"" }",app-20250417191302-0000,Failure,column,RSV Variance Percentage between the Current Period and the Previous Period Failed.,Column,2025-04-17T19:34:31.018Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
pl_features_global,rsv_var_bw_periods_current_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""AODcomm"", ""OTHER"": ""Current Period X Previous Period"" }",app-20250417120713-0000,Success,column,RSV Variance Percentage between the Current Period and the Previous Period Passed.,Column,2025-04-17T12:34:05.899Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
pl_features_global,rsv_var_bw_periods_current_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""AMAZON"", ""OTHER"": ""Current Period X Previous Period"" }",app-20250417124737-0000,Success,column,RSV Variance Percentage between the Current Period and the Previous Period Passed.,Column,2025-04-17T13:11:42.07Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
pl_features_global,rsv_var_bw_periods_current_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""SONIC"", ""OTHER"": ""Current Period X Previous Period"" }",app-20250417153919-0000,Success,column,RSV Variance Percentage between the Current Period and the Previous Period Passed.,Column,2025-04-17T16:03:52.035Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
pl_features_global,rsv_var_bw_periods_last_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""HARRISTEETER"", ""OTHER"": ""Current Period X Previous Year"" }",app-20250417191302-0000,Failure,column,RSV Variance Percentage between the Current Period and the Previous Year Failed.,Column,2025-04-17T19:34:31.018Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
pl_features_global,rsv_var_bw_periods_current_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""BJS"", ""OTHER"": ""Current Period X Previous Period"" }",app-20250417181959-0000,Failure,column,RSV Variance Percentage between the Current Period and the Previous Period Failed.,Column,2025-04-17T18:42:37.424Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
pl_features_global,rsv_var_bw_periods_current_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""WALMART"", ""OTHER"": ""Current Period X Previous Period"" }",app-20250417163352-0000,Failure,column,RSV Variance Percentage between the Current Period and the Previous Period Failed.,Column,2025-04-17T17:21:59.591Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
pl_features_global,rsv_var_bw_periods_last_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""AODcomm"", ""OTHER"": ""Current Period X Previous Year"" }",app-20250417120713-0000,Success,column,RSV Variance Percentage between the Current Period and the Previous Year Passed.,Column,2025-04-17T12:34:05.899Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
pl_features_global,rsv_var_bw_periods_last_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""AMAZON"", ""OTHER"": ""Current Period X Previous Year"" }",app-20250417124737-0000,Success,column,RSV Variance Percentage between the Current Period and the Previous Year Passed.,Column,2025-04-17T13:11:42.07Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
pl_features_global,rsv_var_bw_periods_current_year,"{""YEAR"": ""2025"", ""PERIOD"": ""02"", ""RETAILER"": ""HEB"", ""OTHER"": ""Current Period X Previous Period"" }",app-20250417163352-0000,Failure,column,RSV Variance Percentage between the Current Period and the Previous Period Failed.,Column,2025-04-17T17:51:25.221Z,simpel2,GLOBAL_PHOENIX_US_PNL_FEATURE_CLEANED,20250417
