In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Caminho para os drivers JDBC dentro do contêiner Docker
path_postgresql = "/opt/spark/jars/postgresql-42.2.5.jar"
path_mysql = "/opt/spark/jars/mysql-connector-java-8.0.29.jar"
path_sqlserver = "/opt/spark/jars/sqljdbc42.jar"

# Configurando a SparkSession para incluir os drivers JDBC
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", f"{path_postgresql},{path_mysql},{path_sqlserver}") \
    .getOrCreate()

## Criar schema se precisar criar a tabela

In [14]:
# Conexão com o PostgreSQL em outro contêiner Docker
postgres_url = "jdbc:postgresql://postgres:5432/exemploDB"
postgres_properties = {"user": "exemploUsuario", "password": "exemploSenha", "driver": "org.postgresql.Driver"}

# Criação do schema para o DataFrame
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("nome", StringType(), True),
    StructField("valor", DoubleType(), True)
])

# Dados para inserir no banco de dados
data = [(2, 'Cliente2', 100.0)]
df = spark.createDataFrame(data, schema)

# Escrevendo o DataFrame no banco de dados, criando a tabela
df.write.jdbc(url=postgres_url, table="public.clientes", mode="append", properties=postgres_properties)

# Lendo e exibindo os dados
df_postgres = spark.read.jdbc(url=postgres_url, table="public.clientes", properties=postgres_properties)
df_postgres.show()


+---+----------+-----+
| id|      nome|valor|
+---+----------+-----+
|  1|  Cliente1|100.0|
|  1|  Cliente1|100.0|
|  1|  Cliente1|100.0|
|  1|  Cliente1|100.0|
|  1|'Cliente1'|100.0|
|  2|  Cliente2|100.0|
+---+----------+-----+



In [19]:
# Conexão com o PostgreSQL em outro contêiner Docker
postgres_url = "jdbc:postgresql://postgres:5432/exemploDB"
postgres_properties = {"user": "exemploUsuario", "password": "exemploSenha", "driver": "org.postgresql.Driver"}

# Definir o caminho do arquivo CSV
csv_file_path = 'arquivos/teste.csv'

# Ler os dados do CSV para um DataFrame. Aqui, assumimos que o arquivo CSV possui um cabeçalho.
# Também inferimos automaticamente o esquema dos dados. Adicionalmente, como o delimitador
# padrão de um arquivo CSV é uma vírgula e o arquivo que estamos usando pode ter um delimitador diferente
# (como ponto e vírgula), especificamos isso na opção 'sep'. Também indicamos o caractere de aspas
# com a opção 'quote'. Para lidar com questões de codificação de caracteres que podem surgir ao trabalhar
# com arquivos em formatos diferentes de UTF-8, usamos a opção 'encoding' para definir a codificação
# correta, que neste caso é 'ISO-8859-1'.
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("encoding", "ISO-8859-1") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .csv(csv_file_path)

# Escrevendo o DataFrame no banco de dados, criando a tabela
df.write.jdbc(url=postgres_url, table="public.clientes", mode="append", properties=postgres_properties)

# Lendo e exibindo os dados
df_postgres = spark.read.jdbc(url=postgres_url, table="public.clientes", properties=postgres_properties)
df_postgres.show()

+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|100.0|
|  2|Cliente2|100.0|
|  3|Cliente3|100.0|
|  4|Cliente4|100.0|
+---+--------+-----+



In [18]:
import psycopg2

# Estabeleça a conexão com o banco de dados PostgreSQL
try:
    # Conecte-se ao banco de dados PostgreSQL
    connection = psycopg2.connect(
        user="exemploUsuario",
        password="exemploSenha",
        host="postgres",
        port="5432",  # A porta mapeada para o contêiner Docker
        database="exemploDB"
    )

    # Crie um cursor para executar comandos SQL
    cursor = connection.cursor()

    # Comando SQL para deletar todos os registros da tabela
    delete_query = "DELETE FROM public.clientes"

    # Execute o comando SQL
    cursor.execute(delete_query)
    connection.commit()
    print("Todos os registros foram deletados com sucesso da tabela clientes.")

except (Exception, psycopg2.Error) as error:
    print("Erro ao conectar ao PostgreSQL", error)

finally:
    # Fechar a conexão e o cursor
    if connection:
        cursor.close()
        connection.close()
        print("Conexão com PostgreSQL fechada")


Todos os registros foram deletados com sucesso da tabela clientes.
Conexão com PostgreSQL fechada


In [23]:
import psycopg2
from pyspark.sql import SparkSession

# Inicialização do SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()

# Carregar o DataFrame a partir de um arquivo CSV
df = spark.read.option("header", "true").option("inferSchema", "true").csv("arquivos/teste_update.csv")

# Converter o DataFrame em um RDD e coletar os dados para a memória local
records = df.rdd.map(lambda row: (row['id'], row['nome'], row['valor'])).collect()

# Conectar ao banco de dados PostgreSQL e executar o update linha por linha
try:
    connection = psycopg2.connect(user="exemploUsuario",
                                  password="exemploSenha",
                                  host="postgres",
                                  port="5432",
                                  database="exemploDB")
    cursor = connection.cursor()
    
    update_query = "UPDATE public.clientes SET nome = %s, valor = %s WHERE id = %s"
    
    for record in records:
        cursor.execute(update_query, (record[1], record[2], record[0]))
    
    connection.commit()
    print("Os registros foram atualizados com sucesso.")
    
except (Exception, psycopg2.Error) as error:
    print("Erro ao atualizar os registros: ", error)
finally:
    if connection:
        cursor.close()
        connection.close()
        print("A conexão com o PostgreSQL foi fechada.")


Os registros foram atualizados com sucesso.
A conexão com o PostgreSQL foi fechada.


In [24]:
# Lendo e exibindo os dados
df_postgres = spark.read.jdbc(url=postgres_url, table="public.clientes", properties=postgres_properties)
df_postgres.show()

+---+--------+-------+
| id|    nome|  valor|
+---+--------+-------+
|  1|Cliente1| 200.78|
|  2|Cliente2| 500.09|
|  3|Cliente3| 1100.1|
|  4|Cliente4|1900.56|
+---+--------+-------+



## SQL Server

In [3]:
from pyspark.sql import SparkSession

# Caminho para os drivers JDBC dentro do contêiner Docker
path_sqlserver = "/opt/spark/jars/sqljdbc42.jar"

# Caminho para o arquivo CSV
csv_file_path = 'arquivos/teste.csv'

# Configurando a SparkSession para incluir os drivers JDBC
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", f"{path_sqlserver}") \
    .getOrCreate()

# Ler os dados do CSV para um DataFrame
df = spark.read.option("header", "true").option("inferSchema", "true").csv(csv_file_path)

# Propriedades de conexão com o SQL Server
sqlserver_url = "jdbc:sqlserver://sqlserver:1433;databaseName=tempdb"
sqlserver_properties = {
    "user": "SA",
    "password": "exemploSenha1!",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Gravar o DataFrame no SQL Server
df.write.jdbc(url=sqlserver_url, table="dbo.clientes", mode="append", properties=sqlserver_properties)

# Lendo e exibindo os dados para verificar a inserção
df_sqlserver = spark.read.jdbc(url=sqlserver_url, table="dbo.clientes", properties=sqlserver_properties)
df_sqlserver.show()


+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|100.0|
|  2|Cliente2|100.0|
|  3|Cliente3|100.0|
|  4|Cliente4|100.0|
+---+--------+-----+



## MySQL

In [2]:
from pyspark.sql import SparkSession

# Caminho para os drivers JDBC dentro do contêiner Docker
path_mysql = "/opt/spark/jars/mysql-connector-java-8.0.29.jar"

# Caminho para o arquivo CSV
csv_file_path = 'arquivos/teste.csv'

# Configurando a SparkSession para incluir os drivers JDBC
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", f"{path_mysql}") \
    .getOrCreate()

# Ler os dados do CSV para um DataFrame
df = spark.read.option("header", "true").option("inferSchema", "true").csv(csv_file_path)

# Propriedades de conexão com o MySQL
mysql_url = "jdbc:mysql://mysql:3306/exemploDB"
mysql_properties = {
    "user": "exemploUsuario",
    "password": "exemploSenhaUsuario",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Gravar o DataFrame no MySQL
df.write.jdbc(url=mysql_url, table="clientes", mode="append", properties=mysql_properties)

# Lendo e exibindo os dados para verificar a inserção
df_mysql = spark.read.jdbc(url=mysql_url, table="clientes", properties=mysql_properties)
df_mysql.show()


+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|100.0|
|  2|Cliente2|100.0|
|  3|Cliente3|100.0|
|  4|Cliente4|100.0|
+---+--------+-----+



## Delta Lake

In [1]:
import pyspark
from importlib.metadata import version, PackageNotFoundError

# Imprime a versão do Spark
print("Versão do Spark:", pyspark.__version__)

# Tenta imprimir a versão do Delta Lake
try:
    print("Versão do Delta Lake:", version("delta-spark"))
except PackageNotFoundError:
    print("Delta Lake não está instalado.")

Versão do Spark: 3.5.0
Versão do Delta Lake: 3.0.0


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DeltaLakeExample") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

df = spark.read.csv('arquivos/teste.csv', header=True, inferSchema=True)

df.write.format("delta").mode("overwrite").save("arquivos/tmp/delta/teste")

# Ler dados no formato Delta
delta_df = spark.read.format("delta").load("arquivos/tmp/delta/teste")
delta_df.show()


+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|100.0|
|  2|Cliente2|100.0|
|  3|Cliente3|100.0|
|  4|Cliente4|100.0|
+---+--------+-----+



In [10]:
# Acessando uma versão específica de um conjunto de dados
df = spark.read.format("delta").option("versionAsOf", 0).load("arquivos/tmp/delta/teste")
print("versionAsOf: 0")
df.show()

# Acessando uma versão específica de um conjunto de dados
df = spark.read.format("delta").option("versionAsOf", 1).load("arquivos/tmp/delta/teste")
print("versionAsOf: 1")
df.show()

versionAsOf: 0
+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|100.0|
|  2|Cliente2|100.0|
|  3|Cliente3|100.0|
|  4|Cliente4|100.0|
+---+--------+-----+

versionAsOf: 1
+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|150.0|
|  2|Cliente2|100.0|
|  3|Cliente3|200.0|
|  4|Cliente4|100.0|
|  5|Cliente5|100.0|
+---+--------+-----+



In [4]:
df_time = spark.read.format("delta").option("timestampAsOf", "2024-01-01 14:10:54").load("arquivos/tmp/delta/teste")
df_time.show()

+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|100.0|
|  2|Cliente2|100.0|
|  3|Cliente3|100.0|
|  4|Cliente4|100.0|
+---+--------+-----+



In [6]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "arquivos/tmp/delta/teste")
deltaTable.history().show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|     12|2024-01-01 17:17:...|  NULL|    NULL|    WRITE|{mode -> Overwrit...|NULL|    NULL|     NULL|         11|  Serializable|        false|{numFiles -> 1, n...|        NULL|Apache-Spark/3.5....|
|     11|2024-01-01 15:46:...|  NULL|    NULL|    MERGE|{predicate -> (CA...|NULL|    NULL|     NULL|         10|  Serializable|        false|{numTargetRowsCop...|        NULL|Apache-Spark/3.3....|
|     10|2

In [7]:
from delta.tables import *

# Carregar a tabela Delta
deltaTable = DeltaTable.forPath(spark, "arquivos/tmp/delta/teste")

# Definir o período de retenção para 6 meses (em horas)
RETENTION_HOURS = 6 * 30 * 24  # 6 meses em horas

# Executar o comando VACUUM para remover versões antigas dos dados
# ATENÇÃO: O VACUUM é uma operação destrutiva e não pode ser desfeita.
deltaTable.vacuum(RETENTION_HOURS)


DataFrame[]

In [9]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import expr

# Inicialize sua SparkSession aqui
spark = SparkSession.builder.appName("DeltaLakeExample").getOrCreate()

# Operações com a tabela Delta
deltaTable = DeltaTable.forPath(spark, "arquivos/tmp/delta/teste")

# Update
deltaTable.update(
    condition=expr("id == 1"),
    set={"valor": expr("150.0")}
)

# Leitura para exibição dos resultados
delta_df = spark.read.format("delta").load("arquivos/tmp/delta/teste")
delta_df.show()

# Dados para merge
updatesDF = spark.createDataFrame([(3, "Cliente3", 200.0), (5, "Cliente5", 100.0)], ["id", "nome", "valor"])

# Merge
deltaTable.alias("tabela").merge(
    updatesDF.alias("updates"),
    "tabela.id = updates.id"
).whenMatchedUpdate(set={"valor": "updates.valor"}) \
 .whenNotMatchedInsert(values={"id": "updates.id", "nome": "updates.nome", "valor": "updates.valor"}) \
 .execute()

# Leitura para exibição dos resultados
delta_df = spark.read.format("delta").load("arquivos/tmp/delta/teste")
delta_df.show()

# Delete
deltaTable.delete(condition=expr("valor < 150.0"))

# Leitura final para exibição dos resultados
delta_df = spark.read.format("delta").load("arquivos/tmp/delta/teste")
delta_df.show()

+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|150.0|
|  2|Cliente2|100.0|
|  3|Cliente3|100.0|
|  4|Cliente4|100.0|
+---+--------+-----+

+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|150.0|
|  2|Cliente2|100.0|
|  3|Cliente3|200.0|
|  4|Cliente4|100.0|
|  5|Cliente5|100.0|
+---+--------+-----+

+---+--------+-----+
| id|    nome|valor|
+---+--------+-----+
|  1|Cliente1|150.0|
|  3|Cliente3|200.0|
+---+--------+-----+

