In [5]:
# CASE 1: Kopieren von A nach B – Manuell gesteuerter Airflow DAG
# ===============================================================
# Ziel: Die Datei `TaxiZone.csv` soll per Airflow-DAG von `data/raw/` nach `data/processed/` kopiert werden.

# 📁 Ordnerstruktur vorausgesetzt:
# └── data/
#     ├── raw/zones.csv
#     └── processed/ (leer)

# Schritt 1 – Überprüfung: Ist die Quelldatei vorhanden?
import os

raw_path = "/Users/tsimsek/Workshop-DE-Masterclass/Data-Pipelines/Airflow/data/raw/zones.csv"
assert os.path.exists(raw_path), f"❌ Datei nicht gefunden: {raw_path}"
print(f"✅ Datei gefunden: {raw_path}")

✅ Datei gefunden: /Users/tsimsek/Workshop-DE-Masterclass/Data-Pipelines/Airflow/data/raw/zones.csv


In [6]:
# Schritt 2 – Manuelles Laden der Quelldaten
import pandas as pd

df = pd.read_csv(raw_path)
df.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [10]:
# Schritt 3 – Zielpfad definieren und schreiben
processed_dir = "/Users/tsimsek/Workshop-DE-Masterclass/Data-Pipelines/Airflow/data/processed/"
processed_file = os.path.join(processed_dir, "zones_copied.csv")
os.makedirs(processed_dir, exist_ok=True)

df.to_csv(processed_file, index=False)
print(f"✅ Datei kopiert nach: {processed_file}")

✅ Datei kopiert nach: /Users/tsimsek/Workshop-DE-Masterclass/Data-Pipelines/Airflow/data/processed/zones_copied.csv


In [11]:
# Schritt 4 – Prüfung der Ziel-Datei
df_copied = pd.read_csv(processed_file)
df_copied.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [None]:
# Wie funktioniert das in Airflow?
# In Airflow wird dieser Kopiervorgang durch einen DAG orchestriert.
# Der DAG importiert eine Hilfsfunktion (z. B. aus utils.py), die exakt das macht, was du hier manuell getan hast.

# Danach kannst du in der Web UI den Status einsehen und die Logs analysieren.

In [None]:
## ✅ Lernziele von Case 1

# Airflow DAGs manuell triggern (`schedule_interval=None`)
# Eine Hilfsfunktion in `utils.py` verwenden
# Logs einsehen (Web UI → DAG → Task → Logs)

### 🔄 Nächste Schritte (für Case 2)

# Sensor verwenden, um DAG automatisch starten zu lassen
# Dateiänderung oder -erzeugung triggern DAG-Start