<a href="https://colab.research.google.com/github/MrE2ooo/automated-happiness-data-pipeline/blob/main/world_happiness_etl_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 1. Install Prefect and other dependencies
!pip install -U prefect pandas kagglehub supabase nest_asyncio

# 2. Login with your API KEY
!prefect cloud login -k Login with your API KEY

# 3. Force the workspace to sync (run this if the dashboard still shows 0)
!prefect cloud workspace set

Collecting pandas
  Downloading pandas-2.3.3-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (91 kB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m91.2/91.2 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
Collecting kagglehub
  Downloading kagglehub-0.4.0-py3-none-any.whl.metadata (38 kB)
Collecting kagglesdk<1.0,>=0.1.14 (from kagglehub)
  Downloading kagglesdk-0.1.14-py3-none-any.whl.metadata (13 kB)
Downloading pandas-2.3.3-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (12.4 MB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m12.4/12.4 MB[0m [31m92.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading kagglehub-0.4.0-py3-none-any.whl (69 kB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚

In [3]:
import pandas as pd
import kagglehub
import nest_asyncio
from supabase import create_client
from prefect import flow, task

# Fix for Colab event loop
nest_asyncio.apply()

# --- CREDENTIALS ---
SUPABASE_URL = "SUPABASE_URL"
SUPABASE_KEY = "SUPABASE_KEY"
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)

# --- TASKS ---
@task(name="Cleanup Old Data")
def clear_supabase_table():
    print("üßπ Clearing old data from Supabase...")
    supabase.table("world_happiness_2016").delete().neq("id", 0).execute()

@task(name="Extract 2016 Data")
def extract_2016():
    print("üì• Downloading dataset...")
    path = kagglehub.dataset_download("unsdsn/world-happiness")
    return pd.read_csv(f"{path}/2016.csv")

@task(name="Transform for Supabase")
def transform_2016(df):
    mapping = {
        'Country': 'country', 'Region': 'region', 'Happiness Rank': 'happiness_rank',
        'Happiness Score': 'happiness_score', 'Lower Confidence Interval': 'lower_confidence_interval',
        'Upper Confidence Interval': 'upper_confidence_interval', 'Economy (GDP per Capita)': 'economy_gdp_per_capita',
        'Family': 'family', 'Health (Life Expectancy)': 'health_life_expectancy',
        'Freedom': 'freedom', 'Trust (Government Corruption)': 'trust_government_corruption',
        'Generosity': 'generosity', 'Dystopia Residual': 'dystopia_residual'
    }
    df_renamed = df.rename(columns=mapping)
    return df_renamed[list(mapping.values())].to_dict(orient='records')

@task(name="Load to Supabase")
def load_to_supabase(data):
    print(f"üöÄ Uploading {len(data)} rows...")
    return supabase.table("world_happiness_2016").insert(data).execute()

# --- THE FLOW ---
@flow(name="World Happiness 2016 Pipeline")
def happiness_flow():
    clear_supabase_table()
    raw_df = extract_2016()
    clean_json = transform_2016(raw_df)
    load_to_supabase(clean_json)

In [1]:
# Run this cell to "Force" a run into the dashboard
from prefect import flow

@flow(name="Emergency-Sync-Flow")
def sync_check():
    print("Syncing with Prefect Cloud...")

if __name__ == "__main__":
    sync_check() # This runs it locally but reports to the cloud

INFO:prefect.flow_runs:Beginning flow run 'flying-lobster' for flow 'Emergency-Sync-Flow'
INFO:prefect.flow_runs:View at https://app.prefect.cloud/account/8cad1fd4-fdea-4780-978a-21f1f3b030a6/workspace/3fe9e30a-0912-4e18-a48a-7fcf239af3bd/runs/flow-run/06965135-8ec4-732b-8000-41decd01d607
INFO:prefect.flow_runs:Finished in state Completed()


Syncing with Prefect Cloud...


In [4]:
if __name__ == "__main__":
    print("üöÄ Launching the World Happiness 2016 Pipeline...")

    # Running the flow directly ensures it appears in your 'Runs' tab right now
    happiness_flow()

    print("‚úÖ Check your Prefect Dashboard 'Runs' tab for 'World Happiness 2016 Pipeline'!")

üöÄ Launching the World Happiness 2016 Pipeline...


INFO:prefect.flow_runs:Beginning flow run 'nondescript-bulldog' for flow 'World Happiness 2016 Pipeline'
INFO:prefect.flow_runs:View at https://app.prefect.cloud/account/8cad1fd4-fdea-4780-978a-21f1f3b030a6/workspace/3fe9e30a-0912-4e18-a48a-7fcf239af3bd/runs/flow-run/0696513e-e0ec-7dd1-8000-f9d489733ce8


üßπ Clearing old data from Supabase...


INFO:prefect.task_runs:Finished in state Completed()


üì• Downloading dataset...
Using Colab cache for faster access to the 'world-happiness' dataset.


INFO:prefect.task_runs:Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()


üöÄ Uploading 157 rows...


INFO:prefect.task_runs:Finished in state Completed()
INFO:prefect.flow_runs:Finished in state Completed()


‚úÖ Check your Prefect Dashboard 'Runs' tab for 'World Happiness 2016 Pipeline'!
