In [1]:
import sys
import os
import shutil
from pathlib import Path
import polars as pl

import random
import numpy as np

In [2]:
SEED = 291200

In [3]:
data_path = Path(".", "data", "modena")
original_tables_path = data_path.joinpath("tables")
data_lake_path = data_path.joinpath("data-lake")
queries_path = data_path.joinpath("queries")
milano_tables_path = data_path.joinpath("from_milan")

data_path.absolute(), data_path.exists()

(PosixPath('/home/nanni/projects/bdm/lab/blend-duckdb/data/modena'), True)

In [4]:
if data_lake_path.exists():
    shutil.rmtree(data_lake_path)

# (re-)create the data lake folder
data_lake_path.mkdir(parents=True, exist_ok=True)

In [5]:
# sys.path.remove('/home/nanni/projects/general-data-science/ULOD')

In [6]:
modules_path = Path("..", "modules", "ULOD")

sys.path.append(str(modules_path.resolve()))
sys.path

['/home/nanni/projects/general-data-science/correlation',
 '/home/nanni/projects/general-data-science/ULOD',
 '/home/nanni/projects/general-data-science/sloth',
 '/home/nanni/projects/general-data-science/JOSIE',
 '/home/nanni/projects/general-data-science/BLEND',
 '/home/nanni/mystuff/py_datafusion/src',
 '/home/nanni/projects/bdm/lab/blend-duckdb',
 '/home/nanni/miniconda3/lib/python313.zip',
 '/home/nanni/miniconda3/lib/python3.13',
 '/home/nanni/miniconda3/lib/python3.13/lib-dynload',
 '',
 '/home/nanni/projects/bdm/lab/blend-duckdb/.venv/lib/python3.13/site-packages',
 '/home/nanni/projects/bdm/lab/modules/ULOD']

In [7]:
from ulod.ckan import ModenaCKAN

# Data Preparation

For this lab we will use some datasets from Modena (and Milan) Open Data. Before using them for the lab, however, we need to preprocess them, to create
a "data-lake"-like context.

We want to normalize all the cell values before the ingestion, in order to make data discovery and post-processing easier

First, we observe that the district names are heterogeneous among datasets; we can normalize them with a simple function

### Normalize district names

In [8]:
import re
import difflib

# official districts
DISTRICTS = ["Crocetta", "San Faustino", "Centro", "Buon Pastore"]

def normalize_district(district: str) -> str:
    district_lw = district.lower()

    # split the lowercase district name in some parts, given possible separators
    splits = [p.strip() for p in re.split(r"[-/,]", district_lw) if p.strip()]

    migliori = []
    for split in splits:
        # identify the name that is closer to the target (if any)
        match = difflib.get_close_matches(split, [q.lower() for q in DISTRICTS], n=1, cutoff=0.5)
        if match:
            # map to the canonical form
            idx = [q.lower() for q in DISTRICTS].index(match[0])
            migliori.append(DISTRICTS[idx])

    # if many candidates are identified, returns only the first one
    if migliori:
        return list(dict.fromkeys(migliori))[0] 
    
    # otherwise, return the original one
    return district

In [9]:
print("S.Lazzaro -> ", normalize_district("S.Lazzaro"))
print("B.Pastore -> ", normalize_district("B.Pastore"))
print("BUON PASTORE-S.AGNESE-S.DAMASO -> ", normalize_district("BUON PASTORE-S.AGNESE-S.DAMASO"))
print("Centro storico -> ", normalize_district("Centro storico"))
print('S.FAUSTINO-MADONNINA -> ', normalize_district('S.FAUSTINO-MADONNINA'))

S.Lazzaro ->  S.Lazzaro
B.Pastore ->  Buon Pastore
BUON PASTORE-S.AGNESE-S.DAMASO ->  Buon Pastore
Centro storico ->  Centro
S.FAUSTINO-MADONNINA ->  San Faustino


## Clean and Merge Milan Open Data tables

We add some datasets from Milan Open Data. 
We need to:
- clean the float values;
- clean the column names;
- clean the dataset names;
- map Milan districts to Modena districts;

In [12]:
def clean_float(s: str):
    cnt = s.count(',')
    match cnt:
        case 0: # if no comma, return the string
            return s
        case 1: 
            # if one comma, replace it with a dot, if
            # not in the string yet
            if '.' not in s:
                return s.replace(',', '.')
            else:
                return s.replace(',', '')
        case 2:
            # with more than one comma, replace the first
            return s.replace(',', '', 1)

In [13]:
clean_float('1929'), clean_float('19,29'), clean_float('1,90.2'), clean_float("1,999,333.293")

('1929', '19.29', '190.2', '1999,333.293')

Here we cast string values to float, and next rename the datasets column names to english.

In [14]:
for table in os.listdir(milano_tables_path):
    df = pl.read_csv(milano_tables_path.joinpath(table), separator=';', encoding='utf8-lossy')
    
    df = df.with_columns(
        pl.col('Spesa media mensile familiare (in euro)')
        .map_elements(clean_float, pl.String)
        .cast(pl.Float32)
        .alias('Spesa media mensile familiare (in euro)')
    )

    df = df.rename(
        {
            'Anno': 'YEAR',
            'Anello territoriale': 'DISTRICT',
            'Tipologia di beni': 'TYPE',
            'Categoria': 'CATEGORY',
            'Spesa media mensile familiare (in euro)': 'AVG MONTH FAMILIAR EXPENSE (€)',
            'Reddito del nucleo familiare': 'HOUSEHOLD INCOME',
            'Numero di figli': 'NUMBER OF CHILDREN',
            'Numero componenti del nucleo familiare': 'NUMBER OF FAMILY MEMBERS'
        },
        strict=False
    )

    df.write_csv(original_tables_path.joinpath(table), separator=',')

Then, we map Milan to Modena district names, and add a slight random change to the avg month familiar expense column
(just for testing later)

In [15]:
def rename_district(s: str):
    match s:
        case 'Centro': return 'Centro'
        case 'Semicentro': return 'Crocetta'
        case 'Periferia': return 'San Faustino'
        case _: return s

In [16]:
random.seed(SEED)
np.random.seed(SEED)

for table in os.listdir(milano_tables_path):
    df = pl.read_csv(milano_tables_path.joinpath(table), separator=";", encoding="latin-1")

    df = df.rename(
        {
            "Anno": "YEAR",
            "Categoria": "CATEGORY",
            "Tipologia di beni": "TYPE",
            "Anello territoriale": "DISTRICT", 
            "Numero di figli": "NUMBER OF CHILDREN",
            "Numero componenti del nucleo familiare": "NUMBER OF FAMILY MEMBERS",
            "Reddito del nucleo familiare": "HOUSEHOLD INCOME",
            "Spesa media mensile familiare (in euro)": "AVG MONTH FAMILIAR EXPENSE (€)"
        }, strict=False
    ).with_columns(
        # cast the expense column to float
        pl.col("AVG MONTH FAMILIAR EXPENSE (€)").map_elements(clean_float, pl.String).cast(pl.Float64)
    )

    if "DISTRICT" in df.columns:
        # add a section to the dataset wit the same number of records of that 
        # from "Periferia", and change the expense
        sub = df.filter(pl.col('DISTRICT') == 'Periferia') \
            .with_columns(
                pl.lit('Buon Pastore').alias('DISTRICT'),
                pl.col('AVG MONTH FAMILIAR EXPENSE (€)') + pl.col('AVG MONTH FAMILIAR EXPENSE (€)') * random.random()
            )
        
        df = pl.concat([df, sub]).with_columns(pl.col('DISTRICT').map_elements(rename_district, pl.String))
        
    epsilon_per_year = {
        2014: 0.01,
        2015: 0.01,
        2016: 0.02,
        2017: 0.01,
        2018: 0.02,
        2019: 0.01,
        2020: 0.1,
        2021: 0.12,
        2022: 0.06,
        2023: 0.02
    }

    for year, epsilon in epsilon_per_year.items():
        last = df.filter(pl.col('YEAR') == pl.col('YEAR').max())
        e = pl.Series(values=[epsilon + np.random.uniform(-0.015, 0.03) for _ in range(last.shape[0])])

        new = last.with_columns(
            pl.lit(year).cast(pl.Int64).alias('YEAR'),
            (pl.col('AVG MONTH FAMILIAR EXPENSE (€)') + pl.col('AVG MONTH FAMILIAR EXPENSE (€)') * e).round(2).alias('AVG MONTH FAMILIAR EXPENSE (€)')
        )

        df = pl.concat([df, new])
        df.write_csv(original_tables_path.joinpath(table))

## Create the data-lake datasets

From previous step, we have partially prepared the datasets. 
To add some noise, like duplicate tables, with partial overlaps, potential joins, etc., we can create these situations.

Also, there are many datasets about local elections: we can add useful columns about them, like the election type and the year.

In [17]:
import fake_useragent

ua = fake_useragent.UserAgent()
client = ModenaCKAN(headers={"User-Agent": ua.firefox})

def refine_name(name: str):
    _, rsc_id = name.removesuffix('.csv').split('::')
    
    result = client.resource_show(id=rsc_id)

    pkg_id = result['result']['package_id']

    result = client.package_show(id=pkg_id)
    if len([r for r in result['result']['resources'] if r['format'].upper() == 'CSV']) != 1:
        print(f'Problem: {name}, {pkg_id}, {len(result['result']['resources'])}')
        return name
    
    title = result['result']['title']

    title = '-'.join(title.split())
    title = f'{title}::{rsc_id}.csv'
    return title

In [21]:
from collections import defaultdict
import shutil

# remove the data-lake folder and re-create it
shutil.rmtree(data_lake_path)
data_lake_path.mkdir(exist_ok=True, parents=True)

for table in os.listdir(original_tables_path): 
    original_table = table
    if not table.endswith('.csv'):
        continue

    if any(x in table.lower() for x in {'senato', 'camera', 'sindaco', 'dei-candidati'}):
        continue
    
    try:
        df = pl.read_csv(original_tables_path.joinpath(table), infer_schema_length=10_000, truncate_ragged_lines=True)
        # print('ELEZIONE' in df.columns, table)
        if 'ELEZIONE' in df.columns:            
            df = df.with_columns(
                ELEZIONE=pl.col('ELEZIONE').str.split(' ').list.filter(pl.element().str.starts_with('20').not_()).list.join(' ').str.strip_chars(), 
                ANNO=pl.col('ELEZIONE').str.split(' ').list.filter(pl.element().str.starts_with('20')).list.join(' ').cast(pl.Int64)
            )

            # add the circoscrizione, and relative code, columns
            df = df.with_columns(
                CIRCOSCRIZIONE_CD=pl.col('CIRCOSCRIZIONE').str.extract(r"(\d?)", 0).cast(pl.Int64),
                CIRCOSCRIZIONE=pl.col('CIRCOSCRIZIONE').str.extract(r"([A-Za-z\. ]+)", 1)
            )

            df = df.drop('CIRCOSCRIZIONE_CD').insert_column(df.get_column_index('CIRCOSCRIZIONE') + 1, df.get_column('CIRCOSCRIZIONE_CD'))
            
            # place the year column next to the election type column
            df = df.drop('ANNO').insert_column(df.get_column_index('ELEZIONE') + 1, df.get_column('ANNO'))

            # clean some column names about political parties
            for column in df.columns:
                try:
                    if 'LEGA_' in column:
                        df = df.rename({column: 'LEGA'})
                    elif 'FORZA_ITALIA' in column:
                        df = df.rename({column: 'FORZA_ITALIA'})
                    elif 'FDI' in column or 'FRATELLI_D' in column or 'GIORGIA_MELONI' in column:
                        df = df.rename({column: 'FRATELLI_D_ITALIA'})
                    elif 'BONACCINI' in column: 
                        df = df.rename({column: "STEFANO BONACCINI"})
                    elif 'MOVIMENTO' in column and ('BEPPE' in column or '5' in column or 'STELLE' in column):
                        df = df.rename({column: 'MOVIMENTO_5_STELLE'})
                except Exception as e:
                    print(df.columns)
                    raise e
                
        # normalize district/quarter names
        for c in ['CIRCOSCRIZIONE', 'QUARTIERE', 'QUA_DES_D', 'QUA_DES_P']:
            if c in df.columns and not df.get_column(c).dtype.is_numeric():
                df = df.with_columns(
                    pl.col(c).map_elements(normalize_district, pl.String).alias(c)
                )
        
        # select columns that to drop
        to_drop = ['FID', 'SHAPE', 'NOMEFILE', 'OBJECTID', 'ID_CIRCOSCRIZIONE', 'NOME_CIRCOSCRIZIONE', 'the_geom']
        if ('SEZIONE' in df.columns and 'SEZ_ELETT' in df.columns) or ('SEZ' in df.columns and 'SEZ_ELETT' in df.columns):
            to_drop.append('SEZ_ELETT')
        df = df.drop(to_drop, strict=False)

        # at this time, this part is not used
        if 'Risultati' in table:
            base_headers = [
                'SEZIONE', 'SEZ_ELETT', 'SEZ', 
                'CIRCOSCRIZIONE', 'CIRCOSCRIZIONE_CD', 
                'ID_CIRCOSCRIZIONE', 'NOME_CIRCOSCRIZIONE',
                'TIPODATI', 'TIPO_DATI', 
                'ELEZIONE', 'ANNO', 'SCHEDA', 'ISCRITTI']
            partiti_candidati = [x for x in df.columns if x not in base_headers and 'TOTALE' not in x]

        # remove this prefix to make the dataset name consistent with other cases
        table = table.removeprefix('Accesso-al-dataset-')

        # some datasets have unclear names: we can get from the Open Data portal
        # a better versior of it 
        if 'Risorsa-in-formato' in table:
            table = refine_name(table)

        table = table.split('::')[0].removesuffix(".csv") + ".csv"

        # finally, rename the column names to english
        df = df.rename(
            {
                'SEZIONE': 'SECTION',
                'SEZ_ELETT': 'SECTION', 
                'SEZ': 'SECTION', 
                'CIRCOSCRIZIONE': 'DISTRICT',
                'CIRCOSCRIZIONE_CD': 'DISTRICT_CD',
                'TIPODATI': 'DATA_TYPE',
                'TIPO_DATI': 'DATA_TYPE',
                'ANNO': 'YEAR',
                'ELEZIONE': 'ELECTION',
                'COD_ELEZIONE': 'ELECTION_CD',
                'SCHEDA': 'FORM',
                'TOTALE_VOTANTI': '#VOTERS',
                'TOTALE_SCHEDE_BIANCHE': '#BLANK',
                'TOTALE_VOTI_VALIDI': '#VALID',
                'ISCRITTI': '#REGISTERED',
                'ISCRITTI_MASCHI': '#MALE_REGISTERED',
                'ISCRITTI_FEMMINE': '#FEMALE_REGISTERED',
                'VOTANTI_MASCHI': '#MALE_VOTERS',
                'VOTANTI_FEMMINE': '#FEMALE_VOTERS',
                'TOTALE_SCHEDE_CONTESTATE': '#DISPUTED',
                'TOTALE_VOTI_DI_CUI': '#OF WHICH',
                'TOTALE_VOTI_NON_VALIDI': '#NOT VALID',
                'TOTALE_SCHEDE_NULLE': '#NULL'
                
            }, strict=False
        )

        df.write_csv(data_lake_path.joinpath(table))
    except Exception as e:
        print(f"\nerror: {e}, table: {original_table}\n")

In [22]:
multi_years = defaultdict(list)

# make aggregated table over multiple years
for table in os.listdir(data_lake_path):
    df = pl.read_csv(data_lake_path.joinpath(table), infer_schema_length=10_000, truncate_ragged_lines=True)
    try:
        if 'ELEZIONE' in df.columns and 'generale' not in table:            
            year = re.search(r"(\d+)", table).group(0)
            base_str = table.replace('.csv', '').replace(year, '')
            multi_years[base_str].append(year)

            for base_str, years in multi_years.items():
                if len(years) == 1:
                    continue
                formatted = re.sub(r"(del|delle)-$", "", base_str) + 'generale.csv'
                dfs = [pl.scan_csv(data_lake_path.joinpath(f"{base_str}{year}.csv")) for year in sorted(years)]
                general = pl.concat(dfs, how='diagonal_relaxed').collect()
                general.write_csv(data_lake_path.joinpath(formatted))
    except Exception as e:
        print(e, table)

In [23]:
# Create a fake single-column-unique-key
for table in os.listdir(data_lake_path.joinpath()):
    df = pl.read_csv(data_lake_path.joinpath(table), infer_schema_length=10_000, truncate_ragged_lines=True)
    
    if 'ELECTION' in df.columns:
        df = df.drop('THE_PK_KEY', strict=False).with_columns(
            THE_PK_KEY=pl.concat_str(['SECTION', 'DISTRICT', 'ELECTION'], separator='_').str.strip_chars()
        )

        assert df.get_column('THE_PK_KEY').is_unique().all()

        df = df.drop('THE_PK_KEY').insert_column(0, df.get_column('THE_PK_KEY'))
        
        df.write_csv(data_lake_path.joinpath(table))

In [24]:
# Make unpivoted tables based on party names
for table in os.listdir(data_lake_path):
    if 'unpivot' in table:
        os.remove(data_lake_path.joinpath(table))


for table in os.listdir(data_lake_path):
    df = pl.read_csv(data_lake_path.joinpath(table), infer_schema_length=10_000, truncate_ragged_lines=True).drop('THE_PK_KEY', strict=False)
    try:
        if 'ELECTION' in df.columns and 'Risultati' in table:
            base_headers = ['SECTION', 'DISTRICT', 'DATA_TYPE', 'ELECTION', 'YEAR', 'FORM']

            on = [x for x in df.columns if x not in base_headers and '#' not in x]

            u_df = df.unpivot(on, index=base_headers).rename({'variable': 'CANDIDATE/PARTY', 'value': 'VOTES'})

            u_df = u_df.with_columns(VOTES=pl.col('VOTES').cast(pl.Int64))
            unpivot_name = table.removesuffix('.csv') + '.unpivot.csv'

            u_df.write_csv(data_lake_path.joinpath(unpivot_name))
    except Exception as e:
        print(table, e)

### Create random copies with modified overlap

We then add to the datalake some random copies with missing values

In [None]:
random.seed(SEED)

for table in os.listdir(data_lake_path):
    if 'cpy' in table:
        os.remove(data_lake_path.joinpath(table))

distinct_names = set()

base_headers = ['SECTION', 'DISTRICT', 'DISTRICT', 'DATA_TYPE', 'ELECTION', 'YEAR', 'FORM']

number_of_copies = 2

for table in os.listdir(data_lake_path):
    df = pl.read_csv(data_lake_path.joinpath(table), infer_schema_length=10_000, truncate_ragged_lines=True)
    if 'ELECTION' in df.columns:
        for cpy_number in range(number_of_copies):
            # first random: the table will be duplicated?
            if (create_copy := random.random()) < 0.6:
                cpy_name = table.removesuffix('.csv') + f'.cpy-{cpy_number}.csv'

                # second random: sampling rows
                cpy = df.with_row_index().sample(fraction=max(0.8, random.random()), shuffle=False, seed=SEED).sort('index').drop('index')

                for column in df.columns:
                    if column not in base_headers and '#' not in column:
                        # for numerical columns only, put random Null values inside them
                        cpy = cpy.with_columns(
                            pl.col(column).map_elements(
                                # third random: place null values instead of original ones
                                lambda e: None if random.random() < 0.2 else e, 
                                df.get_column(column).dtype
                            ).alias(column)
                        )
                        
                cpy.write_csv(data_lake_path.joinpath(cpy_name))
                null_cnt = cpy.null_count().sum_horizontal()
                print(f"Created {cpy_number + 1} copy of {table} - {null_cnt[0]}")

        if 'Affluenze' not in table:
            distinct_names.update([x for x in df.columns if x not in base_headers and '#' not in x])

Created 1 copy of Risultati-di-lista-delle-elezioni-amministrative-del-2024.csv - 529
Created 1 copy of Risultati-delle-elezioni-europee-2019.unpivot.csv - 1223
Created 2 copy of Risultati-delle-elezioni-europee-2019.unpivot.csv - 1078
Created 1 copy of Risultati-di-lista-delle-elezioni-amministrative-del-2009.unpivot.csv - 1027
Created 2 copy of Affluenze-delle-elezioni-regionali-2014.csv - 98
Created 1 copy of Affluenze-e-risultati-elettorali-delle-elezioni-amministrative-2024.csv - 84
Created 2 copy of Affluenze-e-risultati-elettorali-delle-elezioni-amministrative-2024.csv - 90
Created 1 copy of Affluenzedelle-elezioni-regionali-2020.csv - 98
Created 1 copy of Risultati-di-lista-delle-elezioni-regionali-del-2014.csv - 349
Created 2 copy of Risultati-di-lista-delle-elezioni-regionali-del-2014.csv - 383
Created 1 copy of Risultati-delle-elezioni-europee-anno-2014.csv - 492
Created 2 copy of Risultati-delle-elezioni-europee-anno-2014.csv - 473
Created 1 copy of Affluenze-delle-elezioni

In [26]:
df = pl.read_csv(data_lake_path.joinpath('Risultati-delle-elezioni-europee-2019.csv'))

df.head()

THE_PK_KEY,SECTION,DISTRICT,DISTRICT_CD,DATA_TYPE,ELECTION,YEAR,EUROPA_VERDE,LEGA,FRATELLI_D_ITALIA,PARTITO_PIRATA,IL_POPOLO_DELLA_FAMIGLIA,PARTITO_ANIMALISTA,PARTITO_DEMOCRATICO,LA_SINISTRA,PPA_POPOLO_PARTITE_IVA,PI__EUROPA,MOVIMENTO_5_STELLE,POPOLARI_PER_L_ITALIA,FORZA_NUOVA,FORZA_ITALIA,SVP,PARTITO_COMUNISTA,CASAPOUND___DESTRE_UNITE,FORM,#VOTERS,#BLANK,#VALID,#REGISTERED,#DISPUTED,#OF WHICH,#NOT VALID,#NULL
str,i64,str,i64,str,str,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,str,i64,i64,i64,i64,i64,str,i64,i64
"""1_Centro_Europee""",1,"""Centro""",1,"""Liste""","""Europee""",2019,21,181,39,0,4,3,197,14,0,32,46,0,0,59,0,3,0,"""E19""",606,0,599,847,1,,7,6
"""2_Centro_Europee""",2,"""Centro""",1,"""Liste""","""Europee""",2019,19,123,22,1,3,0,155,10,0,29,34,0,0,46,0,5,2,"""E19""",461,3,449,704,0,,12,9
"""3_Centro_Europee""",3,"""Centro""",1,"""Liste""","""Europee""",2019,26,167,57,1,0,3,170,11,0,36,38,2,0,72,1,6,1,"""E19""",601,5,591,826,0,,10,5
"""4_Centro_Europee""",4,"""Centro""",1,"""Liste""","""Europee""",2019,27,144,57,1,3,0,184,17,1,35,46,0,0,56,0,4,0,"""E19""",587,6,575,841,0,,12,6
"""5_Centro_Europee""",5,"""Centro""",1,"""Liste""","""Europee""",2019,19,147,48,2,0,6,179,12,1,28,37,1,0,58,0,0,3,"""E19""",560,14,541,818,0,,19,5


In [31]:
# save a particulare dataset just for testing
codes_only = df.select("SECTION", "DISTRICT_CD")
codes_only

SECTION,DISTRICT_CD
i64,i64
1,1
2,1
3,1
4,1
5,1
…,…
186,4
187,3
188,3
189,3


In [32]:
codes_only.write_csv(queries_path.joinpath("section_district_codes.csv"))