In [1]:
import polars as pl

# Function to calculate Moving Annual Total (MAT)
def calculate_mat(data: pl.DataFrame, metrics: list[str], date_col: str, group_col: str):
    """
    Calculate Moving Annual Total (MAT) for given metrics using rolling sum.

    Parameters:
    - data: Polars DataFrame containing sales data.
    - metrics: List of metrics for which MAT needs to be calculated.
    - date_col: Column containing dates.
    - group_col: Column to group data by (e.g., product_name).
    """
    # Check if the column's datatype is not Date or Datetime
    column_dtype = data[date_col].dtype
    if column_dtype != pl.Date and column_dtype != pl.Datetime:
        try:
            #convert the date column to the Date type
            data = data.with_columns(pl.col(date_col).str.strptime(pl.Date, "%Y-%m-%d"))
        except Exception as e:
            raise ValueError(f"Failed to convert the {date_col} column to date format. Error: {e}")

    aggregated_data = data.group_by([group_col, date_col]).agg(
        [pl.col(metric).sum().alias(f"{metric}") for metric in metrics]
    )

    # Calculate MAT for each metric
    for metric in metrics:
        aggregated_data = aggregated_data.with_columns(
        pl.col(metric).rolling_sum(window_size=12, min_periods=12).over([group_col]).alias(f"MAT_{metric}"),
        pl.col(metric).rolling_mean(window_size=12, min_periods=12).over([group_col]).alias(f"MAA_{metric}")
    )

    return aggregated_data



def calculate_growth_kpis(data: pl.DataFrame, metrics: list[str], date_col: str, group_col: str, mat_columns: list[str] = None,):

    """
    Calculate growth-related KPIs (YoY, MoM, QoQ) as percentage growth for given metrics.

    Parameters:
    - data: Polars DataFrame containing sales data.
    - metrics: List of metrics for which growth KPIs need to be calculated.
    - date_col: Column containing dates.
    - group_col: Column to group data by (e.g., product_name).
    - mat_columns: Optional list of MAT columns for additional MAT-based growth calculations.
    """
    # Combine metrics and MAT columns (if provided)
    all_metrics = metrics + (mat_columns if mat_columns else [])

    column_dtype = data[date_col].dtype
    if column_dtype != pl.Date and column_dtype != pl.Datetime:
        try:
            #convert the date column to the Date type
            data = data.with_columns(pl.col(date_col).str.strptime(pl.Date, "%Y-%m-%d"))
        except Exception as e:
            raise ValueError(f"Failed to convert the {date_col} column to date format. Error: {e}")

    # Ensure date column is of the correct type (date)
    # data = data.with_columns(pl.col(date_col).cast(pl.Date))

    # Calculate growth KPIs for each metric
    for metric in all_metrics:
        # value of the previous year same month for each metric
        previous_year_value = pl.col(metric).shift(12).alias(f"previous_year_{metric}")
        data = data.with_columns(previous_year_value)

        #value of the previous month for each metric
        previous_month_value = pl.col(metric).shift(1).alias(f"previous_month_{metric}")
        data = data.with_columns(previous_month_value)

        # Year-over-Year (YoY) Growth in Percentage
        data = data.with_columns(
            (
                (pl.col(metric) - pl.col(metric).shift(12))
                / pl.col(metric).shift(12)
                * 100
            )
            .over([group_col])
            .alias(f"YoY_{metric}_growth_percentage")
        )

        # Month-over-Month (MoM) Growth in Percentage
        data = data.with_columns(
            (
                (pl.col(metric) - pl.col(metric).shift(1))
                / pl.col(metric).shift(1)
                * 100
            )
            .over([group_col])
            .alias(f"MoM_{metric}_growth_percentage")
        )

        #Quarter-over-Quarter (QoQ) Growth in Percentage
        current_quarter_sum = pl.col(metric).rolling_sum(window_size=3, min_periods=3).over(group_col).alias(f"{metric}_current_quarter")
        previous_quarter_sum = current_quarter_sum.shift(3).alias(f"{metric}_previous_quarter")
        qoq_growth = (
            (current_quarter_sum - previous_quarter_sum)
            / previous_quarter_sum
            * 100
        ).alias(f"QoQ_{metric}_growth_percentage")
        data = data.with_columns([current_quarter_sum, previous_quarter_sum, qoq_growth])



    return data


def main(
    data: pl.DataFrame,
    metrics: list[str],
    date_col: str ,
    group_col: str,
    include_mat: bool = True,
):
    """
    Main function to perform sales analysis.

    Parameters:
    - data: Polars DataFrame containing sales data.
    - metrics: List of metrics for analysis.
    - date_col: Column containing dates.
    - group_col: Column to group data by (e.g., product_name).
    - include_mat: Boolean flag to indicate whether MAT calculations should be included.
    """

    print("Starting sales analysis...")

    # Optionally calculate MAT values
    # mat_columns = []
    # if include_mat:
    #     print("Calculating MAT values...")
    #     mat_df = calculate_mat(data, metrics, date_col, group_col)
    #     # Extract MAT column names
    #     mat_columns = [f"MAT_{metric}" for metric in metrics]
    #     print(mat_columns)
    # else:
    #     print("Skipping MAT calculations...")
    #     mat_df = data
    # mat_columns = []

    if include_mat:
      print("Calculating MAT values...")
      mat_columns = []

      mat_df = calculate_mat(data, metrics, date_col, group_col)
      print(mat_df)
      # Extract MAT column names
      mat_columns = [f"MAT_{metric}" for metric in metrics]
      if mat_columns:
        print(f"MAT columns: {mat_columns}")
      else:
        print("No MAT columns calculated.")
      # Calculate growth KPIs
      print("Calculating growth KPIs...")
      kpi_df = calculate_growth_kpis(mat_df, metrics, date_col, group_col, mat_columns)
    else:
      print("Skipping MAT calculations and calculating growth KPIs...")
      kpi_df = calculate_growth_kpis(data, metrics, date_col, group_col, mat_columns=None)

    print("Analysis complete.")
    return kpi_df


In [2]:
import polars as pl

# Sample data
data = pl.DataFrame({
    "date": [
        "2023-01-01", "2023-02-01", "2023-03-01", "2023-04-01", "2023-05-01",
        "2023-06-01", "2023-07-01", "2023-08-01", "2023-09-01", "2023-10-01",
        "2023-11-01", "2023-12-01", "2024-01-01", "2024-02-01", "2024-03-01"
    ]*2,
    "product_name": ["Product_A", "Product_B"] * 15,
    "sales_quantity": [100, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240, 250]*2,
    "sales_value": [1000, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000, 2100, 2200, 2300, 2400, 2500]*2,
})
data

date,product_name,sales_quantity,sales_value
str,str,i64,i64
"""2023-01-01""","""Product_A""",100,1000
"""2023-02-01""","""Product_B""",120,1200
"""2023-03-01""","""Product_A""",130,1300
"""2023-04-01""","""Product_B""",140,1400
"""2023-05-01""","""Product_A""",150,1500
…,…,…,…
"""2023-11-01""","""Product_B""",210,2100
"""2023-12-01""","""Product_A""",220,2200
"""2024-01-01""","""Product_B""",230,2300
"""2024-02-01""","""Product_A""",240,2400


In [3]:
metrics = ["sales_quantity", "sales_value"]
date = 'date'
group_col = 'product_name'
result = main(data, metrics, date, group_col, include_mat=True)
result

Starting sales analysis...
Calculating MAT values...
shape: (30, 8)
┌────────────┬────────────┬────────────┬───────────┬───────────┬───────────┬───────────┬───────────┐
│ product_na ┆ date       ┆ sales_quan ┆ sales_val ┆ MAT_sales ┆ MAA_sales ┆ MAT_sales ┆ MAA_sales │
│ me         ┆ ---        ┆ tity       ┆ ue        ┆ _quantity ┆ _quantity ┆ _value    ┆ _value    │
│ ---        ┆ date       ┆ ---        ┆ ---       ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
│ str        ┆            ┆ i64        ┆ i64       ┆ i64       ┆ f64       ┆ i64       ┆ f64       │
╞════════════╪════════════╪════════════╪═══════════╪═══════════╪═══════════╪═══════════╪═══════════╡
│ Product_B  ┆ 2023-09-01 ┆ 190        ┆ 1900      ┆ null      ┆ null      ┆ null      ┆ null      │
│ Product_B  ┆ 2023-03-01 ┆ 130        ┆ 1300      ┆ null      ┆ null      ┆ null      ┆ null      │
│ Product_A  ┆ 2023-04-01 ┆ 140        ┆ 1400      ┆ null      ┆ null      ┆ null      ┆ null      │
│ Product_B  ┆ 2023-02-

  pl.col(metric).rolling_sum(window_size=12, min_periods=12).over([group_col]).alias(f"MAT_{metric}"),
  pl.col(metric).rolling_mean(window_size=12, min_periods=12).over([group_col]).alias(f"MAA_{metric}")
  current_quarter_sum = pl.col(metric).rolling_sum(window_size=3, min_periods=3).over(group_col).alias(f"{metric}_current_quarter")


product_name,date,sales_quantity,sales_value,MAT_sales_quantity,MAA_sales_quantity,MAT_sales_value,MAA_sales_value,previous_year_sales_quantity,previous_month_sales_quantity,YoY_sales_quantity_growth_percentage,MoM_sales_quantity_growth_percentage,sales_quantity_current_quarter,sales_quantity_previous_quarter,QoQ_sales_quantity_growth_percentage,previous_year_sales_value,previous_month_sales_value,YoY_sales_value_growth_percentage,MoM_sales_value_growth_percentage,sales_value_current_quarter,sales_value_previous_quarter,QoQ_sales_value_growth_percentage,previous_year_MAT_sales_quantity,previous_month_MAT_sales_quantity,YoY_MAT_sales_quantity_growth_percentage,MoM_MAT_sales_quantity_growth_percentage,MAT_sales_quantity_current_quarter,MAT_sales_quantity_previous_quarter,QoQ_MAT_sales_quantity_growth_percentage,previous_year_MAT_sales_value,previous_month_MAT_sales_value,YoY_MAT_sales_value_growth_percentage,MoM_MAT_sales_value_growth_percentage,MAT_sales_value_current_quarter,MAT_sales_value_previous_quarter,QoQ_MAT_sales_value_growth_percentage
str,date,i64,i64,i64,f64,i64,f64,i64,i64,f64,f64,i64,i64,f64,i64,i64,f64,f64,i64,i64,f64,i64,i64,f64,f64,i64,i64,f64,i64,i64,f64,f64,i64,i64,f64
"""Product_B""",2023-09-01,190,1900,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
"""Product_B""",2023-03-01,130,1300,,,,,,190,,-31.578947,,,,,1900,,-31.578947,,,,,,,,,,,,,,,,,
"""Product_A""",2023-04-01,140,1400,,,,,,130,,,,,,,1300,,,,,,,,,,,,,,,,,,,
"""Product_B""",2023-02-01,120,1200,,,,,,140,,-7.692308,440,,,,1400,,-7.692308,4400,,,,,,,,,,,,,,,,
"""Product_B""",2023-11-01,210,2100,,,,,,120,,75.0,460,,,,1200,,75.0,4600,,,,,,,,,,,,,,,,
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""Product_A""",2023-11-01,210,2100,2190,182.5,21900,1825.0,190,100,61.538462,110.0,550,570,-3.508772,1900,1000,61.538462,110.0,5500,5700,-3.508772,,2110,,3.791469,6450,,,,21100,,3.791469,64500,,
"""Product_B""",2023-01-01,100,1000,2120,176.666667,21200,1766.666667,160,210,-47.368421,-41.176471,470,640,-26.5625,1600,2100,-47.368421,-41.176471,4700,6400,-26.5625,,2190,,-4.072398,,,,,21900,,-4.072398,,,
"""Product_B""",2024-02-01,240,2400,2230,185.833333,22300,1858.333333,220,100,84.615385,140.0,510,490,4.081633,2200,1000,84.615385,140.0,5100,4900,4.081633,,2120,,5.188679,6560,,,,21200,,5.188679,65600,,
"""Product_A""",2024-01-01,230,2300,2240,186.666667,22400,1866.666667,170,240,27.777778,9.52381,540,550,-1.818182,1700,2400,27.777778,9.52381,5400,5500,-1.818182,,2230,,2.283105,6540,6450,1.395349,,22300,,2.283105,65400,64500,1.395349
