In [0]:

df_silver = spark.sql("select * from brewery_bronze.breweries")
df_silver.show()

+--------------------+--------------------+------------+--------------------+---------+---------+--------------+--------------+-----------+-------------+-----------+---------+------------+--------------------+-------------+--------------------+----------+
|                  id|                name|brewery_type|           address_1|address_2|address_3|          city|state_province|postal_code|      country|  longitude| latitude|       phone|         website_url|        state|              street|updatetime|
+--------------------+--------------------+------------+--------------------+---------+---------+--------------+--------------+-----------+-------------+-----------+---------+------------+--------------------+-------------+--------------------+----------+
|e5f3e72a-fee2-481...|12 Acres Brewing ...|       micro|      Unnamed Street| Clonmore|     null|     Killeshin|         Laois|   R93 X3X8|      Ireland|  -6.979344|52.849308|353599107299|https://12acresbr...|        Laois|      Unn

In [0]:


# Find Count of Null, None, NaN of All DataFrame Columns
# Replace empty string with None for all columns
columns_v = [ 'name', 'brewery_type', 'address_1', 'address_2', 'address_3', 'city', 'state_province', 'postal_code', 'country', 
            'longitude', 'latitude', 'phone', 'website_url', 'state', 'street' ]
from pyspark.sql.functions import col,isnan, when, count
df_silver.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in columns_v]  ).show()

+----+------------+---------+---------+---------+----+--------------+-----------+-------+---------+--------+-----+-----------+-----+------+
|name|brewery_type|address_1|address_2|address_3|city|state_province|postal_code|country|longitude|latitude|phone|website_url|state|street|
+----+------------+---------+---------+---------+----+--------------+-----------+-------+---------+--------+-----+-----------+-----+------+
|   0|           0|        3|       49|       50|   0|             0|          0|      0|       10|      10|    5|         11|    0|     3|
+----+------------+---------+---------+---------+----+--------------+-----------+-------+---------+--------+-----+-----------+-----+------+



In [0]:

# Get rid of rows without `country`
# Commenting these out because we might not need them anymore
df_silver = df_silver.dropna(subset=['country'])
df_silver.count()

Out[3]: 50

In [0]:
from pyspark.sql.functions import col, when, concat_ws

# as previously verified, there are nulls at address_3 in all records. address_2 requires only 1 record.
# We can create only 1 address column
df_silver = df_silver.withColumn("address", when(col("address_2").isNotNull() \
               , concat_ws(' - ',col("address_1"), col("address_2") )).otherwise(col("address_1")))
df_silver.selectExpr("address_1","address_2","address_3", "address").show()

# as a result the 3 address columns will be dropped so that only 1 remains
df_resultado = df_silver.drop("address_1","address_2","address_3")
df_resultado.show(10, truncate = 15)

+--------------------+---------+---------+--------------------+
|           address_1|address_2|address_3|             address|
+--------------------+---------+---------+--------------------+
|141 E 4th St Ste LL2|     null|     null|141 E 4th St Ste LL2|
|      7391 Forbes Rd|     null|     null|      7391 Forbes Rd|
|250 Mill St, Suit...|     null|     null|250 Mill St, Suit...|
|6410 SE Milwaukie...|     null|     null|6410 SE Milwaukie...|
|1323 Capital Blvd...|     null|     null|1323 Capital Blvd...|
|31125 Via Colinas...|     null|     null|31125 Via Colinas...|
|      Unnamed Street| Clonmore|     null|Unnamed Street - ...|
|80 Earhart Dr Ste 20|     null|     null|80 Earhart Dr Ste 20|
|3000 E Ray Rd Bldg 6|     null|     null|3000 E Ray Rd Bldg 6|
|                null|     null|     null|                null|
|    2416 Meridian St|     null|     null|    2416 Meridian St|
|     3090 Shirley Dr|     null|     null|     3090 Shirley Dr|
|         820 Main St|     null|     nul

In [0]:
df_resultado.createOrReplaceTempView("vw_breweries_silver")

In [0]:


%sql
---Writing to the Delta table layer / Silver layer
MERGE INTO  brewery_silver.breweries tgt
USING vw_breweries_silver upd
ON ( tgt.id = upd.id )      
WHEN MATCHED THEN
  UPDATE SET                         
       tgt.name            = upd.name            
      ,tgt.brewery_type    = upd.brewery_type         
      ,tgt.city            = upd.city            
      ,tgt.state_province  = upd.state_province  
      ,tgt.postal_code     = upd.postal_code     
      ,tgt.country         = upd.country         
      ,tgt.longitude       = upd.longitude       
      ,tgt.latitude        = upd.latitude        
      ,tgt.phone           = upd.phone           
      ,tgt.website_url     = upd.website_url     
      ,tgt.state           = upd.state           
      ,tgt.street          = upd.street   
      ,tgt.updatetime      = current_timestamp
      ,tgt.address         = upd.address 
WHEN NOT MATCHED
      THEN INSERT (  id            ,name          
					,brewery_type      
					,city          ,state_province
					,postal_code   ,country       
					,longitude     ,latitude      
					,phone         ,website_url   
					,state         ,street  ,  updatetime, address  )
   values (  id            ,name          
			,brewery_type   
			,city          ,state_province
			,postal_code   ,country       
			,longitude     ,latitude      
			,phone         ,website_url   
			,state         ,street , current_timestamp, address )         


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
50,50,0,0


In [0]:
# Save/overwrite partitioned by city/location in Delta format
df_resultado.write.option("overwriteSchema", "true").partitionBy("city").mode(saveMode="overwrite").format("delta").saveAsTable("breweries") 
    



In [0]:

# dbutils.notebook.run("3_gold_layer",60)
