Importação de bibliotecas

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

import pandas as pd
from sqlalchemy import create_engine

In [2]:


DB_USERNAME = 'postgres'
DB_PASSWORD = 'admin'
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_DATABASE = 'desafio'

# Conexão usando SQLAlchemy
string_conexao = f"postgresql+psycopg2://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_DATABASE}"

# Tente imprimir a string de conexão para verificar se ela está correta
print(string_conexao)

# Crie uma engine usando a string de conexão
engine = create_engine(string_conexao)


postgresql+psycopg2://postgres:admin@localhost:5432/desafio


In [3]:

# Teste de leitura
consulta_sql = "SELECT * FROM associado"
dados_tabela = pd.read_sql(consulta_sql, engine)
#caminho_arquivo_csv = 'C:/Users/elton/Documents/projeto/sicooperative/sicooperative/Code/Development'
#dados_tabela.to_csv(caminho_arquivo_csv, index=False)
# Exiba os dados
print(dados_tabela.head())

   id     nome sobrenome  idade                       email
0   1   Carlos  Ferreira     28   carlos.ferreira@email.com
1   2      Ana  Oliveira     35      ana.oliveira@email.com
2   3    Pedro   Almeida     40     pedro.almeida@email.com
3   4  Mariana     Costa     22     mariana.costa@email.com
4   5  Ricardo  Silveira     33  ricardo.silveira@email.com


In [4]:
#pip install py4j

In [5]:
# Configuração do Spark com o driver JDBC do PostgreSQL
spark = SparkSession.builder \
    .appName("ETL do Banco de Dados") \
    .config("spark.driver.extraClassPath", "C:/Users/elton/Documents/projeto/postgresql-42.7.0.jar") \
    .config("spark.hadoop.home.dir", "C:/Users/elton/Documents/spark-3.5.0-bin-hadoop3") \
    .getOrCreate()


In [6]:
# Imprime informações sobre a sessão do Spark
print("Sessão do Spark criada com sucesso!")
print(f"Versão do Spark: {spark.version}")
print("Classpath do Spark:", spark._jsc.sc().getConf().get("spark.driver.extraClassPath"))
print(spark.sparkContext.getConf().toDebugString())

Sessão do Spark criada com sucesso!
Versão do Spark: 3.5.0
Classpath do Spark: C:/Users/elton/Documents/projeto/postgresql-42.7.0.jar
spark.app.id=local-1701122926953
spark.app.name=ETL do Banco de Dados
spark.app.startTime=1701122918896
spark.app.submitTime=1701122915417
spark.driver.extraClassPath=C:/Users/elton/Documents/projeto/postgresql-42.7.0.jar
spark.driver.extraJavaOptions=-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.b

In [7]:
# URL de conexão JDBC
url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_DATABASE}"

# Propriedades para a conexão JDBC
properties = {
    "user": DB_USERNAME,
    "password": DB_PASSWORD,
    "driver": "org.postgresql.Driver"
}


In [8]:
## Leitura da tabela associado como um DataFrame (Zona Bruta)
df_associado_raw = spark.read.jdbc(url=url, table="associado", properties=properties)


In [9]:
df_associado_raw.show()

+---+--------+---------+-----+--------------------+
| id|    nome|sobrenome|idade|               email|
+---+--------+---------+-----+--------------------+
|  1|  Carlos| Ferreira|   28|carlos.ferreira@e...|
|  2|     Ana| Oliveira|   35|ana.oliveira@emai...|
|  3|   Pedro|  Almeida|   40|pedro.almeida@ema...|
|  4| Mariana|    Costa|   22|mariana.costa@ema...|
|  5| Ricardo| Silveira|   33|ricardo.silveira@...|
|  6|  Camila|  Santana|   29|camila.santana@em...|
|  7|   Lucas|  Pereira|   31|lucas.pereira@ema...|
|  8|   Aline|  Martins|   26|aline.martins@ema...|
|  9|Fernando|  Ribeiro|   37|fernando.ribeiro@...|
| 10| Juliana|     Lima|   24|juliana.lima@emai...|
+---+--------+---------+-----+--------------------+



In [10]:
# Salvar o DataFrame como Delta Lake
#df_associado_raw.write.format("delta").mode("overwrite").save(caminho_raw_zone + "/associado")

# Converte o DataFrame do Spark para um DataFrame do Pandas
stg_associado = df_associado_raw.toPandas()

# Escreva o DataFrame do Pandas em formato Parquet
caminho_raw_zone = "stg_associado.parquet"
stg_associado.to_parquet(caminho_raw_zone, index=False)


In [11]:
# Leitura da tabela conta como um DataFrame (Zona Bruta)
df_conta_raw = spark.read.jdbc(url=url, table="conta", properties=properties)

In [12]:
# Converte o DataFrame do Spark para um DataFrame do Pandas
stg_conta = df_conta_raw.toPandas()

# Escreva o DataFrame do Pandas em formato Parquet
caminho_raw_zone = "stg_conta.parquet"
stg_conta.to_parquet(caminho_raw_zone, index=False)

In [13]:
# Leitura da tabela cartao como um DataFrame (Zona Bruta)
df_cartao_raw = spark.read.jdbc(url=url, table="cartao", properties=properties)

In [14]:
# Converte o DataFrame do Spark para um DataFrame do Pandas
stg_cartao = df_cartao_raw.toPandas()

# Escreva o DataFrame do Pandas em formato Parquet
caminho_raw_zone = "stg_cartao.parquet"
stg_cartao.to_parquet(caminho_raw_zone, index=False)

In [15]:
# Leitura da tabela movimento como um DataFrame (Zona Bruta)
df_movimento_raw = spark.read.jdbc(url=url, table="movimento", properties=properties)

In [16]:
# Converte o DataFrame do Spark para um DataFrame do Pandas
stg_movimento = df_movimento_raw.toPandas()

# Escreva o DataFrame do Pandas em formato Parquet
caminho_raw_zone = "stg_movimento.parquet"
stg_movimento.to_parquet(caminho_raw_zone, index=False)

ETL

In [17]:
# ETL para criação da tabela movimento_flat)
df_dim_associado = spark.read.parquet("stg_associado.parquet")
df_dim_conta = spark.read.parquet("stg_conta.parquet")
df_dim_cartao = spark.read.parquet("stg_cartao.parquet")
df_dim_movimento = spark.read.parquet("stg_movimento.parquet")

In [18]:
df_movimento_flat = df_dim_associado.alias("associado") \
    .join(df_dim_conta.alias("conta"), col("associado.id") == col("conta.id_associado")) \
    .join(df_dim_cartao.alias("cartao"), col("conta.id") == col("cartao.id_conta")) \
    .join(df_dim_movimento.alias("movimento"), col("cartao.id") == col("movimento.id_cartao")) \
    .select(
        col("associado.nome").alias("nome_associado"),
        col("associado.sobrenome").alias("sobrenome_associado"),
        col("associado.idade").alias("idade_associado"),
        col("movimento.vls_transacao").alias("vlr_transacao_movimento"),
        col("movimento.des_transacao").alias("des_transacao_movimento"),
        col("movimento.data_movimento"),
        col("cartao.num_cartao").alias("numero_cartao"),
        col("cartao.nom_impresso").alias("nome_impresso_cartao"),
        col("conta.tipo"),
        col("conta.data_criacao").alias("data_criacao_conta")
    )

In [19]:
df_movimento_flat.show()

+--------------+-------------------+---------------+-----------------------+-----------------------+-------------------+-------------+--------------------+------------+-------------------+
|nome_associado|sobrenome_associado|idade_associado|vlr_transacao_movimento|des_transacao_movimento|     data_movimento|numero_cartao|nome_impresso_cartao|        tipo| data_criacao_conta|
+--------------+-------------------+---------------+-----------------------+-----------------------+-------------------+-------------+--------------------+------------+-------------------+
|         Pedro|            Almeida|             40|                 150.75|           Supermercado|2023-04-10 00:00:00|        11112|       Pedro Almeida|Investimento|2023-02-10 00:00:00|
|       Mariana|              Costa|             22|                  45.80|            Restaurante|2023-04-15 00:00:00|        55556|       Mariana Costa|    Corrente|2023-02-15 00:00:00|
|       Ricardo|           Silveira|             33|   

In [21]:
# Converta o DataFrame do Spark para um DataFrame do Pandas
fato_movimento_flat = df_movimento_flat.toPandas()
# Escreva o DataFrame do Pandas em formato Parquet
caminho_raw_zone = "movimento_flat.parquet"
fato_movimento_flat.to_parquet(caminho_raw_zone, index=False)

In [22]:
# Escreva o DataFrame do Pandas em formato CSV
caminho_raw_zone_csv = "movimento_flat.csv"
fato_movimento_flat.to_csv(caminho_raw_zone_csv, index=False)

In [23]:
# Finaliza a sessão do Spark
spark.stop()