In [1]:
### Import Libraries
# Azure Data Lake libraries
import common.utils.azure_data_lake_interface as adl

# data cleansing libraries
from common.utils.data_cleansing import clean_illegal_chars_in_column

# config libraries
import common.config
from common.utils.configuration_management import load_config

# Data analysis libraries
from pandas import DataFrame, Timedelta, to_datetime

In [2]:
def find_declining_margins(
        df: DataFrame,
        days: int,
        tolerance: float = 0.0,
        date_col: str = "created_date",
        sku_col: str = "sku",
        gpp_col: str = "gross_profit_percent",
        trans_id_col: str = "tranid",
) -> DataFrame:
    """
    Identifies products with declining gross profit margins within a specified time window.

    This function takes transaction data, computes the decline in gross profit percent for
    each SKU over a specified period, and filters for transactions where this decline
    exceeds a provided tolerance level. The final result is returned as a subset DataFrame
    meeting these criteria.

    Args:
        df (DataFrame): Input DataFrame containing transactional data with gross profit
            percent and dates of transactions.
        days (int): Number of days to consider for the cutoff window. Only transactions
            within this number of days from the latest transaction date are considered.
        tolerance (float, optional): Minimum difference between the previous gross profit
            percentage and the current one to qualify as a "decline". Default is 0.0.
        date_col (str, optional): Name of the column in the DataFrame representing the
            transaction dates. Default is "created_date".
        sku_col (str, optional): Name of the column in the DataFrame identifying the
            stock keeping unit (SKU). Default is "sku".
        gpp_col (str, optional): Name of the column in the DataFrame representing the
            gross profit percentage (GPP). Default is "gross_profit_percent".
        trans_id_col (str, optional): Name of the column in the DataFrame representing the
            transaction identifier. Default is "tranid".

    Returns:
        DataFrame: A subset of the input DataFrame containing rows where the gross profit
        percentage for a product has declined by more than the specified tolerance within
        the defined time window. The returned DataFrame includes columns for the current
        and previous gross profit percentages, transaction dates, and transaction IDs.
    """

    df = df.copy()

    # 1. Parse and sort
    df[date_col] = to_datetime(df[date_col], errors='raise')
    df = df.sort_values([sku_col, date_col])

    # 2. Compute the prior-value column
    df['prev_gpp'] = df.groupby(sku_col)[gpp_col].shift(1)
    df['prev_trans_date'] = df.groupby(sku_col)[date_col].shift(1)
    if df[trans_id_col].dtype == 'string':
        df['prev_trans_id'] = df.groupby(sku_col)[trans_id_col].shift(1).fillna('Not Specified').astype('string')
    else:
        df['prev_trans_id'] = df.groupby(sku_col)[trans_id_col].shift(1).fillna(0).astype(int)

    # 3. Compute cutoff
    cutoff = df['created_date'].max() - Timedelta(days=days)

    # 4. Filter: within window AND declined
    mask_window = df[date_col] >= cutoff
    mask_declined = (df["prev_gpp"] - df[gpp_col]) > tolerance
    result = df[mask_window & mask_declined].copy()

    # Calculate gpp_difference immediately after prev_gpp
    result["gpp_difference"] = round(result["gross_profit_percent"] - result["prev_gpp"], 2)

    # Reorder columns to put gpp_difference after prev_gpp
    cols = list(result.columns)
    gpp_diff_idx = cols.index("gpp_difference")
    prev_gpp_idx = cols.index("prev_gpp")
    cols.insert(prev_gpp_idx + 1, cols.pop(gpp_diff_idx))
    result = result[cols]

    return result


In [12]:
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.styles import Font
from openpyxl.utils import get_column_letter
from typing import Optional, List


class ExcelFormatterError(Exception):
    """Raised when formatting cannot be applied to the Excel file."""
    pass


def format_worksheet(
    file_path: str,
    sheet_name: Optional[str] = None,
    font_size: int = 14,
    freeze_top_row: bool = True,
    autofit_columns: bool = True,
    max_col_width: int = 50,
    padding: int = 2,
    date_columns: Optional[List[str]] = None,
    date_number_format: str = "yyyy-mm-dd",
    number_columns: Optional[List[str]] = None,
    number_format: str = "0.00",
    currency_columns: Optional[List[str]] = None,
    currency_number_format: str = "$#,##0.00",
    enable_filter: bool = True,
    verbose: bool = False
) -> None:
    """
    Apply formatting to an Excel worksheet, including:
      1. Font size
      2. Freeze top row
      3. Auto-fit columns
      4. Date, number, and currency formatting
      5. Auto-filter
    """
    wb = load_workbook(file_path)
    ws: Worksheet = wb[sheet_name] if sheet_name else wb.active

    max_col = ws.max_column
    max_row = ws.max_row

    # 1. Font sizing
    for row in ws.iter_rows(min_row=1, max_row=max_row, min_col=1, max_col=max_col):
        for cell in row:
            if cell.value is not None:
                cell.font = Font(size=font_size)

    # 2. Freeze top row
    if freeze_top_row:
        ws.freeze_panes = "A2"

    # 3. Auto-fit columns
    if autofit_columns:
        for col_idx in range(1, max_col + 1):
            col_letter = get_column_letter(col_idx)
            max_length = 0
            for cell in ws[col_letter]:
                if cell.value is not None:
                    length = len(str(cell.value))
                    if length > max_length:
                        max_length = length
            ws.column_dimensions[col_letter].width = min(max_length + padding, max_col_width)

    # Map headers to columns (case-insensitive)
    header_row = next(ws.iter_rows(min_row=1, max_row=1, values_only=False))
    header_to_col = {
        str(cell.value).strip().lower(): idx + 1
        for idx, cell in enumerate(header_row)
        if cell.value is not None
    }
    if verbose:
        print("Header mapping:", header_to_col)

    def apply_number_format(headers: List[str], fmt: str, label: str):
        for header in headers or []:
            key = header.strip().lower()
            col_idx = header_to_col.get(key)
            if not col_idx:
                if verbose:
                    print(f"Warning: {label} header '{header}' not found.")
                continue
            for row_idx in range(2, max_row + 1):
                cell = ws.cell(row=row_idx, column=col_idx)
                if cell.value is not None:
                    cell.number_format = fmt

    apply_number_format(date_columns, date_number_format, "Date")
    apply_number_format(number_columns, number_format, "Number")
    apply_number_format(currency_columns, currency_number_format, "Currency")

    # 7. Enable filtering
    if enable_filter:
        last_col_letter = get_column_letter(max_col)
        ws.auto_filter.ref = f"A1:{last_col_letter}{max_row}"

    wb.save(file_path)

In [4]:
# attach to the data lake
config = load_config(common.config, "datalake_config.json")
service_client = adl.get_azure_service_client(config["blob_url"])
file_system_client = adl.get_azure_file_system_client(service_client, "consolidated")

# get data
data_state = "curated"
trans_type = "Estimate"
filename = f"transaction/{trans_type}ItemLineItems_{data_state}.parquet"
df = adl.get_parquet_file_from_data_lake(file_system_client, f"{data_state}/netsuite", filename)

In [13]:
lookback = 5
margin_declines = find_declining_margins(df, days=lookback)
print(f"{margin_declines.shape[0]} declining margins found.")
df = clean_illegal_chars_in_column(df, "description")
margin_declines.to_excel(f'../excel_outputs/{trans_type}ItemLineItems_margin_declines_in_past_{lookback}_days.xlsx', index=False)

319 declining margins found.


In [14]:
format_worksheet(
    f'../excel_outputs/{trans_type}ItemLineItems_margin_declines_in_past_{lookback}_days.xlsx',
    sheet_name="Sheet1",
    date_columns=["created_date", "prev_trans_date"],
    number_columns=["quantity", "labor_hours", "gross_profit_percent", "prev_gpp", "gpp_difference"],
    currency_columns=[
        "unit_cost", "highest_quoted_cost", "highest_recent_cost", "highest_cost",
        "handling_cost", "unit_price", "total_cost", "total_amount", "gross_profit"
    ],
)