<a href="https://colab.research.google.com/github/FleaBusyBeeBergs/dtsa5506-pipeline/blob/main/statcan_etl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [22]:
# file handling
import pandas as pd
import os
from google.colab import files

# http requests
import requests

# vis
import matplotlib.pyplot as plt


import xml.etree.ElementTree as ET

In [12]:
# Base URL for data extraction
base_url = 'https://www150.statcan.gc.ca/t1/wds/sdmx/statcan/rest/vector/'

In [16]:
# Namespace mappings for XML parsing
NAMESPACES = {
    "message": "http://www.sdmx.org/resources/sdmxml/schemas/v2_1/message",
    "generic": "http://www.sdmx.org/resources/sdmxml/schemas/v2_1/data/generic",
    "common": "http://www.sdmx.org/resources/sdmxml/schemas/v2_1/common",
}


In [44]:
class StatCanETL:
    def __init__(self, table_df):
        # Initialize with the DataFrame of table metadata
        self.table_df = table_df
        self.data_objects = {}  # Dictionary to store DataFrames dynamically

    def extract(self, vector, start_period, end_period):
        """Extract data from the StatCan API for a given vector."""
        url = f"{BASE_URL}{vector}?startPeriod={start_period}&endPeriod={end_period}&detail=full"
        response = requests.get(url)

        if response.status_code == 200:
            return response.content
        else:
            raise ValueError(f"Failed to fetch data for vector {vector}. HTTP Status: {response.status_code}")

    def transform(self, xml_content, frequency):
        """Transform the XML response into a structured DataFrame."""
        # Parse XML content
        root = ET.fromstring(xml_content)

        # Locate the Series element
        series = root.find(".//generic:Series", NAMESPACES)
        if series is None:
            raise ValueError("No Series element found in the XML response.")

        # Extract observations
        observations = series.findall(".//generic:Obs", NAMESPACES)
        data = []
        for obs in observations:
            obs_dim = obs.find(".//generic:ObsDimension", NAMESPACES)
            obs_val = obs.find(".//generic:ObsValue", NAMESPACES)
            date = obs_dim.attrib["value"]

            # Convert date format if the frequency is quarterly
            if frequency == "quarterly":
                date = self.convert_quarter_to_date(date)

            data.append({
                "Date": date,
                "Value": float(obs_val.attrib["value"]),
            })

        # Convert to DataFrame
        df = pd.DataFrame(data)
        return df

    @staticmethod
    def convert_quarter_to_date(quarter_str):
        """Convert a quarterly date string (YYYY-Qx) to a standard date format (YYYY-MM-DD)."""
        try:
            year, quarter = quarter_str.split("-Q")
            quarter_start_month = {
                "1": "01",
                "2": "04",
                "3": "07",
                "4": "10",
            }.get(quarter)
            if quarter_start_month:
                return f"{year}-{quarter_start_month}-01"
            else:
                raise ValueError(f"Invalid quarter format: {quarter_str}")
        except Exception as e:
            raise ValueError(f"Error converting quarter string {quarter_str}: {e}")

    def load(self, df, name):
        """Load the DataFrame into an object named name_df."""
        variable_name = f"{name}_df"
        globals()[variable_name] = df  # Dynamically create a global variable
        self.data_objects[variable_name] = df  # Store in a dictionary for easy access
        print(f"Data for {name} saved to object {variable_name}")

    def run(self):
        """Run the full ETL pipeline for all variables."""
        for _, row in self.table_df.iterrows():
            print(f"Processing {row['name']}...")
            try:
                # Extract
                xml_content = self.extract(row["vector"], row["start"], row["end"])

                # Transform
                df = self.transform(xml_content, row["frequency"])

                # Load
                self.load(df, row["name"])
            except Exception as e:
                print(f"Error processing {row['name']}: {e}")

In [45]:
# Define the table metadata
tables = {
    'name': ['wage', 'raw', 'tax', 'productivity', 'cpi'],
    'tableid': [14100223, 18100268, 11100058, 36100206, 18100004],
    'vector': ['v79311153', 'v1230998135', 'v122807833', 'v1409153', 'v41690973'],
    'description': ['', '', '', '', ''],
    'frequency': ['monthly', 'monthly', 'annual', 'quarterly', 'monthly'],
    'start': ['2001-01', '2001-01', '2001-01', '2001-Q1', '2001-01'],
    'end': ['2024-12', '2024-12', '2024-12', '2024-Q4', '2024-12'],
    'url': ['', '', '', '', '']
}

# Create DataFrame
table_df = pd.DataFrame(tables)

In [46]:
# Initialize and run the ETL pipeline
pipeline = StatCanETL(table_df)
pipeline.run()

Processing wage...
Data for wage saved to object wage_df
Processing raw...
Data for raw saved to object raw_df
Processing tax...
Data for tax saved to object tax_df
Processing productivity...
Data for productivity saved to object productivity_df
Processing cpi...
Data for cpi saved to object cpi_df


In [50]:
for key, value in pipeline.data_objects.items():
    print(key)
    print(value.head(3))

wage_df
      Date   Value
0  2001-01  657.14
1  2001-02  653.59
2  2001-03  655.14
raw_df
      Date  Value
0  2001-01   65.2
1  2001-02   65.8
2  2001-03   65.5
tax_df
   Date  Value
0  2001   14.0
1  2002   13.8
2  2003   13.8
productivity_df
         Date   Value
0  2001-01-01  85.173
1  2001-04-01  86.150
2  2001-07-01  86.665
cpi_df
      Date  Value
0  2001-01   96.3
1  2001-02   96.8
2  2001-03   97.1
