# European Parlement Data in Iceberg

In [6]:
!pip install pyiceberg pynessie python-dotenv

Collecting pyiceberg
  Using cached pyiceberg-0.10.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (4.9 kB)
Collecting pynessie
  Using cached pynessie-0.67.0-py2.py3-none-any.whl.metadata (13 kB)
Collecting python-dotenv
  Using cached python_dotenv-1.2.1-py3-none-any.whl.metadata (25 kB)
Collecting cachetools<7.0,>=5.5 (from pyiceberg)
  Using cached cachetools-6.2.4-py3-none-any.whl.metadata (5.6 kB)
Collecting mmh3<6.0.0,>=4.0.0 (from pyiceberg)
  Using cached mmh3-5.2.0-cp311-cp311-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl.metadata (14 kB)
Collecting pydantic!=2.4.0,!=2.4.1,<3.0,>=2.0 (from pyiceberg)
  Using cached pydantic-2.12.5-py3-none-any.whl.metadata (90 kB)
Collecting pyroaring<2.0.0,>=1.0.0 (from pyiceberg)
  Using cached pyroaring-1.0.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting rich<15.0.0,>=10.11.0 (from pyiceberg)
  Using cached rich-14.2.0-py3-none-any.whl.m

In [7]:
!pip install pyspark==3.5.3

Collecting pyspark==3.5.3
  Using cached pyspark-3.5.3-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark==3.5.3)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.3


## Import Dependencies

In [8]:
import os
import requests
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, BooleanType

print("✓ Dependencies imported")

✓ Dependencies imported


## Create Spark Session with Iceberg + Nessie + S3

In [9]:
# Stop any existing session
try:
    spark.stop()
    print("Stopped existing Spark session")
except:
    pass

# Create Spark session with full lakehouse configuration
spark = SparkSession.builder \
    .appName("TweedeKamer-Lakehouse") \
    .config("spark.jars.packages",
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,"
            "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.76.0,"
            "org.apache.hadoop:hadoop-aws:3.3.4,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.262,"
            "software.amazon.awssdk:bundle:2.25.11,"
            "software.amazon.awssdk:url-connection-client:2.25.11") \
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
            "org.projectnessie.spark.extensions.NessieSparkSessionExtensions") \
    .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog") \
    .config("spark.sql.catalog.nessie.uri", os.getenv("NESSIE_URI")) \
    .config("spark.sql.catalog.nessie.ref", "main") \
    .config("spark.sql.catalog.nessie.warehouse", "s3://lakehouse/") \
    .config("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.nessie.s3.endpoint", os.getenv("AWS_S3_ENDPOINT")) \
    .config("spark.sql.catalog.nessie.s3.path-style-access", "true") \
    .config("spark.sql.catalog.nessie.s3.access-key-id", os.getenv("AWS_ACCESS_KEY_ID")) \
    .config("spark.sql.catalog.nessie.s3.secret-access-key", os.getenv("AWS_SECRET_ACCESS_KEY")) \
    .config("spark.sql.catalog.nessie.s3.region", os.getenv("AWS_REGION")) \
    .config("spark.hadoop.fs.s3a.endpoint", os.getenv("AWS_S3_ENDPOINT")) \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

print(f"✓ Spark {spark.version} with Iceberg + Nessie + S3 configured")
print(f"  Nessie URI: {os.getenv('NESSIE_URI')}")
print(f"  S3 Endpoint: {os.getenv('AWS_S3_ENDPOINT')}")

✓ Spark 3.5.0 with Iceberg + Nessie + S3 configured
  Nessie URI: http://nessie:19120/api/v1
  S3 Endpoint: https://s3.nl-ams.scw.cloud


## Verify Nessie Connection and Create Namespace

In [10]:
# First, verify the Nessie catalog is available
print("Checking Nessie catalog connection...")
try:
    spark.sql("SHOW NAMESPACES IN nessie").show()
    print("✓ Nessie catalog is accessible")
except Exception as e:
    print(f"✗ Error accessing Nessie catalog: {e}")
    print("\nTroubleshooting steps:")
    print("1. Verify NESSIE_URI is set correctly")
    print("2. Check if Nessie service is running:")
    print("   kubectl get pods -n lakehouse | grep nessie")
    print("3. Verify port-forward if running locally")
    raise


Checking Nessie catalog connection...
+-----------+
|  namespace|
+-----------+
|  analytics|
|tweedekamer|
|   europarl|
|test_schema|
+-----------+

✓ Nessie catalog is accessible


In [12]:
# Create namespace if it doesn't exist
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.europarl")
print("✓ Created namespace: nessie.europarl")

# List existing namespaces
print("\nExisting namespaces in Nessie:")
spark.sql("SHOW NAMESPACES IN nessie").show()

✓ Created namespace: nessie.europarl

Existing namespaces in Nessie:
+-----------+
|  namespace|
+-----------+
|  analytics|
|tweedekamer|
|   europarl|
|test_schema|
+-----------+



## Get MEP data

In [10]:
"""Fetch all political groups (fracties) active in 2025"""
base_url = "https://data.europarl.europa.eu/api/v2"
meps_endpoint = f"{base_url}/meps"

params = {
    "format": "application/ld+json",
    "offset": 0,
    "limit": 50
}

print("Fetching MEP data...")
response = requests.get(meps_endpoint, params=params)
response.raise_for_status()

data = response.json()
meps = data.get('value', [])

print(f"✓ Fetched {len(meps)} meps")


Fetching MEP data...
✓ Fetched 0 meps


In [11]:
import time
import requests

BASE = "https://data.europarl.europa.eu/api/v2/meps"

headers = {
    "Accept": "application/ld+json"
}

offset = 0
limit = 100
all_meps = []

while True:
    print(f"offset: {offset}")
    params = {"offset": offset, "limit": limit}
    r = requests.get(BASE, headers=headers, params=params, timeout=30)

    # hard stop bij lege body
    if not r.text.strip():
        print(f"⛔ Empty body at offset {offset} — stopping.")
        break

    r.raise_for_status()

    try:
        payload = r.json()
    except Exception:
        print(f"⛔ Non-JSON response at offset {offset} — stopping.")
        break

    page = payload.get("data", [])
    if not page:
        print(f"✓ Finished at offset {offset}")
        break
    # print(f"page: {page}")

    all_meps.extend(page)
    offset += limit
    time.sleep(0.2)   # throttle: voorkomt random 503s

print(f"Fetched {len(all_meps)} MEPs")


offset: 0
offset: 100
offset: 200
offset: 300
offset: 400
offset: 500
offset: 600
offset: 700
offset: 800
offset: 900
offset: 1000
offset: 1100
offset: 1200
offset: 1300
offset: 1400
offset: 1500
offset: 1600
offset: 1700
offset: 1800
offset: 1900
offset: 2000
offset: 2100
offset: 2200
offset: 2300
offset: 2400
offset: 2500
offset: 2600
offset: 2700
offset: 2800
offset: 2900
offset: 3000
offset: 3100
offset: 3200
offset: 3300
offset: 3400
offset: 3500
offset: 3600
offset: 3700
offset: 3800
offset: 3900
offset: 4000
offset: 4100
offset: 4200
offset: 4300
offset: 4400
offset: 4500
offset: 4600
offset: 4700
offset: 4800
offset: 4900
offset: 5000
offset: 5100
offset: 5200
offset: 5300
⛔ Empty body at offset 5300 — stopping.
Fetched 5258 MEPs


## Create MEPs table

In [15]:
# Create table meps if it doesn't exist
spark.sql("""CREATE TABLE IF NOT EXISTS nessie.europarl.meps (
   id varchar(255),
   type varchar(255),
   identifier varchar(255),
   label varchar(255),
   familyName varchar(255),
   givenName varchar(255),
   sortLabel varchar(255)
)""")
print("✓ Created table: nessie.europarl.meps")

✓ Created table: nessie.europarl.meps


## Store data in PySpark dataframe

In [16]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

schema = StructType([
    StructField('id', StringType(), True),
    StructField('type', StringType(), True),
    StructField('identifier', StringType(), True),
    StructField('label', StringType(), True),
    StructField('familyName', StringType(), True),
    StructField('givenName', StringType(), True),
    StructField('sortLabel', StringType(), True),
    StructField('load_ts', TimestampType(), True)
])


In [17]:
from datetime import datetime

mep_records = [
    (
        mep.get('id'),
        mep.get('type'),
        mep.get('identifier'),
        mep.get('label'),
        mep.get('familyName'),
        mep.get('givenName'),
        mep.get('sortLabel'),
        datetime.utcnow()      # nu past het schema exact
    )
    for mep in all_meps
]


In [18]:
df_meps = spark.createDataFrame(mep_records, schema)
df_meps.count(), df_meps.show(5, False)

df_meps.writeTo("nessie.europarl.meps") \
    .using("iceberg") \
    .createOrReplace()


+----------+------+----------+-----------------------+------------+-------------+------------+--------------------------+
|id        |type  |identifier|label                  |familyName  |givenName    |sortLabel   |load_ts                   |
+----------+------+----------+-----------------------+------------+-------------+------------+--------------------------+
|person/1  |Person|1         |Georg JARZEMBOWSKI     |Jarzembowski|Georg        |JARZEMBOWSKI|2026-01-12 21:44:33.619603|
|person/10 |Person|10        |Hendrikus VREDELING    |Vredeling   |Hendrikus    |VREDELING   |2026-01-12 21:44:33.619609|
|person/100|Person|100       |J.C. RUTGERS           |Rutgers     |J.C.         |RUTGERS     |2026-01-12 21:44:33.619611|
|person/101|Person|101       |Sijbrandus A. POSTHUMUS|Posthumus   |Sijbrandus A.|POSTHUMUS   |2026-01-12 21:44:33.619613|
|person/11 |Person|11        |Hendrik J.G. WALTMANS  |Waltmans    |Hendrik J.G. |WALTMANS    |2026-01-12 21:44:33.619615|
+----------+------+-----

In [19]:
spark.sql("""SELECT * FROM nessie.europarl.meps""")
print("✓ SELECT: nessie.europarl.meps")

✓ SELECT: nessie.europarl.meps


# Current MEPs

In [37]:
spark.sql("""DROP TABLE nessie.europarl.current_meps""")

DataFrame[]

In [39]:
# Create current_meps table if it doesn't exist
spark.sql("""CREATE TABLE nessie.europarl.current_meps (
   id STRING,
   type STRING,
   identifier STRING,
   label STRING,
   familyName STRING,
   givenName STRING,
   sortLabel STRING,
   country STRING,
   political_group STRING,
   load_ts TIMESTAMP
)""")
print("✓ Created table: nessie.europarl.current_meps")

✓ Created table: nessie.europarl.current_meps


## Get current MEP data

In [None]:
"""Fetch all MEPs currently active (as of today)"""
import time
import requests

BASE = "https://data.europarl.europa.eu/api/v2/meps/show-current"

headers = {
    "Accept": "application/ld+json"
}
offset = 0
limit = 100
all_current = []

while True:
    params = {"offset": offset, "limit": limit}
    print(f"offset: {offset}")
    r = requests.get(BASE, headers=headers, params=params)
    # hard stop bij lege body
    if not r.text.strip():
        print(f"⛔ Empty body at offset {offset} — stopping.")
        break

    r.raise_for_status()

    try:
        payload = r.json()
    except Exception:
        print(f"⛔ Non-JSON response at offset {offset} — stopping.")
        break

    page = payload.get("data", [])
    print(f"page: {page}")
    if not page:
        print(f"✓ Finished at offset {offset}")
        break

    all_current.extend(page)
    offset += limit
    time.sleep(0.2)   # throttle: voorkomt random 503s

print(f"Fetched {len(all_current)} current MEPs")

## Store current MEPs in dataframe.

In [40]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

schema = StructType([
    StructField("id", StringType()),
    StructField("type", StringType()),
    StructField("identifier", StringType()),
    StructField("label", StringType()),
    StructField("familyName", StringType()),
    StructField("givenName", StringType()),
    StructField("sortLabel", StringType()),
    StructField("country", StringType()),
    StructField("political_group", StringType()),
    StructField("load_ts", TimestampType())
])


In [41]:
from datetime import datetime

def normalize(mep):
    return (
        mep.get("id"),
        mep.get("type"),
        mep.get("identifier"),
        mep.get("label"),
        mep.get("familyName"),
        mep.get("givenName"),
        mep.get("sortLabel"),
        mep.get("api:country-of-representation"),
        mep.get("api:political-group"),
        datetime.utcnow()
    )

mep_records = [normalize(mep) for mep in all_current]


In [44]:
spark.sql("""TRUNCATE TABLE nessie.europarl.current_meps""")

DataFrame[]

In [45]:
df_curmeps = spark.createDataFrame(mep_records, schema)
df_curmeps.count(), df_curmeps.show(5, False)

df_curmeps.writeTo("nessie.europarl.current_meps") \
    .using("iceberg") \
    .createOrReplace()


+-------------+------+----------+--------------+----------+---------+---------+-------+---------------+--------------------------+
|id           |type  |identifier|label         |familyName|givenName|sortLabel|country|political_group|load_ts                   |
+-------------+------+----------+--------------+----------+---------+---------+-------+---------------+--------------------------+
|person/101039|Person|101039    |Paolo BORCHIA |Borchia   |Paolo    |BORCHIA  |IT     |PfE            |2026-01-13 11:16:15.509369|
|person/101585|Person|101585    |Niels FUGLSANG|Fuglsang  |Niels    |FUGLSANG |DK     |S&D            |2026-01-13 11:16:15.509386|
|person/103246|Person|103246    |Auke ZIJLSTRA |Zijlstra  |Auke     |ZIJLSTRA |NL     |PfE            |2026-01-13 11:16:15.509394|
|person/103381|Person|103381    |Terry REINTKE |Reintke   |Terry    |REINTKE  |DE     |Verts/ALE      |2026-01-13 11:16:15.509403|
|person/1294  |Person|1294      |Elio DI RUPO  |Di Rupo   |Elio     |DIRUPO   |BE  

## Let's do some analytics

In [7]:
df = spark.table("nessie.europarl.current_meps")

country_counts = (
    df.groupBy("country")
      .count()
      .orderBy("count", ascending=False)
)

country_counts.show()


+-------+-----+
|country|count|
+-------+-----+
|     DE|   96|
|     FR|   81|
|     IT|   76|
|     ES|   60|
|     PL|   53|
|     RO|   33|
|     NL|   31|
|     BE|   22|
|     CZ|   21|
|     PT|   21|
|     GR|   21|
|     SE|   21|
|     HU|   21|
|     AT|   20|
|     BG|   17|
|     FI|   15|
|     SK|   15|
|     DK|   15|
|     IE|   14|
|     HR|   12|
+-------+-----+
only showing top 20 rows



# Meetings and Participants

In [None]:
# Create current_meps table if it doesn't exist
spark.sql("""CREATE TABLE nessie.europarl.meetings (
   id STRING,
   type STRING,
   activity_date STRING,
   activity_end_date STRING,
   activity_id STRING,
   activity_label_en STRING,
   activity_start_date STRING,
   had_activity_type STRING,
   parliamentary_term STRING,
   hasLocality STRING,
   load_ts TIMESTAMP
)""")
print("✓ Created table: nessie.europarl.meetings")


spark.sql("""CREATE TABLE nessie.europarl.meeting_persons (
   id STRING,
   activity_id STRING,
   personid STRING,
   was_participant STRING,
   was_excused STRING,
   load_ts TIMESTAMP
)""")
print("✓ Created table: nessie.europarl.meeting_persons")


In [13]:
import requests, time
from datetime import datetime

BASE = "https://data.europarl.europa.eu/api/v2/meetings"

headers = {"Accept": "application/ld+json"}

def fetch_meetings(year=2025):
    offset, limit = 0, 100
    all_rows = []

    while True:
        print(f"offset: {offset}")
        params = {"year": year, "offset": offset, "limit": limit}
        r = requests.get(BASE, headers=headers, params=params)

        if not r.text.strip():
            break

        payload = r.json()
        page = payload.get("data", [])
        print(f"type(page): {type(page)}")
        # print(f"page[0]: {page[0]}")
        if not page:
            break

        all_rows.extend(page)
        offset += limit
        time.sleep(0.2)

    return all_rows

meetings_raw = fetch_meetings(2025)
print(f"Fetched {len(meetings_raw)} meetings")

offset: 0
type(page): <class 'list'>
offset: 100
Fetched 53 meetings


### Meeting schema and records

In [15]:
def get_label(val):
    if isinstance(val, dict):
        return val.get("label", {}).get("en")
    return None


In [None]:
# spark.sql("""TRUNCATE TABLE nessie.europarl.meetings""")

In [16]:
from pyspark.sql.types import *
from datetime import datetime

meeting_schema = StructType([
    StructField("id", StringType()),
    StructField("type", StringType()),
    StructField("activity_date", StringType()),
    StructField("activity_end_date", StringType()),
    StructField("activity_id", StringType()),
    StructField("activity_label_en", StringType()),
    StructField("activity_start_date", StringType()),
    StructField("had_activity_type", StringType()),
    StructField("parliamentary_term", StringType()),
    StructField("hasLocality", StringType()),
    StructField("load_ts", TimestampType())
])

from datetime import datetime

def normalize_meeting(m):
    return (
        m.get("id"),
        m.get("type"),
        m.get("activity_date"),
        m.get("activity_end_date"),
        m.get("activity_id"),
        m.get("activity_label", {}).get("en"),
        m.get("activity_start_date"),
        m.get("had_activity_type"),
        m.get("parliamentary_term"),
         m.get("hasLocality") or m.get("has_locality"),
        datetime.utcnow()
    )


In [17]:
meeting_rows = [normalize_meeting(m) for m in meetings_raw]

df_meetings = spark.createDataFrame(meeting_rows, meeting_schema)

df_meetings.writeTo("nessie.europarl.meetings").createOrReplace()


In [18]:
df_meetings.count(), df_meetings.show(5, False)

+------------------------------+--------+-------------+-------------------------+-----------------+--------------------------+-------------------------+---------------------------------+------------------+--------------------------------------------------------------+--------------------------+
|id                            |type    |activity_date|activity_end_date        |activity_id      |activity_label_en         |activity_start_date      |had_activity_type                |parliamentary_term|hasLocality                                                   |load_ts                   |
+------------------------------+--------+-------------+-------------------------+-----------------+--------------------------+-------------------------+---------------------------------+------------------+--------------------------------------------------------------+--------------------------+
|eli/dl/event/MTG-PL-2025-01-20|Activity|2025-01-20   |2025-01-20T23:00:00+01:00|MTG-PL-2025-01-20|Monday, 20 Ja

(53, None)

### Meeting participants

In [10]:
def get_identifier(val):
    if isinstance(val, dict):
        return val.get("identifier")
    return None

In [11]:
from pyspark.sql import Row

person_rows = []

for m in meetings_raw:
    meeting_id = m.get("id")
    activity_id = m.get("activity_id")

    # participants
    for p in m.get("had_participant_person", []):
        person_rows.append((
            meeting_id,
            activity_id,
            p,            # p is nu gewoon 'person/197537'
            "true",
            "false",
            datetime.utcnow()
        ))

    # excused
    for p in m.get("had_excused_person", []):
        person_rows.append((
            meeting_id,
            activity_id,
            p,
            "false",
            "true",
            datetime.utcnow()
        ))


In [12]:
person_schema = StructType([
    StructField("id", StringType()),
    StructField("activity_id", StringType()),
    StructField("personid", StringType()),
    StructField("was_participant", StringType()),
    StructField("was_excused", StringType()),
    StructField("load_ts", TimestampType())
])

df_persons = spark.createDataFrame(person_rows, person_schema)

# Write to Iceberg
df_persons.writeTo("nessie.europarl.meeting_persons").createOrReplace()
print("✓ Meeting participants table written")

✓ Meeting participants table written


In [None]:
df_persons.count(), df_persons.show(5, False)