In [2]:
from pathlib import Path
import csv
import json
import shutil
from datetime import datetime, timedelta, timezone
import gzip, csv, shutil

In [4]:
# Constants
ROOT = Path("Pipeline_watch_data")
LANDING = ROOT / "landing"
REFERENCE = ROOT / "reference"
REPORTS = ROOT / "reports"
BRONZE = ROOT / "bronze"
QUAR = ROOT / "quarantine"
REJECTS = ROOT/ "rejects"

for folder in [REFERENCE, REPORTS, BRONZE, QUAR, REJECTS]:
    folder.mkdir(parents=True, exist_ok=True)

In [5]:
#Necssary constants
FRESHNESS_HOURS = 100
MIN_RECORDS = 1000
MAX_ERROR_RATE = 0.02
LAT_MAX = 60000
BYTES_MAX = 5_000_000
MAX_FILE_SIZE_MB = 50
REQUIRED_COULUMNS = {"event_time",
                     "user_id",
                     "session_id",
                     "country",
                     "page",
                     "referrer",
                     "bytes_sent",
                     "latency_ms"}
UNKNOWN_REF={"", "unknown", "n/a"}

In [6]:
allowed = {}
with open(REFERENCE / "allowed_countries.csv","r") as f:
    for row in csv.DictReader(f):
        allowed.setdefault(row['tenant'], set()).add(row['allowed_country'])

with open(REFERENCE/ "baseline_latency.json", "r") as f:
    baseline_latency = json.load(f)
    print(baseline_latency)

{'tenantA': 300, 'tenantB': 450, 'tenantC': 400}


In [7]:
# at top
now_utc = datetime.now(timezone.utc)
files = sorted(LANDING.glob("*.csv.gz"))
stage_status = {}

for fpath in files:
    stem = fpath.name.replace(".csv.gz", "")
    record = {'file_name': fpath.name, 'stage':'file_checks', "status": None, "reason": None}

    try:
        # safer split in case tenant has underscores later
        tenant, hour_str = stem.split("_", 1)         # e.g. tenantA , 2025-10-03T16

        # parse exactly the pattern in filenames
        file_hour = datetime.strptime(hour_str, "%Y-%m-%dT%H").replace(tzinfo=timezone.utc)

        # if you reach here, parsing worked — do your normal processing...
        record["status"] = "OK"
        stage_status[fpath.name] = record

    except Exception as e:
        record["status"] = 'REJECT'
        record["reason"] = ['BAD_NAME', str(e)]
        stage_status[fpath.name] = record
        dest = REJECTS / 'tenants' / 'unknown' / 'date=unknown' / 'hour=unknown'
        dest.mkdir(parents=True, exist_ok=True)
        shutil.copy2(fpath, dest / fpath.name)
        continue

    #Check if the file size is too large
    size_ok = 0 < fpath.stat().st_size < MAX_FILE_SIZE_MB * 1024 * 1024
    if fpath.stat().st_size > MAX_FILE_SIZE_MB * 1024 * 1024:
           print(f"File {fpath} is too large: {fpath.stat().st_size / (1024*1024):.2f)} MB")

    fresh_enough = (now_utc - file_hour) <= timedelta(hours=FRESHNESS_HOURS)
    if fresh_enough:
        print(f"File {fpath} is fresh enough: {now_utc - file_hour}")
    
    if not size_ok or not fresh_enough:
        reasons = []
        if not size_ok: reasons.append('BAD_SIZE')
        if not fresh_enough: reasons.append('STALE')
        record.update({'status':'REJECT', 'reason':reasons, 'tenant': tenant, 'file_hour_utc': file_hour.isoformat()})
        stage_status[fpath.name] = record
        dest = REJECTS / 'tenants' / 'unknown' / 'date=unknown' / 'hour=unknown'
        dest.mkdir(parents=True, exist_ok=True)
        shutil.copy2(fpath, dest / fpath.name) 
    else:
         record.update({'status': 'OK_TO_READ', 'tenant': tenant, 'file_hour_utc': file_hour.isoformat()})
         stage_status[fpath] = record

File Pipeline_watch_data\landing\tenantA_2025-10-03T16.csv.gz is fresh enough: 4 days, 2:02:07.404433
File Pipeline_watch_data\landing\tenantA_2025-10-03T17.csv.gz is fresh enough: 4 days, 1:02:07.404433
File Pipeline_watch_data\landing\tenantA_2025-10-03T18.csv.gz is fresh enough: 4 days, 0:02:07.404433
File Pipeline_watch_data\landing\tenantB_2025-10-03T17.csv.gz is fresh enough: 4 days, 1:02:07.404433
File Pipeline_watch_data\landing\tenantB_2025-10-03T18.csv.gz is fresh enough: 4 days, 0:02:07.404433
File Pipeline_watch_data\landing\tenantC_2025-10-03T17.csv.gz is fresh enough: 4 days, 1:02:07.404433
File Pipeline_watch_data\landing\tenantC_2025-10-03T18.csv.gz is fresh enough: 4 days, 0:02:07.404433


In [8]:
stage_status

{'tenant_-10-03T16.csv.gz': {'file_name': 'tenant_-10-03T16.csv.gz',
  'stage': 'file_checks',
  'status': 'REJECT',
  'reason': ['BAD_NAME',
   "time data '-10-03T16' does not match format '%Y-%m-%dT%H'"]},
 'tenantA_2025-10-03T16.csv.gz': {'file_name': 'tenantA_2025-10-03T16.csv.gz',
  'stage': 'file_checks',
  'status': 'OK_TO_READ',
  'reason': None,
  'tenant': 'tenantA',
  'file_hour_utc': '2025-10-03T16:00:00+00:00'},
 WindowsPath('Pipeline_watch_data/landing/tenantA_2025-10-03T16.csv.gz'): {'file_name': 'tenantA_2025-10-03T16.csv.gz',
  'stage': 'file_checks',
  'status': 'OK_TO_READ',
  'reason': None,
  'tenant': 'tenantA',
  'file_hour_utc': '2025-10-03T16:00:00+00:00'},
 'tenantA_2025-10-03T17.csv.gz': {'file_name': 'tenantA_2025-10-03T17.csv.gz',
  'stage': 'file_checks',
  'status': 'OK_TO_READ',
  'reason': None,
  'tenant': 'tenantA',
  'file_hour_utc': '2025-10-03T17:00:00+00:00'},
 WindowsPath('Pipeline_watch_data/landing/tenantA_2025-10-03T17.csv.gz'): {'file_name': 

In [9]:
# Schema validation
for fname, record in stage_status.items():  # fname is string like 'tenantA_2025-10-03T16.csv.gz'
    if record['status'] != 'OK_TO_READ':
        continue

    tenant = record['tenant']
    file_hour = datetime.fromisoformat(record['file_hour_utc'])

    file_path = LANDING / fname   # ✅ join folder + filename

    if not file_path.exists():
        print(f"⚠️ Missing file: {file_path}")
        continue

    with gzip.open(file_path, 'rt', newline='', encoding='utf-8') as gz:
        reader = csv.DictReader(gz)
        cols = set(reader.fieldnames or [])
        if not REQUIRED_COULUMNS.issubset(cols):
            missing = REQUIRED_COULUMNS - cols
            record.update({'status': 'REJECT', 'reason': ['MISSING_COLUMNS', *missing]})
            dest = (
                REJECTS / 'tenants' / tenant /
                f"date={file_hour.date()}" /
                f"hour={file_hour.hour:02d}"
            )
            dest.mkdir(parents=True, exist_ok=True)
            shutil.copy2(file_path, dest / file_path.name)
            print(f"🚫 File {fname} is missing columns: {missing}")
            continue

    print(f"✅ File {fname} passed schema check for tenant {tenant} at hour {file_hour}")

✅ File tenantA_2025-10-03T16.csv.gz passed schema check for tenant tenantA at hour 2025-10-03 16:00:00+00:00
⚠️ Missing file: Pipeline_watch_data\landing\Pipeline_watch_data\landing\tenantA_2025-10-03T16.csv.gz
✅ File tenantA_2025-10-03T17.csv.gz passed schema check for tenant tenantA at hour 2025-10-03 17:00:00+00:00
⚠️ Missing file: Pipeline_watch_data\landing\Pipeline_watch_data\landing\tenantA_2025-10-03T17.csv.gz
✅ File tenantA_2025-10-03T18.csv.gz passed schema check for tenant tenantA at hour 2025-10-03 18:00:00+00:00
⚠️ Missing file: Pipeline_watch_data\landing\Pipeline_watch_data\landing\tenantA_2025-10-03T18.csv.gz
✅ File tenantB_2025-10-03T17.csv.gz passed schema check for tenant tenantB at hour 2025-10-03 17:00:00+00:00
⚠️ Missing file: Pipeline_watch_data\landing\Pipeline_watch_data\landing\tenantB_2025-10-03T17.csv.gz
✅ File tenantB_2025-10-03T18.csv.gz passed schema check for tenant tenantB at hour 2025-10-03 18:00:00+00:00
⚠️ Missing file: Pipeline_watch_data\landing\Pi