In [2]:
import os
import numpy as np
import pandas as pd

SEED = 42
rng = np.random.default_rng(SEED)

PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), "..")) if os.path.basename(os.getcwd()) == "notebooks" else os.getcwd()
RAW_DIR = os.path.join(PROJECT_ROOT, "data", "raw")
os.makedirs(RAW_DIR, exist_ok=True)

fact_machine_sales = pd.read_csv(os.path.join(RAW_DIR, "fact_machine_sales.csv"), parse_dates=["sale_date"])
fact_telematics = pd.read_csv(os.path.join(RAW_DIR, "fact_telematics_daily.csv"), parse_dates=["snapshot_date"])

dim_dealer = pd.read_csv(os.path.join(RAW_DIR, "dim_dealer.csv"))
dim_failure = pd.read_csv(os.path.join(RAW_DIR, "dim_failure_code.csv"))
dim_machine_model = pd.read_csv(os.path.join(RAW_DIR, "dim_machine_model.csv"))

print("Loaded inputs ✅")


Loaded inputs ✅


In [3]:
# Merge sale_date onto telematics rows
tele = fact_telematics.merge(
    fact_machine_sales[["machine_id", "sale_date", "dealer_id", "model_id", "warranty_months"]],
    on="machine_id",
    how="left"
)

# Machine age in days at snapshot
tele["age_days"] = (tele["snapshot_date"] - tele["sale_date"]).dt.days.clip(lower=0)

# Normalize components for risk
health_component = (100 - tele["health_score"]) / 100.0      
error_component  = np.clip(tele["error_code_count"] / 6.0, 0, 1)  
usage_component  = np.clip(tele["engine_hours"] / tele["engine_hours"].quantile(0.95), 0, 1)

age_component    = np.clip(tele["age_days"] / 365.0, 0, 1)   

# Weighted risk score
tele["risk_score"] = (
    0.45 * health_component +
    0.30 * error_component +
    0.15 * usage_component +
    0.10 * age_component
)

tele["risk_score"] = np.clip(tele["risk_score"], 0, 1)

tele[["machine_id", "snapshot_date", "health_score", "error_code_count", "engine_hours", "risk_score"]].head()


Unnamed: 0,machine_id,snapshot_date,health_score,error_code_count,engine_hours,risk_score
0,MC00001,2023-03-07,98.9,0,5.12,0.090378
1,MC00001,2023-03-08,94.2,0,4.86,0.107464
2,MC00001,2023-03-09,94.1,0,7.05,0.144729
3,MC00001,2023-03-10,97.2,0,5.51,0.105357
4,MC00001,2023-03-11,91.9,0,4.88,0.11897


In [4]:
TARGET_TOTAL_JOBS = 12000
TARGET_BREAKDOWNS = int(TARGET_TOTAL_JOBS * 0.55)

# Sample candidate rows weighted by risk_score
# We oversample then take unique machine-date events to avoid duplicates
candidate = tele.sample(n=TARGET_BREAKDOWNS * 3, weights="risk_score", random_state=SEED)

# Keep unique machine + date to avoid multiple breakdowns same day
candidate = candidate.drop_duplicates(subset=["machine_id", "snapshot_date"])

breakdowns = candidate.head(TARGET_BREAKDOWNS).copy()
breakdowns["job_type"] = "Breakdown"

len(breakdowns), breakdowns["risk_score"].describe()


(6600,
 count    6600.000000
 mean        0.245231
 std         0.081808
 min         0.032404
 25%         0.187282
 50%         0.239576
 75%         0.297514
 max         0.573294
 Name: risk_score, dtype: float64)

In [5]:
# Split failure codes by severity
fail_low = dim_failure[dim_failure["severity"] == "Low"]["failure_code"].tolist()
fail_med = dim_failure[dim_failure["severity"] == "Medium"]["failure_code"].tolist()
fail_high= dim_failure[dim_failure["severity"] == "High"]["failure_code"].tolist()

def pick_failure_code(risk):
    if risk > 0.65:
        # higher chance of high severity
        return rng.choice(fail_high if rng.random() < 0.65 else fail_med)
    elif risk > 0.40:
        return rng.choice(fail_med if rng.random() < 0.70 else fail_low)
    else:
        return rng.choice(fail_low if rng.random() < 0.75 else fail_med)

breakdowns["failure_code"] = breakdowns["risk_score"].apply(pick_failure_code)

breakdowns = breakdowns.merge(
    dim_failure[["failure_code", "severity", "typical_downtime_days"]],
    on="failure_code",
    how="left"
)

breakdowns.head()


Unnamed: 0,snapshot_date,machine_id,engine_hours,idle_pct,fuel_consumption_liters,error_code_count,health_score,sale_date,dealer_id,model_id,warranty_months,age_days,risk_score,job_type,failure_code,severity,typical_downtime_days
0,2024-07-04,MC00563,4.7,7.4,84.08,1,87.5,2023-12-26,D021,M01,24,191,0.236999,Breakdown,F006,Medium,4
1,2025-08-13,MC01426,7.89,38.2,151.74,1,85.1,2024-10-20,D024,M02,12,297,0.330066,Breakdown,F006,Medium,4
2,2024-08-22,MC01098,7.44,36.1,129.81,1,88.5,2023-11-13,D031,M02,12,283,0.303422,Breakdown,F003,Low,1
3,2023-06-20,MC00898,10.61,24.5,275.64,1,90.5,2023-02-21,D035,M05,12,119,0.275353,Breakdown,F001,Medium,3
4,2024-02-18,MC00235,5.57,32.1,148.76,1,92.3,2023-12-26,D003,M06,12,54,0.192381,Breakdown,F008,Medium,5


In [6]:
TARGET_PREVENTIVE = int(TARGET_TOTAL_JOBS * 0.35)
TARGET_INSPECTION = TARGET_TOTAL_JOBS - TARGET_BREAKDOWNS - TARGET_PREVENTIVE

machines = fact_machine_sales["machine_id"].tolist()

# pick random preventive job dates per machine (0 to 4 per machine)
preventive_counts = rng.choice([0,1,2,3,4], size=len(machines), p=[0.15,0.30,0.30,0.18,0.07])
total_prev = preventive_counts.sum()

# scale down/up to target roughly
scale = TARGET_PREVENTIVE / max(total_prev, 1)
preventive_counts = np.floor(preventive_counts * scale).astype(int)
# ensure at least some jobs
preventive_counts = np.clip(preventive_counts, 0, 4)

prev_rows = []
date_min = pd.Timestamp("2023-01-01")
date_max = pd.Timestamp("2025-12-31")

for mid, k in zip(machines, preventive_counts):
    if k == 0:
        continue
    sale_date = fact_machine_sales.loc[fact_machine_sales["machine_id"] == mid, "sale_date"].iloc[0]
    # preventive visits after sale date
    start = max(sale_date + pd.Timedelta(days=15), date_min)
    end = date_max
    if start >= end:
        continue
    # choose k dates uniformly
    chosen = pd.to_datetime(rng.integers(start.value//10**9, end.value//10**9, size=k), unit="s").normalize()
    for d in chosen:
        prev_rows.append((mid, d))

preventive = pd.DataFrame(prev_rows, columns=["machine_id", "service_date"]).drop_duplicates()
# If too many, sample down; if too few, we’ll top-up later
if len(preventive) > TARGET_PREVENTIVE:
    preventive = preventive.sample(n=TARGET_PREVENTIVE, random_state=SEED)
preventive["job_type"] = "Preventive"
preventive["failure_code"] = pd.NA

len(preventive)


3315

In [7]:
# Inspection jobs: fewer, short downtime, no failure code typically
inspection = preventive.sample(n=min(TARGET_INSPECTION, len(preventive)), random_state=SEED).copy()
inspection["job_type"] = "Inspection"
inspection["failure_code"] = pd.NA
inspection.rename(columns={"service_date": "service_date"}, inplace=True)

# Ensure preventive + inspection counts
len(inspection), len(preventive)


(1200, 3315)

In [8]:
if len(inspection) < TARGET_INSPECTION:
    need = TARGET_INSPECTION - len(inspection)
    extra = pd.DataFrame({
        "machine_id": rng.choice(machines, size=need),
        "service_date": pd.to_datetime(rng.integers(pd.Timestamp("2023-01-01").value//10**9,
                                                   pd.Timestamp("2025-12-31").value//10**9,
                                                   size=need), unit="s").normalize(),
        "job_type": "Inspection",
        "failure_code": pd.NA
    })
    inspection = pd.concat([inspection, extra], ignore_index=True).drop_duplicates(subset=["machine_id","service_date"]).head(TARGET_INSPECTION)

len(inspection)


1200

In [9]:
breakdowns_ren = breakdowns.rename(columns={"snapshot_date": "service_date"})

service_jobs = pd.concat([
    breakdowns_ren[["machine_id", "service_date", "job_type", "failure_code", "severity", "typical_downtime_days"]],
    preventive.assign(severity=pd.NA, typical_downtime_days=pd.NA)[["machine_id","service_date","job_type","failure_code","severity","typical_downtime_days"]],
    inspection.assign(severity=pd.NA, typical_downtime_days=pd.NA)[["machine_id","service_date","job_type","failure_code","severity","typical_downtime_days"]],
], ignore_index=True)

# Remove duplicates (same machine serviced same day)
service_jobs = service_jobs.drop_duplicates(subset=["machine_id","service_date","job_type"])

len(service_jobs), service_jobs["job_type"].value_counts()


(11115,
 job_type
 Breakdown     6600
 Preventive    3315
 Inspection    1200
 Name: count, dtype: int64)

In [10]:
service_jobs = service_jobs.merge(
    fact_machine_sales[["machine_id", "dealer_id", "model_id", "sale_date", "warranty_months"]],
    on="machine_id",
    how="left"
)

service_jobs["warranty_end_date"] = service_jobs["sale_date"] + pd.to_timedelta(service_jobs["warranty_months"] * 30, unit="D")
service_jobs["is_warranty"] = (service_jobs["service_date"] <= service_jobs["warranty_end_date"]).astype(int)


In [11]:
service_jobs = service_jobs.merge(
    dim_dealer[["dealer_id", "tier"]],
    on="dealer_id",
    how="left"
)

tier_multiplier = {"A": 0.85, "B": 1.00, "C": 1.15}

def compute_downtime(row):
    if row["job_type"] == "Preventive":
        return int(np.clip(rng.normal(0.8, 0.5), 0, 3))
    if row["job_type"] == "Inspection":
        return 0
    # Breakdown
    base = row["typical_downtime_days"] if pd.notna(row["typical_downtime_days"]) else 3
    mult = tier_multiplier.get(row["tier"], 1.0)
    val = rng.normal(base * mult, 1.2)
    return int(np.clip(np.round(val), 1, 20))

service_jobs["downtime_days"] = service_jobs.apply(compute_downtime, axis=1)
service_jobs["downtime_days"].describe()


count    11115.000000
mean         1.488529
std          1.572187
min          0.000000
25%          0.000000
50%          1.000000
75%          2.000000
max          9.000000
Name: downtime_days, dtype: float64

In [12]:
def labor_hours(row):
    if row["job_type"] == "Inspection":
        return round(float(np.clip(rng.normal(1.5, 0.8), 0.5, 6)), 1)
    if row["job_type"] == "Preventive":
        return round(float(np.clip(rng.normal(3.0, 1.2), 1, 10)), 1)
    # Breakdown
    sev = row["severity"]
    if sev == "High":
        mu, sd = 10, 3
    elif sev == "Medium":
        mu, sd = 6, 2
    else:
        mu, sd = 3, 1.2
    return round(float(np.clip(rng.normal(mu, sd), 1, 24)), 1)

service_jobs["labor_hours"] = service_jobs.apply(labor_hours, axis=1)

# Labor rate (INR/hr), can vary slightly by tier
rate = {"A": 1500, "B": 1400, "C": 1300}
service_jobs["labor_rate"] = service_jobs["tier"].map(rate).fillna(1400)
service_jobs["service_cost_labor_inr"] = (service_jobs["labor_hours"] * service_jobs["labor_rate"]).round(0).astype(int)


In [13]:
service_jobs = service_jobs.merge(
    dim_failure[["failure_code", "failure_category"]],
    on="failure_code",
    how="left"
)

def resolution_status(row):
    if row["job_type"] != "Breakdown":
        return "Fixed"
    cat = row["failure_category"]
    base_p = 0.08
    if cat in ["Electrical", "Hydraulic"]:
        base_p = 0.14
    if row["severity"] == "High":
        base_p += 0.03
    return "Repeat" if rng.random() < base_p else "Fixed"

service_jobs["resolution_status"] = service_jobs.apply(resolution_status, axis=1)
service_jobs["resolution_status"].value_counts()


resolution_status
Fixed     10451
Repeat      664
Name: count, dtype: int64

In [16]:
existing = set(service_jobs["machine_id"])
all_machines = set(fact_machine_sales["machine_id"])

missing = sorted(list(all_machines - existing))

print("Machines with NO service jobs:", len(missing))
print("Example missing:", missing[:10])



Machines with NO service jobs: 3
Example missing: ['MC00518', 'MC00932', 'MC01473']


In [17]:
if len(missing) > 0:
    extra = pd.DataFrame({
        "machine_id": missing,
        "service_date": pd.to_datetime(rng.integers(
            pd.Timestamp("2023-01-01").value//10**9,
            pd.Timestamp("2025-12-31").value//10**9,
            size=len(missing)
        ), unit="s").normalize(),
        "job_type": "Inspection",
        "failure_code": pd.NA,
        "severity": pd.NA,
        "typical_downtime_days": pd.NA
    })

    # Attach dealer/model/sale data
    extra = extra.merge(
        fact_machine_sales[["machine_id", "dealer_id", "model_id", "sale_date", "warranty_months"]],
        on="machine_id",
        how="left"
    ).merge(
        dim_dealer[["dealer_id", "tier"]],
        on="dealer_id",
        how="left"
    )

    # Warranty flag
    extra["warranty_end_date"] = extra["sale_date"] + pd.to_timedelta(extra["warranty_months"] * 30, unit="D")
    extra["is_warranty"] = (extra["service_date"] <= extra["warranty_end_date"]).astype(int)

    # Inspection fields
    extra["downtime_days"] = 0
    extra["labor_hours"] = 1.5
    extra["labor_rate"] = extra["tier"].map({"A": 1500, "B": 1400, "C": 1300}).fillna(1400)
    extra["service_cost_labor_inr"] = (extra["labor_hours"] * extra["labor_rate"]).round(0).astype(int)
    extra["resolution_status"] = "Fixed"

    # Add to service_jobs
    service_jobs = pd.concat([service_jobs, extra], ignore_index=True)

print("✅ After patch, unique machines:", service_jobs["machine_id"].nunique())


✅ After patch, unique machines: 1500


In [None]:
service_jobs = service_jobs.sort_values(["service_date", "machine_id"]).reset_index(drop=True)
service_jobs["service_job_id"] = ["SJ" + str(i).zfill(7) for i in range(1, len(service_jobs) + 1)]

fact_service_job = service_jobs[[
    "service_job_id",
    "machine_id",
    "service_date",
    "dealer_id",
    "model_id",
    "job_type",
    "failure_code",
    "downtime_days",
    "labor_hours",
    "service_cost_labor_inr",
    "is_warranty",
    "resolution_status"
]].copy()

fact_service_job.to_csv(os.path.join(RAW_DIR, "fact_service_job.csv"), index=False)

fact_service_job.head()

In [19]:
print("Rows:", len(fact_service_job))
print(fact_service_job["job_type"].value_counts())

assert fact_service_job["service_job_id"].is_unique
assert fact_service_job["machine_id"].nunique() == 1500

print("fact_service_job ✅ created")


Rows: 11118
job_type
Breakdown     6600
Preventive    3315
Inspection    1203
Name: count, dtype: int64
fact_service_job ✅ created
