In [1]:
import requests, zipfile, io, re, pandas as pd
from pyspark.sql import SparkSession

# Configuration
spark = SparkSession.builder.getOrCreate()
lakehouse_path = "abfss://1dcd65a7-d5a3-4e2b-a110-db438703b7b5@onelake.dfs.fabric.microsoft.com/f8c35c71-7fa4-4e87-9b9a-91435e298fb7/Files"
language = "en"
post_url = "https://www150.statcan.gc.ca/t1/wds/rest/getFullTableDownloadCSV"
datasets = {
    "12100175": "trade_by_province",
    "12100011": "trade_by_country",  
    "12100163": "trade_by_commodity",
    "12100168": "trade_price_volume_indices"
}

# Download ZIP folder from StatCan WDS API
def download_dataset(dataset_id: str) -> io.BytesIO:
    print("1. Navigating to StatCan WDS API")
    api_url = f"{post_url}/{dataset_id}/{language}"
    api_resp = requests.get(api_url)
    api_resp.raise_for_status()

    print("\t1.1. Fetching ZIP folder download link")
    zip_folder_download_link = api_resp.json().get("object")
    if not zip_folder_download_link:
        raise ValueError(f"Error: No 'object' property found for dataset {dataset_id}.")

    print(f"\t1.2. Downloading ZIP folder: {zip_folder_download_link}")
    zip_folder_download_resp = requests.get(zip_folder_download_link, stream=True)
    zip_folder_download_resp.raise_for_status()
    return io.BytesIO(zip_folder_download_resp.content)

# Extract and filter CSV file from ZIP folder
def filter_dataset(zip_folder_bytes: io.BytesIO, dataset_id) -> pd.DataFrame:
    print("2. Extracting CSV file from ZIP folder")
    with zipfile.ZipFile(zip_folder_bytes) as zip_folder:
        csv_filename = next(
            filename for filename in zip_folder.namelist()
            if filename.endswith(".csv") and "MetaData" not in filename
        )
        
        print(f"\t2.1. Filtering CSV file: {csv_filename}")
        filtered_chunks = []
        for chunk in pd.read_csv(zip_folder.open(csv_filename), chunksize=100_000, low_memory=False):
            period_column = next((column for column in chunk.columns if "ref_date" in column.lower()), None)
            principal_trading_partner_column = next((column for column in chunk.columns if "principal trading partners" in column.lower()), None)
            
            if period_column:
                chunk[period_column] = pd.to_datetime(chunk[period_column], errors="coerce")
                if (principal_trading_partner_column and dataset_id in ['12100175', '12100011']):
                    filtered_chunk = chunk[
                        (chunk[period_column] >= "2020-01-01") &
                        (chunk[principal_trading_partner_column] == "United States")
                    ]
                else:
                    filtered_chunk = chunk[chunk[period_column] >= "2020-01-01"]
                if not filtered_chunk.empty:
                    filtered_chunks.append(filtered_chunk)

        if not filtered_chunks:
            print(f"Error: No matching rows found in dataset.")
            return pd.DataFrame()
        
        df_filtered = pd.concat(filtered_chunks, ignore_index=True)
        df_filtered = clean_dataset_column_names(df_filtered)
        print(f"\t2.2. Retaining {len(df_filtered):,} filtered rows")
        return df_filtered

def clean_dataset_column_names(df: pd.DataFrame) -> pd.DataFrame:
    cleaned_columns = []
    for column in df.columns:
        # Replace invalid characters with underscore
        cleaned = re.sub(r'[ ,;{}()\n\t=]', '_', column)
        cleaned = re.sub(r'_+', '_', cleaned)
        cleaned = cleaned.strip('_')
        cleaned_columns.append(cleaned)
    df.columns = cleaned_columns
    return df

# Save dataset to Lakehouse Files and Tables folders (bronze layer)
def save_dataset_to_lakehouse(df: pd.DataFrame, dataset_id: str, dataset_description: str):
    if df.empty:
        print(f"Error: No rows found in dataset. Skipping.")
        return

    print(f"3. Saving dataset to Bronze layer Lakehouse")
    spark_df = spark.createDataFrame(df)

    # Save to Files folder as parquet
    save_path = f"{lakehouse_path}/bronze_napcs_ontario_us_{dataset_description}_data"
    spark_df.repartition(2).write.mode("overwrite").parquet(save_path)

    # Save to Tables folder
    table_name = f"bronze_napcs_ontario_us_{dataset_description}_data"
    spark_df.write.mode("overwrite").saveAsTable(table_name)

# Main loop
for dataset_id, dataset_description in datasets.items():
    try:
        print(f"\nStarting dataset {dataset_id} ingestion:")
        zip_folder_bytes = download_dataset(dataset_id)
        df_filtered = filter_dataset(zip_folder_bytes, dataset_id)
        save_dataset_to_lakehouse(df_filtered, dataset_id, dataset_description)
        print(f"Succeeded")
    except Exception as e:
        print(f"Error processing dataset {dataset_id}: {e}")


StatementMeta(, dc0a92e6-e24e-46e5-ad53-2e9eab7ccc29, 3, Finished, Available, Finished)


Starting dataset 12100175 ingestion:
1. Navigating to StatCan WDS API
	1.1. Fetching ZIP folder download link
	1.2. Downloading ZIP folder: https://www150.statcan.gc.ca/n1/tbl/csv/12100175-eng.zip
2. Extracting CSV file from ZIP folder
	2.1. Filtering CSV file: 12100175.csv
	2.2. Retaining 25,636 filtered rows
3. Saving dataset to Bronze layer Lakehouse
Succeeded

Starting dataset 12100011 ingestion:
1. Navigating to StatCan WDS API
	1.1. Fetching ZIP folder download link
	1.2. Downloading ZIP folder: https://www150.statcan.gc.ca/n1/tbl/csv/12100011-eng.zip
2. Extracting CSV file from ZIP folder
	2.1. Filtering CSV file: 12100011.csv
	2.2. Retaining 612 filtered rows
3. Saving dataset to Bronze layer Lakehouse
Succeeded

Starting dataset 12100163 ingestion:
1. Navigating to StatCan WDS API
	1.1. Fetching ZIP folder download link
	1.2. Downloading ZIP folder: https://www150.statcan.gc.ca/n1/tbl/csv/12100163-eng.zip
2. Extracting CSV file from ZIP folder
	2.1. Filtering CSV file: 121001