Silver Layer: Transform the data to a columnar storage format such as
parquet or delta, and partition it by brewery location. Please explain any
other transformations you perform.

In [0]:
spark.conf.set('spark.databricks.delta.optimizeWrite.enabled', 'true')

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

In [0]:
bronze_table= 'ws_bees_dev_.db_bronze_dev.tb_openbrewery_api'
bronze_checkpoint = 'abfss://ctr-bz-dev@stgbeesdev.dfs.core.windows.net/_checkopint/openbrewery/api/'

silver_path = 'abfss://ctr-sl-dev@stgbeesdev.dfs.core.windows.net/openbrewery/api/'
silver_table = 'ws_bees_dev_.db_silver_dev.tb_openbrewery_api'

In [0]:
openbrewery_df = (
    spark.readStream.format('delta')
    .table(bronze_table)
    .withColumn('longitude', col('longitude').cast(DoubleType()))
    .withColumn('latitude', col('latitude').cast(DoubleType())))

In [0]:
query = (
    openbrewery_df
    .writeStream.format('delta')
    .trigger(availableNow=True)
    .option('mergeSchema', 'true') 
    .option('checkpointLocation', bronze_checkpoint)
    .option('path', silver_path)
    .partitionBy('country')
    .toTable(silver_table))

query.awaitTermination()

In [0]:
_ = spark.sql('OPTIMIZE '+silver_table+' ZORDER BY (city, state)') 