# Update Silver Layer
This notebook reads the bronze Delta tables and writes cleaned data to the `silver` schema.

In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, from_json
from pyspark.sql.types import IntegerType, DoubleType, MapType, StringType

spark = SparkSession.builder.getOrCreate()


In [None]:

# Create silver database if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS silver")


In [None]:

# Customers table
df_customers = spark.table('bronze.customers')    .withColumn('became_member_on', to_date(col('became_member_on')))    .withColumn('age', col('age').cast(IntegerType()))    .withColumn('income', col('income').cast(DoubleType()))

df_customers.write.mode('overwrite').format('delta').saveAsTable('silver.customers')


In [None]:

# Offers table
df_offers = spark.table('bronze.offers')    .withColumn('difficulty', col('difficulty').cast(IntegerType()))    .withColumn('reward', col('reward').cast(IntegerType()))    .withColumn('duration', col('duration').cast(IntegerType()))

df_offers.write.mode('overwrite').format('delta').saveAsTable('silver.offers')


In [None]:

# Events table
schema = MapType(StringType(), StringType())
df_events = spark.table('bronze.events')    .withColumn('time', col('time').cast(IntegerType()))    .withColumn('value', from_json(col('value'), schema))

df_events.write.mode('overwrite').format('delta').saveAsTable('silver.events')
