# Notes
- Make sure Statistics Canada knows about these issues:
    - When downloading the XML for product_id 98100404 it just returns the structure document, not the data document
    - The releaseTime value is in Eastern Standard Zone for https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata and UTC for https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite
    - The releaseTime value is different when getting it from https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite and https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata . For example productId 10100007
    - Currently having an issue with processing productId 13100442. Expected 18 fields but saw 19 at line 162
    - There are cases where the `REF_DATE` is a range, ex. 2023/2024
        - For productId 17100022 the period is from July 1 to June 30 (seen in the metadata), so can't just use January 1, 2023 and December 31, 2024
    - There can be times when a table's `VALUE` field can have both float and integer data types. For example, productId 11100025 has `DECIMAL` values of 0 and 1, corresponding to integer and float data types
    - Need to look into cases where there is no `DGUID`, but the `GEO` value tells you something like `All census metropolitan areas` (productId 11100025). I guess I can do the linking to the CMAs on the API side

In [1]:
from datetime import datetime
import glob
from multiprocessing import Pool
import json
import os
import sqlite3
import zipfile
from zoneinfo import ZoneInfo

import pandas as pd
import polars as pl
import requests
from tqdm import tqdm

data_folder = "/data/tables"
input_folder = f"{data_folder}/input"
scratch_folder = f"{data_folder}/scratch"
output_folder = f"{data_folder}/output"

if not os.path.exists(f"{data_folder}/processing.db"):
    con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0)
    cur = con.cursor()
    cur.executescript("""
        CREATE TABLE IF NOT EXISTS downloaded (
          product_id TEXT PRIMARY KEY,
          last_updated TEXT,
          last_processed TEXT
        );

        CREATE TABLE IF NOT EXISTS cubes (
          product_id TEXT PRIMARY KEY,
          last_updated TEXT
        );
    """)
    con.commit()
else:
    con = sqlite3.connect(f"{data_folder}/processing.db")
    cur = con.cursor()

In [2]:
def setup():
    """
    Makes data folders
    """
    folders_to_create = [data_folder, input_folder, 
                         scratch_folder, output_folder,
                         f"{input_folder}/en", f"{output_folder}/en",
                         f"{input_folder}/fr", f"{output_folder}/fr",
                         f"{input_folder}/metadata"]
    for folder in folders_to_create:
        if not os.path.exists(folder):
            print(f"Making folder {folder}")
            os.mkdir(folder)

In [3]:
setup()

In [4]:
def update_last_downloaded(product_id):
    """
    Updates SQLite database with the last time the table was updated
    The datetime is in Eastern timezone, so have to convert to UTC to
    be consistent with https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite
    """
    filepath = f"{input_folder}/metadata/{product_id}.json"
    print(f"Reading metadata {filepath}")
    with open(filepath, 'r') as fp:
        metadata = json.load(fp)
    product_id = metadata.get("object").get("productId")
    last_updated = metadata.get("object").get("releaseTime")
    # Convert last_updated to UTC since /getAllcubesListLite uses UTC
    last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M")
    last_updated = last_updated.replace(tzinfo=ZoneInfo("America/Toronto"))
    last_updated = last_updated.astimezone(ZoneInfo("UTC")).isoformat()

    data = (product_id, last_updated)
    cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,))
    result = cur.fetchone()
    if not result:
        cur.execute("INSERT INTO downloaded (product_id, last_updated) VALUES (?, ?)", data)
    else:
        cur.execute("UPDATE downloaded SET last_updated = ? WHERE product_id = ?", (last_updated, product_id))

    con.commit()

In [5]:
def update_last_processed(product_id):
    time_finished_processing = datetime.now().isoformat()
    cur.execute("UPDATE downloaded SET last_processed = ? WHERE product_id = ?", (time_finished_processing, product_id))
    con.commit()

In [6]:
def update_tables():
    """
    This currently does not work as expected because Statistics Canada has discrepancies.
    The "releaseTime" listed in https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite
    for every pdocutId is not the same as "releaseTime" listed when making a POST 
    https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata , for example:
    [{"productId":10100007}]
    """
    cur.execute("""
    DELETE FROM cubes;
    """)
    con.commit()
    response = requests.get("https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite").json()
    cubes_metadata = pl.from_dicts(response)[['productId', 'releaseTime']]
    cubes_metadata = cubes_metadata.rename({"productId": "product_id", "releaseTime": "last_updated"})
    cubes_metadata = cubes_metadata.rows()
    cubes_metadata_new =  []
    for cube in cubes_metadata:
        product_id, last_updated = cube
        # Update the date field so it is formatted the same as date field in downloaded table
        last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M:%SZ").astimezone(ZoneInfo("UTC"))
        last_updated = last_updated.isoformat()
        cubes_metadata_new.append((product_id, last_updated))
        
    cur.executemany("INSERT INTO cubes VALUES(?, ?)", cubes_metadata_new)
    con.commit()

    cur.execute("""
    SELECT a.product_id
    FROM downloaded AS a,
         cubes AS b
    WHERE a.product_id = b.product_id
    AND b.last_updated > a.last_updated
    """)
    """
    results = cur.fetchall()
    for result in results:
        product_id = result[0]
        print(f"Updating product_id: {product_id}")
        download_cube(product_id)
        process_cube(product_id)
    """

In [7]:
update_tables()

In [8]:
def compute_ref_date_bounds(df):
    """
    TODO: There are cases where the REF_DATE is a range, ex. 2023/2024.
    For productId 17100022 the period is from July 1 to June 30 (seen in the metadata), so can't just 
    use January 1, 2023 and December 31, 2024
    """
    series = df["REF_DATE"]

    # Initialize the two new columns with NaT
    df["REF_START_DATE"] = pd.NaT
    df["REF_END_DATE"] = pd.NaT

    # Skip rows that contain slashes
    valid_mask = ~series.str.contains("/", na=False)

    # Case 1: YYYY-MM-DD
    full_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}-\d{2}")
    parsed_full = pd.to_datetime(series[full_mask], format="%Y-%m-%d", errors="coerce")
    df.loc[full_mask, "REF_START_DATE"] = parsed_full
    df.loc[full_mask, "REF_END_DATE"] = parsed_full

    # Case 2: YYYY-MM
    month_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}")
    parsed_month = pd.to_datetime(series[month_mask], format="%Y-%m", errors="coerce")
    df.loc[month_mask, "REF_START_DATE"] = parsed_month
    df.loc[month_mask, "REF_END_DATE"] = parsed_month + pd.to_timedelta(
        parsed_month.dt.days_in_month - 1, unit='D'
    )

    # Case 3: YYYY
    year_mask = valid_mask & series.str.fullmatch(r"\d{4}")
    parsed_year = pd.to_datetime(series[year_mask], format="%Y", errors="coerce")
    df.loc[year_mask, "REF_START_DATE"] = parsed_year
    df.loc[year_mask, "REF_END_DATE"] = parsed_year + pd.offsets.YearEnd(0)

    # Move columns after REF_DATE
    ref_idx = df.columns.get_loc("REF_DATE")
    cols = list(df.columns)
    cols.remove("REF_START_DATE")
    cols.remove("REF_END_DATE")
    cols[ref_idx + 1:ref_idx + 1] = ["REF_START_DATE", "REF_END_DATE"]

    return df[cols]

In [9]:
def convert_to_lowest_type(df):
    """
    Convert columns to the best possible dtypes
    For example, if the column is numerical and has a maximum value of 32,000 
    we can assign it a type of int16
    """
    dtypes = pd.DataFrame(df.dtypes)
    # Downcast to the smallest numerical dtype
    for row in dtypes.itertuples():
        column = row[0]
        the_type = str(row[1])
        if the_type == 'Int64':
            df[column] = pd.to_numeric(df[column], downcast='integer')

    return df

In [10]:
def extract_zipfile(product_id, language):
    """
    It is faster to extract the zip file and read the CSV, than open
    via zipfile and then Pandas
    """
    zip_file = f"{input_folder}/{language}/{product_id}.zip"
    with zipfile.ZipFile(zip_file) as myzip:
        print(f"Extracting {zip_file} to {scratch_folder}")
        myzip.extractall(path=scratch_folder)

def get_cube_metadata(product_id):
    url = f"https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata"
    cubes_payload = [{"productId": product_id}]
    result = requests.post(url, json=cubes_payload)
    result = result.json()[0]
    return result

def download_cube(product_id, language="en"):
    """
    Downloads the English CSV for a specific table
    """
    download_url = f"https://www150.statcan.gc.ca/t1/wds/rest/getFullTableDownloadCSV/{product_id}/en"
    response = requests.get(download_url).json()
    zip_url = response['object']
    zip_file_name = f"{input_folder}/{language}/{product_id}.zip"
    print(f"Downloading {zip_url} to {zip_file_name}")
    response = requests.get(zip_url, stream=True, headers={"user-agent": None})
    progress_bar = tqdm(
        desc=zip_file_name,
        total=int(response.headers.get("content-length", 0)),
        unit="B",
        unit_scale=True
    )
    with open(zip_file_name, "wb") as handle:
        for chunk in response.iter_content(chunk_size=512):
            if chunk:  # filter out keep-alive new chunks
                handle.write(chunk)
                progress_bar.update(len(chunk))
        progress_bar.close()

In [11]:
def cleanup_product(product_id):
    """
    Remove the scratch files for a given productId
    """
    print(f"Removing scratch files for productId {product_id}")
    os.remove(f"{scratch_folder}/{product_id}.csv")
    os.remove(f"{scratch_folder}/{product_id}_MetaData.csv")

In [12]:
"""
Examples: 
- productId 43100011 has all with DECIMAL = 1 (float64)
- productId 17100009 has DECIMAL = 0 (int64)
- productId 35100076 has multiple DECIMAL precisions [0, 1, 2] (int64, float64, float64)
- The duplicate column issue just happens with two column names in all data products ("Value", "VALUE", and "Status", "STATUS")
    - productId 10100164 has two columns named the same "Value" and "VALUE". DuckDB treats column names in a case insensitve manner, so 
    "Value" and "VALUE" are the same. So we will need to rename "Value" to "Value.1"
    - productId 13100902 has two columns named the same "Status" and "STATUS". We will need to rename "Status" to "STATUS"
- productId 13100442 has 18 fields, but 19 fields were seen in line 162
- There are cases where the "DECIMALS" column does not exist in the CSV. productId 98100001 is one example.
  In this case, we do let the .read_csv method guess the data types
"""

product_id = "98100001"
#def process_cube(product_id, language="en"):
language = "en"
cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,))
result = cur.fetchone()
if result:
    print(f"Already processed {product_id}")
    #return
extract_zipfile(product_id, language)
"""
The pandas column reader is better than the Polars one
Here is an example where polars was not reading it right:
https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip
"""
# Get metadata
#metadata_file = f"{input_folder}/metadata/{product_id}.json"
#metadata = get_cube_metadata(product_id)
#print(f"Writing metadata file {metadata_file}")
#with open(metadata_file, "w") as outfile:
#    json.dump(metadata, outfile)
# Read CSV using Pandas
product_csv = f"{scratch_folder}/{product_id}.csv"
print(f"Reading {product_csv}")
parameters = {
    "filepath_or_buffer": product_csv,
    "engine": "c",
    "nrows": 100000,
    "dtype": {}
}

columns = pd.read_csv(product_csv, nrows=0).columns
print(columns)

Extracting /data/tables/input/en/98100001.zip to /data/tables/scratch
Reading /data/tables/scratch/98100001.csv
Index(['REF_DATE', 'GEO', 'DGUID', 'Coordinate',
       'Population and dwelling counts (11): Population, 2021 [1]', 'Symbols',
       'Population and dwelling counts (11): Population, 2016 [2]',
       'Symbols.1',
       'Population and dwelling counts (11): Population percentage change, 2016 to 2021 [3]',
       'Symbols.2',
       'Population and dwelling counts (11): Total private dwellings, 2021 [4]',
       'Symbols.3',
       'Population and dwelling counts (11): Total private dwellings, 2016 [5]',
       'Symbols.4',
       'Population and dwelling counts (11): Total private dwellings percentage change, 2016 to 2021 [6]',
       'Symbols.5',
       'Population and dwelling counts (11): Private dwellings occupied by usual residents, 2021 [7]',
       'Symbols.6',
       'Population and dwelling counts (11): Private dwellings occupied by usual residents, 2016 [8]',
   

In [13]:
columns_to_rename = ['Value', 'Status']
for column in columns_to_rename:
    if column in columns:
        print(f"Renaming '{column}' to '{column}.1'")
        columns = [f'{column}.1' if x == column else x for x in columns]
        # Explicitly tell pandas to not read column names from CSV
        parameters["header"] = 0
        parameters["names"] = columns

In [14]:
columns_always_int_8 = ["DECIMALS", "SCALAR_ID"]
for column in columns_always_int_8:
    if column in columns:
        parameters["dtype"][column] = 'int8'

columns_always_int_16 = ["UOM_ID"]
for column in columns_always_int_16:
    if column in columns:
        parameters["dtype"][column] = 'int16'

# GEO, DGUID should always be string
columns_always_string = ["REF_DATE", "GEO", "DGUID"]
for column in columns_always_string:
    if column in columns:
        parameters["dtype"][column] = 'string'

# The remaining columns should be string, with the exception of VALUE
# Added "DECIMAL" check as there can be numeric columns that are not the VALUE column
if "DECIMALS" in columns:
    for column in columns:
        if column not in columns_always_int_8 and column not in columns_always_int_16 and column != "VALUE":
            parameters["dtype"][column] = 'string'

print(parameters)
if not parameters["dtype"]:
    del parameters["dtype"]

print(f"Reading {product_csv} as a Pandas dataframe")
df = pd.read_csv(**parameters)

{'filepath_or_buffer': '/data/tables/scratch/98100001.csv', 'engine': 'c', 'nrows': 100000, 'dtype': {'REF_DATE': 'string', 'GEO': 'string', 'DGUID': 'string'}}
Reading /data/tables/scratch/98100001.csv as a Pandas dataframe


In [15]:
df.head()

Unnamed: 0,REF_DATE,GEO,DGUID,Coordinate,"Population and dwelling counts (11): Population, 2021 [1]",Symbols,"Population and dwelling counts (11): Population, 2016 [2]",Symbols.1,"Population and dwelling counts (11): Population percentage change, 2016 to 2021 [3]",Symbols.2,...,"Population and dwelling counts (11): Private dwellings occupied by usual residents, 2021 [7]",Symbols.6,"Population and dwelling counts (11): Private dwellings occupied by usual residents, 2016 [8]",Symbols.7,"Population and dwelling counts (11): Private dwellings occupied by usual residents percentage change, 2016 to 2021 [9]",Symbols.8,"Population and dwelling counts (11): Land area in square kilometres, 2021 [10]",Symbols.9,"Population and dwelling counts (11): Population density per square kilometre, 2021 [11]",Symbols.10
0,2021,Canada,2021A000011124,1,36991981,,35151728,,5.2,,...,14978941,,14072079,,6.4,,8788702.8,,4.2,
1,2021,Newfoundland and Labrador,2021A000210,2,510550,,519716,,-1.8,,...,223253,,218673,,2.1,,358170.37,,1.4,
2,2021,Prince Edward Island,2021A000211,3,154331,,142907,,8.0,,...,64570,,59472,,8.6,,5681.18,,27.2,
3,2021,Nova Scotia,2021A000212,4,969383,,923598,,5.0,,...,428228,,401990,,6.5,,52824.71,,18.4,
4,2021,New Brunswick,2021A000213,5,775610,,747101,,3.8,,...,337651,,319773,,5.6,,71248.5,,10.9,


In [16]:
df.dtypes

REF_DATE                                                                                                                  string[python]
GEO                                                                                                                       string[python]
DGUID                                                                                                                     string[python]
Coordinate                                                                                                                         int64
Population and dwelling counts (11): Population, 2021 [1]                                                                          int64
Symbols                                                                                                                          float64
Population and dwelling counts (11): Population, 2016 [2]                                                                          int64
Symbols.1                                

In [17]:
if "DECIMALS" in columns:
    unique_decimal_values = df["DECIMALS"].unique()
    print(unique_decimal_values)
    if any(unique_decimal_values):
        """
        A table can have both float and integer in the VALUE field. 
        productId 11100025 is an example
        So if we have unique values for DECIMALS to be [0,1], then we convert to float64
        """
        convert_dict = {"VALUE": "float64"}
        print(convert_dict)
        df = df.astype(convert_dict)
    elif 0 in (unique_decimal_values):
        if df["VALUE"].dtype != "Int64":
            # If DECIMALS = [0]
            convert_dict = {"VALUE": "Int64"}
            print(convert_dict)
            df = df.astype(convert_dict)
else:
    parameters = {
        "convert_string": True,
        "convert_boolean": False
    }
    print("DECIMALS not in columns, using .convert_dtypes")
    df = df.convert_dtypes(**parameters)

DECIMALS not in columns, using .convert_dtypes


In [18]:
df.dtypes

REF_DATE                                                                                                                  string[python]
GEO                                                                                                                       string[python]
DGUID                                                                                                                     string[python]
Coordinate                                                                                                                         Int64
Population and dwelling counts (11): Population, 2021 [1]                                                                          Int64
Symbols                                                                                                                            Int64
Population and dwelling counts (11): Population, 2016 [2]                                                                          Int64
Symbols.1                                

In [19]:
df = convert_to_lowest_type(df)

In [20]:
df = compute_ref_date_bounds(df)

In [21]:
df.head()

Unnamed: 0,REF_DATE,REF_START_DATE,REF_END_DATE,GEO,DGUID,Coordinate,"Population and dwelling counts (11): Population, 2021 [1]",Symbols,"Population and dwelling counts (11): Population, 2016 [2]",Symbols.1,...,"Population and dwelling counts (11): Private dwellings occupied by usual residents, 2021 [7]",Symbols.6,"Population and dwelling counts (11): Private dwellings occupied by usual residents, 2016 [8]",Symbols.7,"Population and dwelling counts (11): Private dwellings occupied by usual residents percentage change, 2016 to 2021 [9]",Symbols.8,"Population and dwelling counts (11): Land area in square kilometres, 2021 [10]",Symbols.9,"Population and dwelling counts (11): Population density per square kilometre, 2021 [11]",Symbols.10
0,2021,2021-01-01,2021-12-31,Canada,2021A000011124,1,36991981,,35151728,,...,14978941,,14072079,,6.4,,8788702.8,,4.2,
1,2021,2021-01-01,2021-12-31,Newfoundland and Labrador,2021A000210,2,510550,,519716,,...,223253,,218673,,2.1,,358170.37,,1.4,
2,2021,2021-01-01,2021-12-31,Prince Edward Island,2021A000211,3,154331,,142907,,...,64570,,59472,,8.6,,5681.18,,27.2,
3,2021,2021-01-01,2021-12-31,Nova Scotia,2021A000212,4,969383,,923598,,...,428228,,401990,,6.5,,52824.71,,18.4,
4,2021,2021-01-01,2021-12-31,New Brunswick,2021A000213,5,775610,,747101,,...,337651,,319773,,5.6,,71248.5,,10.9,


In [22]:
df.dtypes

REF_DATE                                                                                                                  string[python]
REF_START_DATE                                                                                                            datetime64[ns]
REF_END_DATE                                                                                                              datetime64[ns]
GEO                                                                                                                       string[python]
DGUID                                                                                                                     string[python]
Coordinate                                                                                                                          Int8
Population and dwelling counts (11): Population, 2021 [1]                                                                          Int32
Symbols                                  

In [48]:
output_parquet = f"{output_folder}/{language}/{product_id}.parquet"
print(f"Exporting dataframe as parquet to {output_parquet}")
parameters = {
    "path": output_parquet,
    "engine": "pyarrow",
    "compression": "zstd",
    "index": False,
    "compression_level": 22
}
df.to_parquet(**parameters)

Exporting dataframe as parquet to /data/tables/output/en/12100013.parquet


In [None]:
cleanup_product(product_id)
update_last_downloaded(product_id)
update_last_processed(product_id)