1. LLM vs compound ai system => RAG
2. Compound ai system control logic can be:
    - rule-based: coded by humans
    - [AGENT] ai-based: dynamically planned by the ai every time (LLM reasoning capability improvement made this possible)
3. [AGENT]
    - reason (decompose the task)
    - act, by calling tools
        - external apis
        - search the web
        - vector database
        - calculator
        - some other specialist LLMs
        - ...
    - access memory
4. Different Agent types:
    - ReAct (reason + act): user query => LLM.pan / think => execute (maybe using external tools) => observe tool return {ko: iterate / ok: answer}
5. desinging compound systems
    - this is a sliding scale of LLM autonomy from full rule-based to full ai-based
    - when the scope is narrow and well defined, it is more efficient to design a rule-based system (e.g. the user is not expected to ask about the weather)
    - agentic approach is useful when the deterministic controlflow would get too complicated (so we don't mind the time and cost each individual execution takes compared to the time and cost developping the programatic approach would take)

In [1]:
from claude_chat_bot.SDMX_DataFlow import Dataflow
from LLM import model

# Agent demo:
1. get the sdmx descriptor for a dataflow
2. tokenize the whole of it to estimate cost (and conclude that parsing is king)
3. dict / paresed json based solution

In [2]:
fin_perstud = {"agency": "OECD.EDU.IMEP",
"id": "DSD_EAG_UOE_FIN@DF_UOE_INDIC_FIN_PERSTUD",
"version": "1.0",
"name": "Expenditure on educational institutions per full-time equivalent student",
"description": "This dataset contains data on expenditure per full-time equivalent student and per full-time equivalent student as a percentage of GPD per capita. The default table displays data for 2020 in current USD PPP and as a percentage of GDP per capita, from all expenditure sources, and unfiltered by type of expenditure. The selection can be changed to display data: by year, by source of expenditure, by destination of expenditure and by type of expenditure. Please note that some categories are mutually exclusive. </p><p>For more information, please consult <a href=\"https://www.oecd-ilibrary.org/sites/e13bef63-en/1/3/4/index.html?itemId=%20/content/publication/e13bef63-en&_csp_=a4f4b3d408c9dd70d167f10de61b8717&itemIGO=oecd&itemContentType=book\"><i>Education at a Glance 2023</i></a>. Additional details regarding the methodology used, references to the sources, and specific notes for each country can be found in <a href=\"https://www.oecd-ilibrary.org/sites/301fa18e-en/index.html?itemId=/content/component/301fa18e-en\"><i>Education at a Glance 2023 Sources, Methodologies and Technical Notes</i></a>."
}

dataflow_details_url = f'https://sdmx.oecd.org/public/rest/dataflow/{fin_perstud["agency"]}/{fin_perstud["id"]}/{fin_perstud["version"]}?references=all'
dataflow_details_url

'https://sdmx.oecd.org/public/rest/dataflow/OECD.EDU.IMEP/DSD_EAG_UOE_FIN@DF_UOE_INDIC_FIN_PERSTUD/1.0?references=all'

In [3]:
df_info = Dataflow(dataflow_details_url)

In [4]:
df_info.populate_variables()

No codelist found for TIME_PERIOD


In [5]:
# https://openai.com/api/pricing/
MODEL_PRICING_PER_M_TOKENS = {
    'gpt-4o': {'prompt_tokens': 5.00, 'completion_tokens': 15.00},
    'gpt-4o-2024-08-06': {'prompt_tokens': 2.50, 'completion_tokens': 10.00},
    'gpt-4o-mini': {'prompt_tokens': 0.150, 'completion_tokens': 0.600},
    'gpt-4o-mini-2024-07-18': {'prompt_tokens': 0.150, 'completion_tokens': 0.600},
    'o1-preview': {'prompt_tokens': 15.00, 'completion_tokens': 60.00}
}

In [6]:
# print the memory size of instance variables
import pickle
import tiktoken

# Load the encoding for the GPT model you're using
# For GPT-4 or GPT-3.5, you can use the `cl100k_base` encoding
encoding = tiktoken.encoding_for_model("gpt-4")

for var in vars(df_info):
    content = getattr(df_info, var)
    print(f'{var}: {len(pickle.dumps(content))}')
    text = str(content)
    # Tokenize the string
    tokens = encoding.encode(text)
    print(f"Number of tokens: {len(tokens)}")
    

url: 131
Number of tokens: 44
df_details_json: 476481
Number of tokens: 223153
df_name: 87
Number of tokens: 11
df_description: 1121
Number of tokens: 274
df_dimension_names: 522
Number of tokens: 147
df_code_names: 3093
Number of tokens: 1029


In [7]:
interesting_sessin_variables = ['df_name', 'df_description', 'df_dimension_names', 'df_code_names']

# Create a subset dictionary directly from the dataclass instance
interesting_content= {var_name: getattr(df_info, var_name) for var_name in interesting_sessin_variables}

# Serialize the subset dictionary to a JSON string
import json
json_str = json.dumps(interesting_content, indent=4)

# Print the JSON string
print(json_str)


{
    "df_name": "Expenditure on educational institutions per full-time equivalent student",
    "df_description": "This dataset contains data on expenditure per full-time equivalent student and per full-time equivalent student as a percentage of GPD per capita. The default table displays data for 2020 in current USD PPP and as a percentage of GDP per capita, from all expenditure sources, and unfiltered by type of expenditure. The selection can be changed to display data: by year, by source of expenditure, by destination of expenditure and by type of expenditure. Please note that some categories are mutually exclusive. </p><p>For more information, please consult <a href=\"https://www.oecd-ilibrary.org/sites/e13bef63-en/1/3/4/index.html?itemId=%20/content/publication/e13bef63-en&_csp_=a4f4b3d408c9dd70d167f10de61b8717&itemIGO=oecd&itemContentType=book\"><i>Education at a Glance 2023</i></a>. Additional details regarding the methodology used, references to the sources, and specific notes 

In [8]:
interesting_content.keys()

dict_keys(['df_name', 'df_description', 'df_dimension_names', 'df_code_names'])

# Agentic approach

In [9]:
#persona = """You are a data analyst working for a government agency. You have been tasked with analyzing the expenditure on educational institutions per full-time equivalent student. You need to understand the dataset's structure, including the dimension names and code names, to effectively filter and analyze the data. Your goal is to provide insights and recommendations based on the dataset to inform policy decisions and resource allocation in the education sector."""
    
def select_info(user_question):
    persona = """
    You are a coordinator helping data analysts at a government agency. 
    You have been asked to help them find the best data source to answer a specific user question."""
    prompt = f"""
    Please select the best information from the sources listed below to answer the user's question. 
    You are only allowed to choose one source. 
    Please respect the formatting! For example: your answer should look like "df_code_names".
    If you are not able to find the answer in the sources listed below, please select "None".

    ### Sources:
    1. [df_name]: Name of the dataset. Usually 5-10 words.
    2. [df_description]: Detailed description of the dataset, including what it contains and how it can be filtered.
    3. [df_dimension_names]: The list of all dimension codes of the DataFlow as keys and their corresponding names in English as values.
    - Example:
        - 'EDUCATION_LEV': 'Education level'
        - 'EXP_SOURCE': 'Financing source'
    4. [df_code_names]: The list of all codes used in the DataFlow as keys and their corresponding names in English as values.
        - Example:
            - 'EXP_SOURCE':
                - 'S13': 'General government'
                - 'S1D_NON_EDU': 'Private sector'

    ### User Question:
    {user_question}
    """
    return model(prompt, persona)

In [10]:
user_question = "What is the name of the dataset?"
key, cost = select_info(user_question)
print(f"Selected source: {key}, Cost: {cost}")
interesting_content[key]

Selected source: df_name, Cost: 4.65e-05


'Expenditure on educational institutions per full-time equivalent student'

In [11]:
def answer_question(user_question, interesting_content):
    key, cost = select_info(user_question)
    print(f"Selected source: {key}, Cost: {cost}")

    persona = """You are a helpful data analyst working for OECD."""
    prompt = f"""
    Please provide the answer to the following user question: 
    {user_question}
    Please use the information from the source that was selected as the best source to answer the question:
    {interesting_content[key]}
    """

    return model(prompt, persona)

In [12]:
user_question = "What is the name of the dataset?"
answer_question(user_question, interesting_content)

Selected source: df_name, Cost: 4.65e-05


('The name of the dataset is "Expenditure on educational institutions per full-time equivalent student."',
 2.22e-05)

In [18]:
user_question = "What is the definition of the education level 'ISCED11_34_44'?"
answer_question(user_question, interesting_content)

Selected source: None, Cost: 4.74e-05


KeyError: 'None'

# Embed the metadata for better search
- going through items individually to prepare adequate phrasing

In [83]:

def flatten_name_and_description(info):

    return [
          ("The DataFrame's name", info.df_name)
        , (info.df_name, "The DataFrame's name")
        , ("The DataFrame's description", info.df_description)
        , (info.df_description, "The DataFrame's description")
    ]

def flatten_dimensions(info):
    ans = []
    for code, name in info.df_dimension_names.items():
        meta_statement = f"The name that corresponds to the dimension code: '{code}' is {name}."
        ans.append((code, meta_statement))
        ans.append((name, meta_statement))
        ans.append((f"What name that corresponds to the dimension code: '{code}'?", meta_statement))
        ans.append((f"What is the dimension code for '{name}'?", meta_statement))
    return ans

def flatten_codes(info):
    ans = []
    for code_list_id in info.df_code_names:
        for code, name in info.df_code_names[code_list_id].items():
            meta_statement = f"The English name of the code '{code}' within the code list ID '{code_list_id}' is '{name}'."
            ans.append((code, meta_statement))
            ans.append((name, meta_statement))
            ans.append((f"What is the English name of the code '{code}' within the code list ID '{code_list_id}'?", meta_statement))
            ans.append((f"What is the code for '{name}' within the code list ID '{code_list_id}'?", meta_statement))
    return ans


In [84]:
flat_info_for_embedding = list(tuple())
flat_info_for_embedding.extend(flatten_name_and_description(df_info))
flat_info_for_embedding.extend(flatten_dimensions(df_info))
flat_info_for_embedding.extend(flatten_codes(df_info))

In [85]:
# Export the flattened information to a JSON file
import json
with open('flat_info_for_embedding.json', 'w') as f:
    json.dump(flat_info_for_embedding, f, indent=4)

In [86]:
flat_info_for_embedding

[("The DataFrame's name",
  'Expenditure on educational institutions per full-time equivalent student'),
 ('Expenditure on educational institutions per full-time equivalent student',
  "The DataFrame's name"),
 ("The DataFrame's description",
  'This dataset contains data on expenditure per full-time equivalent student and per full-time equivalent student as a percentage of GPD per capita. The default table displays data for 2020 in current USD PPP and as a percentage of GDP per capita, from all expenditure sources, and unfiltered by type of expenditure. The selection can be changed to display data: by year, by source of expenditure, by destination of expenditure and by type of expenditure. Please note that some categories are mutually exclusive. </p><p>For more information, please consult <a href="https://www.oecd-ilibrary.org/sites/e13bef63-en/1/3/4/index.html?itemId=%20/content/publication/e13bef63-en&_csp_=a4f4b3d408c9dd70d167f10de61b8717&itemIGO=oecd&itemContentType=book"><i>Educa

In [61]:
# Define a mapping of old keys to new keys
key_mapping = {
    'df_name': 'full_name_of_the_dataflow',
    'df_description': 'description_of_the_dataflow',
    'df_dimension_names': 'dimension_codes_and_their_english_names',
    'df_code_names': 'code_list_codes_and_their_english_names'
}

# Create a new dictionary with updated key names
new_interesting_content = {key_mapping.get(k, k): v for k, v in interesting_content.items()}

# Print the new dictionary
new_interesting_content.keys()

dict_keys(['full_name_of_the_dataflow', 'description_of_the_dataflow', 'dimension_codes_and_their_english_names', 'code_list_codes_and_their_english_names'])

This function does the following:

1. It uses a recursive approach to traverse the dictionary.
2. For each leaf node (where the value is not a dictionary or list), it creates two entries in the result list:
    - One with the key as the main key and (position, value) as the value.
    - Another with the value as the main key and (position, key) as the value.
3. For 'df_name' and 'df_description', it prefixes 'Code' to the key and 'Name' to the value in the resulting structures.
4. The position is represented as a dot-separated string of the keys leading to the leaf node.

In [66]:
def flatten_dictionary(d):
    result = []
    
    def flatten(data, path=None):
        if path is None:
            path = []
        
        if isinstance(data, dict):
            for key, value in data.items():
                new_path = path + [str(key)]
                if isinstance(value, (dict, list)):
                    flatten(value, new_path)
                else:
                    position = '.'.join(map(str, new_path))
                    if path and path[0] in ['df_name', 'df_description']:
                        result.append({f"Code: {key}": (position, value)})
                        result.append({f"Name: {value}": (position, key)})
                    else:
                        result.append({key: (position, value)})
                        result.append({value: (position, key)})
        elif isinstance(data, list):
            for i, item in enumerate(data):
                flatten(item, path + [str(i)])
    
    flatten(d)
    return result

In [67]:
flattened = flatten_dictionary(new_interesting_content)
for item in flattened:
    print(item)

{'full_name_of_the_dataflow': ('full_name_of_the_dataflow', 'Expenditure on educational institutions per full-time equivalent student')}
{'Expenditure on educational institutions per full-time equivalent student': ('full_name_of_the_dataflow', 'full_name_of_the_dataflow')}
{'description_of_the_dataflow': ('description_of_the_dataflow', 'This dataset contains data on expenditure per full-time equivalent student and per full-time equivalent student as a percentage of GPD per capita. The default table displays data for 2020 in current USD PPP and as a percentage of GDP per capita, from all expenditure sources, and unfiltered by type of expenditure. The selection can be changed to display data: by year, by source of expenditure, by destination of expenditure and by type of expenditure. Please note that some categories are mutually exclusive. </p><p>For more information, please consult <a href="https://www.oecd-ilibrary.org/sites/e13bef63-en/1/3/4/index.html?itemId=%20/content/publication/e

In [65]:
# export the flattened dictionary to a JSON file
import json
with open('flattened_interesting_content.json', 'w') as f:
    json.dump(flattened, f, indent=4)

Assuming each answer is always 100 tokens, independent of input size, we can calculate the cost for 1000 calls as follows:

**Model Pricing:**
- **gpt-4o**: $5.00 / 1M input tokens, $15.00 / 1M output tokens
- **gpt-4o-mini**: $0.150 / 1M input tokens, $0.600 / 1M output tokens
- **o1-preview**: $15.00 / 1M input tokens, $60.00 / 1M output tokens

```python
1000 * len(tokens) * MODEL_PRICING_PER_M_TOKENS[model_name]['prompt_tokens'] / 1000000  # Cost of prompt
1000 * 100 * MODEL_PRICING_PER_M_TOKENS[model_name]['completion_tokens'] / 1000000      # Cost of completion
```

In [46]:
df_info.df_details_json



text = str(df_info.df_details_json)
# Tokenize the string
tokens = encoding.encode(text)
print(f"Cost of prompt: {len(tokens) * MODEL_PRICING_PER_M_TOKENS['gpt-4o']['prompt_tokens'] / 1000000}")
print(f"Cost of completion: {100 * MODEL_PRICING_PER_M_TOKENS['gpt-4o']['completion_tokens'] / 1000000}")

Cost of prompt: 1.115765
Cost of completion: 0.0015


In [47]:
import tiktoken
encoding = tiktoken.encoding_for_model("gpt-4")
# json to text to tokens:
raw_json_text = str(df_info.df_details_json)
raw_json_tokens = encoding.encode(text)
parsed_metadata_text = str(df_info.df_code_names)
parsed_metadata_tokens = encoding.encode(parsed_metadata_text)
completion_tokens = 100

for model_name in ("gpt-4o","gpt-4o-mini", "o1-preview"):
    print(f"Model: {model_name}:")
    raw_prompt_cost = 1000 * len(raw_json_tokens) * MODEL_PRICING_PER_M_TOKENS[model_name]['prompt_tokens'] / 1000000
    parsed_prompt_cost = 1000 * len(parsed_metadata_tokens) * MODEL_PRICING_PER_M_TOKENS[model_name]['prompt_tokens'] / 1000000
    completion_cost = 1000 * completion_tokens * MODEL_PRICING_PER_M_TOKENS[model_name]['completion_tokens'] / 1000000
    print(f"Total cost for 1000 calls on the raw SDMX metadata: ${raw_prompt_cost + completion_cost}")
    print(f"Total cost for 1000 calls on the parsed code names: ${parsed_prompt_cost + completion_cost}")
    print("==========================")



Model: gpt-4o:
Total cost for 1000 calls on the raw SDMX metadata: $1117.265
Total cost for 1000 calls on the parsed code names: $6.645
Model: gpt-4o-mini:
Total cost for 1000 calls on the raw SDMX metadata: $33.53295
Total cost for 1000 calls on the parsed code names: $0.21434999999999998
Model: o1-preview:
Total cost for 1000 calls on the raw SDMX metadata: $3353.295
Total cost for 1000 calls on the parsed code names: $21.435000000000002


In [48]:
! pip install tiktoken


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [49]:
import tiktoken

# Load the encoding for the GPT model you're using
# For GPT-4 or GPT-3.5, you can use the `cl100k_base` encoding
encoding = tiktoken.encoding_for_model("gpt-4")

# Your string
text = "This is a sample string to estimate token usage."

# Tokenize the string
tokens = encoding.encode(text)

# Number of tokens
num_tokens = len(tokens)

# https://platform.openai.com/tokenizer
print(f"Number of tokens: {num_tokens}")

Number of tokens: 10


In [50]:
import requests

class SDMXAgent:
    """
    We can only fire up the agent with the meta dataflow URL already set.
    (The LLM is also required.)
    TODO: print confirmation messages to each step.
    """
    def __init__(self, model, url):
        self.url = url
        self.model = model
        self.df_details_json = None
        self.df_name = None
        self.df_description = None
        self.df_dimension_names = None
        self.df_codes = None

    def populate_dataflow_variables(self):
        self.get_dataflow_details_json()
        self.get_dataflow_name()
        self.get_dataflow_description()
        self.get_dataflow_dimensions_names()
        self.get_dataflow_code_names()

    def get_dataflow_details_json(self):
        dataflow_details_result = requests.get(self.url, headers={
            'Accept': 'application/vnd.sdmx.structure+json;version=1.0;urn=true'
            })
        if str(dataflow_details_result.status_code)[0] != '2':
            print(f'Error getting dataflow details: {dataflow_details_result.status_code}')
        else:
            self.df_details_json = dataflow_details_result.json()['data']
            
    def get_dataflow_name(self):
        self.df_details_json['dataflows'][0]['name']

    def get_dataflow_description(self):
        self.df_description = self.df_details_json['dataflows'][0]['description']

    def get_dataflow_dimensions_names(self):
        self.df_dimension_names = dict()
        for item in self.df_details_json['conceptSchemes'][0]['concepts']:
            self.df_dimension_names[item['id']] = item['name']

    def get_dataflow_code_names(self):
        pass

    def get_dataflow_constrainted_codes_and_names(dataflow_details_json: dict) -> dict:
        code_list_urns = get_code_list_urns(dataflow_details_json)
        code_list_id_urns = dict()
        for code_list_item in dataflow_details_json['codelists']:
            code_list_id_urns[code_list_item['links'][0]['urn']] = [(i['id'], i['name']) for i in code_list_item['codes']]
        code_list_id_urns
        attributes = parse_SDMX_contentConstraints_cubeRegions_keyValues(dataflow_details_json['contentConstraints'][0]['cubeRegions'][0]['keyValues'])

        constrained_codes_and_names = dict()
        for dimension_code in attributes:
            codes = attributes[dimension_code]
            try:
                all_codes_and_names = code_list_id_urns[code_list_urns[dimension_code]]
                all_codes_and_names = {code: name for code, name in all_codes_and_names}
            except:
                all_codes_and_names = {code: 'No codelist found' for code in codes}
                print(f'No codelist found for {dimension_code}')
            constrained_codes_and_names[dimension_code] = {code: all_codes_and_names[code] for code in codes}

        return constrained_codes_and_names


In [51]:
from datetime import datetime
import requests

class Dataflow:
    """
    We can only fire up the agent with the meta dataflow URL already set.
    TODO: print confirmation messages to each step.
    """
    def __init__(self, url):
        self.url = url
        self.df_details_json = None
        self.df_name = None
        self.df_description = None
        self.df_dimension_names = None
        self.df_code_names = None

    def populate_variables(self):
        self.get_df_details_json()
        self.get_df_name()
        self.get_df_description()
        self.get_df_dimensions_names()
        self.get_df_constrainted_codes_and_names()

    def get_df_details_json(self):
        dataflow_details_result = requests.get(self.url, headers={
            'Accept': 'application/vnd.sdmx.structure+json;version=1.0;urn=true'
            })
        if str(dataflow_details_result.status_code)[0] != '2':
            print(f'Error getting dataflow details: {dataflow_details_result.status_code}')
        else:
            self.df_details_json = dataflow_details_result.json()['data']
            
    def get_df_name(self):
        self.df_name = self.df_details_json['dataflows'][0]['name']

    def get_df_description(self):
        self.df_description = self.df_details_json['dataflows'][0]['description']

    def get_df_dimensions_names(self):
        self.df_dimension_names = dict()
        for item in self.df_details_json['conceptSchemes'][0]['concepts']:
            self.df_dimension_names[item['id']] = item['name']

    def get_df_constrainted_codes_and_names(self):
        code_list_urns = self.__get_code_list_urns()
        code_list_id_urns = dict()
        for code_list_item in self.df_details_json['codelists']:
            code_list_id_urns[code_list_item['links'][0]['urn']] = [(i['id'], i['name']) for i in code_list_item['codes']]
        code_list_id_urns
        attributes = self.__parse_SDMX_contentConstraints_cubeRegions_keyValues()

        constrained_codes_and_names = dict()
        for dimension_code in attributes:
            codes = attributes[dimension_code]
            try:
                all_codes_and_names = code_list_id_urns[code_list_urns[dimension_code]]
                all_codes_and_names = {code: name for code, name in all_codes_and_names}
            except:
                all_codes_and_names = {code: 'No codelist found' for code in codes}
                print(f'No codelist found for {dimension_code}')
            constrained_codes_and_names[dimension_code] = {code: all_codes_and_names[code] for code in codes}

        self.df_code_names = constrained_codes_and_names
    
    def __get_code_list_urns(self) -> dict:
        code_list_urns = dict()

        for dsd_i in self.df_details_json['dataStructures'][0]['dataStructureComponents']['dimensionList']['dimensions']:
            code_list_urns[dsd_i['id']] = dsd_i['localRepresentation']['enumeration']

        return code_list_urns
    
    def __parse_SDMX_contentConstraints_cubeRegions_keyValues(self) -> dict:
        attributes = dict()
        for item in self.df_details_json['contentConstraints'][0]['cubeRegions'][0]['keyValues']:
            _, value_type = item
            attr_id = item['id']
            if value_type == 'values':
                attributes[attr_id] = item[value_type]
            elif value_type == 'timeRange':
                attributes[attr_id] = self.__get_year_from_iso_date(item[value_type])
            else:
                raise ValueError(f'Unknown value type: {value_type}')
        return attributes
    
    def __get_year_from_iso_date(self, sdmx_time_period_obj: str) -> int:

        # Extract the iso datetime string
        sdmx_iso_start_period = sdmx_time_period_obj['startPeriod']['period']
        sdmx_iso_end_period = sdmx_time_period_obj['endPeriod']['period']

        # convert the start and end period to datetime objects and extract the year
        start_year = datetime.fromisoformat(sdmx_iso_start_period).year
        end_year = datetime.fromisoformat(sdmx_iso_end_period).year
        
        return [start_year, end_year]

    


In [52]:
fin_perstud = {"agency": "OECD.EDU.IMEP",
"id": "DSD_EAG_UOE_FIN@DF_UOE_INDIC_FIN_PERSTUD",
"version": "1.0",
"name": "Expenditure on educational institutions per full-time equivalent student",
"description": "This dataset contains data on expenditure per full-time equivalent student and per full-time equivalent student as a percentage of GPD per capita. The default table displays data for 2020 in current USD PPP and as a percentage of GDP per capita, from all expenditure sources, and unfiltered by type of expenditure. The selection can be changed to display data: by year, by source of expenditure, by destination of expenditure and by type of expenditure. Please note that some categories are mutually exclusive. </p><p>For more information, please consult <a href=\"https://www.oecd-ilibrary.org/sites/e13bef63-en/1/3/4/index.html?itemId=%20/content/publication/e13bef63-en&_csp_=a4f4b3d408c9dd70d167f10de61b8717&itemIGO=oecd&itemContentType=book\"><i>Education at a Glance 2023</i></a>. Additional details regarding the methodology used, references to the sources, and specific notes for each country can be found in <a href=\"https://www.oecd-ilibrary.org/sites/301fa18e-en/index.html?itemId=/content/component/301fa18e-en\"><i>Education at a Glance 2023 Sources, Methodologies and Technical Notes</i></a>."
}

dataflow_details_url = f'https://sdmx.oecd.org/public/rest/dataflow/{fin_perstud["agency"]}/{fin_perstud["id"]}/{fin_perstud["version"]}?references=all'
dataflow_details_url

'https://sdmx.oecd.org/public/rest/dataflow/OECD.EDU.IMEP/DSD_EAG_UOE_FIN@DF_UOE_INDIC_FIN_PERSTUD/1.0?references=all'

In [53]:
df_info = Dataflow(dataflow_details_url)

In [54]:
df_info.populate_variables()

No codelist found for TIME_PERIOD


In [55]:
# list of instance variables:
vars(df_info).keys()

dict_keys(['url', 'df_details_json', 'df_name', 'df_description', 'df_dimension_names', 'df_code_names'])

In [56]:
# print the memory size of instance variables
import pickle
for var in vars(df_info):
    print(f'{var}: {len(pickle.dumps(getattr(df_info, var)))}')

url: 131
df_details_json: 476481
df_name: 87
df_description: 1121
df_dimension_names: 522
df_code_names: 3093


In [57]:
df_info.df_dimension_names

{'MEASURE': 'Measure',
 'REF_AREA': 'Reference area',
 'EDUCATION_LEV': 'Education level',
 'INTENSITY': 'Intensity',
 'EXP_SOURCE': 'Financing source',
 'EXP_DESTINATION': 'Destination of expenditure',
 'EXPENDITURE_TYPE': 'Type of expenditure',
 'INST_TYPE_EDU': 'Type of educational institution',
 'PRICE_BASE': 'Price base',
 'BASE_PER': 'Base period',
 'TIME_PERIOD': 'Time period',
 'OBS_VALUE': 'Observation value',
 'OBS_STATUS': 'Observation status',
 'UNIT_MULT': 'Unit multiplier',
 'UNIT_MEASURE': 'Unit of measure',
 'DECIMALS': 'Decimals'}

In [58]:
df_info.df_code_names['TIME_PERIOD']

{1995: 'No codelist found', 2021: 'No codelist found'}

In [59]:
df_info.df_code_names['EXPENDITURE_TYPE']

{'ASERV': 'Expenditure for ancillary services',
 'CORE': 'Expenditure for core services',
 'DIR_EXP': 'Expenditure for educational institutions',
 'NORD': 'Excluding research and development (R&D)',
 'RD': 'Expenditure for R&D in educational institutions'}

# refactoring the SDMX class

In [33]:
from datetime import datetime
import requests
from typing import Dict, List, Tuple, Union

class Dataflow:
    """
    A class to extract and manage information from an SDMX dataflow.
    """
    def __init__(self, url: str):
        self.url = url
        self.df_details_json: Dict = {}
        self.df_name: str = ""
        self.df_description: str = ""
        self.df_dimension_names: Dict[str, str] = {}
        self.df_code_names: Dict[str, Dict[str, str]] = {}

    def populate_variables(self):
        """Populate all variables with data from the dataflow."""
        self._get_df_details_json()
        self._extract_dataflow_info()
        self._extract_dimension_names()
        self._extract_constrained_codes_and_names()

    def _get_df_details_json(self):
        """Fetch the dataflow details JSON from the URL."""
        headers = {'Accept': 'application/vnd.sdmx.structure+json;version=1.0;urn=true'}
        response = requests.get(self.url, headers=headers)
        response.raise_for_status()
        self.df_details_json = response.json()['data']

    def _extract_dataflow_info(self):
        """Extract basic dataflow information."""
        dataflow = self.df_details_json['dataflows'][0]
        self.df_name = dataflow['name']
        self.df_description = dataflow['description']

    def _extract_dimension_names(self):
        """Extract dimension names from the dataflow."""
        concepts = self.df_details_json['conceptSchemes'][0]['concepts']
        self.df_dimension_names = {item['id']: item['name'] for item in concepts}

    def _extract_constrained_codes_and_names(self):
        """Extract constrained codes and names from the dataflow."""
        code_list_urns = self._get_code_list_urns()
        code_list_id_urns = self._get_code_list_id_urns()
        attributes = self._parse_content_constraints()

        for dimension_code, codes in attributes.items():
            try:
                all_codes_and_names = dict(code_list_id_urns[code_list_urns[dimension_code]])
            except KeyError:
                all_codes_and_names = {code: 'No codelist found' for code in codes}
                print(f'No codelist found for {dimension_code}')
            
            self.df_code_names[dimension_code] = {
                code: all_codes_and_names.get(code, 'Unknown code')
                for code in codes
            }

    def _get_code_list_urns(self) -> Dict[str, str]:
        """Get code list URNs from the dataflow."""
        dimensions = self.df_details_json['dataStructures'][0]['dataStructureComponents']['dimensionList']['dimensions']
        return {
            dim['id']: dim['localRepresentation']['enumeration']
            for dim in dimensions
        }

    def _get_code_list_id_urns(self) -> Dict[str, List[Tuple[str, str]]]:
        """Get code list ID URNs from the dataflow."""
        return {
            codelist['links'][0]['urn']: [(code['id'], code['name']) for code in codelist['codes']]
            for codelist in self.df_details_json['codelists']
        }

    def _parse_content_constraints(self) -> Dict[str, Union[List[str], List[int]]]:
        """Parse content constraints from the dataflow."""
        attributes = {}
        for item in self.df_details_json['contentConstraints'][0]['cubeRegions'][0]['keyValues']:
            attr_id = item['id']
            value_type = next(key for key in item.keys() if key not in ['id', 'type'])
            
            if value_type == 'values':
                attributes[attr_id] = item[value_type]
            elif value_type == 'timeRange':
                attributes[attr_id] = self._get_year_range(item[value_type])
            else:
                raise ValueError(f'Unknown value type: {value_type}')
        return attributes

    @staticmethod
    def _get_year_range(time_period_obj: Dict[str, Dict[str, str]]) -> List[int]:
        """Extract year range from a time period object."""
        start_year = datetime.fromisoformat(time_period_obj['startPeriod']['period']).year
        end_year = datetime.fromisoformat(time_period_obj['endPeriod']['period']).year
        return [start_year, end_year]

In [34]:
df_info = Dataflow(dataflow_details_url)

In [35]:
df_info.populate_variables()

No codelist found for TIME_PERIOD


In [36]:
# print the memory size of instance variables
import pickle
for var in vars(df_info):
    print(f'{var}: {len(pickle.dumps(getattr(df_info, var)))}')

url: 131
df_details_json: 476481
df_name: 87
df_description: 1121
df_dimension_names: 522
df_code_names: 3093


In [37]:
from dataclasses import dataclass, field
from datetime import datetime
import requests
#from typing import Dict, List, Tuple, Union

@dataclass
class Dataflow:
    """
    A dataclass to extract and manage information from an SDMX dataflow.
    """
    url: str
    df_details_json: dict = field(default_factory=dict)
    df_name: str = ""
    df_description: str = ""
    df_dimension_names: dict[str, str] = field(default_factory=dict)
    df_code_names: dict[str, dict[str, str]] = field(default_factory=dict)

    ACCEPT_HEADER = {'Accept': 'application/vnd.sdmx.structure+json;version=1.0;urn=true'}

    def populate_variables(self):
        """Populate all variables with data from the dataflow."""
        self._get_df_details_json()
        self._extract_dataflow_info()
        self._extract_dimension_names()
        self._extract_constrained_codes_and_names()

    def _get_df_details_json(self):
        """Fetch the dataflow details JSON from the URL."""
        response = requests.get(self.url, headers=self.ACCEPT_HEADER)
        response.raise_for_status()
        self.df_details_json = response.json()['data']

    def _extract_dataflow_info(self):
        """Extract basic dataflow information."""
        dataflow = self.df_details_json['dataflows'][0]
        self.df_name = dataflow['name']
        self.df_description = dataflow['description']

    def _extract_dimension_names(self):
        """Extract dimension names from the dataflow."""
        concepts = self.df_details_json['conceptSchemes'][0]['concepts']
        self.df_dimension_names = {item['id']: item['name'] for item in concepts}

    def _extract_constrained_codes_and_names(self):
        """Extract constrained codes and names from the dataflow."""
        code_list_urns = self._get_code_list_urns()
        code_list_id_urns = self._get_code_list_id_urns()
        attributes = self._parse_content_constraints()

        for dimension_code, codes in attributes.items():
            try:
                all_codes_and_names = dict(code_list_id_urns[code_list_urns[dimension_code]])
            except KeyError:
                all_codes_and_names = {code: 'No codelist found' for code in codes}
                print(f'No codelist found for {dimension_code}')
            
            self.df_code_names[dimension_code] = {
                code: all_codes_and_names.get(code, 'Unknown code')
                for code in codes
            }

    def _get_code_list_urns(self) -> dict[str, str]:
        """Get code list URNs from the dataflow."""
        dimensions = self.df_details_json['dataStructures'][0]['dataStructureComponents']['dimensionList']['dimensions']
        return {
            dim['id']: dim['localRepresentation']['enumeration']
            for dim in dimensions
        }

    def _get_code_list_id_urns(self) -> dict[str, dict[Tuple[str, str]]]:
        """Get code list ID URNs from the dataflow."""
        return {
            codelist['links'][0]['urn']: [(code['id'], code['name']) for code in codelist['codes']]
            for codelist in self.df_details_json['codelists']
        }

    def _parse_content_constraints(self) -> dict[str, Union[list[str], list[int]]]:
        """Parse content constraints from the dataflow."""
        attributes = {}
        for item in self.df_details_json['contentConstraints'][0]['cubeRegions'][0]['keyValues']:
            attr_id = item['id']
            value_type = next(key for key in item.keys() if key not in ['id', 'type'])
            
            if value_type == 'values':
                attributes[attr_id] = item[value_type]
            elif value_type == 'timeRange':
                attributes[attr_id] = self._get_year_range(item[value_type])
            else:
                raise ValueError(f'Unknown value type: {value_type}')
        return attributes

    @staticmethod
    def _get_year_range(time_period_obj: dict[str, dict[str, str]]) -> list[int]:
        """Extract year range from a time period object."""
        start_year = datetime.fromisoformat(time_period_obj['startPeriod']['period']).year
        end_year = datetime.fromisoformat(time_period_obj['endPeriod']['period']).year
        return [start_year, end_year]

In [38]:
#from __future__ import annotations  # This allows us to use | for union types in Python 3.7+
from dataclasses import dataclass, field
from datetime import datetime
import requests

@dataclass
class Dataflow:
    """
    A dataclass to extract and manage information from an SDMX dataflow.
    """
    url: str
    df_details_json: dict = field(default_factory=dict)
    df_name: str = ""
    df_description: str = ""
    df_dimension_names: dict[str, str] = field(default_factory=dict)
    df_code_names: dict[str, dict[str, str]] = field(default_factory=dict)

    ACCEPT_HEADER = {'Accept': 'application/vnd.sdmx.structure+json;version=1.0;urn=true'}

    def populate_variables(self) -> None:
        """Populate all variables with data from the dataflow."""
        self._get_df_details_json()
        self._extract_dataflow_info()
        self._extract_dimension_names()
        self._extract_constrained_codes_and_names()

    def _get_df_details_json(self) -> None:
        """Fetch the dataflow details JSON from the URL."""
        response = requests.get(self.url, headers=self.ACCEPT_HEADER)
        response.raise_for_status()
        self.df_details_json = response.json()['data']

    def _extract_dataflow_info(self) -> None:
        """Extract basic dataflow information."""
        dataflow = self.df_details_json['dataflows'][0]
        self.df_name = dataflow['name']
        self.df_description = dataflow['description']

    def _extract_dimension_names(self) -> None:
        """Extract dimension names from the dataflow."""
        concepts = self.df_details_json['conceptSchemes'][0]['concepts']
        self.df_dimension_names = {item['id']: item['name'] for item in concepts}

    def _extract_constrained_codes_and_names(self) -> None:
        """Extract constrained codes and names from the dataflow."""
        code_list_urns = self._get_code_list_urns()
        code_list_id_urns = self._get_code_list_id_urns()
        attributes = self._parse_content_constraints()

        for dimension_code, codes in attributes.items():
            try:
                all_codes_and_names = dict(code_list_id_urns[code_list_urns[dimension_code]])
            except KeyError:
                all_codes_and_names = {code: 'No codelist found' for code in codes}
                print(f'No codelist found for {dimension_code}')
            
            self.df_code_names[dimension_code] = {
                code: all_codes_and_names.get(code, 'Unknown code')
                for code in codes
            }

    def _get_code_list_urns(self) -> dict[str, str]:
        """Get code list URNs from the dataflow."""
        dimensions = self.df_details_json['dataStructures'][0]['dataStructureComponents']['dimensionList']['dimensions']
        return {
            dim['id']: dim['localRepresentation']['enumeration']
            for dim in dimensions
        }

    def _get_code_list_id_urns(self) -> dict[str, list[tuple[str, str]]]:
        """Get code list ID URNs from the dataflow."""
        return {
            codelist['links'][0]['urn']: [(code['id'], code['name']) for code in codelist['codes']]
            for codelist in self.df_details_json['codelists']
        }

    def _parse_content_constraints(self) -> dict[str, list[str] | list[int]]:
        """Parse content constraints from the dataflow."""
        attributes = {}
        for item in self.df_details_json['contentConstraints'][0]['cubeRegions'][0]['keyValues']:
            attr_id = item['id']
            value_type = next(key for key in item.keys() if key not in ['id', 'type'])
            
            if value_type == 'values':
                attributes[attr_id] = item[value_type]
            elif value_type == 'timeRange':
                attributes[attr_id] = self._get_year_range(item[value_type])
            else:
                raise ValueError(f'Unknown value type: {value_type}')
        return attributes

    @staticmethod
    def _get_year_range(time_period_obj: dict[str, dict[str, str]]) -> list[int]:
        """Extract year range from a time period object."""
        start_year = datetime.fromisoformat(time_period_obj['startPeriod']['period']).year
        end_year = datetime.fromisoformat(time_period_obj['endPeriod']['period']).year
        return [start_year, end_year]

In [39]:
df_info = Dataflow(dataflow_details_url)

In [40]:
df_info.populate_variables()

No codelist found for TIME_PERIOD


In [41]:
# print the memory size of instance variables
import pickle
for var in vars(df_info):
    print(f'{var}: {len(pickle.dumps(getattr(df_info, var)))}')

url: 131
df_details_json: 476481
df_name: 87
df_description: 1121
df_dimension_names: 522
df_code_names: 3093
