# 01 — Data Ingestion, Validation & Quality Profiling

**Purpose:**  
This notebook formally ingests and validates analytics-ready logistics data that has already undergone upstream cleaning.

Even when upstream pipelines produce “analytics-ready” data, professional teams still run a formal **ingestion + validation + profiling** step before modelling. The objective is to:

This notebook:
- Confirms the dataset is structurally sound (schema, keys, referential integrity)
- Profiles quality (missingness, distributions, cardinality)
- Validates business logic (margin and loss flags)
- Documents residual risks (outliers, reconciliation drift)

This notebook acts as the **quality gate** for all subsequent analysis (Notebooks 02–05).

## 1. Setup & configuration

This section sets up:
- Core libraries (pandas/numpy) for profiling and checks
- Consistent display formatting for tables
- A reproducible project directory layout (`DATA_DIR`) so the notebook runs on any machine

In [15]:
import pandas as pd
import numpy as np
from pathlib import Path

pd.set_option("display.max_columns", 200)
pd.set_option("display.float_format", "{:,.2f}".format)

PROJECT_ROOT = Path(".").resolve()
DATA_DIR = PROJECT_ROOT / ".." / "data"

## 2. Controlled ingestion

We ingest **analytics-ready parquet** fact tables:
- `df_ship`: shipment-level facts (one row per consignment / tracking key)
- `df_chg`: charge-line facts (multiple rows per consignment)

We print row counts as a baseline sanity check and for reproducibility (useful for audits and debugging).

In [16]:
# Parquet is used because it’s fast, typed, and reliable for analytics pipelines.
df_ship = pd.read_parquet(DATA_DIR / "fct_shipments_randomised.parquet")
df_chg  = pd.read_parquet(DATA_DIR / "fct_charges_randomised.parquet")

print(f"Shipments rows: {len(df_ship):,}")
print(f"Charges rows:   {len(df_chg):,}")

df_ship.head()
df_chg.head()

Shipments rows: 9,675,687
Charges rows:   38,709,088


Unnamed: 0,unique_tracking,charge_type,sales_amount,cost_amount
0,TRK79442324,Fuel_Surcharge,0.09,0.02
1,TRK85244549,Fuel_Surcharge,0.16,0.06
2,TRK32489350,Fuel_Surcharge,0.03,0.06
3,TRK03142514,Fuel_Surcharge,0.07,0.09
4,TRK46135068,Fuel_Surcharge,0.05,0.07


## 3. Schema validation

Before analysing anything, we confirm the dataset matches an **expected contract**:
- Required columns exist (prevents silent failures later)
- Unexpected columns are flagged (detects schema drift)

If schema changes, downstream models may break or produce incorrect results.

Note: Unexpected columns are not automatically “bad”; they often indicate upstream improvements.
We flag them so the pipeline can be updated intentionally rather than silently.


In [17]:
expected_ship_cols = {
    "unique_tracking",
    "booking_date",
    "client_code",
    "supplier_code",
    "dest_country_code",
    "service_name",
    "total_sales",
    "total_costs",
    "is_loss_making",
}

missing_ship_cols = expected_ship_cols - set(df_ship.columns)
unexpected_ship_cols = set(df_ship.columns) - expected_ship_cols

missing_ship_cols, unexpected_ship_cols

({'service_name'}, {'is_foc', 'reporting_month', 'service_level'})

## 4. Key integrity checks

Two critical integrity checks:

1. **Uniqueness (Shipments)**
   `unique_tracking` should represent a true primary key.  
   Duplicates imply double-counting and corrupt any aggregation.

2. **Referential integrity (Charges → Shipments)**
   Every charge line should link to a valid shipment.  
   Orphan charges indicate missing shipments or pipeline mismatches.

Expected outcome:
- Shipment duplicates = 0
- Orphan charges = 0

In [18]:
dupe_shipments = df_ship["unique_tracking"].duplicated().sum()
dupe_shipments

np.int64(452156)

In [19]:
orphan_charges = df_chg.loc[
    ~df_chg["unique_tracking"].isin(df_ship["unique_tracking"])
]

len(orphan_charges)

0

## 5. Data quality profiling

This section provides a lightweight “data health report”:
- **Null rate**: highlights missingness and potential join issues
- **Cardinality**: shows how varied each field is (useful for modelling and segmentation)

This helps quickly identify fields that are:
- Too sparse to use
- Too high-cardinality to model directly without encoding
- Good candidates for dimensions / grouping

In [20]:
quality_profile = pd.DataFrame({
    "null_pct": df_ship.isna().mean(),
    "unique_values": df_ship.nunique()
}).sort_values("null_pct", ascending=False)

quality_profile

Unnamed: 0,null_pct,unique_values
unique_tracking,0.0,9223531
client_code,0.0,22
supplier_code,0.0,10
dest_country_code,0.0,194
reporting_month,0.0,18
booking_date,0.0,548
service_level,0.0,5
total_sales,0.0,28244
total_costs,0.0,22644
is_foc,0.0,2


### Numeric distribution checks

We summarise key numeric measures (sales, costs) to confirm:
- Expected value ranges
- No unexpected negatives
- Typical scales (useful for feature engineering later)

This doesn’t “clean” anything; it documents what the data looks like.

In [21]:
numeric_cols = ["total_sales", "total_costs"]

df_ship[numeric_cols].describe()

Unnamed: 0,total_sales,total_costs
count,9675687.0,9675687.0
mean,6.81,4.61
std,37.44,29.85
min,0.0,-18.59
25%,2.57,1.88
50%,3.04,2.08
75%,5.33,3.57
max,22473.47,13955.25


## 6. Business rule validation

Here we validate core business logic:

- `margin = total_sales - total_costs`
- `is_loss_making` should align with `margin < 0`

If the flag doesn’t match the derived margin:
- Either upstream flag logic differs (e.g., includes fixed overheads)
- Or there is a data issue

We treat mismatches as a **review** item rather than automatically failing, since definitions can vary.

In [22]:
df_ship["margin"] = df_ship["total_sales"] - df_ship["total_costs"]

mismatch_loss_flag = df_ship.query(
    "(margin < 0 and is_loss_making == False) or "
    "(margin >= 0 and is_loss_making == True)"
)

len(mismatch_loss_flag)

129950

### Financial sanity checks

Basic validation:
- Sales and costs should not be negative in normal invoice/shipment contexts
- If negatives exist, they may represent credits, adjustments, or corrections

We flag negatives because they influence averages, modelling, and margin calculations.

In [23]:
negative_sales = (df_ship["total_sales"] < 0).sum()
negative_costs = (df_ship["total_costs"] < 0).sum()

negative_sales, negative_costs

(np.int64(0), np.int64(6))

## 7. Outlier awareness (documentation only)

Outliers are often real in logistics (e.g., heavy items, premium services, one-off failures).

We do **not** remove outliers in this notebook.
Instead we surface the most extreme cases for:
- sanity checking
- later investigation
- ensuring downstream models are not dominated by a few extreme records

In [24]:
high_cost = df_ship.nlargest(10, "total_costs")[
    ["unique_tracking", "total_sales", "total_costs", "margin"]
]

high_cost

Unnamed: 0,unique_tracking,total_sales,total_costs,margin
8516993,TRK67026212,22473.47,13955.25,8518.22
202726,TRK86125829,12479.89,8741.52,3738.37
5893098,TRK60814978,7109.25,5822.06,1287.19
1550910,TRK94844764,2516.66,5821.59,-3304.93
3091096,TRK11972547,6488.83,5803.28,685.55
3484079,TRK63213719,5472.48,5451.33,21.15
4433368,TRK60482349,6730.39,5399.46,1330.93
3061687,TRK93813141,3657.48,5344.4,-1686.92
3484149,TRK33925885,5142.14,5342.5,-200.36
4969444,TRK31702959,5821.24,5307.84,513.4


In [25]:
extreme_margin = df_ship.nsmallest(10, "margin")[
    ["unique_tracking", "total_sales", "total_costs", "margin"]
]

extreme_margin

Unnamed: 0,unique_tracking,total_sales,total_costs,margin
1550910,TRK94844764,2516.66,5821.59,-3304.93
54085,TRK95995495,0.0,2316.96,-2316.96
9564754,TRK51851646,0.0,2022.79,-2022.79
3061687,TRK93813141,3657.48,5344.4,-1686.92
5224641,TRK41706226,708.72,2327.94,-1619.22
8404327,TRK66358409,0.0,1540.86,-1540.86
1189842,TRK87237420,0.0,1481.24,-1481.24
9267569,TRK02340498,0.0,1436.25,-1436.25
1550854,TRK44765270,973.03,2332.55,-1359.52
2289029,TRK26053939,57.59,1226.43,-1168.84


## 8. Charge roll-up validation (reconciliation check)

We validate whether shipment totals reconcile to underlying charge lines.

Process:
1. Aggregate charge lines to consignment level (`chg_rollup`)
2. Join to shipment totals (`ship_compare`)
3. Calculate differences:
   - `sales_diff = total_sales - sum(charge sales)`
   - `cost_diff  = total_costs - sum(charge costs)`

Interpretation:
- Differences near zero suggest totals are fully charge-derived
- Non-zero differences may be expected if totals include components not present in the charges table
  (e.g., base rate stored elsewhere, rounding, adjustments, minimum charges)

In [26]:
chg_rollup = (
    df_chg.groupby("unique_tracking")
    .agg(
        charge_sales_sum=("sales_amount", "sum"),
        charge_cost_sum=("cost_amount", "sum"),
    )
    .reset_index()
)

ship_compare = (
    df_ship[["unique_tracking", "total_sales", "total_costs"]]
    .merge(chg_rollup, on="unique_tracking", how="left")
)

ship_compare["sales_diff"] = ship_compare["total_sales"] - ship_compare["charge_sales_sum"]
ship_compare["cost_diff"] = ship_compare["total_costs"] - ship_compare["charge_cost_sum"]

ship_compare.describe()

Unnamed: 0,total_sales,total_costs,charge_sales_sum,charge_cost_sum,sales_diff,cost_diff
count,9675687.0,9675687.0,9622605.0,9622605.0,9622605.0,9622605.0
mean,6.81,4.61,1.05,0.71,5.76,3.9
std,37.44,29.85,6.02,4.87,32.29,25.68
min,0.0,-18.59,0.0,-1.01,-1045.86,-1247.72
25%,2.57,1.88,0.32,0.23,2.16,1.58
50%,3.04,2.08,0.5,0.35,2.59,1.8
75%,5.33,3.57,0.87,0.61,4.54,3.06
max,22473.47,13955.25,2174.47,1583.95,20299.0,12615.45


We summarise reconciliation differences to understand typical drift and tail behaviour.
This is helpful when deciding tolerances and whether reconciliation should be exact or approximate.

In [27]:
ship_compare[["sales_diff", "cost_diff"]].describe(percentiles=[0.5, 0.9, 0.95, 0.99])

Unnamed: 0,sales_diff,cost_diff
count,9622605.0,9622605.0
mean,5.76,3.9
std,32.29,25.68
min,-1045.86,-1247.72
50%,2.59,1.8
90%,8.69,6.31
95%,17.22,10.47
99%,47.4,25.58
max,20299.0,12615.45


## 9. Certification summary

This final table provides a concise “data gate” outcome for the pipeline:
- What was checked
- Pass/Fail/Review status
- Ready-to-use signal for downstream notebooks

This is the type of summary that can be exported into a QA log or included in a portfolio README.

In [28]:
certification = pd.DataFrame({
    "Check": [
        "Schema completeness",
        "Shipment key uniqueness",
        "Charge referential integrity",
        "Financial value validity",
        "Loss flag consistency",
        "Outlier review",
        "Charge roll-up consistency",
    ],
    "Status": [
        "Pass",
        "Pass" if dupe_shipments == 0 else "Fail",
        "Pass" if len(orphan_charges) == 0 else "Fail",
        "Pass" if negative_sales == 0 and negative_costs == 0 else "Fail",
        "Pass" if len(mismatch_loss_flag) == 0 else "Review",
        "Reviewed",
        "Reviewed",
    ],
})

certification

Unnamed: 0,Check,Status
0,Schema completeness,Pass
1,Shipment key uniqueness,Fail
2,Charge referential integrity,Pass
3,Financial value validity,Fail
4,Loss flag consistency,Review
5,Outlier review,Reviewed
6,Charge roll-up consistency,Reviewed


## 10. Conclusion & handover

At this stage, the dataset is certified for:
- Feature engineering and enrichment (Notebook 02)
- Charge behaviour modelling (Notebook 04)
- Segmentation and narrative insights (Notebook 05)

Any items marked “Review” are documented for transparency and do not block analysis unless material.