# Getting the data

In [0]:
display(dbutils.fs.ls("/databricks-datasets/bikeSharing/data-001/"))

In [0]:
file_path = "/databricks-datasets/bikeSharing/data-001/hour.csv"

df_raw = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load(file_path)


In [0]:
df_raw.show(10)

# Data cleaning

In [0]:
df_raw.printSchema()

## Overview

- 'instant', -> id
- 'dteday', -> date
- 'season', -> 1,2,3,4
- 'yr', -> 0,1, year 0=2011 1=2012
- 'mnth', -> month
- 'hr', -> hour
- 'holiday', -> 0,1 (no, yes)
- 'weekday', -> 0-6
- 'workingday', -> 0,1 (no, yes)
- 'weathersit', -> 1-4
- 'temp', -> 0-1 (float), temperature normalized
- 'atemp', -> 0-1 (float), feels_like_temperature normalized
- 'hum', -> 0-1 (float), humidity normalized
- 'windspeed', -> 0-1 (float), windspeed normalized
- 'casual', -> int, count of casual user
- 'registered', -> int, count of registered user
- 'cnt' -> int, sum of all users

In [0]:
df_raw.select('weekday').distinct().show() #.sort('casual', ascending=False)

In [0]:
# Column renaming
df_raw_renamed = df_raw \
  .withColumnRenamed("instant", "id") \
  .withColumnRenamed("dteday", "date") \
  .withColumnRenamed("yr", "year") \
  .withColumnRenamed("mnth", "month") \
  .withColumnRenamed("hr", "hour") \
  .withColumnRenamed("holiday", "is_holiday") \
  .withColumnRenamed("workingday", "is_workingday") \
  .withColumnRenamed("temp", "temperature_normalized") \
  .withColumnRenamed("atemp", "feels_like_temperature_normalized") \
  .withColumnRenamed("hum", "humidity_normalized") \
  .withColumnRenamed("windspeed", "windspeed_normalized") \
  .withColumnRenamed("weathersit", "weather_condition") \
  .withColumnRenamed("casual", "count_rentals_casual") \
  .withColumnRenamed("registered", "count_rentals_registered") \
  .withColumnRenamed("cnt", "count_rentals_total")

df_raw_renamed.columns

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, DateType

silver_schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("date", DateType(), nullable=False),
    StructField("season", IntegerType(), nullable=True),
    StructField("year", IntegerType(), nullable=True),
    StructField("month", IntegerType(), nullable=True),
    StructField("hour", IntegerType(), nullable=True),
    StructField("is_holiday", IntegerType(), nullable=True),
    StructField("weekday", IntegerType(), nullable=True),
    StructField("is_workingday", IntegerType(), nullable=True),
    StructField("weather_condition", IntegerType(), nullable=True),
    StructField("temperature_normalized", FloatType(), nullable=True),
    StructField("feels_like_temperature_normalized", FloatType(), nullable=True),
    StructField("humidity_normalized", FloatType(), nullable=True),
    StructField("windspeed_normalized", FloatType(), nullable=True),
    StructField("count_rentals_casual", IntegerType(), nullable=True),
    StructField("count_rentals_registered", IntegerType(), nullable=True),
    StructField("count_rentals_total", IntegerType(), nullable=True),
])

In [0]:
from pyspark.sql import functions as F

df_silver = (
    df_raw_renamed
    .withColumn("id", F.col("id").cast("int"))
    .withColumn("date", F.to_date("date", "yyyy-MM-dd"))
    .withColumn("season", F.col("season").cast("int"))
    .withColumn("year", F.when(F.col("year") == 0, F.lit(2011))
                       .when(F.col("year") == 1, F.lit(2012))
                       .otherwise(None).cast("int"))
    .withColumn("month", F.col("month").cast("int"))
    .withColumn("hour", F.col("hour").cast("int"))
    .withColumn("is_holiday", F.col("is_holiday").cast("int"))
    .withColumn("weekday", F.col("weekday").cast("int"))
    .withColumn("is_workingday", F.col("is_workingday").cast("int"))
    .withColumn("weather_condition", F.col("weather_condition").cast("int"))
    .withColumn("temperature_normalized", F.col("temperature_normalized").cast("float"))
    .withColumn("feels_like_temperature_normalized", F.col("feels_like_temperature_normalized").cast("float"))
    .withColumn("humidity_normalized", F.col("humidity_normalized").cast("float"))
    .withColumn("windspeed_normalized", F.col("windspeed_normalized").cast("float"))
    .withColumn("count_rentals_casual", F.col("count_rentals_casual").cast("int"))
    .withColumn("count_rentals_registered", F.col("count_rentals_registered").cast("int"))
    .withColumn("count_rentals_total", F.col("count_rentals_total").cast("int"))
)

In [0]:
df_silver.show(10)
df_silver.printSchema()

In [0]:
df_silver = df_silver.withColumn(
    "is_valid",
    
    # Wertebereiche
    (F.col("season").between(1, 4)) &
    (F.col("year").isin(2011, 2012)) &
    (F.col("month").between(1, 12)) &
    (F.col("hour").between(0, 23)) &
    (F.col("weekday").between(0, 6)) &
    (F.col("weather_condition").between(1, 4)) &
    
    # Normalisierte Werte
    (F.col("temperature_normalized").between(0.0, 1.0)) &
    (F.col("feels_like_temperature_normalized").between(0.0, 1.0)) &
    (F.col("humidity_normalized").between(0.0, 1.0)) &
    (F.col("windspeed_normalized").between(0.0, 1.0)) &
    
    # Rental Counts müssen konsistent sein
    (F.col("count_rentals_casual") >= 0) &
    (F.col("count_rentals_registered") >= 0) &
    (F.col("count_rentals_total") >= 0) &
    (F.col("count_rentals_total") == 
        F.col("count_rentals_casual") + F.col("count_rentals_registered")) &
    
    # Holiday vs Workingday Logik
    ~((F.col("is_holiday") == 1) & (F.col("is_workingday") == 1)) &
    
    # Pflichtfelder dürfen nicht Null sein
    (F.col("id").isNotNull()) &
    (F.col("date").isNotNull())
)

In [0]:
df_invalid = df_silver.filter(F.col('is_valid') == False)
df_invalid.count() # -> 0

# Alle Daten entsprechen den Qualitätskriterien