# Iniciar sessão Spark com suporte a Hive e Delta

In [1]:
import findspark
from pyspark.sql import SparkSession
findspark.init()

spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
print("Spark versão:", spark.version)
spark.stop()

Spark versão: 4.0.0


In [3]:
print("scala:", spark.sparkContext._jvm.scala.util.Properties.versionNumberString())

scala: 2.13.16


In [4]:
!python3 --version

Python 3.10.12


In [5]:
!pip show delta-spark

Name: delta-spark
Version: 4.0.0
Summary: Python APIs for using Delta Lake with Apache Spark
Home-page: https://github.com/delta-io/delta/
Author: The Delta Lake Project Authors
Author-email: delta-users@googlegroups.com
License: Apache-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: importlib-metadata, pyspark
Required-by: 


In [None]:
# 1.1. Importar tudo o que precisamos
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# 1.2. Construir o SparkSession com Hive e Delta ativados
spark = (
    SparkSession.builder
    .appName("TesteSparkHiveDelta")
    .config("spark.sql.catalogImplementation", "hive")
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive:9083")
    # .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    # .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # .config("spark.hadoop.hive.server2.thrift.max.message.size", "104857600")
    # .config("spark.jars.packages", "io.delta:delta-core_2.13:4.0.0rc1")
    .enableHiveSupport()
    .getOrCreate()
)

# 1.3. Verificar versão do Spark e se o Hive está disponível
print("Spark versão:", spark.version)
print("Suporte a Hive ativado?:", spark.conf.get("spark.sql.catalogImplementation"))  # deve imprimir "hive"

Spark versão: 4.0.0
Suporte a Hive ativado?: hive


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 40172)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/lib/python3.10/dist-packages/pyspark/accumulators.py", line 299, in handle
    poll(accum_updates)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/accumulators.py", line 271, in poll
    if self.rfile in r and func():
  File "/usr/local/lib/python3.10/dist-packages/pyspark/accumulators.py", line 275, in accum_updates
    num_updates = read_int(

# 2.Teste básico de leitura/escrita de DataFrame local


In [7]:
# 2.1. Criar um DataFrame simples
data = [
    {"id": 1, "nome": "Alice", "idade": 30},
    {"id": 2, "nome": "Bob",   "idade": 25},
    {"id": 3, "nome": "Carol", "idade": 40}
]
df = spark.createDataFrame(data)

# 2.2. Mostrar esquema e conteúdo
df.printSchema()
df.show()

root
 |-- id: long (nullable = true)
 |-- idade: long (nullable = true)
 |-- nome: string (nullable = true)



                                                                                

+---+-----+-----+
| id|idade| nome|
+---+-----+-----+
|  1|   30|Alice|
|  2|   25|  Bob|
|  3|   40|Carol|
+---+-----+-----+



# 3. Conectar ao Hive e criar banco/tabela

In [8]:
spark.sparkContext._jsc.hadoopConfiguration().get("hive.metastore.uris")

'thrift://hive:9083'

In [9]:
spark.conf.get("spark.sql.catalogImplementation")

'hive'

In [10]:
spark.conf.get("spark.sql.warehouse.dir")

'hdfs://namenode:9000/user/hive/warehouse'

## 3.1. Cria um database no Hive (caso não exista)

In [11]:
spark.sql("SHOW DATABASES").show()

25/06/14 23:51:21 WARN HiveConf: HiveConf of name hive.server2.thrift.jvm.args does not exist
25/06/14 23:51:21 WARN HiveConf: HiveConf of name hive.server2.thrift.java.port does not exist
Hive Session ID = 1df1b4a2-3bf6-4679-8e5f-8fd57ff3fcfe


+---------+
|namespace|
+---------+
|  default|
+---------+



In [12]:
# propriedade do Hive Metastore (java api)
warehouse_dir = spark.conf.get("spark.sql.warehouse.dir")
hadoop_conf = spark._jsc.hadoopConfiguration()
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)
path = spark._jvm.org.apache.hadoop.fs.Path(warehouse_dir)

if fs.exists(path):
    status = fs.listStatus(path)
    for file_status in status:
        print(file_status.getPath())
else:
    print(f"{warehouse_dir}")

In [13]:
# hdfs root
root_path = spark._jvm.org.apache.hadoop.fs.Path("/")
status = fs.listStatus(root_path)
for file_status in status:
    print(file_status.getPath())

hdfs://namenode:9000/tmp
hdfs://namenode:9000/user


#### importante: hdfs =! metastore
aqui estamos tentando validar a conexão com o namenode

In [14]:
!HADOOP_USER_NAME=sparkuser hdfs dfs -ls hdfs://namenode:9000/

Found 2 items
drwx-wx-wx   - hdfs supergroup          0 2025-06-14 23:50 hdfs://namenode:9000/tmp
drwxr-xr-x   - hdfs supergroup          0 2025-06-14 23:50 hdfs://namenode:9000/user


In [15]:
!hdfs dfs -ls /

Found 2 items
drwx-wx-wx   - hdfs supergroup          0 2025-06-14 23:50 /tmp
drwxr-xr-x   - hdfs supergroup          0 2025-06-14 23:50 /user


In [16]:
spark.sql("CREATE DATABASE IF NOT EXISTS spark_db")

DataFrame[]

In [17]:
spark.conf.get("spark.hadoop.hive.metastore.uris")

'thrift://hive:9083'

In [18]:
spark.conf.get("spark.sql.warehouse.dir")

'hdfs://namenode:9000/user/hive/warehouse'

In [19]:
spark.conf.get("spark.sql.catalogImplementation")

'hive'

In [20]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
| spark_db|
+---------+



## 3.2. Cria uma tabela Hive que grava em formato Parquet

In [21]:
spark.conf.get("spark.hadoop.hive.metastore.uris")
spark.conf.get("spark.sql.catalogImplementation")
spark.sql("SHOW DATABASES").show()


+---------+
|namespace|
+---------+
|  default|
| spark_db|
+---------+



In [22]:
spark.sql("USE spark_db")

print("Current database:", spark.catalog.currentDatabase())

Current database: spark_db


In [23]:
import os
print(os.environ.get("HADOOP_USER_NAME"))

hdfs


In [24]:
# !hdfs dfs -mkdir -p /user/hive/warehouse
# !hdfs dfs -chown -R hive:hive /user/hive
# !hdfs dfs -chmod -R 775 /user/hive

!hdfs dfs -ls /user/hive/warehouse

Found 1 items
drwxr-xr-x   - hdfs supergroup          0 2025-06-14 23:51 /user/hive/warehouse/spark_db.db


In [25]:
## TESTE
# spark.sql("drop table if exists spark_db.tabela_hive_parquet PURGE")

## obs, ignore o primeiro warn, ele cria o diretorio e tras o aviso. 

In [26]:
spark.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS spark_db.tabela_hive_parquet (
    id INT,
    nome STRING,
    idade INT
)
STORED AS PARQUET
LOCATION 'hdfs://namenode:9000/user/hive/warehouse/spark_db.db/tabela_hive_parquet'
""")

DataFrame[]

In [27]:
!hdfs dfs -ls /user/hive/warehouse

Found 1 items
drwxr-xr-x   - hdfs supergroup          0 2025-06-14 23:51 /user/hive/warehouse/spark_db.db


In [28]:
spark.sql("SHOW TABLES IN spark_db").show()

+---------+-------------------+-----------+
|namespace|          tableName|isTemporary|
+---------+-------------------+-----------+
| spark_db|tabela_hive_parquet|      false|
+---------+-------------------+-----------+



## 3.3. Insere alguns dados na tabela Hive

In [29]:
spark.sql("DESCRIBE FORMATTED spark_db.tabela_hive_parquet").show(truncate=False)


+----------------------------+------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                               |comment|
+----------------------------+------------------------------------------------------------------------+-------+
|id                          |int                                                                     |NULL   |
|nome                        |string                                                                  |NULL   |
|idade                       |int                                                                     |NULL   |
|                            |                                                                        |       |
|# Detailed Table Information|                                                                        |       |
|Catalog                     |spark_catalog                                                           | 

### troubleshotting 
1. block de insert -> desativar o safe mode do hdfs

## 3.4. Verifica se os dados foram gravados

# 4. Operações “HDFS” (via API Hadoop) — criar diretórios e mover arquivos

In [30]:
# 4.1. Importar classes do Hadoop FileSystem via o gateway Java (JVM)
hadoop_conf = spark._jsc.hadoopConfiguration()
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)


In [31]:
# 4.2. Definir caminhos para diretório e arquivos de teste no HDFS local

path_dir = spark._jvm.org.apache.hadoop.fs.Path("/tmp/hdfs_exemplo_dir")
path_arquivo_origem = spark._jvm.org.apache.hadoop.fs.Path("/tmp/hdfs_exemplo_dir/original.txt")
path_arquivo_destino = spark._jvm.org.apache.hadoop.fs.Path("/tmp/hdfs_exemplo_dir/movido.txt")

In [32]:
# 4.3. Criar o diretório (se não existir)
if not fs.exists(path_dir):
    fs.mkdirs(path_dir)
    print("Diretório criado:", path_dir)

Diretório criado: /tmp/hdfs_exemplo_dir


In [33]:
# 4.4. Criar um arquivo “dummy” no diretório para testar a movimentação
#      Para “criar” um arquivo, usamos o FileSystem API para abrir um OutputStream
os_stream = fs.create(path_arquivo_origem, True)
os_stream.write(bytes("Conteúdo de teste HDFS\n", "utf-8"))
os_stream.close()
print("Arquivo de origem criado:", path_arquivo_origem)

Arquivo de origem criado: /tmp/hdfs_exemplo_dir/original.txt


In [34]:
# 4.5. Verificar se o arquivo existe
print("Existe original.txt?", fs.exists(path_arquivo_origem))

Existe original.txt? True


In [35]:
# 4.6. Mover (renomear) o arquivo de original.txt → movido.txt
fs.rename(path_arquivo_origem, path_arquivo_destino)
print("Arquivo movido para:", path_arquivo_destino)

Arquivo movido para: /tmp/hdfs_exemplo_dir/movido.txt


In [36]:

# 4.7. Conferir novamente
print("Existe movido.txt?", fs.exists(path_arquivo_destino))
print("Existe original.txt (depois do rename)?", fs.exists(path_arquivo_destino))


Existe movido.txt? True
Existe original.txt (depois do rename)? True


# 5. Criar tabelas Hive apontando para esse “HDFS” (diretório local)


In [37]:
# 5.1. Define um diretório “externo” para a tabela Hive
dir_externo = "hdfs://namenode:9000/hdfs_exemplo_dir/tabela_externa_hive"

# 5.2. Caso não exista, cria o diretório (já criamos antes, mas só para garantir)
path_dir_externo = spark._jvm.org.apache.hadoop.fs.Path(dir_externo)
if not fs.exists(path_dir_externo):
    fs.mkdirs(path_dir_externo)
    print("Diretório para tabela externa criado:", dir_externo)

Diretório para tabela externa criado: hdfs://namenode:9000/hdfs_exemplo_dir/tabela_externa_hive


In [38]:

# 5.3. Criar tabela externa no Hive que aponta para dir_externo
spark.sql(f"""
CREATE TABLE IF NOT EXISTS tabela_hive_externa (
    id INT,
    descricao STRING
)
STORED AS PARQUET
LOCATION '{dir_externo}'
""")

DataFrame[]

In [39]:
# 5.4. Inserir alguns dados nessa tabela externa
spark.sql("INSERT INTO tabela_hive_externa VALUES (100, 'Registro Externo A'), (101, 'Registro Externo B')")

# 5.5. Verificar conteúdo via SELECT
spark.sql("SELECT * FROM tabela_hive_externa").show()

                                                                                

+---+------------------+
| id|         descricao|
+---+------------------+
|100|Registro Externo A|
|101|Registro Externo B|
+---+------------------+



# 6. Ler/Escrever diretamente arquivos CSV/Parquet via Spark (opcional)

In [40]:
# 6.1. Cria um DataFrame e grava como CSV
df_csv = spark.createDataFrame([
    (1,   "Produto A",  9.99),
    (2,   "Produto B", 19.99),
    (3,   "Produto C", 29.99)
], ["produto_id", "produto_nome", "preco"])

caminho_csv = "hdfs://namenode:9000/hdfs_exemplo_dir/produtos.csv"
df_csv.write.mode("overwrite").csv(caminho_csv, header=True)

print("CSV gravado em:", caminho_csv)

CSV gravado em: hdfs://namenode:9000/hdfs_exemplo_dir/produtos.csv


In [41]:

# 6.2. Lê o CSV que acabamos de gravar
df_csv_lido = spark.read.option("header", True).csv(caminho_csv)
df_csv_lido.printSchema()
df_csv_lido.show()

root
 |-- produto_id: string (nullable = true)
 |-- produto_nome: string (nullable = true)
 |-- preco: string (nullable = true)

+----------+------------+-----+
|produto_id|produto_nome|preco|
+----------+------------+-----+
|         2|   Produto B|19.99|
|         3|   Produto C|29.99|
|         1|   Produto A| 9.99|
+----------+------------+-----+



In [42]:

# 6.3. Converte para Parquet e grava em diretório separado
caminho_parquet = "hdfs://namenode:9000/hdfs_exemplo_dir/produtos_parquet"
df_csv_lido.write.mode("overwrite").parquet(caminho_parquet)

print("Parquet gravado em:", caminho_parquet)

# 6.4. Lê o Parquet de volta
df_parquet_lido = spark.read.parquet(caminho_parquet)
df_parquet_lido.printSchema()
df_parquet_lido.show()

Parquet gravado em: hdfs://namenode:9000/hdfs_exemplo_dir/produtos_parquet
root
 |-- produto_id: string (nullable = true)
 |-- produto_nome: string (nullable = true)
 |-- preco: string (nullable = true)

+----------+------------+-----+
|produto_id|produto_nome|preco|
+----------+------------+-----+
|         2|   Produto B|19.99|
|         3|   Produto C|29.99|
|         1|   Produto A| 9.99|
+----------+------------+-----+



In [43]:
# persist? 
!hdfs dfs -ls /

Found 3 items
drwxr-xr-x   - hdfs supergroup          0 2025-06-14 23:51 /hdfs_exemplo_dir
drwx-wx-wx   - hdfs supergroup          0 2025-06-14 23:51 /tmp
drwxr-xr-x   - hdfs supergroup          0 2025-06-14 23:50 /user


In [44]:
# persist? 
!hdfs dfs -ls /user/hive/warehouse/spark_db.db/tabela_hive_parquet

In [45]:
# persist? 
!hdfs dfs -ls /user/hive/warehouse/

Found 1 items
drwxr-xr-x   - hdfs supergroup          0 2025-06-14 23:51 /user/hive/warehouse/spark_db.db


# 7. Testar leitura/escrita de Delta Lake

In [46]:
spark.stop()

In [47]:
from pyspark.sql import SparkSession
from delta import *

builder = (
    SparkSession.builder
    .appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.catalogImplementation", "hive")
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive:9083")
)
# Não uso configure_spark_with_delta_pip(), pois ele adiciona --packages

spark = builder.getOrCreate()


In [48]:
# Acessa o classpath da JVM via SparkContext
classpath = spark.sparkContext._jvm.System.getProperty("java.class.path")

# Divide os caminhos (separados por ':' no Linux/macOS, ou ';' no Windows)
import os
sep = ";" if os.name == "nt" else ":"
jars = [path for path in classpath.split(sep) if path.strip().endswith(".jar")]

# Filtra JARs que contenham "delta" no nome (case-insensitive)
delta_jars = [jar for jar in jars if "delta" in jar.lower()]
antlr_jars = [jar for jar in jars if "antlr" in jar.lower()]

# Exibe os JARs encontrados
print(f"{len(delta_jars)} JARs com 'delta' encontrados no classpath:\n")
for jar in delta_jars:
    print("-", jar)
print(f"\n{len(antlr_jars)} JARs com 'antlr' encontrados no classpath:\n")
for jar in antlr_jars:
    print("-", jar)

3 JARs com 'delta' encontrados no classpath:

- /opt/spark/jars/delta-spark_2.13-4.0.0.jar
- /opt/spark/jars/delta-storage-4.0.0.jar
- /opt/spark/jars/delta-core_2.13-2.4.0.jar

2 JARs com 'antlr' encontrados no classpath:

- /opt/spark/jars/antlr4-runtime-4.13.1.jar
- /opt/spark/jars/antlr-runtime-3.5.2.jar


In [49]:
!pip show pyspark delta-spark

Name: pyspark
Version: 4.0.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: py4j
Required-by: delta-spark
---
Name: delta-spark
Version: 4.0.0
Summary: Python APIs for using Delta Lake with Apache Spark
Home-page: https://github.com/delta-io/delta/
Author: The Delta Lake Project Authors
Author-email: delta-users@googlegroups.com
License: Apache-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: importlib-metadata, pyspark
Required-by: 


In [50]:

# 7.2. Definir diretório para tabela Delta
delta_path = "hdfs://namenode:9000/tmp/delta_exemplo"
# Se já existir, apagamos para recomeçar do zero
fs_local = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
path_delta_dir = spark._jvm.org.apache.hadoop.fs.Path(delta_path)
if fs_local.exists(path_delta_dir):
    fs_local.delete(path_delta_dir, True)

## builder helper (usado em testes para achar os jars)

In [51]:
# import pyspark
# from delta import *

# builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [52]:
# 7.1. Criar um DataFrame inicial
data_delta = [
    {"id": 1, "categoria": "A", "valor": 100},
    {"id": 2, "categoria": "B", "valor": 200},
    {"id": 3, "categoria": "C", "valor": 300}
]
df_delta = spark.createDataFrame(data_delta)
df_delta.show(10)

+---------+---+-----+
|categoria| id|valor|
+---------+---+-----+
|        A|  1|  100|
|        B|  2|  200|
|        C|  3|  300|
+---------+---+-----+



In [53]:
# Definir caminho para salvar em Delta (pode sobrescrever delta_path se desejar)
delta_path = "hdfs://namenode:9000/tmp/delta_sample"

# Se já existir, apagar para evitar erro
hadoop_conf = spark._jsc.hadoopConfiguration()
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)
path_sample_delta = spark._jvm.org.apache.hadoop.fs.Path(delta_path)
if fs.exists(path_sample_delta):
    fs.delete(path_sample_delta, True)

# Gravar o DataFrame em formato Delta
df_delta.write.format("delta").mode("overwrite").save(delta_path)
print(f"DataFrame salvo em Delta no caminho: {delta_path}")

                                                                                

DataFrame salvo em Delta no caminho: hdfs://namenode:9000/tmp/delta_sample


In [54]:

# 7.4. Ler o Delta que acabamos de gravar
df_lido = spark.read.format("delta").load(delta_path)
print("Conteúdo inicial do Delta:")
df_lido.show()


Conteúdo inicial do Delta:


25/06/14 23:51:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+---------+---+-----+
|categoria| id|valor|
+---------+---+-----+
|        A|  1|  100|
|        B|  2|  200|
|        C|  3|  300|
+---------+---+-----+



In [55]:

# 7.5. Exibe o histórico de versões (Timeline) do Delta
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, delta_path)
print("Histórico de versões (history):")
delta_table.history().show()


Histórico de versões (history):
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2025-06-14 23:51:...|  NULL|    NULL|    WRITE|{mode -> Overwrit...|NULL|    NULL|     NULL|       NULL|  Serializable|        false|{numFiles -> 4, n...|        NULL|Apache-Spark/4.0....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+

In [56]:
# 7.6. Simular um “upsert” (merge) — exemplo: atualiza valor onde id=2, insere novo registro id=4
from pyspark.sql.functions import col

# Cria um DataFrame com alterações/novos registros
data_upsert = [
    {"id": 2, "categoria": "B", "valor": 250},  # atualizar
    {"id": 4, "categoria": "D", "valor": 400}   # novo
]
df_upsert = spark.createDataFrame(data_upsert)

df_upsert.show()

+---------+---+-----+
|categoria| id|valor|
+---------+---+-----+
|        B|  2|  250|
|        D|  4|  400|
+---------+---+-----+



In [57]:

# Executa o MERGE: se id bate, atualiza; caso contrário, insere
delta_table.alias("tgt").merge(
    source = df_upsert.alias("src"),
    condition = "tgt.id = src.id"
).whenMatchedUpdate(set = {"valor": "src.valor"}) \
 .whenNotMatchedInsert(values = {
     "id": "src.id",
     "categoria": "src.categoria",
     "valor": "src.valor"
 }).execute()

# 7.7. Mostrar o conteúdo final após o merge
print("Conteúdo do Delta após MERGE:")
spark.read.format("delta").load(delta_path).show()

# 7.8. Mostrar histórico atualizado (nova versão)
print("Histórico de versões (history) após MERGE:")
delta_table.history().show()

25/06/14 23:51:43 WARN MapPartitionsRDD: RDD 58 was locally checkpointed, its lineage has been truncated and cannot be recomputed after unpersisting


Conteúdo do Delta após MERGE:
+---------+---+-----+
|categoria| id|valor|
+---------+---+-----+
|        B|  2|  250|
|        D|  4|  400|
|        A|  1|  100|
|        C|  3|  300|
+---------+---+-----+

Histórico de versões (history) após MERGE:
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      1|2025-06-14 23:51:...|  NULL|    NULL|    MERGE|{predicate -> ["(...|NULL|    NULL|     NULL|          0|  Serializable|        false|{numTargetRows

# 8. (spark-sql) Criar tabela Hive com dados Delta

In [58]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
| spark_db|
+---------+



25/06/14 23:51:44 WARN HiveConf: HiveConf of name hive.server2.thrift.jvm.args does not exist
25/06/14 23:51:44 WARN HiveConf: HiveConf of name hive.server2.thrift.java.port does not exist


In [59]:
spark.sql("CREATE DATABASE IF NOT EXISTS teste_db")

DataFrame[]

In [60]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
| spark_db|
| teste_db|
+---------+



In [61]:
spark.sql("USE teste_db")  # volta ao banco de dados de teste

spark.sql("DROP TABLE IF EXISTS tabela_hive_delta")

# Cria a tabela Delta no Hive apontando para o delta_path, sem schema explícito
spark.sql(f"""
CREATE TABLE IF NOT EXISTS tabela_hive_delta
USING DELTA
LOCATION '{delta_path}'
""")


25/06/14 23:51:45 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`teste_db`.`tabela_hive_delta` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


DataFrame[]

In [62]:
# 8.2. Ler a tabela Delta via API e registrar como view temporária
df_delta_sql = spark.read.format("delta").load(delta_path)
df_delta_sql.createOrReplaceTempView("tabela_hive_delta_view")

# Agora pode consultar via SQL sem erro
spark.sql("SELECT * FROM tabela_hive_delta_view").show()

# 8.3. Mostrar histórico a partir do SQL (opcional)
# Note: não há “SHOW HISTORY” em SQL padrão; mas podemos
# consultar o DeltaTable via Python, conforme feito acima.

+---------+---+-----+
|categoria| id|valor|
+---------+---+-----+
|        B|  2|  250|
|        D|  4|  400|
|        A|  1|  100|
|        C|  3|  300|
+---------+---+-----+



In [63]:
# Cria uma view temporária com o histórico
delta_table.history().createOrReplaceTempView("delta_history")

# Agora pode consultar via SQL
spark.sql("SELECT * FROM delta_history").show(truncate=False)

+-------+-----------------------+------+--------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------------

# 9 teste sas7 package

In [64]:
spark.stop()

## venv 3.10 + scala 2.12 (mude o kernel manualmente)
obs: https://github.com/saurfang/spark-sas7bdat

In [65]:
import pyspark
print(f"Spark version: {pyspark.__version__}")

Spark version: 4.0.0


In [66]:
# 1.1. Importar tudo o que precisamos
from pyspark.sql import SparkSession

# 1.2. Construir o SparkSession com Hive
spark = (
    SparkSession.builder
    .appName("sas lab")
    .config("spark.sql.catalogImplementation", "hive")
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive:9083")
    .config("spark.driver.extraClassPath", "/opt/spark/jars/scala-library-2.12.2.jar:/opt/spark/jars/spark-sas7bdat-3.0.0-s_2.12.jar:/opt/spark/jars/parso-2.0.14.jar") \
    .config("spark.executor.extraClassPath", "/opt/spark/jars/scala-library-2.12.2.jar:/opt/spark/jars/spark-sas7bdat-3.0.0-s_2.12.jar:/opt/spark/jars/parso-2.0.14.jar") \
    # .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    # .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # .config("spark.hadoop.hive.server2.thrift.max.message.size", "104857600")
    # .config("spark.jars.packages", "io.delta:delta-core_2.13:4.0.0rc1")
    .enableHiveSupport()
    .getOrCreate()
)

# 1.3. Verificar versão do Spark e se o Hive está disponível
print("Spark versão:", spark.version)
print("Suporte a Hive ativado?:", spark.conf.get("spark.sql.catalogImplementation"))  # deve imprimir "hive"

Spark versão: 4.0.0
Suporte a Hive ativado?: hive


In [67]:
# Acessa o classpath da JVM via SparkContext
classpath = spark.sparkContext._jvm.System.getProperty("java.class.path")

# Divide os caminhos (separados por ':' no Linux/macOS, ou ';' no Windows)
import os
sep = ";" if os.name == "nt" else ":"
jars = [path for path in classpath.split(sep) if path.strip().endswith(".jar")]

# Filtra JARs que contenham "delta" no nome (case-insensitive)
delta_jars = [jar for jar in jars if "sas" in jar.lower()]
antlr_jars = [jar for jar in jars if "parso" in jar.lower()]
scala_legacy = [jar for jar in jars if "scala-library" in jar.lower()]

# Exibe os JARs encontrados
print(f"{len(delta_jars)} JARs com 'sas' encontrados no classpath:\n")
for jar in delta_jars:
    print("-", jar)
print(f"\n{len(antlr_jars)} JARs com 'parso' encontrados no classpath:\n")
for jar in antlr_jars:
    print("-", jar)
print(f"\n{len(scala_legacy)} JARs com 'scala-library' encontrados no classpath:\n")
for jar in scala_legacy:
    print("-", jar)

1 JARs com 'sas' encontrados no classpath:

- /opt/spark/jars/spark-sas7bdat-3.0.0-s_2.12.jar

1 JARs com 'parso' encontrados no classpath:

- /opt/spark/jars/parso-2.0.14.jar

1 JARs com 'scala-library' encontrados no classpath:

- /opt/spark/jars/scala-library-2.13.16.jar


In [68]:
# --- 10.2. Definir caminho do arquivo SAS e verificar existência ---
sas_path = "/home/sparkuser/app/src/datetime.sas7bdat"

if not os.path.exists(sas_path):
    raise FileNotFoundError(f"O arquivo SAS não foi encontrado em: {sas_path}")
else:
    print(f"Arquivo SAS encontrado em: {sas_path}")


Arquivo SAS encontrado em: /home/sparkuser/app/src/datetime.sas7bdat


In [69]:
# Copia o arquivo local SAS para o HDFS para leitura posterior
hadoop_conf = spark._jsc.hadoopConfiguration()
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)
hdfs_sas_path = "hdfs://namenode:9000/tmp/datetime.sas7bdat"
path_hdfs_sas = spark._jvm.org.apache.hadoop.fs.Path(hdfs_sas_path)

# Copia apenas se ainda não existir no HDFS
if not fs.exists(path_hdfs_sas):
    fs.copyFromLocalFile(False, True, spark._jvm.org.apache.hadoop.fs.Path(sas_path), path_hdfs_sas)
    print(f"Arquivo copiado para o HDFS em: {hdfs_sas_path}")
else:
    print(f"O arquivo já existe no HDFS em: {hdfs_sas_path}")

Arquivo copiado para o HDFS em: hdfs://namenode:9000/tmp/datetime.sas7bdat


In [70]:
!hdfs dfs -ls hdfs://namenode:9000/tmp/datetime.sas7bdat

-rw-r--r--   1 hdfs supergroup       5120 2025-06-14 23:51 hdfs://namenode:9000/tmp/datetime.sas7bdat


In [71]:
sas_path ="hdfs://namenode:9000/tmp/datetime.sas7bdat"

In [72]:
!pip show pyspark

Name: pyspark
Version: 4.0.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: py4j
Required-by: delta-spark


In [73]:
!pip show sas7bdat

Name: sas7bdat
Version: 2.2.3
Summary: A sas7bdat file reader for Python
Home-page: https://bitbucket.org/jaredhobbs/sas7bdat
Author: Jared Hobbs
Author-email: jared@pyhacker.com
License: MIT
Location: /usr/local/lib/python3.10/dist-packages
Requires: six
Required-by: 


In [74]:
from sas7bdat import SAS7BDAT
import pandas as pd

with SAS7BDAT('/home/sparkuser/app/src/datetime.sas7bdat') as reader:
    df_sas = reader.to_data_frame()

print("=== Schema inferido do SAS ===")
print(df_sas.dtypes)


=== Schema inferido do SAS ===
VAR1    datetime64[ns]
VAR2            object
VAR3            object
VAR4           float64
VAR5            object
dtype: object


In [75]:
print(df_sas)

                 VAR1        VAR2        VAR3     VAR4      VAR5
0 2015-02-02 14:42:12  2015-02-02  2015-02-02  20121.0  14:42:12
1 2014-01-01 10:14:23  2014-01-01  2014-01-01  19724.0  10:14:23
2 2015-01-15 06:15:22  2015-06-15  2015-06-15  20254.0  06:15:22
3 1948-09-09 21:32:00  1948-09-16  1948-09-16  -4124.0  21:32:00


In [76]:
# Converte colunas datetime.time para string antes de criar o Spark DataFrame
import datetime

for col in df_sas.columns:
	if df_sas[col].apply(lambda x: isinstance(x, datetime.time)).any():
		df_sas[col] = df_sas[col].astype(str)

df_sas_spark = spark.createDataFrame(df_sas)
df_sas_spark.printSchema()
df_sas_spark.show()

root
 |-- VAR1: timestamp (nullable = true)
 |-- VAR2: date (nullable = true)
 |-- VAR3: date (nullable = true)
 |-- VAR4: double (nullable = true)
 |-- VAR5: string (nullable = true)

+-------------------+----------+----------+-------+--------+
|               VAR1|      VAR2|      VAR3|   VAR4|    VAR5|
+-------------------+----------+----------+-------+--------+
|2015-02-02 14:42:12|2015-02-02|2015-02-02|20121.0|14:42:12|
|2014-01-01 10:14:23|2014-01-01|2014-01-01|19724.0|10:14:23|
|2015-01-15 06:15:22|2015-06-15|2015-06-15|20254.0|06:15:22|
|1948-09-09 21:32:00|1948-09-16|1948-09-16|-4124.0|21:32:00|
+-------------------+----------+----------+-------+--------+



In [77]:
# --- 10.4. Criar/usar um database no Hive e gravar os dados SAS em uma tabela Hive (Parquet) ---
spark.sql("CREATE DATABASE IF NOT EXISTS teste_db")
spark.sql("USE teste_db")

25/06/14 23:51:49 WARN HiveConf: HiveConf of name hive.server2.thrift.jvm.args does not exist
25/06/14 23:51:49 WARN HiveConf: HiveConf of name hive.server2.thrift.java.port does not exist


DataFrame[]

In [78]:

# 10.4.1. (Opcional) Verificar se já existe alguma tabela chamada ‘tabela_sas_hive’ e apagar caso exista
spark.sql("DROP TABLE IF EXISTS tabela_sas_hive")

DataFrame[]

In [79]:

# 10.4.2. Gravar o DataFrame SAS em uma tabela Hive gerenciada chamada ‘tabela_sas_hive’ (formato Parquet padrão)
df_sas_spark.write \
      .mode("overwrite") \
      .format("parquet") \
      .saveAsTable("tabela_sas_hive")

# --- 10.5. Consultar o conteúdo da tabela Hive para confirmar gravação ---
print("=== Conteúdo da tabela Hive ‘teste_db.tabela_sas_hive’ ===")

=== Conteúdo da tabela Hive ‘teste_db.tabela_sas_hive’ ===


                                                                                

In [80]:

# --- 10.5. Consultar o conteúdo da tabela Hive para confirmar gravação ---
print("=== Conteúdo da tabela Hive ‘teste_db.tabela_sas_hive’ ===")
spark.sql("SELECT * FROM teste_db.tabela_sas_hive").show(truncate=False)

=== Conteúdo da tabela Hive ‘teste_db.tabela_sas_hive’ ===
+-------------------+----------+----------+-------+--------+
|VAR1               |VAR2      |VAR3      |VAR4   |VAR5    |
+-------------------+----------+----------+-------+--------+
|2015-02-02 14:42:12|2015-02-02|2015-02-02|20121.0|14:42:12|
|2015-01-15 06:15:22|2015-06-15|2015-06-15|20254.0|06:15:22|
|1948-09-09 21:32:00|1948-09-16|1948-09-16|-4124.0|21:32:00|
|2014-01-01 10:14:23|2014-01-01|2014-01-01|19724.0|10:14:23|
+-------------------+----------+----------+-------+--------+



In [81]:

# --- 10.6. Mostrar contagem total de registros para confirmar que todos foram salvos ---
total_registros = spark.sql("SELECT COUNT(*) AS cnt FROM teste_db.tabela_sas_hive").collect()[0]["cnt"]
print(f"Total de registros gravados em ‘tabela_sas_hive’: {total_registros}")

Total de registros gravados em ‘tabela_sas_hive’: 4
