# Olympics & Countries ETL Pipeline

This notebook reads the CSV files from `datasets/olympics` and `datasets/countries`, normalizes columns, creates an artificial `country_id` to align records across datasets, upserts results to Parquet files under `outputs/`, and writes a denormalized artifact suitable for programmatic queries.

Outputs: `outputs/countries.parquet`, `outputs/olympics.parquet`, `outputs/olympics_denormalized.parquet`

In [36]:
# Imports and path setup
import re
import hashlib
from pathlib import Path
import pandas as pd
import numpy as np

print('pandas', pd.__version__)

# Paths (detect repository root by looking for a 'datasets' folder or README.md)
def find_repo_root(marker_names=('datasets', 'README.md')):
    p = Path.cwd()
    while True:
        if any((p / m).exists() for m in marker_names):
            return p
        if p.parent == p:
            break
        p = p.parent
    # fallback to current working directory
    return Path.cwd()

REPO_ROOT = find_repo_root()
DATA_DIR = REPO_ROOT / 'datasets'
OLY_DIR = DATA_DIR / 'olympics'
COUNTRIES_CSV = DATA_DIR / 'countries' / 'countries of the world.csv'
OUT_DIR = REPO_ROOT / 'outputs'
OUT_DIR.mkdir(exist_ok=True)

print('DATA_DIR', DATA_DIR)

pandas 2.3.3
DATA_DIR /workspaces/data-engineer-test/datasets


In [37]:
# Utility functions
def country_id_from_name(name: str) -> str:
    """Create a stable artificial key from a country name.
    Uses a normalized lowercase form and MD5 to keep IDs compact and deterministic.
    Handles NaN gracefully.
    """
    if pd.isna(name):
        name = ''
    key = re.sub(r'\s+', ' ', str(name)).strip().lower()
    return hashlib.md5(key.encode('utf-8')).hexdigest()

def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Normalize column names: strip, lowercase, replace spaces with underscores.
    """
    df = df.rename(columns=lambda c: re.sub(r'\s+', '_', str(c).strip().lower()))
    return df

In [38]:
# Read and normalize all olympics CSV files into a single DataFrame
def read_olympics_files() -> pd.DataFrame:
    frames = []
    for f in sorted(OLY_DIR.glob('*.csv')):
        # try to extract a year from filename (e.g., Tokyo 2020 ...)
        m = re.search(r'(\d{4})', f.name)
        year = int(m.group(1)) if m else None
        # attempt to read with a forgiving configuration
        try:
            # prefer the default engine; avoid engine='python' together with low_memory
            df = pd.read_csv(f, low_memory=False, encoding='utf-8')
        except Exception:
            df = pd.read_csv(f, low_memory=False, encoding='latin1')
        df = normalize_columns(df)
        df['source_file'] = f.name
        df['year'] = year
        # unify country-like column names
        for c in ('nation', 'country', 'team'):
            if c in df.columns:
                df = df.rename(columns={c: 'country'})
                break
        if 'country' not in df.columns:
            df['country'] = None
        # coerce numeric medal columns when present
        for medal in ('gold', 'silver', 'bronze', 'total', 'rank'):
            if medal in df.columns:
                df[medal] = pd.to_numeric(df[medal].astype(str).str.replace(r'[^0-9.-]', '', regex=True), errors='coerce')
        # create artificial country id
        df['country_id'] = df['country'].astype(str).apply(country_id_from_name)
        frames.append(df)
    if not frames:
        return pd.DataFrame()
    return pd.concat(frames, ignore_index=True, sort=False)

# Build olympics table (DataFrame)
def build_olympics_table() -> pd.DataFrame:
    df = read_olympics_files()
    return df

In [39]:
# Read and normalize the countries CSV into a DataFrame
def build_countries_table() -> pd.DataFrame:
    # robust read (some versions of this CSV use commas and quoted fields)
    try:
        # avoid specifying engine='python' when using low_memory
        df = pd.read_csv(COUNTRIES_CSV, low_memory=False, encoding='utf-8')
    except Exception:
        df = pd.read_csv(COUNTRIES_CSV, low_memory=False, encoding='latin1')
    df = normalize_columns(df)
    # find a sensible name column
    for c in ('country', 'name', 'country_name'):
        if c in df.columns:
            df = df.rename(columns={c: 'country'})
            break
    if 'country' not in df.columns:
        df['country'] = None
    df['country_id'] = df['country'].astype(str).apply(country_id_from_name)
    # attempt to coerce common numeric-like columns
    numeric_like = []
    for col in df.columns:
        if re.search(r'(population|area|density|gdp|percapita|index|%|rate)', col):
            numeric_like.append(col)
            df[col] = pd.to_numeric(df[col].astype(str).str.replace(r'[^0-9.-]', '', regex=True), errors='coerce')
    return df

In [40]:
# Upsert helper for parquet files
def upsert_parquet(df: pd.DataFrame, out_path: Path, key_cols):
    """Upsert rows from df into out_path using key_cols as the natural key.
    If the parquet exists, read it, concatenate, and drop duplicates keeping the latest (by index of concatenation).
    """
    # ensure out_path is a Path object
    out_path = Path(out_path)
    if out_path.exists():
        existing = pd.read_parquet(out_path)
        combined = pd.concat([existing, df], ignore_index=True, sort=False)
        # keep the last occurrence for each key (assuming later rows in df should win)
        combined = combined.drop_duplicates(subset=key_cols, keep='last')
    else:
        combined = df.copy()
    combined.to_parquet(out_path, index=False)
    return combined

In [41]:
# Run the full pipeline and write artifacts
def run_pipeline():
    try:
        print('Reading countries CSV...')
        countries = build_countries_table()
        countries_path = "/workspaces/data-engineer-test/outputs/countries.parquet"
        print(f'Upserting countries -> {countries_path}')
        if countries is None or (hasattr(countries, 'empty') and countries.empty):
            # If no new countries data, prefer existing parquet if present
            if countries_path.exists():
                countries_written = pd.read_parquet(countries_path)
                print(f'No new countries found; using existing file with {len(countries_written)} rows')
            else:
                # write an empty parquet with expected structure
                countries_written = countries.copy() if countries is not None else pd.DataFrame(columns=['country','country_id'])
                countries_written.to_parquet(countries_path, index=False)
                print('Wrote empty countries parquet')
        else:
            countries_written = upsert_parquet(countries, countries_path, key_cols=['country_id'])
            print(f'wrote {len(countries_written)} country rows')

        print('Reading olympics CSVs...')
        olympics = build_olympics_table()
        olympics_path = OUT_DIR / 'olympics.parquet'
        print(f'Upserting olympics -> {olympics_path}')
        if olympics is None or (hasattr(olympics, 'empty') and olympics.empty):
            if olympics_path.exists():
                olympics_written = pd.read_parquet(olympics_path)
                print(f'No new olympics data; using existing file with {len(olympics_written)} rows')
            else:
                olympics_written = olympics.copy() if olympics is not None else pd.DataFrame(columns=['country','country_id','year'])
                olympics_written.to_parquet(olympics_path, index=False)
                print('Wrote empty olympics parquet')
        else:
            olympics_written = upsert_parquet(olympics, olympics_path, key_cols=['country_id', 'year'])
            print(f'wrote {len(olympics_written)} olympics rows')

        # Denormalize (left join olympics -> countries using country_id)
        print('Creating denormalized artifact...')
        merged = olympics_written.merge(countries_written, on='country_id', how='left', suffixes=('_olymp', '_country'))
        merged_path = OUT_DIR / 'olympics_denormalized.parquet'
        merged.to_parquet(merged_path, index=False)
        print(f'wrote denormalized -> {merged_path} ({len(merged)} rows)')
        return {'countries': countries_written, 'olympics': olympics_written, 'merged': merged}
    except Exception as e:
        print('Pipeline failed with exception:', e)
        raise

# Execute pipeline
artifacts = run_pipeline()

Reading countries CSV...
Upserting countries -> /workspaces/data-engineer-test/outputs/countries.parquet
wrote 227 country rows
Reading olympics CSVs...
Upserting olympics -> /workspaces/data-engineer-test/outputs/olympics.parquet
wrote 15 olympics rows
Creating denormalized artifact...
wrote denormalized -> /workspaces/data-engineer-test/outputs/olympics_denormalized.parquet (15 rows)
Upserting olympics -> /workspaces/data-engineer-test/outputs/olympics.parquet
wrote 15 olympics rows
Creating denormalized artifact...
wrote denormalized -> /workspaces/data-engineer-test/outputs/olympics_denormalized.parquet (15 rows)


In [42]:
# Quick checks and example queries
countries_df = artifacts['countries']
olympics_df = artifacts['olympics']
merged_df = artifacts['merged']

print('Countries sample:')
display(countries_df.head(5))

print('Olympics sample:')
display(olympics_df.head(5))

print('Denormalized sample:')
display(merged_df.head(5))

# Example: Top 10 countries by total medals (if 'total' exists)
if 'total' in olympics_df.columns:
    top = olympics_df.groupby('country_id', dropna=False)['total'].sum().reset_index().sort_values('total', ascending=False).head(10)
    # join back to get country name where available
    top = top.merge(countries_df[['country_id','country']], on='country_id', how='left')
    print('Top 10 by total medals:')
    display(top)
else:
    print('No "total" medal column present in olympics dataset to aggregate.')

Countries sample:


Unnamed: 0,country,region,population,area_(sq._mi.),pop._density_(per_sq._mi.),coastline_(coast/area_ratio),net_migration,infant_mortality_(per_1000_births),gdp_($_per_capita),literacy_(%),...,arable_(%),crops_(%),other_(%),climate,birthrate,deathrate,agriculture,industry,service,country_id
227,Afghanistan,ASIA (EX. NEAR EAST),31056997,647500,480,0,2306,16307,700.0,360.0,...,1213.0,22.0,8765.0,1,466.0,2034.0,38.0,24.0,38.0,a8424b7baae263918301406bdc884e28
228,Albania,EASTERN EUROPE,3581655,28748,1246,126,-493,2152,4500.0,865.0,...,2109.0,442.0,7449.0,3,1511.0,522.0,232.0,188.0,579.0,3303daf806aebcd0cfd114f7d267f109
229,Algeria,NORTHERN AFRICA,32930091,2381740,138,4,-39,31,6000.0,700.0,...,322.0,25.0,9653.0,1,1714.0,461.0,101.0,6.0,298.0,49f4fdc64da08e0cbac132fa06062d2d
230,American Samoa,OCEANIA,57794,199,2904,5829,-2071,927,8000.0,970.0,...,10.0,15.0,75.0,2,2246.0,327.0,,,,4941fb51fed4066ebea2f0a198cb531e
231,Andorra,WESTERN EUROPE,71201,468,1521,0,66,405,19000.0,1000.0,...,222.0,0.0,9778.0,3,871.0,625.0,,,,449f508e9fad3b1bd4debc02f0d2194b


Olympics sample:


Unnamed: 0,noc,gold,silver,bronze,total,source_file,year,country,country_id
88,MGL,0,0,1,1,Athens 2004 Olympics Nations Medals.csv,2004,,334c4a4c42fdb79d7ebc3e73b517e6f8
166,PUR,0,0,1,1,Atlanta 1996 Olympics Nations Medals.csv,1996,,334c4a4c42fdb79d7ebc3e73b517e6f8
188,AUS,0,0,1,1,Lillehammer 1994 Olympics Nations Medals.csv,1994,,334c4a4c42fdb79d7ebc3e73b517e6f8
274,AFG,0,0,1,1,London 2012 Olympics Nations Medals.csv,2012,,334c4a4c42fdb79d7ebc3e73b517e6f8
298,AUS,0,0,1,1,Nagano 1998 Olympics Nations Medals.csv,1998,,334c4a4c42fdb79d7ebc3e73b517e6f8


Denormalized sample:


Unnamed: 0,noc,gold,silver,bronze,total,source_file,year,country_olymp,country_id,country_country,...,phones_(per_1000),arable_(%),crops_(%),other_(%),climate,birthrate,deathrate,agriculture,industry,service
0,MGL,0,0,1,1,Athens 2004 Olympics Nations Medals.csv,2004,,334c4a4c42fdb79d7ebc3e73b517e6f8,,...,,,,,,,,,,
1,PUR,0,0,1,1,Atlanta 1996 Olympics Nations Medals.csv,1996,,334c4a4c42fdb79d7ebc3e73b517e6f8,,...,,,,,,,,,,
2,AUS,0,0,1,1,Lillehammer 1994 Olympics Nations Medals.csv,1994,,334c4a4c42fdb79d7ebc3e73b517e6f8,,...,,,,,,,,,,
3,AFG,0,0,1,1,London 2012 Olympics Nations Medals.csv,2012,,334c4a4c42fdb79d7ebc3e73b517e6f8,,...,,,,,,,,,,
4,AUS,0,0,1,1,Nagano 1998 Olympics Nations Medals.csv,1998,,334c4a4c42fdb79d7ebc3e73b517e6f8,,...,,,,,,,,,,


Top 10 by total medals:


Unnamed: 0,country_id,total,country
0,334c4a4c42fdb79d7ebc3e73b517e6f8,15,


## Notes and next steps
- The notebook creates an artificial `country_id` using MD5(country_name_normalized). This keeps a deterministic mapping but may not resolve all name variants (e.g., 'USA' vs 'United States'). For improved matching, consider fuzzy matching (Python's `fuzzywuzzy` or `rapidfuzz`) or a canonical mapping table.
- Parquet files are written with the default engine (pyarrow). Install `pyarrow` if not already present.
- To run this notebook: install dependencies from `requirements.txt`, then open in Jupyter or run cells in your environment.