In [2]:
pip install pandas schedule

Note: you may need to restart the kernel to use updated packages.


In [3]:
import pandas as pd

# Create the dataframe with your telecom data
data = {
    "customer_id": [1001, 1002, 1003, 1004, 1005],
    "data_used_gb": [5.2, None, 7.8, 15.6, 3.4],
    "calls_made": [25, 40, 32, 55, 18],
    "revenue_inr": [180, 280, 210, None, 120],
    "region": ["delhi", "Mumbai", "chennai", "DELHI", "Kolkata"],
    "date": ["2025/09/25", "2025-09-25", "25-09-2025", "2025-09-25", "2025-09-25"]
}

# Create a DataFrame
telecom_df = pd.DataFrame(data)

# Save to CSV
telecom_df.to_csv("telecom_raw.csv", index=False)

print("✅ telecom_raw.csv saved successfully.")



✅ telecom_raw.csv saved successfully.


In [5]:
#Importing libraries

import os
import time
import pandas as pd
import schedule
from datetime import datetime


In [24]:
#Setting up file paths

RAW_PATH = "telecom_raw.csv"
OUT_DIR  = "output"
OUT_PATH = os.path.join(OUT_DIR, "telecom_cleaned.csv")
TMP_PATH = os.path.join(OUT_DIR, "telecom_cleaned.tmp.csv")
LOG_PATH = os.path.join(OUT_DIR, "etl_run.log")


In [26]:
#creates output folder
os.makedirs(OUT_DIR, exist_ok=True)

#Logs function
def log(msg: str):
    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(LOG_PATH, "a", encoding="utf-8") as f:
        f.write(f"[{ts}] {msg}\n")
    print(f"[{ts}] {msg}")



In [29]:
#Data cleaning function
def clean_frame(df: pd.DataFrame) -> pd.DataFrame:
 
   
# 1) Standardize text columns - Fix text formatting
    if "region" in df.columns:
        df["region"] = df["region"].astype(str).str.strip().str.title()





    # 2) Fill missing numeric values with robust medians – missing numbers

    for col in ["data_used_gb", "calls_made", "revenue_inr"]:
        if col in df.columns:
            if pd.api.types.is_numeric_dtype(df[col]) is False:
                df[col] = pd.to_numeric(df[col], errors="coerce")
            df[col] = df[col].fillna(df[col].median())


    # 3) Parse date; coerce + fill default - Fix date problems

    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"], errors="coerce")
        df["date"] = df["date"].fillna(pd.Timestamp("2025-09-25"))

    # 4) De-duplicate but keep first; log how many were removed - Remove duplicates

    if {"customer_id", "date"}.issubset(df.columns):
        before = len(df)
        df = df.drop_duplicates(subset=["customer_id", "date"], keep="first")
        log(f"Deduplicated: removed {before - len(df)} duplicate row(s).")

    # 5) Clip clearly invalid ranges (safety net) - Keep data realistic

    if "data_used_gb" in df.columns:
        df["data_used_gb"] = df["data_used_gb"].clip(lower=0, upper=100)
    if "revenue_inr" in df.columns:
        df["revenue_inr"] = df["revenue_inr"].clip(lower=0)

    return df



In [30]:
def etl_job():
    try:
        log("Starting ETL...")
        if not os.path.exists(RAW_PATH):
            log(f"Raw file not found: {RAW_PATH}")
            return

        # EXTRACT
        df = pd.read_csv(RAW_PATH)

        # TRANSFORM
        df = clean_frame(df)

        # LOAD (atomic write: write tmp then rename -> avoids half-written files)
        df.to_csv(TMP_PATH, index=False)
        os.replace(TMP_PATH, OUT_PATH)

        log(f"ETL completed successfully. Rows written: {len(df)}.")
    except Exception as e:
        log(f"ETL failed: {e}")


In [31]:
schedule.clear()
schedule.every(10).seconds.do(etl_job)

runs = 3  # run 3 times then stop
for _ in range(runs):
    schedule.run_pending()
    time.sleep(10)   # wait for the next tick

print("✅ Done. Scheduler exited after", runs, "runs.")



[2025-10-29 15:22:41] Starting ETL...
[2025-10-29 15:22:41] Deduplicated: removed 0 duplicate row(s).
[2025-10-29 15:22:41] ETL completed successfully. Rows written: 5.
[2025-10-29 15:22:51] Starting ETL...
[2025-10-29 15:22:51] Deduplicated: removed 0 duplicate row(s).
[2025-10-29 15:22:52] ETL completed successfully. Rows written: 5.
✅ Done. Scheduler exited after 3 runs.
