In [2]:
import os
from pathlib import Path
import pandas as pd
from collections import Counter, defaultdict

# Spalten Checken

In [19]:
sf_raw = Path("/workspaces/projekt-datascience/data/raw/san_francisco")
csv_files = sorted(sf_raw.glob("**/*.csv*"))
print(f"gefundene CSVs: {len(csv_files)}")

gefundene CSVs: 93


In [16]:
def read_columns_fastish(p):
    cols = pd.read_csv(p, nrows=0).columns
    raw_cols = list(cols)
    norm_cols = tuple(col.strip().lower() for col in cols)
    return raw_cols, norm_cols

In [17]:
schema_groups = defaultdict(list)
col_counter = Counter()
per_file_cols = {}

for f in csv_files:
    raw_cols, norm_cols = read_columns_fastish(f)
    schema_groups[norm_cols].append(f)
    col_counter.update(norm_cols)

In [18]:
print("Einzigartige Schemas")
for i, (schema, files) in enumerate(schema_groups.items(), start=1):
    print(f"\nSchema #{i} — {len(files)} Datei(en)")
    for ex in files:
        print(f"  • {ex.relative_to(sf_raw)}")
    print("Spalten:")
    for c in schema:
        print(f"  - {c}")

Einzigartige Schemas

Schema #1 — 7 Datei(en)
  • 2014/Stations_2014.csv
  • 2015/Stations_2015.csv
  • 2016/Stations_2016.csv
  • 2017/Stations_2017.csv
  • 2018/Stations_2018.csv
  • 2019/Stations_2019.csv
  • 2020/stations.csv
Spalten:
  - code
  - name
  - latitude
  - longitude

Schema #2 — 48 Datei(en)
  • 2014/monthly/OD_2014-04.csv
  • 2014/monthly/OD_2014-05.csv
  • 2014/monthly/OD_2014-06.csv
  • 2014/monthly/OD_2014-07.csv
  • 2014/monthly/OD_2014-08.csv
  • 2014/monthly/OD_2014-09.csv
  • 2014/monthly/OD_2014-10.csv
  • 2014/monthly/OD_2014-11.csv
  • 2015/monthly/OD_2015-04.csv
  • 2015/monthly/OD_2015-05.csv
  • 2015/monthly/OD_2015-06.csv
  • 2015/monthly/OD_2015-07.csv
  • 2015/monthly/OD_2015-08.csv
  • 2015/monthly/OD_2015-09.csv
  • 2015/monthly/OD_2015-10.csv
  • 2015/monthly/OD_2015-11.csv
  • 2016/monthly/OD_2016-04.csv
  • 2016/monthly/OD_2016-05.csv
  • 2016/monthly/OD_2016-06.csv
  • 2016/monthly/OD_2016-07.csv
  • 2016/monthly/OD_2016-08.csv
  • 2016/monthly/O

# San Fran in Parquet

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print("master:", spark.sparkContext.master)
print("driver mem:", spark.conf.get("spark.driver.memory"))
print("shuffle.parts:", spark.conf.get("spark.sql.shuffle.partitions"))

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/23 20:37:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


master: local[64]
driver mem: 180g
shuffle.parts: 256


In [4]:
from pathlib import Path
import re
from pyspark.sql import SparkSession, functions as F

RAW_CITY = Path("/workspaces/projekt-datascience/data/raw/montreal")
OUT_BASE = "/workspaces/projekt-datascience/data/parquet/bronze/usage"
CITY = "montreal"

spark = (SparkSession.builder
         .appName("bronze-parquet-bikes")
         .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
         .config("spark.sql.parquet.compression.codec", "snappy")
         # .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
         .getOrCreate())

def read_csv_as_strings(path: str):
    return (spark.read.format("csv")
            .option("header", True)
            .option("multiLine", False)
            .option("mode", "PERMISSIVE")
            .option("inferSchema", False)
            .load(path))

monthly_files = sorted(RAW_CITY.glob("**/monthly/*.csv*"))
yearly_files  = sorted(RAW_CITY.glob("**/yearly/*.csv*"))

print(f"Monatliche Dateien: {len(monthly_files)}")
print(f"Jährliche Dateien:  {len(yearly_files)}")

RE_YM = re.compile(r"(?P<y>20\d{2})(?P<m>0[1-9]|1[0-2])")

def parse_year_month_from_name(p: Path):
    m = RE_YM.search(p.name)
    if m:
        return int(m["y"]), int(m["m"])
    try:
        y = int([part for part in p.parts if part.isdigit() and len(part) == 4][0])
        m2 = re.search(r"-(0[1-9]|1[0-2])", p.name)
        if m2:
            return y, int(m2.group(1))
    except Exception:
        pass
    return None, None

CANDIDATE_TS_STR_COLS = [
    "started_at","start_time","starttime","start date","start_date",
    "start date time","start_date_time","start time","start_date_local",
    "starttimeutc","starttime_utc","start_time_utc"
]
MS_TS_COL = "starttimems"

def build_ts_expr(df):
    lower = {c.lower(): c for c in df.columns}

    if MS_TS_COL in lower:
        col_ms = lower[MS_TS_COL]
        ts_expr = F.from_unixtime((F.col(col_ms).cast("double") / F.lit(1000.0))).cast("timestamp")
        return ts_expr, col_ms, "millis"

    for key in CANDIDATE_TS_STR_COLS:
        if key in lower:
            col_ts = lower[key]
            ts_expr = F.to_timestamp(F.col(col_ts))
            return ts_expr, col_ts, "string"

    for c in df.columns:
        if "start" in c.lower():
            ts_expr = F.to_timestamp(F.col(c))
            return ts_expr, c, "heuristic"

    return None, None, None

def write_month_partition(df, year: int, month: int):
    (df.withColumn("city", F.lit(CITY))
       .withColumn("year", F.lit(int(year)))
       .withColumn("month", F.lit(int(month)))
       .write.mode("overwrite")
       .partitionBy("city","year","month")
       .parquet(OUT_BASE))

for f in monthly_files:
    y, m = parse_year_month_from_name(f)
    if not y or not m:
        print(f"[SKIP] YYYYMM nicht erkannt: {f.name}")
        continue
    df = read_csv_as_strings(str(f))
    write_month_partition(df, y, m)
    print(f"[OK] monthly {f.name} year={y} month={m}")

for f in yearly_files:
    df = read_csv_as_strings(str(f))
    ts_expr, ts_src, ts_kind = build_ts_expr(df)
    if ts_expr is None:
        print(f"[WARN] Keine Zeitspalte gefunden: {f.name} übersprungen.")
        continue

    df_ts = df.withColumn("_ts", ts_expr)
    ym = (df_ts
          .select(F.year("_ts").alias("y"), F.month("_ts").alias("m"))
          .where(F.col("y").isNotNull() & F.col("m").isNotNull())
          .distinct()
          .collect())

    if not ym:
        print(f"[WARN] Keine gültigen Timestamps in {f.name} (Quelle: {ts_src}/{ts_kind}) übersprungen.")
        continue

    for row in ym:
        y, m = int(row["y"]), int(row["m"])
        part_df = df_ts.where((F.year("_ts") == y) & (F.month("_ts") == m)).drop("_ts")
        write_month_partition(part_df, y, m)
        print(f"[OK] yearly {f.name} ({ts_src}/{ts_kind}) year={y} month={m}")

spark.stop()
print("Bronze-Parquet fertig:", OUT_BASE)

Monatliche Dateien: 47
Jährliche Dateien:  6
[OK] monthly OD_2014-04.csv year=2014 month=4
[OK] monthly OD_2014-05.csv year=2014 month=5
[OK] monthly OD_2014-06.csv year=2014 month=6
[OK] monthly OD_2014-07.csv year=2014 month=7
[OK] monthly OD_2014-08.csv year=2014 month=8
[OK] monthly OD_2014-09.csv year=2014 month=9
[OK] monthly OD_2014-10.csv year=2014 month=10
[OK] monthly OD_2014-11.csv year=2014 month=11
[OK] monthly OD_2015-04.csv year=2015 month=4
[OK] monthly OD_2015-05.csv year=2015 month=5
[OK] monthly OD_2015-06.csv year=2015 month=6
[OK] monthly OD_2015-07.csv year=2015 month=7
[OK] monthly OD_2015-08.csv year=2015 month=8
[OK] monthly OD_2015-09.csv year=2015 month=9
[OK] monthly OD_2015-10.csv year=2015 month=10
[OK] monthly OD_2015-11.csv year=2015 month=11
[OK] monthly OD_2016-04.csv year=2016 month=4
[OK] monthly OD_2016-05.csv year=2016 month=5
[OK] monthly OD_2016-06.csv year=2016 month=6
[OK] monthly OD_2016-07.csv year=2016 month=7
[OK] monthly OD_2016-08.csv yea

                                                                                

[OK] monthly OD_2018-07.csv year=2018 month=7
[OK] monthly OD_2018-08.csv year=2018 month=8
[OK] monthly OD_2018-09.csv year=2018 month=9
[OK] monthly OD_2018-10.csv year=2018 month=10
[OK] monthly OD_2018-11.csv year=2018 month=11
[OK] monthly OD_2019-04.csv year=2019 month=4
[OK] monthly OD_2019-05.csv year=2019 month=5
[OK] monthly OD_2019-06.csv year=2019 month=6
[OK] monthly OD_2019-07.csv year=2019 month=7
[OK] monthly OD_2019-08.csv year=2019 month=8
[OK] monthly OD_2019-09.csv year=2019 month=9
[OK] monthly OD_2019-10.csv year=2019 month=10


                                                                                

[OK] yearly OD_2020.csv (start_date/string) year=2020 month=5


                                                                                

[OK] yearly OD_2020.csv (start_date/string) year=2020 month=4


                                                                                

[OK] yearly OD_2020.csv (start_date/string) year=2020 month=6


                                                                                

[OK] yearly OD_2020.csv (start_date/string) year=2020 month=7


                                                                                

[OK] yearly OD_2020.csv (start_date/string) year=2020 month=8


                                                                                

[OK] yearly OD_2020.csv (start_date/string) year=2020 month=9


                                                                                

[OK] yearly OD_2020.csv (start_date/string) year=2020 month=10


                                                                                

[OK] yearly OD_2020.csv (start_date/string) year=2020 month=11


                                                                                

[OK] yearly 2021_donnees_ouvertes.csv (start_date/string) year=2021 month=10


                                                                                

[OK] yearly 2021_donnees_ouvertes.csv (start_date/string) year=2021 month=9


                                                                                

[OK] yearly 2021_donnees_ouvertes.csv (start_date/string) year=2021 month=11


                                                                                

[OK] yearly 2021_donnees_ouvertes.csv (start_date/string) year=2021 month=5


                                                                                

[OK] yearly 2021_donnees_ouvertes.csv (start_date/string) year=2021 month=6


                                                                                

[OK] yearly 2021_donnees_ouvertes.csv (start_date/string) year=2021 month=7


                                                                                

[OK] yearly 2021_donnees_ouvertes.csv (start_date/string) year=2021 month=4


                                                                                

[OK] yearly 2021_donnees_ouvertes.csv (start_date/string) year=2021 month=8


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=10


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=7


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=4


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=5


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=6


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=8


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=9


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=11


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=12


                                                                                

[OK] yearly DonneesOuverte2022.csv (STARTTIMEMS/millis) year=2022 month=3


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2023 month=6


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2023 month=8


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2023 month=12


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2023 month=9


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2023 month=10


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2023 month=7


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2023 month=4


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2023 month=5


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2023 month=11


                                                                                

[OK] yearly DonneesOuvertes (1).csv (STARTTIMEMS/millis) year=2024 month=1


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=5


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=3


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=2


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=1


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=8


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=7


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=6


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=4


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=9


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=10


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=11


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2024 month=12


                                                                                

[OK] yearly DonneesOuvertes (2).csv (STARTTIMEMS/millis) year=2025 month=1


                                                                                

[OK] yearly DonneesOuvertes2025_0102030405060708.csv (STARTTIMEMS/millis) year=2025 month=5


                                                                                

[OK] yearly DonneesOuvertes2025_0102030405060708.csv (STARTTIMEMS/millis) year=2025 month=6


                                                                                

[OK] yearly DonneesOuvertes2025_0102030405060708.csv (STARTTIMEMS/millis) year=2025 month=1


                                                                                

[OK] yearly DonneesOuvertes2025_0102030405060708.csv (STARTTIMEMS/millis) year=2025 month=7


                                                                                

[OK] yearly DonneesOuvertes2025_0102030405060708.csv (STARTTIMEMS/millis) year=2025 month=4


                                                                                

[OK] yearly DonneesOuvertes2025_0102030405060708.csv (STARTTIMEMS/millis) year=2025 month=3


                                                                                

[OK] yearly DonneesOuvertes2025_0102030405060708.csv (STARTTIMEMS/millis) year=2025 month=8


                                                                                

[OK] yearly DonneesOuvertes2025_0102030405060708.csv (STARTTIMEMS/millis) year=2025 month=2


                                                                                

[OK] yearly DonneesOuvertes2025_0102030405060708.csv (STARTTIMEMS/millis) year=2025 month=9
Bronze-Parquet fertig: /workspaces/projekt-datascience/data/parquet/bronze/usage
