In [1]:
# %pip install pyspark
%pip install findspark

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import  StringType, IntegerType, ShortType, DateType
from typing import List
import logging
import sys

# Inicialize o SparkSession
spark = SparkSession.builder \
    .appName("ETL NortWind") \
    .master("spark://spark-master:7077") \
    .config('spark.jars.packages','com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.22.0,com.google.cloud.bigdataoss:gcs-connector:hadoop3-1.9.5,com.google.guava:guava:r05')\
    .config("spark.jars", "/home/notebooks/CaseEqualBi/postgresql-42.7.3.jar") \
    .config("spark.executor.memory", "4g") \
    .config("spark.ui.port", "4040")\
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.google.cloud.spark#spark-bigquery-with-dependencies_2.12 added as a dependency
com.google.cloud.bigdataoss#gcs-connector added as a dependency
com.google.guava#guava added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-469dc1a7-a469-46a6-893b-5e0ad620ce1e;1.0
	confs: [default]
	found com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.22.0 in central
	found com.google.cloud.bigdataoss#gcs-connector;hadoop3-1.9.5 in central
	found com.google.api-client#google-api-client-java6;1.24.1 in central
	found com.google.api-client#google-api-client;1.24.1 in central
	found com.google.oauth-client#google-oauth-client;1.24.1 in central
	found com.google.http-client#google-http-client;1.24.1 in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found org.apache.httpcomponents#httpclient;4.0.1 in central
	found org.apache.httpcomponents#htt

In [3]:
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)

logger.addHandler(console_handler)

Extracao tabelas Banco de dados

In [4]:
logger.info("Início do processo ETL")
import json 
import os

with open("config.json", "r") as file: 
    config = json.load(file)

db_config = config['database']
url_DB = db_config['url_DB']
properties = {
    "user": "leotnunesn",
    "password": os.getenv('POSTGRES_PASSWORD'),
    "driver": "org.postgresql.Driver"
}

def status_conexaoDB():
    try:
        spark.read.jdbc(url=url_DB, table="categories", properties=properties) 
        logger.info("Conexão com DB bem sucedida!")
        return True
    except Exception as e:
        print(f"Falha na conexão: {e}") 
        logger.info(f"Falha na conexão: {e}")
        return False


2024-07-16 22:56:28,759 - __main__ - INFO - Início do processo ETL


In [5]:
if status_conexaoDB():
    class TabelaSpark:
        def __init__(self, nome: str, dataframe: DataFrame):
            self.nome = nome
            self.dataframe = dataframe

    lista_tabelas = ["categories", "customer_customer_demo", 
                    "customer_demographics", "customers", 
                    "employee_territories", 
                    "employees", "orders", 
                    "products", "region", 
                    "shippers", "suppliers", 
                    "territories","us_states"] 

    tabelasspark:  List[TabelaSpark] = []

    for nometabela in lista_tabelas:  
        df = spark.read.jdbc(url=url_DB, table=nometabela, properties=properties) 
        if df.isEmpty(): 
            logger.info(f"{nometabela} # NÃO CARREGADA POR SER TABELA VAZIA #")
        else:   
            sparktables = TabelaSpark(nometabela, dataframe=df) 
            tabelasspark.append(sparktables) 
            logger.info(f"COLUNA: {nometabela} # CARREGADA COM SUCESSO #")
    logger.info("Tabelas carregadas com sucesso!")
else:
    logger.info("Erro na conexão. Verifique suas configurações e tente novamente.") 
    sys.exit()

2024-07-16 22:56:35,712 - __main__ - INFO - Conexão com DB bem sucedida!
2024-07-16 22:56:38,192 - __main__ - INFO - COLUNA: categories # CARREGADA COM SUCESSO #
2024-07-16 22:56:38,520 - __main__ - INFO - customer_customer_demo # NÃO CARREGADA POR SER TABELA VAZIA #
2024-07-16 22:56:38,823 - __main__ - INFO - customer_demographics # NÃO CARREGADA POR SER TABELA VAZIA #
2024-07-16 22:56:39,094 - __main__ - INFO - COLUNA: customers # CARREGADA COM SUCESSO #
2024-07-16 22:56:39,361 - __main__ - INFO - COLUNA: employee_territories # CARREGADA COM SUCESSO #
2024-07-16 22:56:39,626 - __main__ - INFO - COLUNA: employees # CARREGADA COM SUCESSO #
2024-07-16 22:56:39,921 - __main__ - INFO - COLUNA: orders # CARREGADA COM SUCESSO #
2024-07-16 22:56:40,207 - __main__ - INFO - COLUNA: products # CARREGADA COM SUCESSO #
2024-07-16 22:56:40,449 - __main__ - INFO - COLUNA: region # CARREGADA COM SUCESSO #
2024-07-16 22:56:40,693 - __main__ - INFO - COLUNA: shippers # CARREGADA COM SUCESSO #
2024-07-

Extracao tabela csv

In [9]:
from pyspark import SparkFiles 
import requests
import io

with open("config.json", "r") as file: 
    config = json.load(file) 

file_config = config['files']
fileid = file_config['fileid']
url_csv = file_config['url_csv']
localfile = file_config['localfile']

response = requests.get(url_csv)

if response.status_code == 200:
    df = io.BytesIO(response.content)
    #Caso o sparkContext fosse a proprio notebook
    # with open("data.csv", 'wb') as f:
    #     f.write(response.content)
    logger.info("Conexão com drive bem sucedida!")
    logger.info("Dados do csv extraidos para variavel: df!")
else:
    logger.info(f"Erro ao baixar o arquivo: {response.status_code} - {response.reason}")
    sys.exit()

2024-07-16 22:57:12,991 - __main__ - INFO - Conexão com drive bem sucedida!
2024-07-16 22:57:12,992 - __main__ - INFO - Dados do csv extraidos para variavel: df!


In [10]:
import pandas as pd 
df_pandas = pd.read_csv(df) 
df_spark = spark.createDataFrame(df_pandas)
#Caso o sparkContext fosse a proprio notebook
# spark.sparkContext.addFile(localfile) 
# path = SparkFiles.get("data.csv")
# df_spark = spark.read.csv(path, header=True, inferSchema=True, sep=",") 

if df_spark.isEmpty(): 
    logger.info("CSV # NÃO CARREGADO POR SER TABELA VAZIA #")
else:
    tabelacsv = TabelaSpark(nome = "order_details", dataframe = df_spark)
    tabelasspark.append(tabelacsv) 

Transformacao e limpeza

In [11]:
def preencher_nulos(tabelas: List[TabelaSpark]) -> List[TabelaSpark]: 
    for tabela in tabelas: 
        for colum in tabela.dataframe.columns:
            if tabela.dataframe.schema[colum].dataType in [ShortType(), IntegerType()]:
                tabela.dataframe = tabela.dataframe.fillna({colum: 0}) 
            elif  tabela.dataframe.schema[colum].dataType == StringType(): 
                tabela.dataframe = tabela.dataframe.fillna({colum: "missing"})  
            elif  tabela.dataframe.schema[colum].dataType == DateType(): 
                tabela.dataframe = tabela.dataframe.fillna({colum: "1900-01-01"})  
    return tabelas

tabelasspark_1 = preencher_nulos(tabelasspark)

In [14]:
geo_dict = {
    "region": "region_description",
    "us_states": "state_name",
    "territories": "territory_description"
}

def retirar_duplicatasgeo(tabelas: List[TabelaSpark]) -> List[TabelaSpark]:
    for tabela in tabelas:
        if tabela.nome in geo_dict:
            coluna = geo_dict[tabela.nome]
            tabela.dataframe = tabela.dataframe.dropDuplicates([coluna])
    return tabelas

tabelasspark_2 = retirar_duplicatasgeo(tabelasspark_1)

In [10]:
#####Caso onde o poder computacional é insuficiente para o processamento dos dados ##############

# listadfgrande = ["customers", "orders", "region", "territories"] 
# tabelasspark_3 = [tabela for tabela in tabelasspark_2 if tabela.nome not in listadfgrande] 
# tabelasspark_b = [tabela for tabela in tabelasspark_2 if tabela.nome in listadfgrande] 
# # tabelasspark_metade: List[TabelaSpark] = []
# tabelasspark_div: List[TabelaSpark] = []



# # def separacao(tabela, nome, cond): 
# #     tabelametade = tabela.limit(tabela.count() // 2)
# #     if cond:
# #         return TabelaSpark(nome, tabela) 
# #     else:
# #         tabela.subtract(tabelametade)
# #         return TabelaSpark(nome, tabela)


# # def dfmetade(tabelas: List[TabelaSpark]) -> List[TabelaSpark]: 
# #     for tabela in tabelas: 
# #         if tabela.nome != "region" and tabela.nome != "territories":
# #             print(tabela.nome)
# #             tabelasspark_metade.append(separacao(tabela.dataframe, f"{tabela.nome}1", True))
# #             tabelasspark_metade.append(separacao(tabela.dataframe, f"{tabela.nome}2", False))


# def dfdividir(tabelas: List[TabelaSpark]) -> List[TabelaSpark]: 
#     for tabela in tabelas: 
#         print("entrou", tabela.nome)
#         partes = [tabela.dataframe.sample(False, 1.0/4).cache() for _ in range(4)]
#         partes_exclusivas = [
#             partes[0].subtract(partes[1]).subtract(partes[2]).subtract(partes[3]),
#             partes[1].subtract(partes[0]).subtract(partes[2]).subtract(partes[3]),
#             partes[2].subtract(partes[0]).subtract(partes[1]).subtract(partes[3]),
#             partes[3].subtract(partes[0]).subtract(partes[1]).subtract(partes[2])
#         ] 
#         tabelasspark_div.append(TabelaSpark(f"{tabela.nome}1", partes_exclusivas[0]))
#         tabelasspark_div.append(TabelaSpark(f"{tabela.nome}2", partes_exclusivas[1]))
#         tabelasspark_div.append(TabelaSpark(f"{tabela.nome}3", partes_exclusivas[2]))
#         tabelasspark_div.append(TabelaSpark(f"{tabela.nome}4", partes_exclusivas[3]))



# # dfmetade(tabelasspark_b)
# dfdividir(tabelasspark_b) 


entrou customers
entrou orders
entrou region
entrou territories


In [15]:
# POSSIVEL VERIFICAO
def verificar_duplicatasid(tabelas: List[TabelaSpark]) -> List[tuple[str, str, bool]]: 
    duplicatasID = []
    for tabela in tabelas: 
        tabelarepar = tabela.dataframe.repartition(4)
        for colum in tabelarepar.columns: 
            if 'id' in colum.lower(): 
                duplicatas_existem = tabelarepar.select(colum).distinct().count() != tabelarepar.count()   
                duplicatasID.append((tabela.nome, colum, duplicatas_existem))
    return duplicatasID
duplicatasID = verificar_duplicatasid(tabelasspark_2)

schemaduplicadas = ["DataFrame", "nome_coluna", "duplicata"]
dfduplicatasID = spark.createDataFrame(duplicatasID, schemaduplicadas) 

dfduplicatasID.show()

+--------------------+------------+---------+
|           DataFrame| nome_coluna|duplicata|
+--------------------+------------+---------+
|          categories| category_id|    false|
|           customers| customer_id|    false|
|employee_territories| employee_id|     true|
|employee_territories|territory_id|    false|
|           employees| employee_id|    false|
|              orders|    order_id|    false|
|              orders| customer_id|     true|
|              orders| employee_id|     true|
|            products|  product_id|    false|
|            products| supplier_id|     true|
|            products| category_id|     true|
|              region|   region_id|    false|
|            shippers|  shipper_id|    false|
|           suppliers| supplier_id|    false|
|         territories|territory_id|    false|
|         territories|   region_id|     true|
|           us_states|    state_id|    false|
|       order_details|    order_id|     true|
|       order_details|  product_id

In [16]:
# POSSIVEL Validacao
def validacao_nulos(tabelas: List[TabelaSpark]):
    for tabela in tabelas: 
        for colum in tabela.dataframe.columns:
            assert tabela.dataframe.repartition(4).filter(F.col(colum).isNull()).count() == 0, f"Valores nulos em {colum} da tabela {tabela.nome}" 

validacao_nulos(tabelasspark_2)
# validacao_nulos(tabelasspark_div)

In [17]:
def validacao_integridade(tabelas: List[TabelaSpark]):
    for tabela in tabelas: 
        assert tabela.dataframe.count() > 0, f"DataFrame {tabela.nome} está vazio" 

validacao_integridade(tabelasspark_2)
# validacao_integridade(tabelasspark_div)

In [14]:
# Conferir tabelas particionas quando nao ha poder computacional
# for tabela in tabelasspark_div: 
#     print(tabela.nome, tabela.dataframe)

customers1 DataFrame[customer_id: string, company_name: string, contact_name: string, contact_title: string, address: string, city: string, region: string, postal_code: string, country: string, phone: string, fax: string]
customers2 DataFrame[customer_id: string, company_name: string, contact_name: string, contact_title: string, address: string, city: string, region: string, postal_code: string, country: string, phone: string, fax: string]
customers3 DataFrame[customer_id: string, company_name: string, contact_name: string, contact_title: string, address: string, city: string, region: string, postal_code: string, country: string, phone: string, fax: string]
customers4 DataFrame[customer_id: string, company_name: string, contact_name: string, contact_title: string, address: string, city: string, region: string, postal_code: string, country: string, phone: string, fax: string]
orders1 DataFrame[order_id: smallint, customer_id: string, employee_id: smallint, order_date: date, required_dat

Carregamento para banco de Dados Local

In [15]:
url_DB2 = db_config['url_DB2'] 
for tabela in tabelasspark_3: 
    tabela.dataframe.write.jdbc(url_DB2, table=tabela.nome, mode="overwrite", properties=properties) 

24/06/26 18:18:58 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [16]:
#Caso poder compuntacional nao fosse suficiente, subida de reparticoes
# for tabela in tabelasspark_div: 
#     print(tabela.nome)
#     if "1" in tabela.nome:
#         print("entra")
#         tabela.dataframe.write.jdbc(url_DB2, table=tabela.nome, mode="overwrite", properties=properties) 
#     elif "2" in tabela.nome: 
#         tabela.dataframe.write.jdbc(url=url_DB2, table=tabela.nome, mode="append", properties=properties)
#     elif "3" in tabela.nome: 
#         tabela.dataframe.write.jdbc(url=url_DB2, table=tabela.nome, mode="append", properties=properties)
#     elif "4" in tabela.nome: 
#         tabela.dataframe.write.jdbc(url=url_DB2, table=tabela.nome, mode="append", properties=properties)

Conexao com o GCP Big Query

In [17]:
# %pip install google-cloud-bigquery

In [18]:
# project_id = os.getenv('GOOGLE_PROJECT_ID')
# dataset_id = os.getenv('BIGQUERY_DATASET_ID')

# for tabela in tabelasspark: 
#      tabela.dataframe.write.format("bigquery") \
#     .option("table", f"{project_id}.{dataset_id}.{tabela.nome}") \
#     .save(mode="overwrite")

# logger.info("Fim do processo ETL")
# spark.stop()