# Libraries

In [None]:
import sys, os
sys.path.append(os.path.abspath(os.path.join('..')))

In [None]:
from pyspark.sql import functions as F, types as T, SparkSession
from pyspark.sql.utils import AnalysisException
from delta import DeltaTable, configure_spark_with_delta_pip
from src.path_controller import PathController

# Extracting

In [None]:
builder = (
    SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
controller = PathController()
input_file_path = controller.get_path_for_silver_layer('breweries_database', 'master_data_context', 'breweries')

In [None]:
silver_breweries = spark.read.format('delta').load(input_file_path)

# Transforming

In [None]:
breweries_categories_count = (
    silver_breweries
    .groupBy(F.col('state'), F.col('brewery_type'))
    .count()
)

In [None]:
breweries_categories_count.head(10)

# Loading

In [None]:
output_file_path = controller.get_path_for_gold_layer('data_warehouse', 'breweries_views', 'breweries_categories_count')
try:
    gold_table = DeltaTable.forPath(spark, output_file_path)
    
    (
        gold_table.alias("old")
        .merge(breweries_categories_count.alias("new"), "old.state = new.state and old.brewery_type = new.brewery_type")
        .whenMatchedUpdate(set={"count": col("new.count")})
        .whenNotMatchedInsertAll()
        .execute()
    )
except AnalysisException:
    (
        breweries_categories_count
        .write.mode('overwrite').format('delta')
        .save(output_file_path)
    )