<a href="https://colab.research.google.com/github/AldenisFranca/Tech_Challenge_Bees/blob/develop/breweries_case_aldenis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Tech Challenge - Data Engineer - AB-InBev | Bees
#Candidato: Aldenis França

####Objective:
The goal of this test is to assess your skills in consuming data from an API, transforming and
persisting it into a data lake following the medallion architecture with three layers: raw data,
curated data partitioned by location, and an analytical aggregated layer.


### Instalando, importando e iniciando uma sessão no PySpark

In [None]:
# Instalando o PySpark
!pip install pyspark



In [None]:
# Importando a Biblioteca
from pyspark.sql import SparkSession

# Criando a sessão do Spark
spark = (
    SparkSession
    .builder
    .appName("Tech Challenge")
    .getOrCreate()
)

# Verificando
spark

###Importando as bibliotecas

In [52]:
import requests
from pyspark.sql.types import *
from pyspark.sql.functions import *

###Implementando paginação para obter os dados da API

In [44]:
# Criação da função para realizar as requisições na API
def get_all_data_from_api(base_url: str, page_param: str, per_page_param: str, page_start: int = 1, items_per_page: int = 50):

    """
    Função que faz requisições paginadas para uma API e retorna uma lista com
    todos os dados das páginas.

    Args:
        base_url: URL base da API.
        page_param: Nome do parâmetro de página na URL.
        per_page_param: Nome do parâmetro para itens por página.
        page_start: Número da primeira página (padrão: 1).
        items_per_page: Número de itens por página (padrão: 50).
    """

    all_data = []
    page = page_start

    while True:
        url = f'{base_url}?{page_param}={page}&{per_page_param}={items_per_page}'
        response = requests.get(url)

        if response.status_code == 200:
            data = response.json()
            if not data:
                break  # sai do loop se não houver mais dados

            all_data.extend(data)
            page += 1
        elif response.status_code == 429:
            all_data = []
            page = page_start
        else:
            print(f'Erro na requisição (página {page}): {response.status_code}')
            break  # sai do loop em caso de erro

    return all_data

In [49]:
# Informando a URL da API
base_url = 'https://api.openbrewerydb.org/v1/breweries'

# Chamada da função
complet_data = get_all_data_from_api(base_url, page_param = 'page', per_page_param = 'per_page')

# Quantidade de linhas da base completa
print(f'Quantidade de linhas da base completa: {len(complet_data)}')

Quantidade de linhas da base completa: 8408


##Implementando a Arquitetura do Data Lake

###Bronze Layer

Persist the raw data from the API in its native format or
any format you find suitable.

In [61]:
# Criando um DataFrame com schema específico
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("brewery_type", StringType(), True),
    StructField("address_1", StringType(), True),
    StructField("address_2", StringType(), True),
    StructField("address_3", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state_province", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("country", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("website_url", StringType(), True),
    StructField("state", StringType(), True),
    StructField("street", StringType(), True)
])

df_raw = spark.createDataFrame(complet_data, schema)
df_raw.show(truncate=False)

+------------------------------------+-------------------------------+------------+---------------------------+---------+---------+--------------+--------------+-----------+-------------+----------------+---------------+------------+----------------------------------+-------------+---------------------------+
|id                                  |name                           |brewery_type|address_1                  |address_2|address_3|city          |state_province|postal_code|country      |longitude       |latitude       |phone       |website_url                       |state        |street                     |
+------------------------------------+-------------------------------+------------+---------------------------+---------+---------+--------------+--------------+-----------+-------------+----------------+---------------+------------+----------------------------------+-------------+---------------------------+
|5128df48-79fc-4f0f-8b52-d06be54d0cec|(405) Brewing Co             

In [79]:
# Criando a pasta da camada raw
!mkdir raw

# Salvando a base de dados em formato CSV na camada raw
df_raw.write.format('csv') \
      .mode('overwrite') \
      .option('header', 'true') \
      .option('sep', '\t') \
      .save('raw/breweries_data')

### Silver Layer

In [66]:
# Criando a pasta da camada curated
!mkdir curated

1

###Gold Layer