###### G10 Q1 Customer Booking ETL Process
### Created by: Ruhan Ahmed
### Created on: 01/07/2025
### Last updated by: Ruhan Ahmed
### Last updated on: 06/07/2025

# Extracting: 
### The data has been exported using Adobe Acrobat Pro text form features into multiple sheets

In [1]:
import pandas as pd

def load_and_combine_excel(file_path: str) -> pd.DataFrame:
    """
    Reads all sheets from an Excel file and concatenates them into a single DataFrame.

    :param file_path: Path to the Excel file.
    :return: Combined DataFrame from all sheets.
    """

    xls = pd.ExcelFile(file_path)
    return pd.concat([xls.parse(sheet) for sheet in xls.sheet_names], ignore_index=True)

# Base Lakehouse directory
LAKEHOUSE_BASE = (
    "abfss://fcadf7d8-0448-4cbc-a324-f9d13c00817d@onelake.dfs.fabric.microsoft.com/"
    "472b431a-5c41-4e10-994e-6ba64c998caa/Files/"
)

# Month names to construct file paths
months = ["January", "February", "March"]

# Load and combine all Q1 Excel files
q1_orders_df = pd.concat(
    [
        load_and_combine_excel(f"{LAKEHOUSE_BASE}{i:02d} - {month} pdf orders.xlsx")
        for i, month in enumerate(months, start=1)
    ],
    ignore_index=True
)

# Display combined DataFrame
display(q1_orders_df)


StatementMeta(, 5383f715-e176-41e4-80d2-aff1cef3a52a, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cc1926c7-0340-4117-a9da-6105ad546e14)

# Transform:
### Renaming & Dropping columns

In [5]:
def clean_orders(df: pd.DataFrame) -> pd.DataFrame:
    """
    Cleans the raw orders DataFrame by:
    - Renaming columns
    - Converting data types
    - Handling missing values

    :param df: The dataframe to be cleaned
    :return: The cleaned dataframe
    """

    # Rename relevant columns
    rename_map: dict[str, str] = {
        "Unnamed: 0": "source_file",
        "Text Field 63": "order_date",
        "Text Field 65": "date_required",
        "Text Field 64": "booking_number",
        "Text Field 66": "invoice_number",
        "Text Field 60": "production_order_number",
        "Text Field 61": "despatch_date",
        "Text Field 62": "despatch_number",
        "Text Field 118": "company_name",
        "Text Field 130": "ordered_by",
        "Text Field 180": "contact_number",
        "Text Field 178": "customer_order_number",

        # Booking data columns
        "Text Field 71": "quantity_field_1",
        "Text Field 135": "size_field_1",
        "Text Field 141": "paper_field_1",
        "Text Field 90": "description_field_1",
        "Text Field 102": "zero_rated_field_1",
        "Text Field 1010": "taxable_field_1",

        "Text Field 78": "quantity_field_2",
        "Text Field 136": "size_field_2",
        "Text Field 142": "paper_field_2",
        "Text Field 91": "description_field_2",
        "Text Field 103": "zero_rated_field_2",
        "Text Field 1011": "taxable_field_2",

        "Text Field 79": "quantity_field_3",
        "Text Field 137": "size_field_3",
        "Text Field 143": "paper_field_3",
        "Text Field 92": "description_field_3",
        "Text Field 104": "zero_rated_field_3",
        "Text Field 1012": "taxable_field_3",

        "Text Field 80": "quantity_field_4",
        "Text Field 138": "size_field_4",
        "Text Field 144": "paper_field_4",
        "Text Field 93": "description_field_4",
        "Text Field 105": "zero_rated_field_4",
        "Text Field 1013": "taxable_field_4",

        "Text Field 134": "special_info_field_1",
        "Text Field 179": "special_info_field_2",

        # Delivery data columns
        "Text Field 37": "delivery_contact",
        "Text Field 131": "delivery_mobile_number",
        "Text Field 132": "delivery_email",
        "Text Field 38": "delivery_address",
        "Text Field 133": "delivery_instructions"
    }
    
    df = df.rename(columns=rename_map)

    # Drop unused columns because it's empty boxes to be handwritten for printers downstairs
    # Also cost columns because I have the sales dataset which is more accurate
    df.drop(columns=[
        "production_order_number",
        "despatch_date",
        "despatch_number",
        "customer_order_number",
        "invoice_number",
        "zero_rated_field_1", "zero_rated_field_2", "zero_rated_field_3", "zero_rated_field_4",
        "taxable_field_1", "taxable_field_2", "taxable_field_3", "taxable_field_4",
        "special_info_field_1", "special_info_field_2"
    ], inplace=True, errors="ignore")

    # Drop completely empty rows
    df.dropna(how="all", inplace=True)

    # Reset index
    df = df.reset_index(drop=True)

    return df

df = clean_orders(q1_orders_df)

display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2a865dc6-d5b0-4820-ba85-bcfce34a2b94)

In [6]:
def parse_quantity(value: str | float) -> float | None:
    """
    Parse a quantity value into a float.

    Interprets strings like '2.5K' or '3 thousand' and converts them into float values.
    Cleans out irrelevant characters and handles invalid or missing data.

    :param value: A quantity value as a string or float. Can include suffixes like 'K' or 'thousand'.
    :return: Parsed numeric value as float, or None if parsing fails or value is null.
    """

    if pd.isnull(value):
        return None
    
    try:
        # Convert to string and clean
        value_str = str(value).lower()
        
        # Remove common words/symbols
        for remove_str in ["copies", "'", ",", " ", "units", "quantity", "qty", ":", "-"]:
            value_str = value_str.replace(remove_str, "")
            
        value_str = value_str.strip()
        
        # Handle thousand indicators (k, thousand)
        if value_str.endswith("k"):
            return float(value_str[:-1]) * 1000
        elif value_str.endswith("thousand"):
            return float(value_str[:-8]) * 1000
    
        # Try direct conversion if no special suffixes
        return float(value_str)
        
    except (ValueError, AttributeError):
        return None

def convert_data_types(df):
    """
    Converts and cleans data types for relevant fields in a DataFrame.

    - Parses quantity-related fields using :func:`parse_quantity`.
    - Converts booking numbers to nullable integers.
    - Cleans string fields by filling missing values and stripping whitespace.

    :param df: DataFrame containing raw data.
    :return: DataFrame with cleaned and converted data types.
    """

    for quantity_field in ["quantity_field_1", "quantity_field_2", "quantity_field_3", "quantity_field_4"]:
        df[quantity_field] = df.get(quantity_field).apply(parse_quantity)

    # type: int
    for number in ["booking_number"]:
        df[number] = pd.to_numeric(df[number], errors='coerce').astype('Int64')

    # Clean up client name and product type
    # type: str
    for name in ["ordered_by", "delivery_contact"]:
        df[name] = df.get(name, "").fillna("Unknown").astype(str).str.strip()

    return df

df = convert_data_types(df)

display(df)
print(df.dtypes)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6304cabc-7597-4dce-b96d-2ede11d62586)

source_file                object
delivery_contact           object
delivery_address           object
order_date                 object
booking_number              Int64
date_required              object
quantity_field_1          float64
quantity_field_2          float64
quantity_field_3          float64
quantity_field_4          float64
description_field_1        object
description_field_2        object
description_field_3        object
description_field_4        object
company_name               object
ordered_by                 object
delivery_mobile_number     object
delivery_email             object
delivery_instructions      object
size_field_1               object
size_field_2               object
size_field_3               object
size_field_4               object
paper_field_1              object
paper_field_2              object
paper_field_3              object
paper_field_4              object
contact_number             object
dtype: object


In [7]:
def extract_from_source_file(df: pd.DataFrame) -> pd.DataFrame:
    """
    Splits 'source_file' into booking_number, company_name, and job_name.
    Replaces 'company_name' with the extracted version and validates consistency.
    
    :param df: Cleaned DataFrame with a 'source_file' column.
    :return: Updated DataFrame with separated and validated columns.
    """

    # Remove .pdf suffix and split by " - "
    split_cols = df["source_file"].str.replace(".pdf", "", regex=False).str.split(" - ", n=2, expand=True)
    df["booking_number_extracted"] = pd.to_numeric(split_cols[0], errors="coerce").astype('Int64')
    df["company_name"] = split_cols[1].str.strip().str.title()
    df["job_name"] = split_cols[2].str.strip().str.title()

    # Optional: Check for mismatch
    mismatches = df[df["booking_number_extracted"] != df["booking_number"]]
    if not mismatches.empty:
        print("⚠️ Warning: Booking number mismatch found in some rows.")
        display(mismatches[["source_file", "booking_number", "booking_number_extracted"]])

    # Drop original column
    df.drop(columns=["source_file"], inplace=True)

    return df

df = extract_from_source_file(df)
display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 9, Finished, Available, Finished)



SynapseWidget(Synapse.DataFrame, 62b1ebc2-c431-43c9-a6c5-24b2ef9ef6ec)

SynapseWidget(Synapse.DataFrame, f28790ca-ead8-45ab-84c8-525b231decf0)

In [8]:
# Replace incorrect booking numbers
df["booking_number"] = df["booking_number_extracted"]

# Drop the extracted column
df.drop(columns=["booking_number_extracted"], inplace=True)

display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b7aee50c-55cd-4cee-906c-71d1c29ccfdd)

In [10]:
def reorder_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Reorders the DataFrame columns in a specific layout:
    - booking_number, company_name, job_name, ordered_by, contact_number
    - Followed by booking fields in order (quantity, size, paper, description for each set)
    - Followed by any remaining columns

    :param df: The input DataFrame with cleaned and enriched data
    :return: A DataFrame with columns reordered according to the specified logic
    """

    # Desired initial columns
    main_columns = [
        "booking_number",
        "company_name",     # from extracted data
        "job_name",         # from extracted data
        "order_date",
        "date_required",
        "ordered_by",
        "contact_number"
    ]

    # Define the desired column order
    booking_fields = []
    for i in range(1, 5):
        booking_fields.extend([
            f"quantity_field_{i}",
            f"size_field_{i}",
            f"paper_field_{i}",
            f"description_field_{i}"
        ])

    # Combine all columns in order
    final_column_order = main_columns + booking_fields

    # Add any remaining columns not listed above
    remaining_columns = [col for col in df.columns if col not in final_column_order]
    final_column_order += remaining_columns

    # Reorder the DataFrame
    df = df[final_column_order]

    return df

df = reorder_columns(df)
display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7b8ded9c-6f65-44d8-ace6-2221417285cb)

# Transforming:
# booking_number

In [11]:
null_booking_num_check1 = df[df["booking_number"].isna()]
display(null_booking_num_check1)

duplicates_check1 = df[df["booking_number"].duplicated(keep=False)]
display(duplicates_check1)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 335a0771-7d00-4fcc-8856-61bed470d01d)

SynapseWidget(Synapse.DataFrame, 837fc6d8-2579-47c6-b788-36b5448c694e)

In [12]:
def booking_number_clean(df: pd.DataFrame) -> pd.DataFrame:
    """
    Cleans the DataFrame by removing rows with missing `booking_number`
    and deduplicating on that column.

    - Drops rows where `booking_number` is null.
    - Removes duplicates based on `booking_number`, keeping only the last occurrence.

    :param df: The input DataFrame containing a `booking_number` column.
    :return: Cleaned DataFrame with non-null, deduplicated `booking_number` entries.
    """

    df = df[df["booking_number"].notna()].copy()
    df = df[~df.duplicated(subset="booking_number", keep="last")]


    return df

df = booking_number_clean(df)

display(df)

null_booking_check2 = df[df["booking_number"].isna()]
display(null_booking_check2)

duplicates_check2 = df[df["booking_number"].duplicated(keep=False)]
display(duplicates_check2)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7a1d55b4-119c-488a-a9c3-b1779439639a)

SynapseWidget(Synapse.DataFrame, 125c6b20-c609-4540-b9bb-b9ba4e7edf95)

SynapseWidget(Synapse.DataFrame, 12db7e9a-b82d-4c38-ac86-653caa2bd536)

# Transforming:
# company_name, job_name

In [13]:
null_job_name_check1 = df[df["job_name"].isna()]
display(null_job_name_check1)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 90e87bb5-0e64-4a73-a507-0250bd9933a1)

In [14]:
def company_job_clean(df: pd.DataFrame) -> pd.DataFrame:
    """
    Manually updates `company_name` and `job_name` fields based on specific `booking_number` values.

    Applies hardcoded corrections to ensure the `company_name` and `job_name` columns have
    accurate information for known booking numbers.

    :param df: The input DataFrame with columns `booking_number`, `company_name`, and `job_name`.
    :return: The updated DataFrame with corrected values for specific bookings.
    """

    updates: dict[int, tuple[str, str]] = {
        2501006: ("Ahead Design", "Spicy King"),
        2501010: ("One Step", "Golden Dragon"),
        2502416: ("We Design", "Lotus House"),
        2502424: ("We Design", "Top Chef"),
        2502583: ("We Design", "China King"),
        2502584: ("We Design", "China Rose"),
        2502588: ("We Design", "Happy Gourmet"),
        2502589: ("We Design", "Lams Kitchen"),
        2503415: ("We Design", "China Hall"),
        2503502: ("We Design", "The Bear Inn"),
    }

    for booking, (company, job) in updates.items():
        df.loc[df["booking_number"] == booking, "company_name"] = company
        df.loc[df["booking_number"] == booking, "job_name"] = job

    return df

df = company_job_clean(df)

null_job_name_check2 = df[df["job_name"].isna()]
display(null_job_name_check2)

display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 59e43362-a99a-44e0-8345-43d49ff93373)

SynapseWidget(Synapse.DataFrame, 7f70fb12-b13f-41b0-939b-175631ffab8e)

In [15]:
unique = df["company_name"].unique()

display(pd.DataFrame(sorted(unique), columns=["Unique Company Names"]))

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e91d9485-5ba2-4a44-833b-f39812ff6158)

In [16]:
# Mapping dictionary keys are now Title Case
company_relabel_map: dict[str, str] = {
    "16 Print": "16 Printing",
    "A&D Print": "A&D Printing",
    "Amaze Media": "Amaze Media Pro",
    "Aston Design": "Aston Design & Print",
    "Aston Print": "Aston Design & Print",
    "B1 Group": "B1 Media",
    "Bdsigns": "Bd Signs",
    "D&A Print": "D&A Printing",
    "Da Print": "Da Print Ltd",
    "Ema": "Ema Embroidery",
    "Impress": "Impress Media Uk",
    "Impress Media": "Impress Media Uk",
    "Inspire": "Inspire Print Ltd",
    "Inspire Print": "Inspire Print Ltd",
    "Jnb": "Jnb Prints",
    "Kabir Design": "Kabir Design & Print",
    "Menu Lane": "Menulane",
    "Metro Printers": "Metro Print",
    "Pixels Design": "Pixels Design Studio",
    "Premier Links": "Premier Linkz",
    "Print Today": "Print Today Uk",
    "Sign Genious": "Sign Genius",
    "Simply Menus": "Simply Menus Ltd",
    "Ultimate": "Ultimate Print & Design",
    "Ultimate Print": "Ultimate Print & Design",
    "Wang": "Wangland",
    "Wedesign": "We Design"
}

def relabel_company(name: str) -> str:
    """
    Standardizes company names using a predefined mapping dictionary.

    Converts the input string to title case, trims whitespace, and maps it to a
    standardized name if a match is found in the mapping dictionary. If no match is found,
    returns the title-cased name unchanged.

    :param name: Raw company name string.
    :return: Standardized company name.
    """

    if not isinstance(name, str):
        return name
    name_title = name.title().strip()

    # Lookup in title case keys
    new_name = company_relabel_map.get(name_title, name_title)
    return new_name

df["company_name"] = df["company_name"].apply(relabel_company)

unique = df["company_name"].unique()
display(pd.DataFrame(sorted(unique), columns=["Unique Company Names"]))


StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 44889a92-32f1-456d-a7c6-bca9c1bf2a02)

# Transforming:
# order_date, date_required

In [17]:
import pandas as pd
import numpy as np
import re
from datetime import timedelta

def clean_dates_with_ranges(df: pd.DataFrame) -> pd.DataFrame:
    """
    Cleans the 'date_required' column in a DataFrame by parsing various formats.

    This function handles:
    - Standard date formats
    - Unix timestamps (milliseconds)
    - String ranges like '2-5 days', adjusting from 'order_date'

    :param df: A DataFrame containing 'date_required' and 'order_date' columns.
    :return: The DataFrame with cleaned 'order_date' and 'date_required' columns.
    """
    
    def parse_date_required(val, order_date: pd.Timestamp) -> pd.Timestamp | float:
        """
        Parses a single value from 'date_required' using multiple strategies.

        :param val: Raw value from 'date_required' column.
        :param order_date: Corresponding 'order_date' for fallback computation.
        :return: Parsed timestamp or NaN if parsing fails.
        """
        
        if pd.isnull(val):
            return np.nan

        # Case 1: Already a valid date
        try:
            parsed = pd.to_datetime(val, errors='coerce', dayfirst=True)
            if pd.notnull(parsed):
                return parsed
        except Exception:
            pass

        # Case 2: Unix timestamp
        try:
            val_float = float(val)
            if val_float > 1e12:
                return pd.to_datetime(val_float, unit='ms')
        except Exception:
            pass

        # Case 3: String like '1-3 days', '4-7 days urgent'
        if isinstance(val, str) and isinstance(order_date, pd.Timestamp):
            match = re.findall(r'\d+', val)
            if match:
                max_days = max(map(int, match))
                return order_date + timedelta(days=max_days)

        return np.nan

    # Ensure order_date is parsed correctly first
    df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce", dayfirst=True)

    # Now clean date_required based on order_date
    df["date_required"] = df.apply(
        lambda row: parse_date_required(row["date_required"], row["order_date"]),
        axis=1
    )

    return df


def fix_date(date: pd.Timestamp) -> pd.Timestamp:
    """
    Adjusts dates from 2024 or 2025 based on certain conditions.

    - If date is in 2024 and not in December, it swaps the day and month, and sets year to 2025.
    - If date is in 2025 but outside Jan–Mar, it also swaps day/month to attempt a Jan–Mar correction.
    - Swapping is only accepted if it results in a valid date within Jan–Mar 2025.

    :param date: A datetime object to potentially correct.
    :return: Corrected or original timestamp.
    """

    year = date.year
    month = date.month
    day = date.day

    if year == 2024:
        if month != 12:
            # Swap day and month and set year to 2025
            try:
                swapped = pd.Timestamp(year=2025, month=day, day=month)
                if swapped.month in [1, 2, 3]:
                    return swapped
            except ValueError:
                # Invalid swapped date, keep original
                return date
    elif year == 2025:
        if month not in [1, 2, 3]:
            # Swap day and month keeping year 2025
            try:
                swapped = pd.Timestamp(year=2025, month=day, day=month)
                if swapped.month in [1, 2, 3]:
                    return swapped
            except ValueError:
                return date
    return date

df = clean_dates_with_ranges(df)

df['order_date'] = df['order_date'].apply(fix_date)
display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 19, Finished, Available, Finished)

  parsed = pd.to_datetime(val, errors='coerce', dayfirst=True)


SynapseWidget(Synapse.DataFrame, e837f190-ca40-49e2-9b65-de90092e6c98)

In [18]:
# Filter rows where order_date year is 2024
orders_2024 = df[df['order_date'].dt.year == 2024]

display(orders_2024)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7b9cf3fa-cfa0-4e65-b773-ef5cf105d1a0)

In [19]:
invalid_2024 = df[(df['order_date'].dt.year == 2024) & (df["order_date"].dt.month != 12)]

display(invalid_2024)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, bf7dbef7-66e1-4fdc-a709-0d8062147804)

In [20]:
# For 2024 dates that are not in December, change year to 2025
mask = (df['order_date'].dt.year == 2024) & (df['order_date'].dt.month != 12)

# Use apply with a function to update the year
df.loc[mask, 'order_date'] = df.loc[mask, 'order_date'].apply(
    lambda d: d.replace(year=2025)
)

# Convert order_date to datetime if it's not already
df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')

# Replace the specific date for 2502571
# Having checked the invoice data and using context
df.loc[df['order_date'] == pd.Timestamp('2025-04-26'), 'order_date'] = pd.Timestamp('2025-02-26')

invalid_2024_check = df[(df['order_date'].dt.year == 2024) & (df["order_date"].dt.month != 12)]

display(invalid_2024_check)

# Checking row to see if the date has been correctly applied
display(df[df['booking_number'] == 2502571])

# Set date_required to NaT if it's not after order_date
df.loc[df['date_required'] <= df['order_date'], 'date_required'] = pd.NaT


StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 22, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5505bf77-ad65-4a8c-93d2-390ef756d695)

SynapseWidget(Synapse.DataFrame, eeedc786-9eeb-4689-b388-84757af5f331)

# Transformation:
# ordered_by, delivery_contact

In [22]:
columns_to_title = ['ordered_by', 'delivery_contact']

for col in columns_to_title:
    df[col] = df[col].apply(lambda x: x.title() if isinstance(x, str) else x)
    
display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 24, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 785cdd8b-7113-4857-9ef5-e553f64d0bd2)

# Transformation:
# size_fields

In [23]:
size_fields = ['size_field_1', 'size_field_2', 'size_field_3', 'size_field_4']

# Check if any of these columns contain 'A4' anywhere in the string
mask = df[size_fields].apply(lambda col: col.str.contains('A4', na=False))

# Keep rows where any column contains 'A4'
filtered_df = df[mask.any(axis=1)]

display(filtered_df)
print(filtered_df.dtypes)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 25, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e53cccd2-f14d-4e4f-a465-2663ac59ec6a)

booking_number                     Int64
company_name                      object
job_name                          object
order_date                datetime64[ns]
date_required             datetime64[ns]
ordered_by                        object
contact_number                    object
quantity_field_1                 float64
size_field_1                      object
paper_field_1                     object
description_field_1               object
quantity_field_2                 float64
size_field_2                      object
paper_field_2                     object
description_field_2               object
quantity_field_3                 float64
size_field_3                      object
paper_field_3                     object
description_field_3               object
quantity_field_4                 float64
size_field_4                      object
paper_field_4                     object
description_field_4               object
delivery_contact                  object
delivery_address

In [24]:
import re

def relabel_sizes(value: str | float | None) -> str | float | None:
    """
    Normalize and relabel paper size strings into standardized labels.

    This function processes an input value representing paper size descriptions,
    normalizes whitespace and case, and then applies pattern matching 
    to classify and relabel sizes consistently.

    Relabeling rules:

    - 'A4+', 'A4 plus' → 'A4+'
    - 'A4' → 'A4'
    - 'A3L', 'A3 long', 'long A3' → 'SRA3'
    - 'A3' → 'A3'
    - 'A2' → 'A2'
    - 'A1' → 'A1'
    - 'A0' → 'A0'
    - 'A5' with '6pp' → 'A5 6pp'
    - 'A5' without '6pp' → 'A5'
    - 'A6' → 'A6'
    - 'A7' → 'A7'
    - 'B4' → 'B4'
    - Contains 'bus' → 'Business Cards'
    - Contains 'ncr' → 'NCR Pads'
    - Contains 'dl' → 'DL'
    - Anything else → 'custom'

    :param value: Input value (string or NaN).
    :return: Standardized label or original value if NaN.
    """

    if value is None or (isinstance(value, float) and pd.isna(value)):
        return value

    val_norm = re.sub(r'\s+', ' ', str(value).lower()).strip()

    if re.search(r'a4\+|a4 plus', val_norm):
        return 'A4+'
    if re.search(r'\ba4\b', val_norm):
        return 'A4'
    if re.search(r'\b(a3l|a3 long|long a3)\b', val_norm):
        return 'SRA3'
    if re.search(r'\ba3\b', val_norm):
        return 'A3'
    if re.search(r'\ba2\b', val_norm):
        return 'A2'
    if re.search(r'\ba1\b', val_norm):
        return 'A1'
    if re.search(r'\ba0\b', val_norm):
        return 'A0'
    if re.search(r'\ba5\b', val_norm):
        if '6pp' in val_norm:
            return 'A5 6pp'
        else:
            return 'A5'
    if re.search(r'\ba6\b', val_norm):
        return 'A6'
    if re.search(r'\ba7\b', val_norm):
        return 'A7'
    if re.search(r'\bb4\b', val_norm):
        return 'B4'
    if 'bus' in val_norm:
        return 'Business Cards'
    if 'dl' or 'ncr' in val_norm:
        return 'DL'

    # Anything else
    return 'custom'

for column in size_fields:
    df[column] = df[column].apply(relabel_sizes)

display(df)


StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 26, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a01b4962-e26d-4ec1-b6d0-18c22b5ccef0)

# Transformation:
# paper_field_1

In [25]:
# Count of nulls in paper_field_1
null_count = df['paper_field_1'].isnull().sum()
print(f"Number of null values in paper_field_1: {null_count}")

# Display rows where paper_field_1 is null
null_paper_check1 = df[df['paper_field_1'].isnull()]
display(null_paper_check1)


StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 27, Finished, Available, Finished)

Number of null values in paper_field_1: 10


SynapseWidget(Synapse.DataFrame, 9feeb804-5cff-4312-b712-301aadd9be9d)

In [26]:
df['paper_field_1'] = df['paper_field_1'].fillna('banner/poster')

null_paper_check2 = df[df['paper_field_1'].isnull()]
display(null_paper_check2)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 28, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, aa4c57b3-1abf-431f-907f-550c7804877d)

In [27]:
def normalize_gsm(value: str | float | None) -> str | float | None:
    """
    Normalize paper weight strings by extracting the first number and appending 'gsm'.

    If the input is a string containing a number (e.g., '130gsm', '150gloss', '170gsm Silk'),
    this function extracts the first number and returns it followed by 'gsm' (e.g., '130gsm').

    If the input is not a string or does not contain any numbers, the original value is returned unchanged.

    :param value: The input value to normalize, typically a string or possibly NaN/None.
    :return: The normalized gsm string or the original value if no number is found or input is not string.
    """

    if not isinstance(value, str):
        return value
    
    # Search for the first number in the string
    match = re.search(r'(\d+)', value)
    if match:
        return f"{match.group(1)}gsm"
    else:
        return value

paper_fields = ['paper_field_1', 'paper_field_2', 'paper_field_3', 'paper_field_4']

for col in paper_fields:
    df[col] = df[col].apply(normalize_gsm)

display(df)


StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 00b4120b-132c-40ed-ba86-fac3b621a0d3)

# Transformation:
# delivery_address

In [28]:
df.loc[df['delivery_address'].str.lower().str.contains('pick up|collection', na=False), 'delivery_address'] = 'Collection'

display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8db8b955-fb1c-45d5-87d1-6b7aa7e12cca)

In [29]:
import re

postcode_col = 'delivery_address'

# Updated UK postcode pattern allowing optional trailing comma/period/space
uk_postcode_pattern = r'(GIR 0AA|[A-Z]{1,2}\d{1,2} ?\d[ABD-HJLNP-UW-Z]{2}|[A-Z]{1,2}\d[A-Z] ?\d[ABD-HJLNP-UW-Z]{2})[,.\s]*$'

def has_valid_postcode(address: str) -> bool:
    if not isinstance(address, str):
        return False
    address_upper = address.upper().strip()
    # Check if postcode occurs near the end allowing trailing punctuation
    return bool(re.search(uk_postcode_pattern, address_upper))

valid_postcode_mask = df[postcode_col].apply(has_valid_postcode)

invalid_postcode_rows = df[~valid_postcode_mask]

display(invalid_postcode_rows)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, da98bc2f-87cf-4801-ba37-1f510a431597)

In [30]:
# Dictionary of patterns and their replacements
address_replacements = {
    "Ultimate Print Office": "Ossington Rd, Kneesall, Newark NG22 0AB",
    "address": "Menu Address",
    "menu": "Menu Address",
    "us": "Company Address"
}

# Apply replacements
for keyword, replacement in address_replacements.items():
    df.loc[df['delivery_address'].str.contains(keyword, case=False, na=False), 'delivery_address'] = replacement


display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 32, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a0eb6ce6-40a7-4512-b830-cc977a869084)

# Transformation: 
# contact_number, delivery_mobile_number

In [31]:
def clean_phone_number(number: str) -> str:
    """
    Cleans UK phone numbers by replacing '+44' with '0', removing non-digits,
    and adding a space after the first 5 digits.

    :param number: Original phone number string
    :return: Cleaned phone number in the format '07986 123456'
    """

    if not isinstance(number, str):
        return number

    # Replace +44 with 0
    number = number.replace('+44', '0')

    # Remove all non-digit characters
    digits = re.sub(r'\D', '', number)

    # Format with space after 5 digits if long enough
    if len(digits) > 5:
        cleaned = f"{digits[:5]} {digits[5:]}"
    else:
        cleaned = digits

    # Truncate to max 12 characters to avoid multiple numbers stuck together
    # Only gets the first number
    return str(cleaned[:12])

# Apply to both columns

for mobile_number in ["contact_number", "delivery_mobile_number"]:
    df[mobile_number] = df[mobile_number].apply(clean_phone_number)

display(df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 33, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8ad2b8d5-d09c-4f93-b4bd-0d2a90784408)

In [32]:
for col in df.columns:
    print(f"Column: {col}, unique types: ", df[col].map(type).unique())

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 34, Finished, Available, Finished)

Column: booking_number, unique types:  [<class 'int'>]
Column: company_name, unique types:  [<class 'str'>]
Column: job_name, unique types:  [<class 'str'>]
Column: order_date, unique types:  [<class 'pandas._libs.tslibs.timestamps.Timestamp'>
 <class 'pandas._libs.tslibs.nattype.NaTType'>]
Column: date_required, unique types:  [<class 'pandas._libs.tslibs.nattype.NaTType'>
 <class 'pandas._libs.tslibs.timestamps.Timestamp'>]
Column: ordered_by, unique types:  [<class 'str'>]
Column: contact_number, unique types:  [<class 'float'> <class 'str'>]
Column: quantity_field_1, unique types:  [<class 'float'>]
Column: size_field_1, unique types:  [<class 'str'> <class 'float'>]
Column: paper_field_1, unique types:  [<class 'str'> <class 'int'>]
Column: description_field_1, unique types:  [<class 'str'> <class 'datetime.datetime'> <class 'float'>]
Column: quantity_field_2, unique types:  [<class 'float'>]
Column: size_field_2, unique types:  [<class 'float'> <class 'str'>]
Column: paper_field_

In [33]:
# List of columns with mixed types
mixed_type_cols = [
    'contact_number', 'size_field_1', 'paper_field_1', 'description_field_1',
    'size_field_2', 'paper_field_2', 'description_field_2',
    'size_field_3', 'paper_field_3', 'description_field_3',
    'size_field_4', 'paper_field_4', 'description_field_4',
    'delivery_address', 'delivery_mobile_number', 'delivery_email',
    'delivery_instructions',
]

# Convert all mixed-type columns to string (handle NaNs properly)
for col in mixed_type_cols:
    df[col] = df[col].astype(str).replace('nan', None)

for col in df.columns:
    print(col)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 35, Finished, Available, Finished)

booking_number
company_name
job_name
order_date
date_required
ordered_by
contact_number
quantity_field_1
size_field_1
paper_field_1
description_field_1
quantity_field_2
size_field_2
paper_field_2
description_field_2
quantity_field_3
size_field_3
paper_field_3
description_field_3
quantity_field_4
size_field_4
paper_field_4
description_field_4
delivery_contact
delivery_address
delivery_mobile_number
delivery_email
delivery_instructions


# Transforming:
### Normalising database to 3NF
### Sales

In [34]:
import pandas as pd

# Load Excel
sales_df = pd.read_excel(LAKEHOUSE_BASE + "SALES DATA.xlsx")

# Keep only necessary columns
sales_df = sales_df[["No.", "Date", "Amount", "Sales Order No"]]

# Rename for consistency
sales_df.columns = ["invoice_number", "invoice_date", "amount", "booking_number"]

# Change invoice_date to date time
sales_df["invoice_date"] = pd.to_datetime(sales_df["invoice_date"], errors="coerce", dayfirst=True)

display(sales_df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 36, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 186c54a8-57ac-44fb-811d-ab5f3735d84f)

### Customers

In [35]:
# 1. Extract unique customers from df
customers_df = df[["company_name", "contact_number"]].drop_duplicates().reset_index(drop=True)

# 2. Assign a customer_id
customers_df["customer_id"] = customers_df.index + 1

# 3. Merge customer_id back into df so we can use it downstream
df = df.merge(customers_df, on=["company_name", "contact_number"], how="left")

display(customers_df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 37, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6d0fa189-3c3d-419c-a64e-3507734891e3)

### Bookings

In [36]:
bookings_df = df[[
    "booking_number",
    "job_name",
    "order_date",
    "date_required",
    "customer_id"
]].drop_duplicates()

display(bookings_df)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 38, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9679fbd6-972c-45ee-bf79-3f5d5d5a2bc3)

### Items

In [37]:
# Start by creating a list to store each item
items = []

# Go through each row in df
for _, row in df.iterrows():
    booking_number = row["booking_number"]
    
    # We expect up to 4 item fields per booking
    for i in range(1, 5):
        quantity = row.get(f"quantity_field_{i}")
        size = row.get(f"size_field_{i}")
        paper = row.get(f"paper_field_{i}")
        description = row.get(f"description_field_{i}")
        
        # Skip empty items (you can adjust this logic if needed)
        if pd.notna(quantity) or pd.notna(size) or pd.notna(paper) or pd.notna(description):
            items.append({
                "booking_number": booking_number,
                "item_number": i,
                "quantity": quantity,
                "size": size,
                "paper": paper,
                "description": description
            })

# Create the normalized item table
# And drops the null-quantity values
items_df = pd.DataFrame(items).dropna(subset=["quantity"])

display(items_df)


StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 39, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2b783d03-a5c2-424f-8fde-cd60dbe0b064)

SynapseWidget(Synapse.DataFrame, 351eb558-74d3-4980-bddb-be950d0c67ba)

SynapseWidget(Synapse.DataFrame, def9bbaf-179f-4657-9f5a-562c305c7f6b)

SynapseWidget(Synapse.DataFrame, cde9b4e1-39c2-4737-974a-0ee51b0a8051)

In [38]:
for types in [items_df, bookings_df, customers_df, sales_df]:
    print(types.dtypes)

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 40, Finished, Available, Finished)

booking_number      int64
item_number         int64
quantity          float64
size               object
paper              object
description        object
dtype: object
booking_number             Int64
job_name                  object
order_date        datetime64[ns]
date_required     datetime64[ns]
customer_id                int64
dtype: object
company_name      object
contact_number    object
customer_id        int64
dtype: object
invoice_number             int64
invoice_date      datetime64[ns]
amount                   float64
booking_number             int64
dtype: object


# Load:
### Converting to Spark dataframe
### to save as deltatable in the lakehouse

In [39]:
items_sdf = spark.createDataFrame(items_df).write.format("delta").saveAsTable("items")
bookings_sdf = spark.createDataFrame(bookings_df).write.format("delta").saveAsTable("bookings")
customers_sdf = spark.createDataFrame(customers_df).write.format("delta").saveAsTable("customers")
sales_sdf = spark.createDataFrame(sales_df).write.format("delta").saveAsTable("sales")

StatementMeta(, b166b3a6-3fd6-4647-bb09-22b2edcc894f, 41, Finished, Available, Finished)

AnalysisException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `g10_q1_lakehouse`.`items` because it already exists.
Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects.

# 1. Total Sales per Customer

In [1]:
%%sql
SELECT
  c.customer_id,
  c.company_name,
  ROUND(SUM(s.amount), 2) AS total_revenue
FROM customers c
JOIN bookings b ON c.customer_id = b.customer_id
JOIN sales s ON b.booking_number = s.booking_number
GROUP BY c.customer_id, c.company_name
ORDER BY total_revenue DESC;


StatementMeta(, 13f7e730-2c88-40a5-a872-e23e5821c0f1, 2, Finished, Available, Finished)

<Spark SQL result set with 111 rows and 3 fields>

# 2. Top 5 Customers by Revenue

In [2]:
%%sql
WITH cust_rev AS (
  SELECT b.customer_id, SUM(s.amount) AS revenue
  FROM bookings b
  JOIN sales s ON b.booking_number = s.booking_number
  GROUP BY b.customer_id
)
SELECT
  cr.customer_id,
  c.company_name,
  cr.revenue
FROM cust_rev cr
JOIN customers c ON cr.customer_id = c.customer_id
ORDER BY cr.revenue DESC
LIMIT 5;


StatementMeta(, 47dbc66f-eb63-4fa9-8456-c1f906f93e3e, 3, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 3 fields>

# 3. Monthly Sales Trend

In [3]:
%%sql
SELECT
  DATE_TRUNC('month', s.invoice_date) AS month,
  SUM(s.amount) AS monthly_revenue
FROM sales s
GROUP BY 1
ORDER BY 1;


StatementMeta(, 47dbc66f-eb63-4fa9-8456-c1f906f93e3e, 4, Finished, Available, Finished)

<Spark SQL result set with 4 rows and 2 fields>

# 4. Average Invoice Amount by Month

In [11]:
%%sql
SELECT
  DATE_TRUNC('month', invoice_date) AS month,
  ROUND(AVG(amount), 2) AS avg_invoice_amt
FROM sales
GROUP BY 1
ORDER BY 1;


StatementMeta(, 47dbc66f-eb63-4fa9-8456-c1f906f93e3e, 12, Finished, Available, Finished)

<Spark SQL result set with 4 rows and 2 fields>

# 5. Total Quantity Sold by Size

In [5]:
%%sql
SELECT
  i.size,
  SUM(i.quantity) AS total_quantity
FROM items i
GROUP BY i.size
ORDER BY total_quantity DESC;


StatementMeta(, 47dbc66f-eb63-4fa9-8456-c1f906f93e3e, 6, Finished, Available, Finished)

<Spark SQL result set with 15 rows and 2 fields>

# 6. Revenue by Paper Type

In [6]:
%%sql
SELECT
  i.paper,
  SUM(s.amount) AS revenue
FROM items i
JOIN bookings b ON i.booking_number = b.booking_number
JOIN sales s ON b.booking_number = s.booking_number
GROUP BY i.paper
ORDER BY revenue DESC;


StatementMeta(, 47dbc66f-eb63-4fa9-8456-c1f906f93e3e, 7, Finished, Available, Finished)

<Spark SQL result set with 40 rows and 2 fields>

# 7. Average Lead Time (Order to Required Date)

In [7]:
%%sql
SELECT
  AVG(DATEDIFF(b.date_required, b.order_date)) AS avg_lead_days
FROM bookings b;


StatementMeta(, 47dbc66f-eb63-4fa9-8456-c1f906f93e3e, 8, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

# 8. Items per Booking Distribution

In [8]:
%%sql
SELECT
  item_count,
  COUNT(*) AS num_bookings
FROM (
  SELECT booking_number, COUNT(*) AS item_count
  FROM items
  GROUP BY booking_number
) t
GROUP BY item_count
ORDER BY item_count;


StatementMeta(, 47dbc66f-eb63-4fa9-8456-c1f906f93e3e, 9, Finished, Available, Finished)

<Spark SQL result set with 4 rows and 2 fields>

# 9. Bookings without any Sales (Unbilled Orders)

In [13]:
%%sql
SELECT
  b.booking_number,
  b.job_name,
  c.company_name
FROM bookings b
LEFT JOIN sales s ON b.booking_number = s.booking_number
JOIN customers c ON b.customer_id = c.customer_id
WHERE s.booking_number IS NULL;


StatementMeta(, 47dbc66f-eb63-4fa9-8456-c1f906f93e3e, 14, Finished, Available, Finished)

<Spark SQL result set with 700 rows and 3 fields>

# 10. Backlog Orders (Required Date After Today)

In [10]:
%%sql
SELECT
  b.booking_number,
  b.job_name,
  b.date_required,
  DATEDIFF(b.date_required, CURRENT_DATE()) AS days_until_due
FROM bookings b
WHERE b.date_required > CURRENT_DATE()
ORDER BY days_until_due;


StatementMeta(, 47dbc66f-eb63-4fa9-8456-c1f906f93e3e, 11, Finished, Available, Finished)

<Spark SQL result set with 20 rows and 4 fields>