## Setup SDMX Lab Jupyter Notebook environment


In [None]:
# Clone the repo
#PROD
#!git clone https://github.com/HMS-Analytical-Software/hms-sdmx-lab-notebooks.git
#%cd hms-sdmx-lab-notebooks/notebooks/data-pipelines
#DEV
!git clone https://github.com/Buffett65/data-pipelines.git
%cd data-pipelines
root_dir="/content/data-pipelines"

# Install dependencies
!pip install -r requirements.txt

In [None]:
from pathlib import Path
import os
import json
import requests
import certifi
from lxml import etree
from getpass import getpass
from requests.auth import HTTPBasicAuth
from dataclasses import dataclass

@dataclass
class MyConfig:
    # Load config variables
    with open(f"{root_dir}/config.json") as f:
        config = json.load(f)
    _test = config["TEST"]
    agency = config["AGENCY"]
    flowid = config["FLOWID"]
    endpoint = config["ENDPOINT"]
    #
    out_dir = f"{root_dir}/data"

    headers = {"Accept": "application/xml"}  # we'll ask for SDMX-ML data via ?format=sdmx-3.0

cfg = MyConfig()

Set up your credentials for basic authorization. These will be used to authenticate REST API calls to the Lab instance.

In [None]:
# Obtain Lab instance endpoint
sdmx_lab=True # if True, need to use 'auth' on *ALL* api calls, not just on POST calls.

lab_url = os.environ.get("USER_LAB") or input("Enter URL of your Lab space: ")
fmr_url = f"{lab_url}/fmr"

# Obtain authentication credentials
user = os.environ.get("USER_ID") or input("Username: ")
password = os.environ.get("PASS") or input("Password: ")
auth = HTTPBasicAuth(username=user, password=password)

print(f"User: {user}")
print(f"password: {password}")
print(f"Lab URL: {lab_url}")
print(f"FMR URL: {fmr_url}")

## SDMX API


### Setup Environment for SDMX API examples and exercises.


In [None]:

import requests, urllib.parse as up
import xml.etree.ElementTree as ET
from typing import Dict, Any, List, Tuple, Optional
import json
import io
import os
import pandas as pd



def get_csv(url, headers=None, params=None):
    r = requests.get(url, params=params, headers=headers, timeout=60)
    r.raise_for_status()
    # (optional but safe) ensure correct decoding before you touch .text
    # SDMX-CSV is UTF-8, so force UTF-8 to avoid stray characters
    r.encoding = "utf-8"
    return r

### EXAMPLE: SDMX REST API - get structure
###          Scenario: get all dataflows, return JSON


### Version A: bespoke code


In [None]:
# Retrieve all dataflows, based on config parameters.
# Example --- sdmx-json structure call

# Make an API structure call requesting all dataflows     
r = requests.get(f"{cfg.endpoint}/structure/dataflow/*/*/*", 
                 params={"format":"sdmx-json"}, 
                 headers=None, 
                 timeout=60)
r.raise_for_status()

flows=r.json()

# Create List of all dataflows (catalog)
dfs = []
for flow in (flows.get("data", {}).get("dataflows",{})):
    name = flow.get("name")
    agency = flow.get('agencyID')
    id = flow.get("id")
    version = flow.get('version')
    description = flow.get("description")
    dfs.append({"agency": agency, "id": id, "version": version, "name": name, "description": description})

dfs[:10]  # peek

### Version B: using pysdmx


In [None]:

from pysdmx.api.fmr import RegistryClient
fmr_client=RegistryClient(f"{cfg.endpoint}")
dfs=fmr_client.get_dataflows("*","*","*")

dfs[:10]  # peek

### EXAMPLE: SDMX REST API - get structure
###          Scenario: get all dataflows, return SDMX-ML


### Version A: bespoke code


In [None]:

# 1) Make an API structure call, retrieve all dataflows (catalog)

# Call an SMDX API endpoint requesting an SMDX-ML response.
flows = requests.get(f"{cfg.endpoint}/structure/dataflow/{cfg.agency}/*/*", 
                     params={"format":"sdmx-3.0", "prettyPrint":True}, 
                     headers={"Accept": "application/xml"}, 
                     timeout=60)
r.raise_for_status()

# Parse the XML and return its ElementTree and namespaces.
root = ET.fromstring(r.content) 

# Extract all namespaces declared in the document
namespaces = dict([
    node for _, node in ET.iterparse(
            # We need a bytes stream for iterparse
            io.BytesIO(r.content),
            events=['start-ns']
    )
])

# Create List of all dataflows (catalog)
dfs = []
for flow in flows.findall(".//str:Dataflow", namespaces):
    agency = flow.attrib.get('agencyID')
    id = flow.attrib.get('id')
    version = flow.attrib.get('version')
    name_elem = flow.find("com:Name", namespaces)
    name = name_elem.text if name_elem is not None else None
    description_elem = flow.find("com:Description", namespaces)
    description = description_elem.text if description_elem is not None else None
    dfs.append({"agency": agency, "id": id, "version": version, "name": name, "description": description})

dfs[:10]  # peek

### Version B: using pysdmx


In [None]:

from pysdmx.api.fmr import RegistryClient
fmr_client=RegistryClient(f"{cfg.endpoint}")
dfs=fmr_client.get_dataflows("*","*","*")

dfs[:10]  # peek

### EXAMPLE: SDMX REST API - get data
###          Scenario: get dataset, return CSV, save to file.


In [None]:
# Ensure output directory exists
os.makedirs(cfg.out_dir, exist_ok=True)

# Build filename from flowid
filename = f"{flowid}.csv"
out_path = os.path.join(cfg.out_dir, filename)

# Make an API data call, retrieve the latest 1 observation, in csv format, for the provided agency/dataflow/version
resp = requests.get(cfg.endpoint}/data/dataflow/{agency}/{flowid}/*",
                    params={"format":"sdmx-csv", "lastNobservations":"1"}, 
                    headers={"Accept": "application/text"},
                    timeout=60)

resp.raise_for_status()

# (optional but safe) ensure correct decoding before you touch .text
# SDMX-CSV is UTF-8, so force UTF-8 to avoid stray characters
resp.encoding = "utf-8"

# Get csv into a pandas dataframe
dataset = pd.read_csv(
    io.StringIO(resp.text),
    dtype=str,              # start as strings; we’ll coerce precisely below
    keep_default_na=True,   # empty fields -> NaN
)

# 1st 10 lines of the Pandas Dataframe 
dataset.head(10)    

### Drop the first column of a pandas DataFrame and save to CSV.


In [None]:

# Drop the first column
dataset_out = dataset.drop(dataset.columns[0], axis=1)

# Save to CSV
dataset_out.to_csv(out_path, index=False)

print(f"Saved {dataset.shape[0]} rows x {dataset_out.shape[1]} cols to {out_path}")

Data Processing Services (FMR)

This notebook demonstrates using REST API calls to FMR running inside an SDMX Lab instance.

### Submitting structural metadata to FMR


In [None]:
from pathlib import Path
import json
import requests
import certifi
from lxml import etree
from getpass import getpass
from requests.auth import HTTPBasicAuth

## Setup FMR Structures for the data pipelines session

ie. Load all data pipelines structures to FMR via REST-API

This step demonstrates how to submit a file of structures to FMR using the REST endpoint.

In [None]:
# Prepare SDMX structural metadata artefacts
endpoint = f"{fmr_url}/ws/secure/sdmxapi/rest"

path = f"{root_dir}/structures/AllStructures.json"
payload = open(path, "rb")

headers = {"Content-Type": "application/json"}

# Submit and upload all structures to FMR
response = requests.post(
    endpoint, 
    data=payload, 
    headers=headers, 
    auth=(user, password),
    verify=False
)

In [None]:
# Display response
if response.status_code in [200, 201]:
    print("Resource successfully created in FMR.")
    print("Response:", response.text)
else:
    print(f"Failed to create resource. Status code: {response.status_code}")
    print("Response:", response.text)

### Retrieve DSDs in submitted dataflow from FMR via REST-API

This step demonstrates how to retrieve the individual DSDs in the submitted dataflow from FMR using the REST endpoint.

### Data Validation

Validate files against structures in registry - do they pass FMR's 9 validation rules?
refer to and run the Jupyter Notebook in SDMX Lab which was provided from SDMX.IO learning resources materials.



### Transcoding

Recode files - remap and change coding, change shape using Structure Maps and Representation Maps.

Refer to SDMX.IO learning resources materials. NB: need adaptation to run in SDMX LAB.


In [None]:
# # %% [markdown]
# ### Transcoding with Verification
#
# Refer to SDMX.IO learning resources materials. NB: need adaptation to run in SDMX LAB.

# Reference Metadata

This notebook demonstrates Reference Metadata
### 

The essence of the refernce metadata section is to demonstrate the elements to be created for a simple data pipeline
system controlled by reference metadata (loosely coupled business logic, versioned, ...). In brief, metadata driven processes.

**Reference Metadata**
Category Scheme: A tree representing data pipeline process/workflow.
   [] Dataflows are attached to the tree IFF they are to be processed as a part of the workflow
   [] Annotation attached to Category Scheme to indicate the associated Metadataflow
   [] A metadata report is created for each dataflow. It has the 'process settings' for the dataflow and is attached to the dataflow
Codelists: For decision trees
Metadata Structure Definition: 
Metadataflows: One per process
Metadatasets: One per dataflow. The process settings for this dataflow.
Provision Agreements: according to needs of process workflow

See artifacts in FMR (after AllStructures.json is loaded)
See https://py.sdmx.io section on 'processes'


### 


In [None]:
from pathlib import Path
import json
import requests
import certifi
from lxml import etree

### POST A METADATA REPORT TO DATAFLOW

URL to POST a metadata report:  {fmr_url}/ws/secure/sdmx/v2/metadata
URL to GET a metadata report: {fmr_url}/sdmx/v2/metadata

In [None]:
from pathlib import Path
import json
import requests
import certifi
from lxml import etree
from getpass import getpass
from requests.auth import HTTPBasicAuth

root_dir = f"/Users/client-bis/dev/repos/data-pipelines" #FIX

# Prepare SDMX structural metadata artefacts
endpoint = f"{fmr_url}/ws/secure/sdmx/v2/metadata"
reports = {f"{root_dir}/data/PID001-pass.json",
        f"{root_dir}/data/PID001_DF2-pass.json",
        }

headers = {"Content-Type": "application/json"}

for path in reports: 
    # Submit and upload report to FMR
    payload = open(path, "rb")
    response = requests.post(
        endpoint, 
        data=payload, 
        headers=headers, 
        auth=(user, password),
        verify=False
    )

    # Display response
    if response.status_code in [200, 201]:
        print("Resource successfully created in FMR.")
        print("Response:", response.text)
    else:
        print(f"Failed to create resource. Status code: {response.status_code}")
        print("Response:", response.text)

# NOTE: replace with pysdmx calls once SDMX LAB auth access is ready.
# 
# next: open FMR UI, open dataflow, confirm metadata report is attached to dataflow.

### GET A METADATA REPORT TO DATAFLOW

URL to POST a metadata report:  {fmr_url}/ws/secure/sdmx/v2/metadata
URL to GET a metadata report: {fmr_url}/sdmx/v2/metadata


In [None]:
from pathlib import Path
import json
import requests
import certifi
from lxml import etree
from getpass import getpass
from requests.auth import HTTPBasicAuth

# Prepare SDMX structural metadata artifacts
endpoint = f"{fmr_url}/sdmx/v2/metadata/metadataset/BIS.SDMXIO/*/*"
headers = {"Content-Type": "application/json"}

# Retrieve metadatasets from FMR for http://localhost:8080/sdmx/v2/metadata/metadataset 
# ie. for BIS.SDMXIO/*/*
#  
if sdmx_lab: #auth
    response = requests.get(
        endpoint, 
        headers=headers, 
        auth=(user, password),
        verify=False
    )
else: # no auth
    response = requests.get(
        endpoint, 
        headers=headers, 
        verify=False
    )

# Display response
if response.status_code in [200, 201]:
    print("Resource successfully created in FMR.")
    print("Response:", response.text)
else:
    print(f"Failed to create resource. Status code: {response.status_code}")
    print("Response:", response.text)

#TODO - do the same thing now using pysdmx
#
# Open output in VS CODE or text editor ... and look at content.

# Data Pipelines

This notebook demonstrates the Data Pipeline workflow

### 

The essence of the data pipepline section is to take the building blocks seen earlier and string them together 
into a sequence of tasks guided by the content of the metadata report attached to each dataflow within
the Data Pipelines Category Scheme.

**Data Pipeline metadata:**
Category Scheme: A tree representing data pipeline process/workflow.
   [] Dataflows are attached to the tree IFF they are to be processed as a part of the workflow
   [] Annotation attached to Category Scheme to indicate the associated Metadataflow
   [] A metadata report is created for each dataflow. It has the 'process settings' for the dataflow and is attached to the dataflow

Processing: 
   Events trigger the pipeline. 
   The category scheme is traversed and each dataflow is processed according to the attached metadataflow.

See https://py.sdmx.io section on 'processes'


In [None]:
from pathlib import Path
import json
import requests
import certifi
from lxml import etree