In [0]:
import pandas as pd

raw_data = [
    # valid
    {"user_id": "U001", "status": "Active", "date": "2025-08-01", "monthly_fee": 16.99, "age": 24},
    # invalid age (negative)
    {"user_id": "U002", "status": "Inactive", "date": "2025-01-15", "monthly_fee": 27.99, "age": -7},
    # missing user_id
    {"user_id": None, "status": "Active", "date": "2026-02-03", "monthly_fee": 49.99, "age": 30},
    # duplicate user_id
    {"user_id": "U001", "status": "ACTIVE", "date": "2025-02-04", "monthly_fee": 16.99, "age": 28},
    # valid
    {"user_id": "U003", "status": "inactive", "date": "2025-11-20", "monthly_fee": None, "age": 22},
    # invalid age (string)
    {"user_id": "U004", "status": "Active", "date": "2025-12-05", "monthly_fee": 27.99, "age": "twenty"},
    # missing user_id
    {"user_id": None, "status": "Inactive", "date": "2025-09-25", "monthly_fee": 23.99, "age": 35},
    # duplicate user_id
    {"user_id": "U003", "status": "Active", "date": "2025-12-06", "monthly_fee": 44.99, "age": 40},
    # valid
    {"user_id": "U005", "status": "Inactive", "date": "2026-01-30", "monthly_fee": 28.99, "age": 27},
    # unrealistic age
    {"user_id": "U006", "status": "Active", "date": "2025-12-02", "monthly_fee": 17.99, "age": 250},
]

df = pd.DataFrame(raw_data)
df.to_csv("subscription.csv", index=False)

### Clean and validate data

In [0]:
# Standardise status to lowercase
df["status"] = df["status"].str.lower()

# Convert date columns to proper datetime
df["date"] = pd.to_datetime(df["date"], errors="coerce")

# Replace missing monthly_fee with the median fee
median_fee = df["monthly_fee"].median()
df["monthly_fee"] = df["monthly_fee"].fillna(median_fee)

# Ensure age is numeric (invalid → NaN)
df["age"] = pd.to_numeric(df["age"], errors="coerce")
df.loc[(df["age"] < 0) | (df["age"] > 120), "age"] = None

# Drop records where user_id is missing or duplicated
df = df.dropna(subset=["user_id"])
df = df.drop_duplicates(subset=["user_id"])

### Engineer new analytical features and answer business questions

In [0]:
# subscription_length_days
from datetime import datetime

today = pd.Timestamp("2026-02-06")
df["subscription_length_days"] = (today - df["date"]).dt.days

# is_active_subscription (1/0)
df["is_active_subscription"] = (df["status"] == "active").astype(int)

# revenue_estimate = monthly_fee × months_active
df["months_active"] = (df["subscription_length_days"] / 30).astype(int) # months_active
df["revenue_estimate"] = df["monthly_fee"] * df["months_active"]


### Integrate an external public API with error handling

In [0]:
import requests

# Simple country codes
df["country_code"] = df["user_id"].apply(
    lambda x: "AU" if x in ["U001", "U002"] else "US" if x in ["U003", "U004"] else "IN"
)

# Call public API
try:
    response = requests.get(
        "https://restcountries.com/v3.1/alpha?codes=AU,US,IN",
        timeout=5
    )
    response.raise_for_status()
    countries = response.json()
except requests.exceptions.RequestException as e:
    print("API error:", e)
    countries = []

# Build lookup dictionary
country_lookup = {
    c["cca2"]: c["name"]["common"]
    for c in countries
    if "cca2" in c and "name" in c
}

# Map country names to dataframe
df["country_name"] = df["country_code"].map(country_lookup)

df.head()


### Load cleaned data into a relational database

In [0]:
import pandas as pd
from sqlalchemy import create_engine, Column, String, Integer, Float, Date
from sqlalchemy.orm import declarative_base, sessionmaker

# Create SQLite database
engine = create_engine("sqlite:///streamsmart.db", echo=False)
Base = declarative_base()

# Define tables
class User(Base):
    __tablename__ = "users"
    user_id = Column(String, primary_key=True)
    age = Column(Integer)
    country_code = Column(String(2))
    country_name = Column(String)

class Subscription(Base):
    __tablename__ = "subscriptions"
    user_id = Column(String, primary_key=True)
    status = Column(String)
    start_date = Column(Date)
    monthly_fee = Column(Float)
    is_active_subscription = Column(Integer)
    months_active = Column(Integer)
    revenue_estimate = Column(Float)

class AnalyticsSummary(Base):
    __tablename__ = "analytics_summary"
    country_code = Column(String(2), primary_key=True)
    country_name = Column(String)
    user_count = Column(Integer)
    total_revenue = Column(Float)
    active_subscriptions = Column(Integer)

# Create tables (drop first to avoid duplicates)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

# Insert USERS table
users = [
    User(
        user_id=row["user_id"],
        age=None if pd.isna(row["age"]) else int(row["age"]),
        country_code=row["country_code"],
        country_name=row["country_name"]
    )
    for _, row in df.iterrows()
]
session.bulk_save_objects(users)
session.commit()

# Insert SUBSCRIPTIONS table
subscriptions = [
    Subscription(
        user_id=row["user_id"],
        status=row["status"],
        start_date=row["date"].date(),
        monthly_fee=float(row["monthly_fee"]),
        is_active_subscription=int(row["is_active_subscription"]),
        months_active=int(row["months_active"]),
        revenue_estimate=float(row["revenue_estimate"])
    )
    for _, row in df.iterrows()
]
session.bulk_save_objects(subscriptions)
session.commit()

# Build & insert ANALYTICS SUMMARY table
summary_df = df.groupby(
    ["country_code", "country_name"]
).agg(
    user_count=("user_id", "count"),
    total_revenue=("revenue_estimate", "sum"),
    active_subscriptions=("is_active_subscription", "sum")
).reset_index()

summaries = [
    AnalyticsSummary(
        country_code=row["country_code"],
        country_name=row["country_name"],
        user_count=int(row["user_count"]),
        total_revenue=float(row["total_revenue"]),
        active_subscriptions=int(row["active_subscriptions"])
    )
    for _, row in summary_df.iterrows()
]
session.bulk_save_objects(summaries)
session.commit()

# Verification
print("Users:", session.query(User).count())
print("Subscriptions:", session.query(Subscription).count())
print("Analytics rows:", session.query(AnalyticsSummary).count())


In [0]:
import pandas as pd

# Read tables from SQLite using pandas
users_df = pd.read_sql("SELECT * FROM users", engine)
subscriptions_df = pd.read_sql("SELECT * FROM subscriptions", engine)
analytics_df = pd.read_sql("SELECT * FROM analytics_summary", engine)

# Print tables
print("\nUSERS TABLE")
print(users_df)

print("\nSUBSCRIPTIONS TABLE")
print(subscriptions_df)

print("\nANALYTICS SUMMARY TABLE")
print(analytics_df)


In [0]:
df.to_csv("subscription_cleaned.csv", index=False)

### Questions to Answer
1. When is it better to drop records vs. impute values, and how does this depend on business context?
  - It is better to drop records when key information (like user_id) is missing or duplicated, because the record cannot be trusted.
  - It is better to impute values when only non-critical data is missing and enough similar data exists, such as replacing a missing fee with the median.
2. What assumptions are you making when estimating revenue this way, and how could they bias conclusions?
  - The revenue estimate assumes users are billed every month and remain subscribed for the full calculated period.
  - This may overestimate revenue if users cancel early, pause subscriptions, or receive discounts.
3. Why should API failures not crash a data pipeline?
  - APIs are external services and can fail due to network or service issues.
  - If a pipeline crashes because of an API failure, the entire data process stops, so error handling ensures the pipeline continues to run safe
4. Why is schema design just as important as data cleaning?
  - Schema design ensures data is stored with correct types, keys, and structure.
  - Without a proper schema, even clean data can become inconsistent, duplicated, or unreliable.
5. How does version control improve trust in data outputs?
  - Version control tracks all changes made to data and code.
  - This allows results to be reproduced, audited, and rolled back, which increases trust in the final outputs.