## **Bibliotecas**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

## **Funções**

In [0]:
%run ./functions

## **Parâmetros**

In [0]:
bronze_path = f'{lake_path}/bronze/tb_bronze_breweries'

## **Tratamento**

In [0]:
# Leitura da Tabela Bronze 
df_breweries = (spark.read.format('delta').load(bronze_path))

In [0]:
df_breweries.select('country').distinct().display()

country
Sweden
Singapore
United States
Ireland
Portugal
Austria
England
Germany
Poland
Scotland


Nota-se que United States apresenta uma duplicidade devido a um espaço adicionado no início da string

In [0]:
df_breweries = df_breweries.withColumn('country', trim(col('country')))

In [0]:
df_breweries.select('country').distinct().display()

country
Germany
France
United States
Ireland
South Korea
Portugal
Austria
England
Scotland
Sweden


In [0]:
# VALIDAÇÃO DA UNICIDADE DAS CHAVES
key_validation(dataframe=df_breweries, chave='id')
df_breweries_final = df_breweries

In [0]:
display(df_breweries_final.limit(5))

address_1,address_2,address_3,brewery_type,city,country,id,latitude,longitude,name,phone,postal_code,state,state_province,street,website_url,tmstp_ingestion
1716 Topeka St,,,micro,Norman,United States,5128df48-79fc-4f0f-8b52-d06be54d0cec,35.25738891,-97.46818222,(405) Brewing Co,4058160490,73069-8224,Oklahoma,Oklahoma,1716 Topeka St,http://www.405brewing.com,2024-09-26T20:39:35.532+0000
407 Radam Ln Ste F200,,,micro,Austin,United States,9c5a66c8-cc13-416f-a5d9-0a769c87d318,,,(512) Brewing Co,5129211545,78745-1197,Texas,Texas,407 Radam Ln Ste F200,http://www.512brewing.com,2024-09-26T20:39:35.532+0000
8100 Washington Ave,,,micro,Mount Pleasant,United States,34e8c68b-6146-453f-a4b9-1f6cd99a5ada,42.72010826899558,-87.88336350209435,1 of Us Brewing Company,2624847553,53406-3920,Wisconsin,Wisconsin,8100 Washington Ave,https://www.1ofusbrewing.com,2024-09-26T20:39:35.532+0000
1501 E St,,,large,San Diego,United States,ef970757-fe42-416f-931d-722451f1f59c,32.714813,-117.129593,10 Barrel Brewing Co,6195782311,92101-6618,California,California,1501 E St,http://10barrel.com,2024-09-26T20:39:35.532+0000
62970 18th St,,,large,Bend,United States,6d14b220-8926-4521-8d19-b98a2d6ec3db,44.08683531,-121.281706,10 Barrel Brewing Co,5415851007,97701-9847,Oregon,Oregon,62970 18th St,http://www.10barrel.com,2024-09-26T20:39:35.532+0000


## **Upsert na Tabela**

In [0]:
# Criação da View para processo de Upsert
df_breweries_final.createOrReplaceTempView('source_view_breweries')

In [0]:
%sql
MERGE INTO tb_silver_breweries as TARGET
USING source_view_breweries as SOURCE
ON TARGET.id = SOURCE.id
WHEN MATCHED THEN
  UPDATE SET  
    address_1 = source.address_1,
    address_2 = source.address_2,
    address_3 = source.address_3,
    brewery_type = source.brewery_type,
    city = source.city,
    country = source.country,
    latitude = source.latitude,
    longitude = source.longitude,
    name = source.name,
    phone = source.phone,
    postal_code = source.postal_code,
    state = source.state,
    state_province = source.state_province,
    street = source.street,
    website_url = source.website_url,
    tmstp_ingestion = source.tmstp_ingestion  
WHEN NOT MATCHED
  THEN INSERT (
    id,
    address_1,
    address_2,
    address_3,
    brewery_type,
    city,
    country,
    latitude,
    longitude,
    name,
    phone,
    postal_code,
    state,
    state_province,
    street,
    website_url,
    tmstp_ingestion    
        )
  VALUES (
    SOURCE.id,
    SOURCE.address_1,
    SOURCE.address_2,
    SOURCE.address_3,
    SOURCE.brewery_type,
    SOURCE.city,
    SOURCE.country,
    SOURCE.latitude,
    SOURCE.longitude,
    SOURCE.name,
    SOURCE.phone,
    SOURCE.postal_code,
    SOURCE.state,
    SOURCE.state_province,
    SOURCE.street,
    SOURCE.website_url,
    SOURCE.tmstp_ingestion  
    )

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
8318,0,0,8318
