<a href="https://colab.research.google.com/github/arunabh-alt/CI-CD_Pipeline_Practice/blob/main/ELT_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
!pip install --upgrade opentelemetry-sdk opentelemetry-api


Collecting opentelemetry-sdk
  Downloading opentelemetry_sdk-1.32.1-py3-none-any.whl.metadata (1.6 kB)
Collecting opentelemetry-semantic-conventions==0.53b1 (from opentelemetry-sdk)
  Downloading opentelemetry_semantic_conventions-0.53b1-py3-none-any.whl.metadata (2.5 kB)
Downloading opentelemetry_sdk-1.32.1-py3-none-any.whl (118 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m119.0/119.0 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading opentelemetry_semantic_conventions-0.53b1-py3-none-any.whl (188 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m188.4/188.4 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: opentelemetry-semantic-conventions, opentelemetry-sdk
  Attempting uninstall: opentelemetry-semantic-conventions
    Found existing installation: opentelemetry-semantic-conventions 0.37b0
    Uninstalling opentelemetry-semantic-conventions-0.37b0:
      Successfully uninstalled opentelemetry-semantic

In [5]:
# Install necessary packages
!pip install --quiet prefect sqlalchemy pandas matplotlib requests


In [7]:
!pip install "prefect<2"

Collecting prefect<2
  Downloading prefect-1.4.1-py3-none-any.whl.metadata (27 kB)
Collecting croniter>=0.3.24 (from prefect<2)
  Downloading croniter-6.0.0-py2.py3-none-any.whl.metadata (32 kB)
Collecting marshmallow>=3.0.0b19 (from prefect<2)
  Downloading marshmallow-4.0.0-py3-none-any.whl.metadata (7.4 kB)
Collecting marshmallow-oneofschema>=2.0.0b2 (from prefect<2)
  Downloading marshmallow_oneofschema-3.1.1-py3-none-any.whl.metadata (5.0 kB)
Collecting mypy-extensions>=0.4.0 (from prefect<2)
  Downloading mypy_extensions-1.1.0-py3-none-any.whl.metadata (1.1 kB)
Collecting marshmallow>=3.0.0b19 (from prefect<2)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Downloading prefect-1.4.1-py3-none-any.whl (606 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m606.6/606.6 kB[0m [31m13.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading croniter-6.0.0-py2.py3-none-any.whl (25 kB)
Downloading marshmallow_oneofschema-3.1.1-py3-none-any.whl (5.7 kB)
Downl

In [1]:
import sqlite3
import pandas as pd
import requests
import matplotlib.pyplot as plt
from sqlalchemy import create_engine
from prefect import task, Flow
from prefect.schedules import IntervalSchedule
from datetime import timedelta

Extract

In [2]:
@task
def fetch_data(country_slug : str) -> pd.DataFrame:
  url = f"https://api.covid19api.com/dayone/country/{country_slug}"
  resp = requests.get(url)
  resp.raise_for_status()
  data = resp.join()
  df = pd.json_normalize(data)

  return df[ ["Country", "CountryCode", "Province", "City", "CityCode",
        "Lat", "Lon", "Confirmed", "Deaths", "Recovered", "Active", "Date"]]

Load

In [9]:
DB_URI = "sqlite:///covid_data.db"
engine = create_engine(DB_URI, echo= False)

@task
def load_raw(df: pd.DataFrame):

    df.to_sql("raw_cases", engine, if_exists="append", index=False)

Transform

1. Build daily totals with SQL

In [4]:
@task
def transform_daily():
  sql = """
  CREATE TABLE IF NOT EXISTS daily_total AS
  SELECT
    DATE(Date) AS day,
    SUM(Confirmed) AS total_confirmed,
    SUM(Deaths) AS total_deaths,
    SUM(Recovered) AS total_recovered,
  FROM raw_cases
  GROUP BY day
  ORDER BY day;
  """
  with engine.begin() as conn:
    conn.execute(sql)


2. Compute 7-day moving average with pandas

In [5]:
@task
def transform_moving_avg() -> pd.DataFrame:
  df = pd.read_sql("SELECT * FROM daily_totals", engine, parse_dates = ["day"])
  df = df.sort_values("day")
  df["new_confirmed"] = df["total_confirmed"].diff().fillna(0)
  df["ma_7"] = df["new_confirmed"].rolling(window = 7).mean()

  df.to_sql("moving_avg", engine, if_exists="replace", index=False)
  return df

Visualize

In [6]:
@task
def plot_trends(df: pd.DataFrame):
    plt.figure(figsize=(10,4))
    plt.plot(df["day"], df["new_confirmed"], label="Daily New Cases")
    plt.plot(df["day"], df["ma_7"], label="7‑Day MA")
    plt.title("COVID‑19 Daily New Cases & 7‑Day MA")
    plt.xlabel("Date")
    plt.ylabel("Cases")
    plt.legend()
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

In [None]:
schedule = IntervalSchedule(interval= timedelta(days=1))

with Flow("covid-elt-pipeline", schedule = schedule) as flow:
  raw_df = fetch_data(country_slug="china")
  _  = load_raw(raw_df)
  _  = transform_daily()
  mov_avg_df = transform_moving_avg()
  _ = plot_trends(mov_avg_df)

flow.run()

[2025-05-07 13:48:20+0000] INFO - prefect.covid-elt-pipeline | Waiting for next scheduled run at 2025-05-08 00:00:00+00:00


INFO:prefect.covid-elt-pipeline:Waiting for next scheduled run at 2025-05-08 00:00:00+00:00
