# Deel takehome assignment
Steps:
1. Loads raw CSVs 
2. Builds staging, fact, and dimension tables with **pandas**
3. Runs basic data-quality checks (tests)
4. Computes **day-over-day** balance changes (latest day only) and issues alerts (>50% change)
5. Optionally posts alerts to **Slack** via webhook

**Balance definition:** `balance = SUM(amount) - SUM(payment_amount)` per organization and date.



## 0) Prerequisites

In [3]:
# Install dependencies if needed 
# %pip install pandas requests


## 1) Imports & Configuration

In [5]:
## Imports
import os
import pandas as pd
from datetime import datetime
import numpy as np
import json
import requests

## Set paths to inputs, outputs and Webhook URL
INVOICES_CSV =  r"C:\Users\ajayk\invoices.csv"
ORGS_CSV = r"C:\Users\ajayk\organizations.csv"
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/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"
OUTPUT_DIR =  r"C:\Users\ajayk\local_pipeline_outputs"
os.makedirs(OUTPUT_DIR, exist_ok=True)

print("OUTPUT_DIR:", OUTPUT_DIR)
print("INVOICES_CSV:", INVOICES_CSV)
print("ORGS_CSV:", ORGS_CSV)
print("SLACK_WEBHOOK_URL set?:", bool(SLACK_WEBHOOK_URL))

OUTPUT_DIR: C:\Users\ajayk\local_pipeline_outputs
INVOICES_CSV: C:\Users\ajayk\invoices.csv
ORGS_CSV: C:\Users\ajayk\organizations.csv
SLACK_WEBHOOK_URL set?: True


## 2) Load Raw Data

In [7]:
def load_raw_data(invoices_csv=INVOICES_CSV, orgs_csv=ORGS_CSV):
    if invoices_csv and os.path.exists(invoices_csv) and orgs_csv and os.path.exists(orgs_csv):
        invoices = pd.read_csv(invoices_csv, parse_dates=["CREATED_AT"])
        invoices.columns = invoices.columns.str.lower()
        orgs = pd.read_csv(orgs_csv, parse_dates=["FIRST_PAYMENT_DATE","LAST_PAYMENT_DATE","CREATED_DATE"])
        orgs.columns = orgs.columns.str.lower()
        print("Loaded CSVs from disk.")
    else:
        print("CSV paths not provided or files not found. ")
    return invoices, orgs

invoices_raw, orgs_raw = load_raw_data()
invoices_raw.shape, orgs_raw.shape

Loaded CSVs from disk.


((12708796, 14), (21806, 6))

## 3) Staging Models (pandas)

In [9]:
## Basic data cleaning for stg_invoices
stg_invoices = invoices_raw.copy()
stg_invoices["created_date"] = pd.to_datetime(stg_invoices["created_at"]).dt.date
stg_invoices["amount"] = pd.to_numeric(stg_invoices["amount"], errors="coerce").fillna(0.0)
stg_invoices["payment_amount"] = pd.to_numeric(stg_invoices["payment_amount"], errors="coerce").fillna(0.0)
stg_invoices["organization_id"] = stg_invoices["organization_id"].astype(int)
stg_invoices["invoice_id"] = stg_invoices["invoice_id"].astype(int)

## Basic data cleaning for stg_organizations
stg_organizations = orgs_raw.copy()
stg_organizations["organization_id"] = stg_organizations["organization_id"].astype(int)


## 4) Build Fact & Dimension Tables

In [11]:
# fct_org_daily_balances
fct_org_daily_balances = (
    stg_invoices
    .groupby(["organization_id", "created_date"], as_index=False)
    .agg(total_amount=("amount", "sum"),
         total_payment_amount=("payment_amount", "sum"))
)
fct_org_daily_balances["daily_balance"] = (
    fct_org_daily_balances["total_amount"] - fct_org_daily_balances["total_payment_amount"]
)

agg = stg_invoices.groupby("organization_id").agg(
    total_invoices=("invoice_id", "nunique"),
    total_invoice_amount=("amount", "sum"),
    total_payment_amount=("payment_amount", "sum"),
)
dim_organizations = stg_organizations.merge(agg, on="organization_id", how="left")
# dim_organizations["total_invoices"] = dim_organizations["total_invoices"].fillna(0).astype(int)
# dim_organizations["total_invoice_amount"] = dim_organizations["total_invoice_amount"].fillna(0.0)
# dim_organizations["total_payment_amount"] = dim_organizations["total_payment_amount"].fillna(0.0)

# Save outputs
import os
fct_path = os.path.join(OUTPUT_DIR, "fct_org_daily_balances.csv")
dim_path = os.path.join(OUTPUT_DIR, "dim_organizations.csv")
stg_inv_path = os.path.join(OUTPUT_DIR, "stg_invoices.csv")
stg_org_path = os.path.join(OUTPUT_DIR, "stg_organizations.csv")
for path, df in [(fct_path, fct_org_daily_balances), (dim_path, dim_organizations),
              (stg_inv_path, stg_invoices), (stg_org_path, stg_organizations)]:
    df.to_csv(p, index=False)
    print("Saved:", path)

fct_org_daily_balances.head(5)

NameError: name 'p' is not defined

## 5) Data Quality Checks (Simple Tests)

In [None]:
errors = []

if fct_org_daily_balances[["organization_id", "created_date"]].isnull().any().any():
    errors.append("Nulls found in organization_id or created_date in fact table.")

if fct_org_daily_balances.duplicated(["organization_id", "created_date"]).any():
    errors.append("Duplicate (organization_id, created_date) rows in fact table.")

if stg_invoices["invoice_id"].isnull().any():
    errors.append("Null invoice_id in staging invoices.")

if (fct_org_daily_balances["total_amount"] < 0).any():
    errors.append("Negative total_amount found in fact table.")
if (fct_org_daily_balances["total_payment_amount"] < 0).any():
    errors.append("Negative total_payment_amount found in fact table.")

print("Row counts:")
print("  stg_invoices:", len(stg_invoices))
print("  stg_organizations:", len(stg_organizations))
print("  fct_org_daily_balances:", len(fct_org_daily_balances))
print("  dim_organizations:", len(dim_organizations))

if errors:
    print("\nTEST FAILURES:")
    for e in errors:
        print(" -", e)
else:
    print("\n All basic tests passed")

## 6) Day-over-Day Alerts (Latest Day Only)

In [None]:
fact = fct_org_daily_balances.sort_values(["organization_id", "created_date"]).copy()
fact["prev_balance"] = fact.groupby("organization_id")["daily_balance"].shift(1)

latest_day = f["created_date"].max()
latest = f[f["created_date"] == latest_day].copy()

def pct_change(curr, prev):
    if pd.isna(prev) or prev == 0:
        return np.nan
    return abs(curr - prev) / abs(prev)

latest["pct_change"] = latest.apply(lambda r: pct_change(r["daily_balance"], r["prev_balance"]), axis=1)

alerts = latest[(~latest["prev_balance"].isna()) & (latest["pct_change"] > 0.5)].copy()

alerts_path = os.path.join(OUTPUT_DIR, "alerts_latest_day.csv")
alerts.to_csv(alerts_path, index=False)
print(f"Latest day: {latest_day}")
print(f"Alerts saved: {alerts_path}")
alerts.head(10)

## 7)Send Alerts to Slack

In [None]:
import requests

def post_to_slack(webhook_url, text):
    try:
        resp = requests.post(webhook_url, json={"text": text}, timeout=10)
        return resp.status_code, resp.text
    except Exception as e:
        return None, str(e)

if len(SLACK_WEBHOOK_URL) > 10:
    if alerts.empty:
        msg = f"No balance changes >50% on latest day ({latest_day})."
        status, resp = post_to_slack(os.getenv("SLACK_WEBHOOK_URL"), msg)
        print("Slack response:", status, resp)
    else:
        for _, row in alerts.iterrows():
            msg = (f" *Balance Change Alert*\n"
                   f"*Organization:* {row.organization_id}\n"
                   f"*Date:* {row.created_date}\n"
                   f"*Previous:* {row.prev_balance}\n"
                   f"*Current:* {row.daily_balance}\n"
                   f"*Change:* {row.pct_change}\n")
            status, resp = post_to_slack(SLACK_WEBHOOK_URL, msg)
            print("Slack response:", status, resp)
else:
    print("SLACK_WEBHOOK_URL not set; skipping Slack notifications.")