# Import Libraries

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Initialize a Spark session

In [2]:
# Initialize a Spark session
spark = SparkSession.builder.appName("BreweryDataPipeline").getOrCreate()

25/03/15 16:15:03 WARN Utils: Your hostname, Ubuntu24 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/03/15 16:15:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/03/15 16:15:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

## Data Base Insights

In [4]:
df = spark.read.json("data_architecture/bronze_layer_raw_data.json")

                                                                                

In [32]:
df.count()

50

In [5]:
df.printSchema()

root
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- address_3: string (nullable = true)
 |-- brewery_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_province: string (nullable = true)
 |-- street: string (nullable = true)
 |-- website_url: string (nullable = true)



# Check if some columns are iqual

### street and addrees_1

In [23]:
df.select('id','name', 'street', 'address_1')\
    .withColumn('street_adress_1', 
        when(
            (col('street') == col('address_1'))|
            (col('street').isNull() & col('address_1').isNull()), 1).otherwise(0))\
    .show(200)

+--------------------+--------------------+--------------------+--------------------+---------------+
|                  id|                name|              street|           address_1|street_adress_1|
+--------------------+--------------------+--------------------+--------------------+---------------+
|5128df48-79fc-4f0...|    (405) Brewing Co|      1716 Topeka St|      1716 Topeka St|              1|
|9c5a66c8-cc13-416...|    (512) Brewing Co|407 Radam Ln Ste ...|407 Radam Ln Ste ...|              1|
|34e8c68b-6146-453...|1 of Us Brewing C...| 8100 Washington Ave| 8100 Washington Ave|              1|
|6d14b220-8926-452...|10 Barrel Brewing Co|       62970 18th St|       62970 18th St|              1|
|e2e78bd8-80ff-4a6...|10 Barrel Brewing Co|1135 NW Galveston...|1135 NW Galveston...|              1|
|e432899b-7f58-455...|10 Barrel Brewing Co| 1411 NW Flanders St| 1411 NW Flanders St|              1|
|ef970757-fe42-416...|10 Barrel Brewing Co|           1501 E St|           1501 E 

### state and state_province

In [34]:
df.select('id','name', 'state', 'state_province')\
    .withColumn('state_state_province', 
        when(
            (col('state') == col('state_province'))|
            (col('state').isNull() & col('state_province').isNull()), 1).otherwise(0))\
    .show(200)

+--------------------+--------------------+--------------+--------------+--------------------+
|                  id|                name|         state|state_province|state_state_province|
+--------------------+--------------------+--------------+--------------+--------------------+
|5128df48-79fc-4f0...|    (405) Brewing Co|      Oklahoma|      Oklahoma|                   1|
|9c5a66c8-cc13-416...|    (512) Brewing Co|         Texas|         Texas|                   1|
|34e8c68b-6146-453...|1 of Us Brewing C...|     Wisconsin|     Wisconsin|                   1|
|6d14b220-8926-452...|10 Barrel Brewing Co|        Oregon|        Oregon|                   1|
|e2e78bd8-80ff-4a6...|10 Barrel Brewing Co|        Oregon|        Oregon|                   1|
|e432899b-7f58-455...|10 Barrel Brewing Co|        Oregon|        Oregon|                   1|
|ef970757-fe42-416...|10 Barrel Brewing Co|    California|    California|                   1|
|9f1852da-c312-42d...|10 Barrel Brewing...|       

# New Data Frame with Only the usefull columns and with a more intuitive position columns

In [37]:
df_new = df\
    .select('id', 'name', 'country', 'state', 'city', col("address_2").alias("village"),
            'postal_code', 'street','latitude', 'longitude','phone','brewery_type','website_url')

# Qual o melhor critério de localização para particionar os dados?

In [42]:
df_new.groupby('country').count().show(200, False)

+-------------+-----+
|country      |count|
+-------------+-----+
|United States|49   |
|Ireland      |1    |
+-------------+-----+



In [43]:
df_new.groupby('state').count().show(200, False)

+--------------+-----+
|state         |count|
+--------------+-----+
|Minnesota     |3    |
|Ohio          |2    |
|Oregon        |6    |
|Texas         |2    |
|Pennsylvania  |1    |
|Vermont       |1    |
|Nevada        |1    |
|Washington    |2    |
|Illinois      |1    |
|Oklahoma      |1    |
|Delaware      |1    |
|Michigan      |1    |
|Virginia      |1    |
|North Carolina|2    |
|Maryland      |1    |
|Arizona       |3    |
|Iowa          |1    |
|Massachusetts |1    |
|Mississippi   |1    |
|Indiana       |3    |
|Idaho         |1    |
|South Carolina|1    |
|California    |4    |
|New York      |2    |
|Wisconsin     |2    |
|Laois         |1    |
|Colorado      |4    |
+--------------+-----+



In [47]:
df_new.write.partitionBy("state").parquet('data_architecture/silver_data.parquet')

                                                                                

In [48]:
#

In [53]:
silver_df = spark.read.parquet('data_architecture/silver_data.parquet')
aggregated_df = silver_df.groupBy("state", "brewery_type").count()
golden_df = aggregated_df.withColumnRenamed('count', 'brewery_quantity')

In [55]:
golden_df.show()

+--------------+------------+----------------+
|         state|brewery_type|brewery_quantity|
+--------------+------------+----------------+
|          Iowa|       micro|               1|
|    California|       micro|               2|
|     Minnesota|       micro|               3|
|        Oregon|       large|               4|
|      Virginia|       micro|               1|
|    California|       large|               1|
|        Nevada|       micro|               1|
|      Colorado|     brewpub|               1|
|      Michigan|       micro|               1|
|       Vermont|       micro|               1|
|         Texas|       micro|               2|
|      Delaware|       micro|               1|
|         Laois|       micro|               1|
|    Washington|      closed|               1|
| Massachusetts|       micro|               1|
|North Carolina|       micro|               1|
|      Colorado|  proprietor|               1|
|          Ohio|     brewpub|               1|
|       India