#Breweries Project Pipeline Silver

This project aims to create a pipeline for breweries of the AB InBev Group.


**Responsible Engineer: Ozeas Gomes <p>
Created on: 02/12/2025 <p>
Last updated: 02/13/2025 <p>**

####Installing Required Dependencies

####Importing Dependencies

In [0]:
%run /Users/ozeasjgomes@gmail.com/brewery_data_pipeline/pipeline_functions


In [0]:
from pyspark.sql.functions import col, trim, when, lit, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from datetime import datetime


####Reading data

In [0]:
# # Reading data from the bronze layer
silver_breweries_df = spark.read.format("delta").load("dbfs:/dbfs/FileStore/project_breweries/bronze/breweries_202502121744")


In [0]:
# First transformation: Removing columns 'address_1', 'address_2', and 'address_3' 
# After analysis, this was determined to be the best decision.
# Reordered the columns to follow a more standardized format.
silver_breweries_df = silver_breweries_df.select(['id', 'name', 'brewery_type','phone', 'street', 'city', 'state', 'state_province',
                                                            'country', 'postal_code', 'latitude', 'longitude','website_url'])

In [0]:
# Ensuring there are no spaces within the columns.
colunas_texto = silver_breweries_df.columns
for coluna in colunas_texto:
    silver_breweries_df = silver_breweries_df.withColumn(coluna, trim(col(coluna)))

In [0]:
silver_breweries_df.where(col('state') != col('state_province')).display() # Here, I checked if there were any differences between the 'state' and 'state_province' columns. 
# Since no differences were found, the 'state_province' column will be deleted.

id,name,brewery_type,phone,street,city,state,state_province,country,postal_code,latitude,longitude,website_url


In [0]:
# Dropping the state_province column
silver_breweries_df = silver_breweries_df.drop(col('state_province'))

####Checking and Removing Duplicate Data

In [0]:
silver_breweries_df.count()

Out[8]: 8356

In [0]:
# I checked the possibility of duplicate data by first using the distinct() method.
silver_breweries_df = silver_breweries_df.distinct()


In [0]:
silver_breweries_df.count()

Out[10]: 8356

In [0]:
# Since duplicates were still present, I thought the ID might be hiding them, so I decided to check without the ID.

silver_breweries_df.select(['name',
 'brewery_type',
 'phone',
 'street',
 'city',
 'state',
 'country',
 'postal_code',
 'latitude',
 'longitude',
 'website_url']).distinct().count()

# To my surprise, there are indeed duplicate records. I will identify and handle them.

Out[11]: 8353

In [0]:
# With the code below, I can display the duplicated data along with the count column
silver_breweries_df.groupBy(['name',
    'brewery_type',
    'phone',
    'street',
    'city',
    'state',
    'country',
    'postal_code',
    'latitude',
    'longitude',
    'website_url']).count().filter("count > 1").display()

name,brewery_type,phone,street,city,state,country,postal_code,latitude,longitude,website_url,count
BRLO Brwhouse,large,493055577606,Schöneberger Str. 16,Berlin,Berlin,Germany,10963,13.4168836,52.4995161,https://www.brlo.de/gastronomien/brlo-brwhouse,2
Berlin Craft Beer Experience,bar,4915779216971,Reichenberger Str. 176,Berlin,Berlin,Germany,10999,13.4168836,52.4995161,http://www.berlincraftbeerexperience.com/,2
BrewDog Berlin Mitte,bar,493048477770,Ackerstraße 29,Berlin,Berlin,Germany,10115,13.4132225,52.5170043,https://www.brewdog.com/eu_de/brewdog-berlin-mitte,2


In [0]:
# I will apply a WHERE clause to the following data using the names displayed above and select 3 IDs to be deleted
silver_breweries_df.where((col('name') == 'BrewDog Berlin Mitte') | (col('name') == 'BRLO Brwhouse') | (col('name') == 'Berlin Craft Beer Experience')).display()

id,name,brewery_type,phone,street,city,state,country,postal_code,latitude,longitude,website_url
1fe01316-a2ee-428b-8200-ba3f054eda6d,BRLO Brwhouse,large,493055577606,Schöneberger Str. 16,Berlin,Berlin,Germany,10963,13.4168836,52.4995161,https://www.brlo.de/gastronomien/brlo-brwhouse
7db0fe62-fb6c-4949-9a25-d0f318959a1b,BrewDog Berlin Mitte,bar,493048477770,Ackerstraße 29,Berlin,Berlin,Germany,10115,13.4132225,52.5170043,https://www.brewdog.com/eu_de/brewdog-berlin-mitte
307847a0-c60b-43e8-a42d-b8f6b5fb3092,BRLO Brwhouse,large,493055577606,Schöneberger Str. 16,Berlin,Berlin,Germany,10963,13.4168836,52.4995161,https://www.brlo.de/gastronomien/brlo-brwhouse
e58d60d7-92f7-4f8d-8a1a-6d02c25a32ee,BrewDog Berlin Mitte,bar,493048477770,Ackerstraße 29,Berlin,Berlin,Germany,10115,13.4132225,52.5170043,https://www.brewdog.com/eu_de/brewdog-berlin-mitte
c75eb363-ba15-4f96-a3cf-d6462867a4e3,Berlin Craft Beer Experience,bar,4915779216971,Reichenberger Str. 176,Berlin,Berlin,Germany,10999,13.4168836,52.4995161,http://www.berlincraftbeerexperience.com/
5e69d342-1efc-469e-bf8b-22d276c6d863,Berlin Craft Beer Experience,bar,4915779216971,Reichenberger Str. 176,Berlin,Berlin,Germany,10999,13.4168836,52.4995161,http://www.berlincraftbeerexperience.com/


In [0]:
## Deleting duplicate data based on ID order
valores_remover = ["e58d60d7-92f7-4f8d-8a1a-6d02c25a32ee", "307847a0-c60b-43e8-a42d-b8f6b5fb3092", "c75eb363-ba15-4f96-a3cf-d6462867a4e3"]
silver_breweries_df = silver_breweries_df.filter(~silver_breweries_df.id.isin(valores_remover))

#### Missing Latitude and Longitude Data
The missing latitude and longitude data is extensive, making it impossible to process with free tools like Geopy or Geocoding. Processing 2,325 rows would take a very long time. The ideal solution would be to use paid APIs such as Google Maps or OpenCage.

In [0]:
# Checking all missing latitude and longitude data
silver_breweries_df.where("latitude is null").display()

id,name,brewery_type,phone,street,city,state,country,postal_code,latitude,longitude,website_url
b4bad111-e8f4-45ac-bb74-c3dbac3cdceb,"The Levee Brewing Company, Inc",planning,8383901041,,Valdese,North Carolina,United States,28690,,,http://www.theleveepub.com
1c9765e4-b86d-4127-8403-ba0eaa4a6e68,Virant Family Winery / Black Angus Brewery,micro,4404666279,541 Atkins Rd,Geneva,Ohio,United States,44041,,,http://www.virantfamilywinery.com
a828db31-d32a-4fb3-a96b-46ba8c1cb9f8,Weasel Boy Brewing Co LLC,brewpub,7404553767,126 Muskingum Ave Unit E,Zanesville,Ohio,United States,43701-4921,,,http://www.weaselboybrewing.com
6230d54c-2de5-41c3-b5b3-60dc70d6b2ac,Trinity Brewing,brewpub,7196340029,1466 Garden of The Gods Rd Ste 184,Colorado Springs,Colorado,United States,80907-9464,,,http://www.trinitybrew.com
972dc278-a3a6-4dbd-acd0-4f6c8fc22343,Wooden Skiff Beer Company,micro,8432904364,141 Island Dr Ste 16,Hilton Head Island,South Carolina,United States,29926-4500,,,http://www.woodenskiffbrewing.com
a06d2559-f43d-4818-9c51-2b6ed78ded6a,Woodland Farms Brewery,micro,2079943911,"306 Route 1, Suite C",Kittery,Maine,United States,03904,,,http://www.wfbrewery.com
72466771-4af4-4da5-a120-06081eda08d8,Timberyard Brewing Co.,planning,4135526048,,East Brookfield,Massachusetts,United States,01515,,,http://timberyardbrewing.com
a65c58fa-7082-4317-bbc3-a723049bb2a5,White Marsh Brewing Co/Red Brick Station,brewpub,4109317827,8149 Honeygo Blvd Ste A,Baltimore,Maryland,United States,21236-8209,,,http://www.redbrickstation.com
89edb92f-550b-4b68-a503-b583d3dcba8f,The Austin Beer Garden Brewing Co,brewpub,,1305 W Oltorf St Ste B,Austin,Texas,United States,78704-5362,,,
4292bd71-a7f6-4cb8-88af-f1f1b60f2272,Une Annee Brewery,micro,8476350655,9082 W Golf Rd,Niles,Illinois,United States,60714-5805,,,http://uneannee.com


In [0]:
#creating a df with the missing latidute data
ltlg_df = silver_breweries_df.where("latitude is null")
#Just selecting a sample
ltlg_df = ltlg_df.sample(0.01, 123)

In [0]:
# Using a function from the notebook pipeline_functions to check the possibility of filling in the missing data
df_corrigido = fill_missing_lat_long(ltlg_df)


#### Evaluating the possibility for streets and phone numbers
We could filter out the missing data and automate the collection of this data from the URLs present in the DataFrame

In [0]:
silver_breweries_df.sample(0.02, 123).display()

id,name,brewery_type,phone,street,city,state,country,postal_code,latitude,longitude,website_url
f3e387ba-a709-4788-8be7-40544b60d002,THAT Brewery - Cottonwood,micro,9282023013,300 E Cherry St Unit B,Cottonwood,Arizona,United States,86326-6178,,,http://www.thatbrewery.com
dbe868ec-d1f5-4624-9661-c14d150d6071,미스터리 브루잉 (MysterLee Brewing Co.),brewpub,02-3272-6337,"311, Dongmak-ro",Mapo-gu,Seoul,South Korea,04156,37.5438955,126.9474136,http://www.mysterleebrewing.com/
46b43b36-9668-4b37-9a23-5f4784ed47b8,TripEnd Brewing,micro,6073467472,3072 Prutsman Rd,Troupsburg,New York,United States,14885-9613,42.09121034,-77.55268946,http://www.tripendbrewing.com
0e22f154-5c03-4bee-9b37-657afaca8ecd,Wind River Brewing Co - WY,brewpub,3073672337,402 Pine St,Pinedale,Wyoming,United States,82941,42.86643365266134,-109.86603873089386,http://www.windriverbrewingco.com
7d0c32f8-09a9-4363-944f-3a836c4d8b17,Water Street Brewing Co,brewpub,6072174546,168 Water St Fl 1,Binghamton,New York,United States,13901-2736,,,http://www.waterstreetbrewingco.com
5edd07f2-5db8-4527-b234-227b23129c43,Two Monks Brewing Company,micro,2347382337,352 Massillon Rd,Akron,Ohio,United States,44312-2021,41.05205025,-81.46292368,http://www.2monksbru.com
5c68b706-804c-4554-b37e-09732562d708,대도양조장(daedo brewing),brewpub,0507-1446-2345,"47, Dongdeok-ro 14-gil",Jung-gu,Deagu,South Korea,41951,35.86077096,128.6064699,https://www.instagram.com/daedo_brewing
f26c65be-a08b-4bb5-a053-5a0c63ae4ff0,Thin Man Brewery,brewpub,7169234100,486 Elmwood Ave,Buffalo,New York,United States,14222-2014,42.91037815,-78.87720285,http://www.thinmanbrewery.com
6275d4e7-2531-4855-870f-25616819d7ae,Three Spirits Brewery,micro,9802074881,5046 Old Pineville Rd Ste 200,Charlotte,North Carolina,United States,28217-3033,35.1718311,-80.87842302,http://www.threespiritsbrewery.com
fa21605f-18f8-48da-9302-97dcf2d3ac64,Two Frogs Brewing Company,micro,7279406077,151 E Tarpon Ave,Tarpon Springs,Florida,United States,34689-3451,28.14632482,-82.75503088,


In [0]:
silver_breweries_df.where("phone is null").display()

id,name,brewery_type,phone,street,city,state,country,postal_code,latitude,longitude,website_url
4d0ab1cf-bb3a-4432-8108-3bd91cb900d4,The Mitten Brewing Company Production Facility,micro,,540 Leonard St NW,Grand Rapids,Michigan,United States,49504-4260,42.98491266,-85.68100276,
109e48c5-be0e-48f6-a017-4269bf6f51c2,The Federal Brewing Company,micro,,102 S Main St,Federalsburg,Maryland,United States,21632-1215,38.69360488,-75.77333325,http://www.fedbrew.com
89edb92f-550b-4b68-a503-b583d3dcba8f,The Austin Beer Garden Brewing Co,brewpub,,1305 W Oltorf St Ste B,Austin,Texas,United States,78704-5362,,,
dfa5640f-a31b-4bad-b1d9-3143e94c22ea,The Hold By Revelry Brewing,micro,,36 Romney St,Charleston,South Carolina,United States,29403-3825,32.80761336,-79.94569372,http://www.revelrybrewingco.com
615a73ce-eb1a-44bb-8a8d-0cc1b5573e72,Vashon Brewing Co,micro,,,Vashon,Washington,United States,98070-5900,,,http://www.vashonbrewingco.com
020aa383-378b-43d0-b91a-f8b4e5641be1,TTs Old Iron Brewery,micro,,154 S Madison St,Spokane,Washington,United States,99201-4542,47.655248,-117.4281651,http://www.ttsoldironbrewery.com
e16cb5ff-1d95-4cae-9719-ff9951b746a3,Urge Gastropub & Common House/Mason Ale Works,brewpub,,255 Redel Rd,San Marcos,California,United States,92078-4347,33.135679,-117.158627,
6e9dcd2b-fc79-49e3-8a6c-009a726fd771,Woodstacker Beer Company,micro,,850 Elm St,Manchester,New Hampshire,United States,03101,42.99003,-71.46309,https://woodstackerbeerco.com/
c4101816-9cb4-4327-afd1-5ec710378b76,Three Notch'd Brewing Company,micro,,520 2nd St SE,Charlottesville,Virginia,United States,22902-5794,38.0261159,-78.4820248,
5c8d8884-ab03-4de2-9370-2cd781eb7b07,Three Blondes Brewing,brewpub,,1875 Phoenix St,South Haven,Michigan,United States,49090-7151,42.4033183,-86.2642391,http://www.threeblondesbrewing.com


#### Standardizing Null Data

In [0]:
silver_breweries_df.display()

id,name,brewery_type,phone,street,city,state,country,postal_code,latitude,longitude,website_url
5d3e644a-e243-4bfb-ba06-f62b21bb94e7,The Mason Jar Brewing Company,micro,9512445277,29683 New Hub Dr Ste A,Menifee,California,United States,92586-6545,33.68931288,-117.1772064,http://www.masonjarbrewing.com
4d0ab1cf-bb3a-4432-8108-3bd91cb900d4,The Mitten Brewing Company Production Facility,micro,,540 Leonard St NW,Grand Rapids,Michigan,United States,49504-4260,42.98491266,-85.68100276,
f8d441be-c395-485d-bc40-97fb4182e1e4,Tin Roof Brewing Co,micro,2253777022,1624 Wyoming St,Baton Rouge,Louisiana,United States,70802-8514,30.419556,-91.1884332,http://www.tinroofbeer.com
cfee19cf-73ea-4a7e-9037-1105b4dbecd8,Top Frog Brewery,micro,5096712884,221 Vista Dr,Newport,Washington,United States,99156-7014,48.3131068419406,-117.16077576379269,
85fca49d-7715-4238-b3ab-4252a5299795,Union Pizza & Brewing Co,brewpub,2189988888,114 S Union Ave,Fergus Falls,Minnesota,United States,56537-,46.28264407,-96.07792093,http://www.unionpizzaandbrewing.com
109e48c5-be0e-48f6-a017-4269bf6f51c2,The Federal Brewing Company,micro,,102 S Main St,Federalsburg,Maryland,United States,21632-1215,38.69360488,-75.77333325,http://www.fedbrew.com
b4bad111-e8f4-45ac-bb74-c3dbac3cdceb,"The Levee Brewing Company, Inc",planning,8383901041,,Valdese,North Carolina,United States,28690,,,http://www.theleveepub.com
e7757500-d0bf-4060-8da7-be9734e4397d,The Red Baron,brewpub,8107441310,2495 S Center Rd,Burton,Michigan,United States,48519-1145,42.9924788,-83.63373728,http://www.baronburger.com
e2829cb7-d5e8-4401-8fb8-6f7af1150193,Tim's Pumpkin Patch,micro,3156739209,2901 Rose Hill Rd,Marietta,New York,United States,13110-3236,42.911678,-76.34398,http://www.timspumpkinpatch.com
594a7d7b-1885-4697-863e-5d60203b8a8a,Towerhill Brewery,brewpub,2158228788,237 W Butler Ave,Chalfont,Pennsylvania,United States,18914-3020,40.27984378,-75.21378231,http://www.towerhillbrewery.com


In [0]:
#Handle Missing Data (Example for a few columns, extend as needed)
silver_breweries_df = silver_breweries_df.fillna({
    'brewery_type': 'Unknown',  # Unknown category
    'phone': 'N/A', # Unknown category
    'street': 'Unknown',  # Unknown category
    'city': 'Unknown',  # Unknown category
    'postal_code': 'N/A',  # 'Not Applicable' or default value for postal code
    'website_url': 'N/A',  # 'Not Applicable' or 'no website'
    'latitude': 0.0,  # Default value for latitude (can be adjusted depending on context)
    'longitude': 0.0  # Default value for longitude (can be adjusted depending on context)
})



#### Removing Strings from the Phone Number

In [0]:
# Remove the '+' symbol; chose not to remove '-'
silver_breweries_df = silver_breweries_df.withColumn("phone", regexp_replace(col("phone"), "[+\\-\\s]", ""))

In [0]:
silver_breweries_df.columns

Out[23]: ['id',
 'name',
 'brewery_type',
 'phone',
 'street',
 'city',
 'state',
 'country',
 'postal_code',
 'latitude',
 'longitude',
 'website_url']

######## Adding Load Date

In [0]:
silver_breweries_df = silver_breweries_df.withColumn("_loadDate", lit(datetime.now()))

#### Standardizing Data Types

In [0]:
# Converting data to a specific data format
silver_breweries_df = (silver_breweries_df \
    .withColumn("id", col("id").cast(StringType()))\
    .withColumn("name", col("name").cast(StringType()))\
    .withColumn("brewery_type", col("brewery_type").cast(StringType()))\
    .withColumn("phone", col("phone").cast(StringType()))\
    .withColumn("street", col("street").cast(StringType()))\
    .withColumn("city", col("city").cast(StringType()))\
    .withColumn("state", col("state").cast(StringType()))\
    .withColumn("country", col("country").cast(StringType()))\
    .withColumn("postal_code", col("postal_code").cast(StringType()))\
    .withColumn("latitude", col("latitude").cast(DoubleType()))\
    .withColumn("longitude", col("longitude").cast(DoubleType()))\
    .withColumn("website_url", col("website_url").cast(StringType()))
    .withColumn("_loadDate", col("_loadDate").cast(DateType()))
)


In [0]:
#printing the Schema
silver_breweries_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- brewery_type: string (nullable = false)
 |-- phone: string (nullable = false)
 |-- street: string (nullable = false)
 |-- city: string (nullable = false)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- postal_code: string (nullable = false)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- website_url: string (nullable = false)
 |-- _loadDate: date (nullable = false)



#### Writing the DataFrame as Delta in the Silver Layer

In [0]:
caminho_silver_delta = "/dbfs/FileStore/project_breweries/silver/breweries_delta" # Define the Path in Your Databricks Environment

silver_breweries_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("state") \
    .save(caminho_silver_delta)

print(f"DataFrame Silver Layer criado e salvo em Delta Lake: {caminho_silver_delta}")

DataFrame Silver Layer criado e salvo em Delta Lake: /dbfs/FileStore/project_breweries/silver/breweries_delta


In [0]:
# Materialize the Delta table as a table
#spark.sql("CREATE TABLE IF NOT EXISTS default.silver_breweries USING delta LOCATION '{0}'".format(caminho_silver_delta))