# Data Ingestion Pipeline - Smart Building IoT Sensors

## Casus
We gaan data verwerken van temperatuur- en CO2-sensoren in een universiteitskantoor. De sensoren sturen metingen naar verschillende bronnen:
- **Lokale JSON files** (batch uploads van sensoren)
- **CSV metadata** (sensor configuratie)
- **Publieke API** (weer data voor correlatie)

**Doel**: Inzicht krijgen in kantoorklimaat en energiebesparing mogelijk maken.

In [None]:
# Benodigde libraries
import pandas as pd
import json
import requests
from datetime import datetime, timedelta
from pathlib import Path


## 1. Trigger - Hoe en wanneer start de pipeline?

In productie zou dit een scheduled job zijn (bijv. via cron/Airflow).

**1.1 Cron expression oefening**: 

Wat is de cron expression voor “elke werkdag om 0:00 uur”


```
* * * * *
┬ ┬ ┬ ┬ ┬
│ │ │ │ └─── Day of week (0-6 or SUN-SAT)
│ │ │ └────── Month (1-12 or JAN-DEC)
│ │ └───────── Day of month (1-31)
│ └────────────Hour (0-23)
└─────────────── Minute (0-59)

```

In [None]:
# 1.1 Schrijf hier het antwoord op


**1.2 Op job completion oefening**:

Als je zeker wilt weten dat een job (B) pas begint als de vorige (A) klaar is, kan je dat doen met APScheduler.

Vraag: wat gebeurt er als je de `if` statement verwijdert in `on_done()`:
```
def on_done(event):
    sched.add_job(b, id="b")

In [None]:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED
import time

def a(): 
    print("Job A started...")
    time.sleep(2)
    print("A done.")

def b(): 
    print("Job B started after A...")
    time.sleep(1)
    print("B done.")

def on_done(event):
    if event.job_id == "a": 
        sched.add_job(b, id="b")

sched = BackgroundScheduler()
sched.add_job(a, id="a")
sched.add_listener(on_done, EVENT_JOB_EXECUTED)

# Start de scheduler
sched.start()
time.sleep(5)
sched.shutdown()

In [None]:
# Vul hier je antwoord in

**1.3 Zoek JSON files** 

Controleer of er .json files bestaan in FOLDER_NAME (zie hieronder), met `Path()` en `glob()`.

In [None]:
# Constantes worden gedefinieerd in hoofdletters
FOLDER_NAME = "sample_data"

In [None]:
# Implementeer hier

**1.4 Maak een functie**

Bij oefening 1.2 heb je ze al gezien: functies. Functies maken het mogelijk een stukje code te hergebruiken. De syntax is: 
```
def functie_naam(parameter_naam: <type> = default_waarde):
    <logica>
    return <object>
```

Bijvoorbeeld:
```
def add(x: int, y: int):
    result = x + y
    return result
```

Stappen:
- Maak een functie `check_for_data()` dat op basis van een folder name de json files returnt.
- Roep de functie aan en sla het return object op in een variabele `files_to_process`

In [None]:
# Implementeer hier


## 2. Connection - Hoe verbindt de source met de target?

We hebben 3 verschillende databronnen voor deze oefening:
1. Lokale JSON files (sensor readings)
2. CSV file (sensor metadata)
3. Publieke API (Open-Meteo voor weer data)

In productie:
- Authentication (API token, OAuth2, service account)
- Credentials in environment variables (NOOIT in code!)
- Retry logic voor netwerkfouten

**2.1 Haal weather data op met publieke API Open-Meteo**

Test eerst of de API werkt, en daarna verpak je het weer in een functie `request_weather(latitude: int, longitude: int, date: str)`.

In [None]:
ROTTERDAM_COORDINATES = (51.92, 4.47) # Latitude, Longitude
TEST_DATE = "2025-10-01"

In [None]:
# API details
base_url = "https://archive-api.open-meteo.com/v1/archive"
params = {
        "latitude": latitude,
        "longitude": longitude,
        "start_date": date,
        "end_date": date,
        "hourly": "temperature_2m,relative_humidity_2m",
        "timezone": "Europe/Amsterdam"
    }

In [None]:
# Implementeer hier

## 3. State Management - Hoe houd je bij wat er is veranderd?

Definieer 2 functies:
- `load_state()`, dat de laatste pipeline status inlaadt, of initialiseert bij de eerste keer
- `save_state()`, voor als de pipeline klaar is, om de status te updaten.

Sla op welke files al verwerkt zijn en de timestamp van de laatste run.

In [None]:
STATE_FILE = "pipeline_state.json"

def load_state():
    """Laad de laatste pipeline state"""
    try:
        with open(STATE_FILE, 'r') as f:
            state = json.load(f)
            print(f"State geladen: laatste run op {state['last_run']}")
            print(f"Verwerkte files: {len(state['processed_files'])}")
            return state
    except FileNotFoundError:
        print("Geen eerdere state gevonden, start vanaf nu")
        return {
            "last_run": None,
            "records_processed": 0,
            "processed_files": []
        }

def save_state(state):
    """Bewaar de huidige pipeline state"""
    with open(STATE_FILE, 'w') as f:
        json.dump(state, f, indent=2)
    print("State opgeslagen")

# Load current state
pipeline_state = load_state()

# Filter nieuwe files (die nog niet verwerkt zijn)
new_files = [f for f in files_to_process if f.name not in pipeline_state['processed_files']]
print(f"\nNieuwe files om te verwerken: {len(new_files)}")

## 4. Data Extraction - Hoe extraheer je de data?

Haal data op uit 3 verschillende bronnen (batch processing).

We hebben 3 verschillende databronnen:
1. JSON files (sensor readings)
2. CSV file (sensor metadata)
3. Publieke API (Open-Meteo voor weer data)

**4.1 Interpreteer onderstaande functie om JSONs in te laden**


In [None]:
# 1. JSON files
def extract_sensor_data(json_files):
    """Lees sensor readings uit JSON files"""
    all_data = []
    
    for file_path in json_files:
        with open(file_path, 'r') as f:
            data = json.load(f)
            all_data.extend(data)
            print(f"Gelezen: {file_path.name} ({len(data)} records)")
    
    return all_data

**4.2 Maak de functie af**

Lees de metadata CSV en return het als een pandas DataFrame

In [None]:
# 2. CSV file
def extract_sensor_metadata(csv_file="sample_data/sensor_metadata.csv"):
    """Lees sensor metadata uit CSV"""
    NotImplemented
    return metadata_df

**BONUS** 

Implementeer de functie `extract_weather_data_for_dates()` die een dictionary returnt, obv dates.


In [None]:
# Publieke API (Open-Meteo voor weer data)
def extract_weather_data_for_dates(dates):
    """Haal uurlijkse weer data op voor alle unieke datums in de sensor data"""
    weather_by_hour = {}
    NotImplemented
    return weather_by_hour

Nu kunnen we alle 3 de databronnen met de bovenstaande functies ophalen:

In [None]:
# Ophalen van alle sensor databronnen
raw_sensor_data = extract_sensor_data(new_files if new_files else files_to_process[:1]) # files_to_process komt uit oefening 1.4
sensor_metadata = extract_sensor_metadata()

# Voorbeelden
print(f"Voorbeeld ruwe data:")
print(json.dumps(raw_sensor_data[:2], indent=2))

Uit de sensor data weten we welke data relevant zijn voor de weer API, en kunnen we specifiek voor die dagen weather data ophalen:

In [None]:

# Bepaal unieke datums uit sensor data
df_temp = pd.DataFrame(raw_sensor_data)
df_temp['timestamp'] = pd.to_datetime(df_temp['timestamp'])
unique_dates = df_temp['timestamp'].dt.date.unique()
print(f"\nUnieke datums in sensor data: {len(unique_dates)}")

# Ophalen van de weer databron
weather_by_hour = extract_weather_data_for_dates(unique_dates)


## 5. Transformation - Moet je filteren, omvormen voordat data naar target gaat?

Transformeer en verrijk de data

**5.1 Combineer de sensormetingen met de sensor metadata**

Selecteer alleen de kolommen die relevant zijn

Hulpmiddelen:
- `df.merge()`
- Selecteer meerdere kolommen met `[[kolommen]]`, bijv. `df[["sensor_id", "sensor_type"]]`, let op dubbele haakjes (`[[`)!



In [None]:
# Implementeer hier

**5.2 Combineer de sensor data (DataFrame) met de weer data (dictionary)**


Hulpmiddelen:
- De weather data was afgerond op het uur, weet je nog? Match daarom de sensor timestamp ook naar het dichtstbijzijnde uur:

  `df['timestamp_hour'] = df['timestamp'].dt.floor('h').dt.strftime('%Y-%m-%dT%H:%M')`

- Gebruik map() om voor elke rij een .get() functie aan te roepen in de weer dictionary

  `df['outdoor_temp'] = df['timestamp_hour'].map(lambda h: weather_by_hour.get(h, {}).get('outdoor_temp'))`


In [None]:
# Implementeer hier

**5.3 Huiswerkopdracht**

1. Voeg afgeleide kolommen toe: hour_of_day, day_of_week, is_working_hours
2. Filter onrealistische waarden voor temperatuur en CO2
3. Geef een waarschuwing als een meting met lage batterij is gevonden
4. Meer interessante verrijkingen die je zelf kunt bedenken!


In [73]:
# Implementeer hier

## 6. Validation and Data Quality - Hoe zorg je ervoor dat je data correct is?

**6 Voer data quality checks uit voordat we data opslaan. Wat is je tolerantieniveau?**

- Check 1: Geen null waarden in kritieke kolommen
- Check 2: Duplicaten
- Check 3: Data coverage - elke sensor heeft data?
- Check 4: Waarde ranges per sensor type
- Check 5: Tijdsgaten (missende metingen)

Tot slot:
- Print rapport
- Sla resultaat van kritische checks op in variabele is_valid (True/False)

In [None]:
# Implementeer hier

## 7. Data Loading - Waar en hoe sla je de data op?

**7. Sla de data op naar CSV en Parquet bestanden als de validatie van eerder was geslaagd**

**Parquet vs CSV:**
- CSV: Row-based, makkelijk leesbaar, groot
- Parquet: Column-based, gecomprimeerd, snel voor analytics

In [None]:
# Implementeer hier

In [None]:
# Implementeer hier

## 8. Archiving and Retention - Hoe lang bewaar je data?

**8. Implementeer een retention policy: verwijder data ouder dan X dagen.**

In [None]:
# Implementeer hier

## 9. Pipeline Afronden - Update State

Sla de nieuwe pipeline state op voor de volgende run.

In [None]:
# Implementeer hier

## 10. Check nieuwe state
Run nu nog een keer de code van het blok "3. State Management" en je ziet dat er geen nieuwe files meer te verwerken zijn.

---

## Extra Opdrachten voor studenten

### Basis opdrachten:
1. **Trigger**: Pas de trigger aan zodat deze alleen JSON files van vandaag verwerkt
2. **Extraction**: Voeg een extra sensor type toe aan de sample data (bijv. luchtvochtigheid)
3. **Transformation**: Bereken het gemiddelde per sensor per uur
4. **Validation**: Voeg een validatie toe die checkt of batterijniveau niet te laag is (<10%)

### Gevorderde opdrachten:
5. **Error handling**: Implementeer retry logic voor de API call (probeer 3x met exponential backoff)
6. **Monitoring**: Voeg logging toe die bijhoudt hoelang elke stap duurt
7. **Alerting**: Stuur een waarschuwing als CO2 > 1000 ppm EN tijdens werktijd

### Uitdagingen:
9. **Data enrichment**: Voeg postcode gegevens toe op basis van locatie
10. **Schaalvergroting**: Wat als je 100+ JSON files per dag krijgt? Hoe optimaliseer je?
11. **Data governance**: Implementeer data lineage - track voor elke record waar het vandaan komt
12. **ETLT**: Welke transformaties zou bovenop het huidige resultaat willen doen? Voeg deze toe.
13. **Real-time**: Simuleer een streaming scenario: verwerk elke 30 seconden nieuwe data
14. **Parquet partitioning**: Sla Parquet data op gepartitioneerd per dag voor betere query performance