---

##  Pipeline Executive Summary

### Complete Transformation

| Metric | Before | After | Change |
|---------|-------|---------|--------|
| **Total records** | 5,955,075 | 2,082,071 | HE 2017-2024 filtered |
| **Duplicates** | 9,165 (0.15%) | 0 | Removed |
| **Source files** | 11 CSVs (2014-2024) | 1 unified dataset | Consolidated |
| **Columns** | 19-21 (inconsistent) | 26 (standardized) | Normalized |
| **Date format** | Mixed | ISO 8601 | Standardized |
| **Country names** | Multiple variants | ISO2 + canonical name | Unified |

### Data Quality

- **100% of countries normalized** by ISO2 code
- **0 exact duplicates**
- **621,084 age outliers** converted to NA (impossible outliers <10 or >65)
- **16,069 durations = 0** converted to NA
- **Null tokens** unified ("unknown", "n/a", "???", etc. -> NA)

### Final Dataset for Power BI

- **Focus**: Higher Education (ISCED 6-8)
- **Period**: 2017-2024 (mobility start years)
- **Variables**: 26 analytical columns
- **Formats**: Parquet (compressed) + CSV (universal)

---

For technical details of the cleaning process, see [`PIPELINE.md`](docs/en/PIPELINE.md)

In [1]:
#Import necessary libraries for the process
import numpy as np
import pandas as pd
import os
import re
import unicodedata

In [None]:
# - Check if the annual CSVs (2014–2024) have the same column structure.
# - Detect differences between years before performing the full load.

# - Normalize column names (lowercase + underscores) for consistent comparison.
folder_path = "bases_datos_erasmus"
years = range(2014, 2025)  # includes 2024


def normalize_columns(columns):
    """
    Normalize column names to compare between years.
    In this block I do a basic normalization (lowercase and underscores).
    """
    normalized = []
    for col in columns:
        col = col.strip().lower()
        col = re.sub(r"\s+", " ", col)      
        col = col.replace(" ", "_")         
        normalized.append(col)
    return normalized


def read_header_with_fallback(file_path, sep=";"):
    """
    Read ONLY the CSV header (nrows=0) trying various encodings.
    Returns (df_header, encoding_used).
    """
    encodings_to_try = ["utf-8", "utf-8-sig", "latin1"]
    last_error = None

    for enc in encodings_to_try:
        try:
            df = pd.read_csv(file_path, sep=sep, nrows=0, encoding=enc)
            return df, enc
        except Exception as e:
            last_error = e

    raise last_error


column_sets = {}   # columns (normalized) by year
errors = {}        # read errors by year

print("Verifying file structure...")

for year in years:
    file_name = f"Erasmus-KA1-Mobility-Data-{year}.csv"
    file_path = os.path.join(folder_path, file_name)

    try:
        df_header, used_enc = read_header_with_fallback(file_path, sep=";")
        column_sets[year] = normalize_columns(df_header.columns)
        print(f"  {year}: {len(df_header.columns)} columns (encoding={used_enc})")

    except Exception as e:
        errors[year] = str(e)
        print(f"  {year}: Error reading {file_name}: {e}")


print("\nComparing columns across years...")

if not column_sets:
    print("Could not read any file. Check paths, names and separator.")
else:
    # Use the first year that was read correctly as reference
    base_year = min(column_sets.keys())
    base_cols = set(column_sets[base_year])

    for year in sorted(column_sets.keys()):
        if year == base_year:
            continue

        current_cols = set(column_sets[year])

        if current_cols != base_cols:
            print(f"\n  Differences detected in {year} (base={base_year}):")
            print("    - Missing:", base_cols - current_cols)
            print("    - New:", current_cols - base_cols)
        else:
            print(f"  {year} has the same columns as {base_year}.")


print("\nSummary:")
print("  Years OK:", sorted(column_sets.keys()))
if errors:
    print("  Years with error:", sorted(errors.keys()))


In [None]:
# In the previous block I verified that small differences exist between years in column names.
# In this block I now load the complete CSVs and apply a unification strategy to be able to concatenate them:
# - Normalize column names (lowercase, underscores, etc.)
# - Rename equivalent columns to canonical names (RENAME_MAP)
# - Remove residual "Unnamed" type columns
# - Add source_file_year for traceability
# - Align all columns between years and concatenate into df_total


folder_path = "bases_datos_erasmus"
years = range(2014, 2025)

def normalize_columns(cols):
    """
    Normalize column names so all years follow the same format.
    """
    out = []
    for c in cols:
        c = str(c).strip().lower()
        c = re.sub(r"\s+", " ", c)
        c = c.replace(" ", "_")
        c = c.replace("/", "_")
        c = re.sub(r"[()]", "", c)
        c = re.sub(r"__+", "_", c)
        out.append(c)
    return out

# Mapping to canonical names (equivalent columns between years)
RENAME_MAP = {
    "sending_organisation": "sending_organization",
    "receiving_organisation": "receiving_organization",
    "mobility_duration_-_calendar_days": "mobility_duration",
    "mobility_duration_in_days": "mobility_duration",
    "mobility_start_year_month": "mobility_start_month",
    "actual_participants_contracted_projects": "actual_participants",
}

def read_csv_with_fallback(path, sep=";"):
    """
    Read the CSV trying different encodings to avoid read errors.
    Returns (df, encoding_used).
    """
    encodings = ["utf-8", "utf-8-sig", "latin1"]
    last_error = None
    for enc in encodings:
        try:
            df = pd.read_csv(path, sep=sep, low_memory=False, encoding=enc)
            return df, enc
        except Exception as e:
            last_error = e
    raise last_error

df_list = []
all_cols = set()

print("Loading and unifying files by year...")

for year in years:
    file_name = f"Erasmus-KA1-Mobility-Data-{year}.csv"
    file_path = os.path.join(folder_path, file_name)

    try:
        # 1) Load the CSV for the year
        df, used_enc = read_csv_with_fallback(file_path, sep=";")

        # 2) Normalize column names and apply canonical renaming
        df.columns = normalize_columns(df.columns)
        df = df.rename(columns=RENAME_MAP)

        # 3) Remove residual "Unnamed" type columns
        unnamed_cols = [c for c in df.columns if c.startswith("unnamed")]
        if unnamed_cols:
            df = df.drop(columns=unnamed_cols)

        # 4) Add traceability of source year
        df["source_file_year"] = year

        # 5) Accumulate DF and columns to align at the end
        df_list.append(df)
        all_cols |= set(df.columns)

        print(f" {year} loaded: {df.shape[0]:,} rows, {df.shape[1]} columns (encoding={used_enc})")

    except Exception as e:
        print(f" Error in {year}: {e}")

if not df_list:
    raise ValueError("Could not load any file. Check path, names and separator.")

# Fixed column order so the final dataset is stable
all_cols = sorted(all_cols)

# Align all years to the same set of columns
df_list_aligned = [d.reindex(columns=all_cols) for d in df_list]

# Concatenate everything into df_total
df_total = pd.concat(df_list_aligned, ignore_index=True)

print(f"\n{'='*60}\nFILES MERGED\n{'='*60}")
print(f"Total records: {df_total.shape[0]:,}")
print(f"Total columns:  {df_total.shape[1]}")

In [None]:
df_total.info()

In [None]:
# Before normalizing, I review what distinct values appear in the "academic_year" column.
# This helps me detect different formats (for example: "2020-2021" vs "2020-21").
print(df_total["academic_year"].unique())


In [None]:
# Define a function to leave all values of "academic_year" with the same format.
# Goal: for it to always remain as "YYYY-YY" (for example, "2020-21").
def norm_academic_year(x):
    # If the value is null (NaN), I keep it as null
    if pd.isna(x):
        return pd.NA

    # Convert to string and remove spaces in case values come like " 2020-2021 "
    s = str(x).strip()

    # Case 1: full format "YYYY-YYYY" (e.g. "2020-2021")
    # In this case, I convert it to "YYYY-YY" (e.g. "2020-21")
    m = re.fullmatch(r"(\d{4})-(\d{4})", s)
    if m:
        return f"{m.group(1)}-{m.group(2)[-2:]}"

    # Case 2: already reduced format "YYYY-YY" (e.g. "2020-21")
    # If it's already correct, I leave it as is
    m = re.fullmatch(r"(\d{4})-(\d{2})", s)
    if m:
        return s

    # If a strange format appears that doesn't match the previous ones,
    # I return it the same to review it later.
    return s


# Apply normalization and overwrite the column with unified values
df_total["academic_year"] = df_total["academic_year"].apply(norm_academic_year)

# Quick check: review unique values again to make sure it remained homogeneous
print(df_total["academic_year"].unique())


In [None]:
# I use this block to review duplicates in the final dataset.
# I distinguish two cases:
# 1) Exact duplicates: identical rows in all columns (including academic_year).
# 2) Repeated rows ignoring academic_year: same characteristics but in different academic years,
#    which could happen if a record repeats between years or if there is some form of "overlap".
col_year = "academic_year"

# Columns I show as example when I find duplicates
# (just for quick inspection and so the output is not huge)
cols_show = ["academic_year", "sending_country", "receiving_city", "participant_age", "mobility_duration"]


# Assign df_total to a variable for convenience.
# I don't use copy() to avoid wasting extra memory, since I don't modify the DataFrame here.
df = df_total


# 1) Exact duplicates (all columns equal)
n = len(df)
n_dup = df.duplicated().sum()
print(f"Exact duplicates: {n_dup:,} ({n_dup / n * 100:.4f}%)")

# If they exist, I show an example of some rows to see the pattern
if n_dup:
    print("\nExample exact duplicates (5 rows):")
    print(df.loc[df.duplicated(keep=False), cols_show].head(5))


# 2) Repeated rows ignoring academic_year
# First I convert academic_year to a numeric year (I keep the first year: "2015-16" -> 2015)
# This is just to analyze if the same "group" appears in more than one year.
year_temp = pd.to_numeric(df[col_year].astype(str).str[:4], errors="coerce").astype("Int64")

# Create a list with all columns except academic_year to compare "everything except the year"
cols_compare = [c for c in df.columns if c != col_year]

# To efficiently detect repeats, I create a hash per row using those columns
h = pd.util.hash_pandas_object(df[cols_compare], index=False)

# Mark as repeated the rows whose hash appears more than once
mask_rep = h.duplicated(keep=False)
print(f"\nRepeated rows ignoring '{col_year}': {mask_rep.sum():,}")

# Now I look at how many of those groups appear in more than one year
tmp = pd.DataFrame({"h": h[mask_rep], "y": year_temp[mask_rep]})
multi = tmp.groupby("h")["y"].nunique()

# Count how many hashes have more than one associated year (multi-year)
n_multi_groups = (multi > 1).sum()
print(f"Groups repeated in multiple years: {n_multi_groups:,}")

# If there is any multi-year case, I show an example to better understand it
if n_multi_groups:
    ex_hash = multi[multi > 1].index[0]
    print("\nExample multi-year (5 rows):")
    print(df.loc[h == ex_hash, cols_show].head(5))

In [None]:
# In the previous analysis I saw that there were exact duplicates (identical rows in all columns).
# Since they are 100% repeated records, I decide to remove them to avoid counting the same observation twice.
# I keep the first occurrence of each row and remove the rest.

n_before = len(df_total)
n_dup = df_total.duplicated().sum()

print(f"Records before: {n_before:,}")
print(f"Exact duplicates detected: {n_dup:,} ({n_dup / n_before * 100:.4f}%)")

# Remove exact duplicates and reset the index to leave the DataFrame clean
df_total = df_total.drop_duplicates(keep="first").reset_index(drop=True)

n_after = len(df_total)
print(f"Records after: {n_after:,}")
print(f"Removed: {n_before - n_after:,}")

# Final check: confirm that there are no exact duplicates left
print("Remaining duplicates:", df_total.duplicated().sum())


In [None]:
# I review the most frequent values of "mobility_start_month" to understand
# why the column is of type object (usually happens due to mixed formats,
# strange values, or because it comes as text "YYYY-MM").
df_total["mobility_start_month"].value_counts(dropna=False).head(20)


In [None]:
# I don't modify the original column to be able to go back if needed.
# Create an auxiliary column in date format ("mobility_start_ym") from the text.
# Use errors="coerce" so values that don't match the format become NaT (date null).
df_total["mobility_start_ym"] = pd.to_datetime(
    df_total["mobility_start_month"].astype(str),
    format="%Y-%m",
    errors="coerce"
)

# From the date, I extract the year and month as separate columns.
# Use Int64 (nullable) to allow nulls without converting to float.
df_total["mobility_start_year"] = df_total["mobility_start_ym"].dt.year.astype("Int64")
df_total["mobility_start_month_num"] = df_total["mobility_start_ym"].dt.month.astype("Int64")

# Quick checks:
# 1) How many values could not be converted (remained as NaT)
print("Nulls mobility_start_ym:", df_total["mobility_start_ym"].isna().sum())

# 2) Distribution of months (including nulls) to see if it makes sense
print(df_total["mobility_start_month_num"].value_counts(dropna=False).sort_index())


In [None]:
# 1) Quick check: how many nulls there are and if there are values with decimals
# In theory "actual_participants" should be an integer number and > 0.
print("NaN actual_participants:", df_total["actual_participants"].isna().sum())
print("Strange decimals:", (df_total["actual_participants"].dropna() % 1 != 0).sum())


In [None]:
# 2) Inspection: if there are decimals, I locate the rows to see where they come from
# (I'm especially interested in the source file year and the academic_year)
mask_dec = (df_total["actual_participants"] % 1 != 0)
df_total.loc[mask_dec, ["actual_participants", "source_file_year", "academic_year"]].head(10)


In [13]:
# 3) Cleaning: convert impossible values to null and leave the column as nullable integer
# - If it has decimals, I consider it invalid (you can't have 4.7 participants)
# - If it's 0, I also consider it invalid (in this dataset 0 participants doesn't make sense)
col = "actual_participants"

mask_nonint = df_total[col].notna() & ((df_total[col] % 1) != 0)
mask_zero = (df_total[col] == 0)

df_total.loc[mask_nonint | mask_zero, col] = pd.NA
df_total[col] = df_total[col].astype("Int64")


In [None]:
# Before cleaning age, I do a quick review of extreme values.
# The idea is to convert "participant_age" to numeric in an auxiliary variable (age_num)
# to be able to detect impossible ages (<= 0) or too high (>= 65).

# 1) Convert to numeric
age_num = pd.to_numeric(
    df_total["participant_age"].replace({"-": pd.NA, "": pd.NA, "0": 0}),
    errors="coerce"
)

# 2) Review values <= 0 (negative ages or zero don't make sense)
print("<= 0 (count):", (age_num <= 0).sum())
print(df_total.loc[age_num <= 0, ["participant_age", "academic_year", "source_file_year"]].head(20))

# 3) Review values >= 65 (in this analysis I consider them extreme / unrealistic values)
print("\n>= 65 (count):", (age_num >= 65).sum())
print(df_total.loc[age_num >= 65, ["participant_age", "academic_year", "source_file_year"]].head(20))

# 4) Quick distribution of those extremes to see what values appear most frequently
print("\nValue counts (<=0):")
print(df_total.loc[age_num <= 0, "participant_age"].value_counts(dropna=False).head(20))

print("\nValue counts (>=65):")
print(df_total.loc[age_num >= 65, "participant_age"].value_counts(dropna=False).head(20))


In [None]:
# Now I do the final cleaning of "participant_age".
# 1) Convert to numeric so that what is not a number becomes NaN.
# 2) Mark as null values outside the range I consider reasonable for the analysis.
# 3) Save the result as nullable integer (Int64), to allow nulls without converting to float.

age = pd.to_numeric(
    df_total["participant_age"].replace({"-": pd.NA, "": pd.NA}),
    errors="coerce"
)

# Range I consider valid: 10 to 65 years
# What's outside is left as NA to not invent data
age = age.mask((age < 10) | (age > 65))

df_total["participant_age"] = age.astype("Int64")

# Quick check: number of nulls and most frequent ages
print("NaN in participant_age:", df_total["participant_age"].isna().sum())
print(df_total["participant_age"].value_counts(dropna=False).head(15))


In [None]:
# Basic cleaning of "mobility_duration" (mobility duration in days).
# Here I focus on a clear error: durations = 0, which don't make sense for a real mobility.
# First I quantify how many zeros there are, then I convert them to null (NA) and generate a quick summary.

col = "mobility_duration"

# 1) Count of zeros (impossible values or registration errors)
n0 = (df_total[col] == 0).sum()
print(f"Duration = 0 days: {n0:,} ({(n0 / len(df_total)) * 100:.4f}%) -> converted to NA")

# 2) Replace 0 with NA
df_total.loc[df_total[col] == 0, col] = pd.NA

# 3) Quick summary after cleaning
print("Nulls after cleaning:", df_total[col].isna().sum())
print("Min/Median/Max:", df_total[col].min(), df_total[col].median(), df_total[col].max())


In [None]:
# Additional review: very long durations.
# I don't remove them automatically, but I quantify them to know if there are extreme values
# (for example, mobilities of more than one year or more than two years).

col = "mobility_duration"

print(">365 days:", (df_total[col] > 365).sum())
print(">730 days:", (df_total[col] > 730).sum())


In [None]:
# In several text columns there are values that actually mean "unknown" or "not applicable",
# but come written as strings (for example "unknown", "n/a", "???", "-").
# To be able to work better with the data, I unify all those cases and convert them to NA.
# This way, null analyses and subsequent cleanups are more consistent.

# Typical "hidden null" tokens (I don't include "0" by default because in some columns it can be a valid value)
missing_tokens = {
    "unknown", "not specified", "none", "n/a", "na",
    "?", "??", "???", "????",
    "-", "--", "---",
    "_", "__",
    "? unknown ?", "??? - ? unknown ?"
}

# Select text type columns (object or category)
text_cols = df_total.select_dtypes(include=["object", "category"]).columns

# General replacement in text columns:
# - remove spaces
# - convert to lowercase to compare with tokens
# - convert to NA when it's empty string or matches a missing token
for c in text_cols:
    s = df_total[c].astype("string").str.strip()
    s_low = s.str.lower()
    df_total[c] = s.mask((s == "") | (s_low.isin(missing_tokens)), pd.NA)

# Specific case: in "receiving_city" the string "0" appears as invalid value in some records.
# In this dataset I treat it as null to not confuse it with a real city.
if "receiving_city" in df_total.columns:
    df_total.loc[
        df_total["receiving_city"].astype("string").str.strip() == "0",
        "receiving_city"
    ] = pd.NA

# Quick summary: I show the columns with most null values to have an overview
na_counts = df_total.isna().sum().sort_values(ascending=False)
print("Top 10 columns with most NA:")
print(na_counts.head(10))

In [None]:
# In "participant_profile" I find values in different formats (for example in uppercase).
# To avoid "LEARNERS" and "Learner" being treated as different categories, I unify labels.
# Then I convert the column to category type so it's more organized and uses less memory.

df_total["participant_profile"] = df_total["participant_profile"].replace({
    "LEARNERS": "Learner",
    "STAFF": "Staff"
})

df_total["participant_profile"] = df_total["participant_profile"].astype("category")

# Quick check: I see how many records there are of each profile (including nulls)
print(df_total["participant_profile"].value_counts(dropna=False))


In [None]:
# The column "fewer_opportunities" comes encoded as 0/1 (sometimes as text and sometimes as number).
# To make it easier to interpret, I convert it to labels "No" and "Yes".
# Then I convert it to category type, since it only has a few possible categories.

df_total["fewer_opportunities"] = df_total["fewer_opportunities"].replace({
    "0": "No", 0: "No",
    "1": "Yes", 1: "Yes"
}).astype("category")

# Quick check: distribution of values (including nulls)
print(df_total["fewer_opportunities"].value_counts(dropna=False))


In [None]:
# From "education_level" I want to extract the ISCED level (if it appears in the text).
# In the dataset it usually comes in formats like:
# "ISCED-6 - First cycle / Bachelor's ..." or similar.
# The idea is to keep only the number (1..9) to be able to group and filter better.

# 1) Extract the ISCED number (if it exists)
df_total["isced_level"] = (
    df_total["education_level"]
      .astype("string")                          # ensure string format to work with .str
      .str.extract(r"ISCED-(\d)", expand=False)  # capture the digit after "ISCED-"
      .astype("Int64")                           # leave it as nullable integer (allows NA)
)

# 2) Create a simplified group (more useful for project analysis)
# - HE (6-8): higher education (Bachelor/Master/Doctorate)
# - Pre-tertiary (1-5): previous levels
# - ISCED-9 / Other: other cases
# If ISCED cannot be extracted, it remains as NA.
df_total["isced_group"] = pd.Series(pd.NA, index=df_total.index, dtype="string")

df_total.loc[df_total["isced_level"].between(6, 8), "isced_group"] = "HE (6-8)"
df_total.loc[df_total["isced_level"].between(1, 5), "isced_group"] = "Pre-tertiary (1-5)"
df_total.loc[df_total["isced_level"] == 9, "isced_group"] = "ISCED-9 / Other"

df_total["isced_group"] = df_total["isced_group"].astype("category")

# Quick check: distribution of ISCED levels and created groups
print(df_total["isced_level"].value_counts(dropna=False).sort_index())
print(df_total["isced_group"].value_counts(dropna=False))

In [None]:
# The column "activity_mob" mixes long text with a code at the beginning (in many cases).
# To analyze it better, I separate:
# 1) An "activity_code" (if it exists) extracted from the beginning of the text.
# 2) An "activity_group" with more general categories (HE, VET, Youth, etc.).
# This allows me to filter and summarize without depending on hundreds of different labels.

# Clean spaces to avoid problems when extracting the code
s = df_total["activity_mob"].astype("string").str.strip()

# 1) Extract the code from the format "CODE - description"
# Typical examples: "HE-SMS - ...", "LM-VET - ...", etc.
code = s.str.extract(r"^([A-Z]{1,4}(?:-[A-Z0-9]{1,10})+)\s*-\s*", expand=False)
df_total["activity_code"] = code.astype("category")


def group_from_activity(text, code):
    """
    Assign a general category to each activity_mob record.
    First try to classify by code (if available).
    If there is no code or format is old, try to classify by text keywords.
    """
    if pd.isna(text):
        return pd.NA

    t = str(text).lower()
    c = str(code) if pd.notna(code) else ""

    # 1) Classification by code (when it exists)
    if c:
        if c.startswith("HE-"):
            return "HE"
        if c.startswith("LM-") and "VET" in c:
            return "VET"
        if c.startswith("LM-") and ("EXCH" in c or "YOU" in c):
            return "Youth/Volunteering"
        if c.startswith("LM-") and "PUPIL" in c:
            return "School"
        if c.startswith("LM-") and "ADULT" in c:
            return "Adult"
        if c.startswith("SM-") or c.startswith("OA-"):
            return "Staff/Training"

    # 2) Classification by text (for old cases or without code)
    if (
        "student mobility for studies" in t
        or "student mobility for traineeships" in t
        or "higher education" in t
    ):
        return "HE"

    if (
        "youth exchanges" in t
        or "mobility of youth workers" in t
        or "european voluntary service" in t
        or "volunteering" in t
    ):
        return "Youth/Volunteering"

    if (
        "vet learners" in t
        or "mobility of vet learners" in t
        or "erasmuspro" in t
        or "vet" in t
    ):
        return "VET"

    if (
        "staff mobility" in t
        or "staff training" in t
        or "structured courses/training events" in t
        or "training/teaching assignments abroad" in t
        or "teaching/training assignments abroad" in t
        or "job shadowing" in t
    ):
        return "Staff/Training"

    if "school pupils" in t or "pupil" in t:
        return "School"

    if "adult learners" in t:
        return "Adult"

    # Visits/preparation/hosting (includes Advance Planning Visit)
    if (
        "advance planning visit" in t
        or "preparatory visit" in t
        or "invited experts" in t
        or "hosting teachers" in t
    ):
        return "Staff/Training"

    # If it doesn't match anything, I leave it as "Other"
    return "Other"


# Apply the function to create the final group.
# Use zip to pass both the text and the extracted code at once.
df_total["activity_group"] = [
    group_from_activity(txt, c)
    for txt, c in zip(df_total["activity_mob"], df_total["activity_code"])
]

df_total["activity_group"] = df_total["activity_group"].astype("category")

# Quick check: distribution of groups
print(df_total["activity_group"].value_counts(dropna=False))


In [None]:
# This block serves to group "field_of_education" into broad categories (ISCED-F broad fields).
# In the dataset, this column can come with:
# - a code at the beginning (for example "0410 - Business and administration")
# - or directly text (sometimes without code)
# What I do is:
# 1) Try to extract a code if it exists at the beginning (4 digits / 2 digits / 1 digit)
# 2) If there's no code, apply rules by keywords to assign a macro-field
# 3) If it doesn't match anything, I mark it as "99 - Not classified"
#
# Note: many records may come with NA in field_of_education, that's why a high number of NaN appears.

COL = "field_of_education"  # adjust if your column has spaces

# Broad labels (2 digits) for ISCED broad fields
broad_labels = {
    "00": "00 - Generic programmes and qualifications",
    "01": "01 - Education",
    "02": "02 - Arts and humanities",
    "03": "03 - Social sciences, journalism and information",
    "04": "04 - Business, administration and law",
    "05": "05 - Natural sciences, mathematics and statistics",
    "06": "06 - Information and Communication Technologies",
    "07": "07 - Engineering, manufacturing and construction",
    "08": "08 - Agriculture, forestry, fisheries and veterinary",
    "09": "09 - Health and welfare",
    "10": "10 - Services",
    "99": "99 - Not classified",
}

# Rules by text: if the description contains certain keywords, I assign a macro-code
rules = [
    (r"\beducation\b|teacher training|pre-?school", "01"),
    (r"arts?|fine arts|handicrafts|music|performing|audio-visual|fashion|design|humanities|"
     r"history|archaeology|philosophy|ethics|religion|theology|languages?|linguistics|literature", "02"),
    (r"social and behavioural|economics|political|psychology|sociology|cultural studies|"
     r"journalism|reporting|library|archival", "03"),
    (r"accounting|taxation|finance|banking|insurance|management|administration|marketing|advertising|"
     r"secretarial|office work|wholesale|retail|business\b|law\b|legal|juris", "04"),
    (r"biolog|biochem|chemistry|earth sciences|physics|physical sciences|mathematics|statistics|"
     r"natural sciences|environment", "05"),
    (r"\bict\b|information and communication technologies|computer use|database|network|software|applications", "06"),
    (r"engineering|electricity|energy\b|electronics|automation|mechanics|metal trades|motor vehicles|ships|aircraft|"
     r"manufacturing|processing|materials|textiles|mining|extraction|architecture|building|civil engineering|construction", "07"),
    (r"agricult|crop|livestock|horticult|forestry|fisheries|veterinar", "08"),
    (r"\bhealth\b|medicine|medical|nursing|midwifery|therapy|rehabilitation|pharmacy|dental|"
     r"welfare|care of the elderly|disabled adults|child care|youth services|social work|counselling", "09"),
    (r"personal services|domestic services|hair|beauty|hotel|restaurants|catering|"
     r"sports|travel|tourism|leisure|hygiene|occupational health and safety|community sanitation|"
     r"security services|military|defence|transport|\bservices\b", "10"),
    (r"generic programmes|eqf-\d", "00"),
    (r"^not-?classified\b", "99"),
]

def to_macro_code(val):
    # If the value is null, I keep it as NA
    if pd.isna(val):
        return pd.NA

    s = str(val).strip().lower()

    # 1) If it starts with code, I try to convert it to a 2-digit macro-code
    # I accept "#### - ...", "## - ..." or "# - ..."
    m = re.match(r"^(\d{4}|\d{2}|\d)\s*-\s*", s)
    if m:
        raw = m.group(1)
        if len(raw) == 4:
            code2 = raw[:2]
        elif len(raw) == 2:
            code2 = raw
        else:
            code2 = f"0{raw}"

        if code2 in broad_labels:
            return code2

    # 2) If there's no code, I apply keyword rules
    for pat, code in rules:
        if re.search(pat, s):
            return code

    # 3) If it doesn't match, I mark it as not classified
    return "99"


# Apply the function to generate the macro-code and its label
df_total["isced_macro_code"] = df_total[COL].apply(to_macro_code)
df_total["isced_macro"] = df_total["isced_macro_code"].map(broad_labels).astype("category")

# Quick summary to see if the classification makes sense
print(df_total["isced_macro"].value_counts(dropna=False))
print("Not classified (99):", (df_total["isced_macro_code"] == "99").sum())


In [None]:
# In the dataset there are three columns related to countries:
# - participant_country: usually comes only as name (without code in front).
# - sending_country / receiving_country: usually come as "XX - Country name" (XX = ISO2).
#
# Problem:
# - The same country can appear with different names (for example "Turkiye" vs "Turkey"),
#   or with long variants ("Iran (Islamic Republic of)").
# - In sending/receiving, for the same ISO2 code there may be small differences in the name
#   (spaces, capitalization, variants), which breaks counts if not unified.
#
# Strategy:
# 1) participant_country: clean spaces and apply an equivalence dictionary (name_map).
# 2) sending_country and receiving_country: normalize by ISO2 code:
#    - extract the code and name,
#    - choose the most frequent name as "canonical" per code,
#    - create auxiliary columns <col>_code and <col>_name,
#    - rebuild the original column in format "XX - CanonicalName".
# 3) Global standard: force sending and receiving to use the same canonical name by code,
#    so that a code doesn't have a different name depending on the column.
# 4) Final check: verify that no codes remain with more than one name.



# 1) Normalization of participant_country (name only, without ISO2)
name_map = {
    "Turkiye": "Turkey",
    "Czechia": "Czech Republic",
    "Russian Federation": "Russia",
    "Moldova (Republic of)": "Moldova",
    "Republic of Moldova": "Moldova",
    "China (People's Republic of)": "China",
    "Iran (Islamic Republic of)": "Iran",
    "Syrian Arab Republic": "Syria",
    "Viet Nam": "Vietnam",
    "Tanzania (United Republic of)": "Tanzania",
    "Korea (Republic of)": "South Korea",
    "Korea (Democratic People's Republic of)": "North Korea",
    "Cabo Verde": "Cape Verde",
    "Aland islands": "Aland Islands",
    "United States of America": "United States",
    "United States Minor outlying islands": "United States Minor Outlying Islands",
    "St Lucia": "Saint Lucia",
    "St Kitts and Nevis": "Saint Kitts and Nevis",
    "St Vincent and the Grenadines": "Saint Vincent and the Grenadines",
    "Isle Of Man": "Isle of Man",
    "The Republic of North Macedonia": "North Macedonia",
    "Kosovo * UN resolution": "Kosovo",
    "Congo (Democratic Republic of)": "Democratic Republic of the Congo",
    "Lao (People's Democratic Republic)": "Laos",
    "Lao People's Democratic Republic": "Laos",
    "Brunei Darussalam": "Brunei",
    "French Southern and Antarctic Territories": "French Southern Territories",
}

df_total["participant_country"] = (
    df_total["participant_country"]
      .astype("string")
      .str.strip()
      .str.replace(r"\s+", " ", regex=True)
      .replace(name_map)
      .astype("category")
)

print("participant_country unique:", df_total["participant_country"].nunique(dropna=True))


# 2) Normalization by ISO2 for sending_country and receiving_country
def normalize_country_by_code(df, col):
    """
    Normalize a column that comes as 'XX - Name' (XX = ISO2).
    For each code, use the most frequent name as canonical name.
    """
    s = df[col].astype("string").str.strip()

    # Quick validation: check that the pattern "XX - ..." exists
    has_code = s.str.match(r"^[A-Z]{2}\s*[-\u2013]\s*", na=False).any()
    if not has_code:
        raise ValueError(f"{col} does not seem to come as 'XX - Name'. Cannot extract ISO2.")

    # Extract code and name
    code = s.str.extract(r"^([A-Z]{2})\s*[-\u2013]\s*", expand=False)
    name = s.str.replace(r"^[A-Z]{2}\s*[-\u2013]\s*", "", regex=True).str.strip()
    name = name.str.replace(r"\s+", " ", regex=True)

    # Canonical name by code: the most frequent
    canon_name = (
        pd.DataFrame({"code": code, "name": name})
          .dropna()
          .groupby("code")["name"]
          .agg(lambda x: x.value_counts().idxmax())
    )

    # Save auxiliary columns and rebuild the normalized original column
    df[col + "_code"] = code.astype("category")
    df[col + "_name"] = code.map(canon_name).astype("category")
    df[col] = (
        df[col + "_code"].astype("string") + " - " + df[col + "_name"].astype("string")
    ).astype("category")

    return df


df_total = normalize_country_by_code(df_total, "sending_country")
df_total = normalize_country_by_code(df_total, "receiving_country")


# 3) Global standard: same name by code in sending and receiving
both = pd.concat([
    df_total[["sending_country_code", "sending_country_name"]]
      .rename(columns={"sending_country_code": "code", "sending_country_name": "name"}),
    df_total[["receiving_country_code", "receiving_country_name"]]
      .rename(columns={"receiving_country_code": "code", "receiving_country_name": "name"}),
], ignore_index=True).dropna()

global_name = both.groupby("code")["name"].agg(lambda x: x.value_counts().idxmax())

df_total["sending_country_name"] = df_total["sending_country_code"].map(global_name).astype("category")
df_total["receiving_country_name"] = df_total["receiving_country_code"].map(global_name).astype("category")

df_total["sending_country"] = (
    df_total["sending_country_code"].astype("string") + " - " + df_total["sending_country_name"].astype("string")
).astype("category")

df_total["receiving_country"] = (
    df_total["receiving_country_code"].astype("string") + " - " + df_total["receiving_country_name"].astype("string")
).astype("category")


# 4) Final check: confirm there are no codes with more than one canonical name
for base in ["sending_country", "receiving_country"]:
    tmp = df_total[[base + "_code", base + "_name"]].dropna()
    multi = tmp.groupby(base + "_code")[base + "_name"].nunique()
    print("FINAL", base, "codes with >1 name:", int((multi > 1).sum()))

In [None]:
# - Reduce spelling variations in city names (extra spaces, suffixes like CEDEX,
#   punctuation, districts like "Paris 16", accents, etc.).
# - Unify some frequent variants/exonyms to a standard English form.
# Note on encoding:
# - In some records the character "�" may appear. This usually indicates a problem
#   with encoding already present in the original data. It's not corrected here but quantified as a quality check.

def strip_accents(s: str) -> str:
    """
    Remove accents/diacritics to reduce variants of the same name.
    Example: "Málaga" -> "Malaga".
    """
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    return s.replace("ß", "ss")


# Map of frequent exonyms/variants to a standard English form
english_map = {
    "Wien": "Vienna",
    "Praha": "Prague",
    "Bruxelles": "Brussels",
    "Roma": "Rome",
    "Milano": "Milan",
    "Lisboa": "Lisbon",
    "Warszawa": "Warsaw",
    "Oporto": "Porto",
    "Bologne": "Bologna",
    "Bolonia": "Bologna",
    "Firenze": "Florence",
    "Torino": "Turin",
    "Kobenhavn": "Copenhagen",
    "Sevilla": "Seville",
}


def clean_city_series(series: pd.Series, english_map: dict) -> pd.Series:
    """
    Basic cleaning of city names.
    Apply simple rules to homogenize text without normalizing city by city.
    """
    s = series.astype("string").str.strip()

    # 1) Normalize spaces
    s = s.str.replace(r"\s+", " ", regex=True)

    # 2) Remove suffixes like "CEDEX" (common in French addresses)
    s = s.str.replace(r"\s+cedex\s*\d*\s*$", "", regex=True, case=False)

    # 3) Remove typical final punctuation
    s = s.str.replace(r"[.,;:?]+$", "", regex=True)

    # 4) Remove district numbers at the end (Paris 16, Dublin 2, etc.)
    s = s.str.replace(r"\s+\d{1,3}\s*$", "", regex=True)

    # 5) Remove accents/diacritics and standardize capitalization
    s = s.apply(lambda x: strip_accents(str(x)) if pd.notna(x) else pd.NA)
    s = s.str.title()

    # 6) Unify frequent exonyms/variants
    s = s.replace(english_map)

    return s


# Apply cleaning to both city columns if they exist in the DataFrame
city_cols = ["receiving_city", "sending_city"]

for col in city_cols:
    if col in df_total.columns:
        df_total[col] = clean_city_series(df_total[col], english_map).astype("category")

        # Light check: number of unique and presence of mojibake (�)
        print(f"\n{col}")
        print("Unique:", df_total[col].nunique(dropna=True))

        bad = df_total[col].astype("string").str.contains("�", na=False).sum()
        print("Rows with mojibake (�):", bad)

        # Minimal examples (only if mojibake exists) to be able to inspect it without generating too much output
        if bad > 0:
            print("Examples with mojibake:")
            print(df_total.loc[df_total[col].astype("string").str.contains("�", na=False), col].head(5))


In [None]:
# I'm going to prepare a filtered dataset for analysis (which I'll then export to Power BI).
# Steps:
# 1) Create year_start from academic_year (for example "2019-20" -> 2019).
# 2) Filter the period 2017–2024 by start year.
# 3) Compare criteria to define HE (ISCED, activity_group, field) as evidence.
# 4) Define the final HE dataset by ISCED 6–8 and add an he_strict flag.

df = df_total.copy()

df["year_start"] = pd.to_numeric(
    df["academic_year"].astype("string").str.extract(r"(\d{4})")[0],
    errors="coerce"
).astype("Int64")

df_17_24 = df[df["year_start"].between(2017, 2024)].copy()

print(df_17_24["academic_year"].value_counts().sort_index().tail(10))
print("Rows:", len(df_17_24))


In [None]:
# Comparison of criteria to define HE (evidence)
mask_isced = df_17_24["isced_level"].isin([6, 7, 8])
mask_act   = df_17_24["activity_group"].eq("HE")
mask_field = df_17_24["field"].eq("Higher Education")

summary = pd.DataFrame({
    "Filter": [
        "ISCED 6-8 (base criterion)",
        "activity_group == HE",
        "field == Higher Education",
        "ISCED 6-8 AND activity_group HE (HE strict)",
        "ISCED 6-8 AND field HE",
        "field HE AND activity_group HE",
        "ISCED 6-8 AND field HE AND activity_group HE",
    ],
    "Rows": [
        mask_isced.sum(),
        mask_act.sum(),
        mask_field.sum(),
        (mask_isced & mask_act).sum(),
        (mask_isced & mask_field).sum(),
        (mask_field & mask_act).sum(),
        (mask_isced & mask_field & mask_act).sum(),
    ]
})

summary["% over df_17_24"] = (summary["Rows"] / len(df_17_24) * 100).round(2)
print(summary)

In [None]:
#Final HE dataset for Power BI (HE by ISCED 6–8 + he_strict flag)
df_17_24["he_isced"] = mask_isced
df_17_24["he_strict"] = mask_isced & (df_17_24["activity_group"] == "HE")

df_he = df_17_24[df_17_24["he_isced"]].copy()

print("HE (ISCED 6-8):", len(df_he))
print("HE strict (within HE):", df_he["he_strict"].sum())
print(df_he["academic_year"].value_counts().sort_index().tail(10))


In [None]:
df_he.info()

In [30]:
# Final dataset for Power BI: I keep only the columns I'm going to use in the analysis.
# For countries, I keep ISO2 (code) + name (name) because it's more robust for maps and for a star model.

cols_keep = [
    "academic_year", "year_start",
    "mobility_start_ym", "mobility_start_year", "mobility_start_month_num",

    # Countries (ISO2 + name)
    "sending_country_code", "sending_country_name",
    "receiving_country_code", "receiving_country_name",

    # participant_country comes as name (not ISO2 in this pipeline)
    "participant_country",

    # Demographic variables / flags
    "participant_gender", "participant_profile", "fewer_opportunities",

    # Numeric metrics
    "participant_age", "mobility_duration", "actual_participants",

    # Education and activity
    "isced_level", "isced_group",
    "activity_group",
    "isced_macro",

    # Flag for conservative analysis within HE
    "he_strict",
]

df_he_pbi = df_he[cols_keep].copy()


In [None]:
# I export the final dataset in two formats:
# - Parquet (recommended): compressed, fast to load, preserves data types.
# - CSV (universal): for compatibility with tools that don't support Parquet.
# The output folder is created if it doesn't exist.

import os

output_dir = "data/processed"
os.makedirs(output_dir, exist_ok=True)

# 1) Export to Parquet (preferred for Power BI)
parquet_path = os.path.join(output_dir, "erasmus_he_2017_2024.parquet")
df_he_pbi.to_parquet(parquet_path, index=False)

# 2) Export to CSV (fallback)
csv_path = os.path.join(output_dir, "erasmus_he_2017_2024.csv")
df_he_pbi.to_csv(csv_path, index=False)

# Quick check: confirm export
print(f"Exported {len(df_he_pbi):,} records to {output_dir}/")
print(f"  - erasmus_he_2017_2024.parquet ({os.path.getsize(parquet_path) / 1e6:.1f} MB)")
print(f"  - erasmus_he_2017_2024.csv ({os.path.getsize(csv_path) / 1e6:.1f} MB)")