In [0]:
%pip install google-genai eurostat --quiet

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import eurostat
from google import genai
from pydantic import BaseModel
from pyspark.sql.functions import substring, length, regexp_replace, when, col, sum, max
from pyspark.sql.types import IntegerType
import re
import json

#### Eurostat Fact Tables
Description of eurostat module: https://pypi.org/project/eurostat/<br>
Eurostat database: https://ec.europa.eu/eurostat/databrowser/explore/all/

##### Define functions for data loading

In [0]:
def get_data_spark_df(code):

    data = eurostat.get_data(code)
    header = [h.split('\\')[0] for h in data[0]]
    rows = data[1:]

    df = spark.createDataFrame(rows,header)
    return df

def get_mapping_spark_df(code, param):

    rows = eurostat.get_dic(code,param)
    headers = ['value','description']

    df = spark.createDataFrame(rows,headers)
    return df

eu_iso2_codes = [
    "AT", "BE", "BG", "HR", "CY", "CZ", "DK", "EE", "FI", "FR",
    "DE", "EL", "HU", "IE", "IT", "LV", "LT", "LU", "MT", "NL",
    "PL", "PT", "RO", "SK", "SI", "ES", "SE"
]

#####Population Split

In [0]:
# Population on 1 January by age, sex
code = 'demo_pjan'

# load data from API
df = get_data_spark_df(code)

# perform all the transformations
df_transformed = df \
    .withColumnsRenamed({'geo':'Country_Code'}) \
    .withColumn('Age',regexp_replace('age','Y','').try_cast('int')) \
    .filter( 
        (df.sex.isin(['M','F'])) & 
        (col('Country_Code').isin(eu_iso2_codes)) &
        (col('Age').isNotNull())) \
    .select('Age','Sex','Country_Code','2016','2019','2022') \
    .unpivot(
        ['Age','Sex','Country_Code'],
        ['2016','2019','2022'], 'Year', 'Population') \
    .filter(col('Population') > 0)

# create UC table
unity_catalog_table_name = 'workspace.general.population_split'
df_transformed.write.option('mergeSchema', 'true').mode('overwrite').saveAsTable(unity_catalog_table_name)


##### Healthcare Expenditure

In [0]:
# health care expenditure by financing scheme
code = 'hlth_sha11_hf'

# load data from API
data = get_data_spark_df(code)
map_health = get_mapping_spark_df(code,'icha11_hf')
map_unit = get_mapping_spark_df(code,'unit')

# base codes of expenditure classification
sha_11_codes = ['HF1','HF2','HF3','HF4']

# do transformations for final df
df_transformed = data \
    .withColumnRenamed('geo','Country_Code').withColumnRenamed('icha11_hf','SHA_11') \
    .filter(
        (col('Country_Code').isin(eu_iso2_codes)) & 
        (col('SHA_11').isin(sha_11_codes))) \
    .join( map_health, col('SHA_11') == map_health.value, 'left' ) \
    .join( map_unit, col('unit') == map_unit.value, 'left' ) \
    .select('SHA_11','Country_Code','2016','2019','2022', 
            map_health.description.alias('Financing_schema'), map_unit.description.alias('unit')) \
    .withColumn('Financing_type',
        when(col('SHA_11') == 'HF1', 'Government and compulsory schemes')
        .when(col('SHA_11') == 'HF2', 'Voluntary schemes')
        .when(col('SHA_11') == 'HF3', 'Out-of-pocket payments')
        .otherwise('Rest of world')) \
    .unpivot(
        ['Country_Code','Financing_schema','unit','SHA_11','Financing_type'], 
        ['2016','2019','2022'], 'Year', 'Value') \
    .groupBy('Country_Code','Year','SHA_11','Financing_type','Financing_schema').pivot('unit').max('Value')

# remove spaces or other illegal characters from column names
df_final = df_transformed.toDF(*[re.sub(r'[^0-9a-zA-Z_]','_',c) for c in df_transformed.columns])

# create UC table
unity_catalog_table_name = 'workspace.eurostat.healthcare_expenditure'
df_final.write.option('mergeSchema', 'true').mode('overwrite').saveAsTable(unity_catalog_table_name)


##### Own Health Assesement

In [0]:
# Self-perceived health by sex, age and educational attainment level
code = 'hlth_silc_02'

# load data from API
data = get_data_spark_df(code)
map_levels = get_mapping_spark_df(code,'levels')
population = spark.read.table('workspace.general.population_split')

# do transformations for final df
df_transformed = data \
    .withColumnsRenamed({'geo':'Country_Code', 'age':'Age_Group'}) \
    .filter( 
        (col('Country_Code').isin(eu_iso2_codes)) &
        (col('levels').isin(['VGOOD', 'GOOD', 'FAIR', 'BAD', 'VBAD'])) &
        (col('isced11') == 'TOTAL') &
        (col('sex').isin(['M','F'])) &
        (col('Age_Group').isin(['Y16-24', 'Y25-34', 'Y35-44', 'Y45-54', 'Y55-64', 'Y_GE65']))) \
    .withColumn('Age_Group', when(col('Age_Group') == 'Y_GE65', 'Y65+').otherwise(col('Age_Group'))) \
    .join( map_levels, data.levels == map_levels.value, 'left' ) \
    .select('Age_Group','Country_Code','sex', '2016','2019','2022', map_levels.description.alias('Health_Assesement')) \
    .unpivot(
        ['Age_Group','Country_Code','Sex','Health_Assesement'], 
        ['2016','2019','2022'], 'Year', 'Percentage')

# joining actual population numbers to calculate number of people in each category from percantage info
df_final = df_transformed.alias('d') \
    .join(population.alias('p'),
        (col('d.Country_Code') == col('p.Country_Code')) & 
        (col('d.Sex')==col('p.Sex')) & 
        (col('d.Year')==col('p.Year')) &
        (col('d.Age_Group') == when(col('p.Age') >= 65, 'Y65+')
        .when(col('p.Age') >= 55, 'Y55-64')
        .when(col('p.Age') >= 45, 'Y45-54')
        .when(col('p.Age') >= 35, 'Y35-44')
        .when(col('p.Age') >= 25, 'Y25-34')
        .when(col('p.Age') >= 16, 'Y16-24')
        .otherwise('0')),'left') \
    .select('d.*' , 'Population') \
    .groupBy(df_transformed.columns).agg(sum('Population').alias('Population')) \
    .withColumn('Number_of_People', ((col('Percentage')/100)*col('Population')).cast('int')) \
    .drop('Population', 'Percentage')

# create UC table
unity_catalog_table_name = 'workspace.eurostat.own_health_assesement'
df_final.write.option('mergeSchema', 'true').mode('overwrite').saveAsTable(unity_catalog_table_name)


#### Additional Dim Tables

##### Countries Information
Use Gemin API for structured output: https://ai.google.dev/gemini-api/docs/structured-output?hl=pl<br>
Configuring google-genai module: https://pypi.org/project/google-genai/

In [0]:
# create LLM function to generate structured dictionary with country information
def ask_gemini(prompt:str) -> list:

    class Country(BaseModel):
        Country_Name: str
        Country_ISO2_code: str
        Capital_City: str
        Currency_Name: str
        Currency_Code: str
        Estimated_Population: int
        Year_of_joining_EU: int

    secret_value = dbutils.secrets.get(scope = "ai-secrets", key = "gemini-api")
    client = genai.Client(api_key=secret_value)

    response = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt,
        config={
            "response_mime_type": "application/json",
            "response_schema": list[Country]
        },
    )

    result_list = json.loads(response.text)
    return result_list

# generate dict content and load it to spark dataframe
countries_dict = ask_gemini("List all current EU member countries and provide some additional information")
df = spark.createDataFrame(countries_dict)


In [0]:
# add some more columns based on data created by LLM
df_final = df \
    .withColumn('Country_Size', 
        when(col('Estimated_Population') <= 6 * pow(10,6), 'Small')
        .when(col('Estimated_Population') <= 20 * pow(10,6), 'Medium')
        .otherwise('Big')
    ) \
    .withColumn('Currency_Zone',
        when(col('Currency_Code') == 'EUR', 'Euro Zone')
        .otherwise('National Currency')
    ) \
    .withColumn('EU_Member_Status',
        when(col('Year_of_joining_EU') < 2000, 'Old Member')
        .otherwise('New Member')
    )
# display(df_augmented)

# create UC table
unity_catalog_table_name = 'workspace.eurostat.dim_countries'
df_final.write.option('mergeSchema', 'true').mode('overwrite').saveAsTable(unity_catalog_table_name)


####Semantic layer
Creating comment descriptions for tables and columns that can be used by AI agent for providing more context data and generating better answers<br>
For more complex data one can utilize metric views to get more robust approach: https://docs.databricks.com/aws/en/metric-views/

In [0]:
%sql

USE CATALOG workspace;
USE SCHEMA eurostat;

-- own_health_assesement ------------------------------------------------------------------

-- -- table ------------------------------------------------------------------------------
COMMENT ON TABLE own_health_assesement IS
'This table contains the self-reported subjective 5-level health assesment of the population with additional split by age group, sex and year. Main numeric column for calculations is Number_of_People';

-- -- columns ---------------------------------------------------------------------------
COMMENT ON COLUMN own_health_assesement.Age_Group IS
'Segements of the population by age group, defined within 10 year buckets, starting from 16 years old. Unique values are Y16-24, Y25-34, Y35-44, Y45-54, Y55-64, 65+';
  
COMMENT ON COLUMN own_health_assesement.Country_Code IS
'ISO2 country codes of all current EU member states. Column is a foreign key to Country_ISO2_code column in dim_countries table where the additional country info like country name is stored';

COMMENT ON COLUMN own_health_assesement.Health_Assesement IS
'The subjective assesement of reported health status. It is measured using some variant of Likert scale, where "very bad" and "bad" are used to indicate negative assesemnt, "good" and "very good" are used to indicate positive assesment and "fair" means neutral';

COMMENT ON COLUMN own_health_assesement.Sex IS
'Sex of the population. Unique values are "M" for male and "F" for female';

COMMENT ON COLUMN own_health_assesement.Year IS
'Year of the research by eurostat. Unique values are 2016, 2019, 2022';

-- healthcare_expenditure ----------------------------------------------------------------

-- -- table ------------------------------------------------------------------------------
COMMENT ON TABLE healthcare_expenditure IS
'Table contains information about expenditures related to health and healthcare in all EU countries, measured using different units and classified usning SHA-11 classification system';

-- -- columns ----------------------------------------------------------------------------
COMMENT ON COLUMN healthcare_expenditure.Country_Code IS
'ISO2 country codes of all current EU member states. Column is a foreign key to Country_ISO2_code column in dim_countries table where the additional country info like country name is stored';

COMMENT ON COLUMN healthcare_expenditure.Year IS
'Year of the research by eurostat. Unique values are 2016, 2019, 2022';

COMMENT ON COLUMN healthcare_expenditure.SHA_11 IS
'Codes from System of Health Accounts - statistical reference manual giving a comprehensive description of the financial flows in health care.';

COMMENT ON COLUMN healthcare_expenditure.Financing_schema IS
'Full cliassification names taken from SHA11 system.';

COMMENT ON COLUMN healthcare_expenditure.Financing_type IS
'Shorten names of SHA11 categories. Column can be used for comparing public vs private spending, with "Government and compulsory schemes" indicating public and "Out-of-pocket payments" + "Voulntary schemes" private. Other forth option is called "Rest of world"';

COMMENT ON COLUMN healthcare_expenditure.Million_euro IS
'Expenditure in millions of euro. Sum of the column is the default total value of overall spending';

COMMENT ON COLUMN healthcare_expenditure.Euro_per_inhabitant IS
'Expenditure amount in euro calculated per single inidividual. Good for comparing spending between countries with different population';

COMMENT ON COLUMN healthcare_expenditure.Million_purchasing_power_standards__PPS_ IS
'Expenditure in millions of euro wieghted by Purchasing Power Standard for each country. Sum of the column is more balaced total value of overall spending adjusting for differences between local economies';

COMMENT ON COLUMN healthcare_expenditure.Purchasing_power_standard__PPS__per_inhabitant IS
'Expenditure per single inidividual in euro but wieghted by Purchasing Power Standard. Allow comparison between countries adjusted for differences in population size and local prices';

COMMENT ON COLUMN healthcare_expenditure.Million_units_of_national_currency IS
'Total expenditure in millions of national currency. To be avoided for comparisons between countries';

COMMENT ON COLUMN healthcare_expenditure.National_currency_per_inhabitant IS
'Expenditure per single inidividual in national currency. To be avoided for comparisons between countries';

COMMENT ON COLUMN healthcare_expenditure.Percentage_of_gross_domestic_product__GDP_ IS
'Percentage of healthcare expenditure in relation to Gross Domestic Product of the country. Sum of the column is best for comapring spending between countries';

-- dim_countries ---------------------------------------------------------------------------

-- -- table -------------------------------------------------------------------------------
COMMENT ON TABLE dim_countries IS
'Table contains additional information about all EU countries providing some additional context for data in other tables';

-- -- columns -----------------------------------------------------------------------------
COMMENT ON COLUMN dim_countries.Country_size IS
'Country classification by population size. Countires below 6 millions inhibitants are called small, between 6 and 20 millions medium, above 20 millions big';

COMMENT ON COLUMN dim_countries.Capital_City IS
'Column distinguishes between countries in "Euro Zone" and the ones with "National Currency';

COMMENT ON COLUMN dim_countries.EU_Member_Status IS
'Countries that joined EU before 2000 are called "Old Member" whereas the ones that joined after that are "New Member"'


Executing subquery: describe workspace.eurostat.healthcare_expenditure.
Executing subquery: DROP TABLE IF EXISTS workspace.eurostat.population_split.
Executing subquery: USE CATALOG workspace.
Executing subquery: USE SCHEMA eurostat.
Executing subquery: -- own_health_assesement ------------------------------------------------------------------

-- -- table ------------------------------------------------------------------------------
COMMENT ON TABLE own_health_assesement IS
'This table contains the self-reported subjective 5-level health assesment of the population with additional split by age group, sex and year. Main numeric column for calculations is Number_of_People'.
Executing subquery: -- -- columns ---------------------------------------------------------------------------
COMMENT ON COLUMN own_health_assesement.Age_Group IS
'Segements of the population by age group, defined within 10 year buckets, starting from 16 years old. Unique values are Y16-24, Y25-34, Y35-44, Y45-54, Y5