In [0]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql.functions import col, split, initcap, lit, when, trim, lower
import datetime

In [0]:
spark = SparkSession.builder \
                    .appName("Breweries") \
                    .getOrCreate()

sc = spark.sparkContext

In [0]:
today = datetime.datetime.today()

In [0]:
df_bronze = spark.read.parquet("hdfs:///datalake/bronze/breweries-" + today + ".parquet")

In [0]:
df_cleaned = df_bronze \
                        .withColumn("Name", trim(lower(col("name"))))\
                        .withcolumn("Brewery_type", trim(lower(col("brewery_type")))) \
                        .withColumn("Address_1", split(col("address_1"), " ",2)) \
                        .drop("Address_1") \
                        .withColumn("Number", col("address_1")[0]) \
                        .withColumn("Street", col("address_1")[1]) \
                        .withColumn("City", initcap(col("city"))) \
                        .withColumn("State_province", initcap(col("state_province"))) \
                        .withColumn("Postal_code", when(col("postal_code").isNull, lit(0)).otherwise(col("postal_code"))) \
                        .withColumn("Country", initcap(col("country"))) \
                        .withColumn("phone", (col("country")).cast("int")) \
                        .withColumn("website", when(col("website").isNull, lit(" ").otherwise(col("website")))) \
                        .drop("state") \
                        .drop("street")

df_silver = df_cleaned.dropDuplicates()
                        

In [0]:
try:
    df_silver.write.format("delta")\
                    .partitionBy("Country", "State_province", "City") \
                    .mode ("overwrite") \
                    .save("hdfs:///datalake/silver/breweries-" + today)
    print("Data has been written succesfully!")

except Exception as error:
    print("An error has occured while writing data: {error}")