# Explorando as Tabelas Delta

Este notebook permite explorar e manipular as tabelas Delta do projeto de diário de bordo.

## 1. Configuração do SparkSession

Configurando a sessão Spark com suporte ao Delta Lake.

In [1]:
from pyspark.sql import SparkSession
import os

# Configuração do SparkSession com Delta Lake
builder = (
    SparkSession.builder
    .appName("ExplorarTabelas")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
    .config("spark.sql.warehouse.dir", "/app/spark-warehouse")
    .config("spark.sql.catalogImplementation", "hive")
    .config("javax.jdo.option.ConnectionURL", "jdbc:derby:/app/derby/metastore_db;create=true")
    .config("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
    .config("javax.jdo.option.ConnectionUserName", "APP")
    .config("javax.jdo.option.ConnectionPassword", "mine")
    .enableHiveSupport()
)

spark = builder.getOrCreate()
print("SparkSession inicializada com sucesso!")

# Verificar se o Delta Lake está disponível
try:
    spark.sql("CREATE TABLE IF NOT EXISTS test_delta USING delta AS SELECT 1 as id")
    spark.sql("DROP TABLE test_delta")
    print("Delta Lake configurado com sucesso!")
except Exception as e:
    print(f"Erro ao verificar Delta Lake: {e}")



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-15ae6718-4396-4669-9dce-8296795d59df;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.3.0/delta-core_2.12-2.3.0.jar ...
	[SUCCESSFUL ] io.delta#delta-core_2.12;2.3.0!delta-core_2.12.jar (1188ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/2.3.0/delta-storage-2.3.0.jar ...
	[SUCCESSFUL ] io.delta#delta-storage;2.3.0!delta-storage.jar (260ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.8/antlr4-runtime-4.8.jar ...
	[SUCCESSFUL ] org.antlr#antlr4-runtime;4.8!antlr4-runtime.jar (2716ms)
:: resolution report :: resolve 4156ms :: artifacts dl 4174ms
	:: m

25/06/11 13:03:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


SparkSession inicializada com sucesso!
25/06/11 13:03:19 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/06/11 13:03:19 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/06/11 13:04:06 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/06/11 13:04:06 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.18.0.2


                                                                                

25/06/11 13:04:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

25/06/11 13:04:17 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `default`.`test_delta` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
25/06/11 13:04:18 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
25/06/11 13:04:18 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
25/06/11 13:04:18 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/06/11 13:04:18 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/06/11 13:04:19 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Delta Lake configurado com sucesso!


## 2. Carregando a Tabela Bronze

Lendo os dados da tabela bronze diretamente do formato Delta.

In [None]:
# Carregando a tabela Bronze e registrando no catálogo
bronze_df = spark.read.format("delta").load("/app/data/bronze/b_info_transportes")
bronze_df.createOrReplaceTempView("b_info_transportes")
print("Schema da tabela Bronze:")
bronze_df.printSchema()

print("\nAmostra dos dados:")
bronze_df.show(5)

Schema da tabela Bronze:
root
 |-- DATA_INICIO: string (nullable = true)
 |-- DATA_FIM: string (nullable = true)
 |-- CATEGORIA: string (nullable = true)
 |-- LOCAL_INICIO: string (nullable = true)
 |-- LOCAL_FIM: string (nullable = true)
 |-- DISTANCIA: string (nullable = true)
 |-- PROPOSITO: string (nullable = true)


Amostra dos dados:
+----------------+----------------+---------+------------+---------------+---------+-----------------+
|     DATA_INICIO|        DATA_FIM|CATEGORIA|LOCAL_INICIO|      LOCAL_FIM|DISTANCIA|        PROPOSITO|
+----------------+----------------+---------+------------+---------------+---------+-----------------+
|01-01-2016 21:11|01-01-2016 21:17|  Negocio| Fort Pierce|    Fort Pierce|       51|      Alimentação|
|01-02-2016 01:25|01-02-2016 01:37|  Negocio| Fort Pierce|    Fort Pierce|        5|             null|
|01-02-2016 20:25|01-02-2016 20:38|  Negocio| Fort Pierce|    Fort Pierce|       48|         Entregas|
|01-05-2016 17:31|01-05-2016 17:45|  Neg

## 3. Carregando a Tabela Silver

Lendo os dados da tabela silver processada.

In [15]:
# Carregando a tabela Silver e registrando no catálogo
silver_df = spark.read.format("delta").load("/app/data/silver/s_info_transportes")
silver_df.createOrReplaceTempView("s_info_transportes")
print("Schema da tabela Silver:")
silver_df.printSchema()

print("\nAmostra dos dados:")
silver_df.show(5)

Schema da tabela Silver:
root
 |-- DATA_INICIO: string (nullable = true)
 |-- DATA_FIM: string (nullable = true)
 |-- CATEGORIA: string (nullable = true)
 |-- LOCAL_INICIO: string (nullable = true)
 |-- LOCAL_FIM: string (nullable = true)
 |-- DISTANCIA: string (nullable = true)
 |-- PROPOSITO: string (nullable = true)
 |-- DT_REFE: date (nullable = true)


Amostra dos dados:
+----------------+----------------+---------+----------------+----------+---------+-----------------+----------+
|     DATA_INICIO|        DATA_FIM|CATEGORIA|    LOCAL_INICIO| LOCAL_FIM|DISTANCIA|        PROPOSITO|   DT_REFE|
+----------------+----------------+---------+----------------+----------+---------+-----------------+----------+
|04-01-2016 13:43|04-01-2016 14:01|  negocio|       Kissimmee| Kissimmee|       11|          reunião|2016-01-04|
|04-01-2016 14:36|04-01-2016 15:24|  negocio|       Kissimmee|   Orlando|      155|visita ao cliente|2016-01-04|
|04-01-2016 16:01|04-01-2016 16:49|  negocio|         Or

In [None]:
#selecione os valores distintos da coluna categoria da tabela Silver sem usar expressions
distinct_categories = silver_df.select("categoria").distinct().show()

## 4. Carregando a Tabela Gold

Lendo os dados agregados da tabela gold.

In [16]:
# Carregando a tabela Gold e registrando no catálogo
gold_df = spark.read.format("delta").load("/app/data/gold/info_corridas_do_dia")
gold_df.createOrReplaceTempView("info_corridas_do_dia")
print("Schema da tabela Gold:")
gold_df.printSchema()

print("\nAmostra dos dados:")
gold_df.show(5)

Schema da tabela Gold:
root
 |-- DT_REFE: date (nullable = true)
 |-- QT_CORR: long (nullable = true)
 |-- QT_CORR_NEG: long (nullable = true)
 |-- QT_CORR_PESS: long (nullable = true)
 |-- VL_MAX_DIST: string (nullable = true)
 |-- VL_MIN_DIST: string (nullable = true)
 |-- VL_AVG_DIST: double (nullable = true)
 |-- QT_CORR_REUNI: long (nullable = true)
 |-- QT_CORR_NAO_REUNI: long (nullable = true)


Amostra dos dados:
+----------+-------+-----------+------------+-----------+-----------+-----------+-------------+-----------------+
|   DT_REFE|QT_CORR|QT_CORR_NEG|QT_CORR_PESS|VL_MAX_DIST|VL_MIN_DIST|VL_AVG_DIST|QT_CORR_REUNI|QT_CORR_NAO_REUNI|
+----------+-------+-----------+------------+-----------+-----------+-----------+-------------+-----------------+
|2016-06-09|      1|          0|           0|        691|        691|      691.0|            0|                0|
|2016-05-05|      3|          0|           0|         29|        129|      100.0|            1|                2|
|2016

In [19]:
# Filtrar uma data na tabela gold_df usando PySpark DataFrame API
data_especifica = "2016-06-09"

filtered_gold_df = gold_df.filter(gold_df["QT_CORR_PESS"] > 0)

# Exibir os resultados
filtered_gold_df.show()

+----------+-------+-----------+------------+-----------+-----------+------------------+-------------+-----------------+
|   DT_REFE|QT_CORR|QT_CORR_NEG|QT_CORR_PESS|VL_MAX_DIST|VL_MIN_DIST|       VL_AVG_DIST|QT_CORR_REUNI|QT_CORR_NAO_REUNI|
+----------+-------+-----------+------------+-----------+-----------+------------------+-------------+-----------------+
|2016-02-04|      6|          0|           2|        805|        144|             595.0|            1|                3|
|2016-12-07|      3|          0|           2|         87|        123| 74.66666666666667|            0|                0|
|2016-03-03|      5|          0|           1|         76|        173|              69.2|            1|                3|
|2016-09-02|      6|          0|           4|         61|         15|40.666666666666664|            0|                1|
|2016-12-03|      2|          0|           1|         22|         19|              20.5|            0|                0|
|2016-08-03|      3|          0|

## 5. Manipulação dos Dados

Exemplos de operações que você pode fazer com os dados:

In [17]:
# Exemplo 1: Contagem de registros por tabela
print("Quantidade de registros:")
print(f"Bronze: {bronze_df.count():,}")
print(f"Silver: {silver_df.count():,}")
print(f"Gold: {gold_df.count():,}")

Quantidade de registros:
Bronze: 1,153
Silver: 420
Gold: 114


## Análise das Partições

Vamos explorar as partições da tabela Gold por data de referência:

In [None]:
# Método 1: Usando os.listdir para ver as partições no sistema de arquivos
import os

partitions = os.listdir('/app/data/gold/info_corridas_do_dia')
partitions = [p for p in partitions if p.startswith('DT_REFE=')]
print(f'Total de partições: {len(partitions)}')
print('\nPrimeiras 10 partições:')
for p in sorted(partitions)[:10]:
    print(p)

In [None]:
# Método 2: Usando Spark SQL para analisar as partições
print('\nAnálise das partições via Spark SQL:')
gold_df.select('DT_REFE').distinct().orderBy('DT_REFE').show(10)

# Estatísticas por partição
print('\nEstatísticas por partição:')
gold_df.groupBy('DT_REFE') \
    .agg({'QTD_CORRIDAS': 'sum'}) \
    .orderBy('DT_REFE') \
    .withColumnRenamed('sum(QTD_CORRIDAS)', 'TOTAL_CORRIDAS') \
    .show(10)

In [None]:
# Método 3: Usando SHOW PARTITIONS do Spark SQL
print('\nUsando SHOW PARTITIONS na tabela Gold:')

# Agora podemos usar SHOW PARTITIONS na tabela registrada
spark.sql("SHOW PARTITIONS info_corridas_do_dia").show(20, False)

In [19]:
# Método 4: Usando spark.table() na tabela registrada
print('\nUsando spark.table() diretamente:')

# Agora podemos usar spark.table() porque a tabela está registrada
table_df = spark.table("info_corridas_do_dia")

# Mostrando as partições
print("\nPartições da tabela:")
table_df.select('DT_REFE').distinct().orderBy('DT_REFE').show(20)


Usando spark.table() diretamente:

Partições da tabela:



[Stage 100:===>                                                   (1 + 14) / 15]

+----------+
|   DT_REFE|
+----------+
|2016-01-01|
|2016-01-02|
|2016-01-03|
|2016-01-04|
|2016-01-05|
|2016-01-06|
|2016-01-07|
|2016-01-08|
|2016-01-09|
|2016-01-11|
|2016-01-12|
|2016-02-01|
|2016-02-02|
|2016-02-04|
|2016-02-05|
|2016-02-07|
|2016-02-08|
|2016-02-09|
|2016-02-11|
|2016-02-12|
+----------+
only showing top 20 rows




                                                                                

In [20]:
table_df.display()

AttributeError: 'DataFrame' object has no attribute 'display'

## 6. Consultas Personalizadas

Espaço para suas próprias consultas:

In [10]:
#Checar os máximos e minimos da tabela Gold usando pyspark
print("\nMáximo, mínimo e média de QTD_CORRIDAS na tabela Gold:")
stats_df = spark.sql("""
    SELECT 
        MAX(QTD_CORRIDAS) AS MAX_CORRIDAS,
        MIN(QTD_CORRIDAS) AS MIN_CORRIDAS,
        AVG(QTD_CORRIDAS) AS MEDIA_CORRIDAS
    FROM info_corridas_do_dia
""")
stats_df.show()


Máximo, mínimo e média de QTD_CORRIDAS na tabela Gold:


AnalysisException: Column 'QTD_CORRIDAS' does not exist. Did you mean one of the following? [info_corridas_do_dia.DT_REFE, info_corridas_do_dia.QT_CORR, info_corridas_do_dia.QT_CORR_NEG, info_corridas_do_dia.QT_CORR_PESS, info_corridas_do_dia.QT_CORR_REUNI, info_corridas_do_dia.VL_MIN_DIST, info_corridas_do_dia.VL_AVG_DIST, info_corridas_do_dia.VL_MAX_DIST, info_corridas_do_dia.QT_CORR_NAO_REUNI]; line 3 pos 12;
'Project ['MAX('QTD_CORRIDAS) AS MAX_CORRIDAS#3295, 'MIN('QTD_CORRIDAS) AS MIN_CORRIDAS#3296, 'AVG('QTD_CORRIDAS) AS MEDIA_CORRIDAS#3297]
+- SubqueryAlias info_corridas_do_dia
   +- View (`info_corridas_do_dia`, [DT_REFE#2434,QT_CORR#2435L,QT_CORR_NEG#2436L,QT_CORR_PESS#2437L,VL_MAX_DIST#2438,VL_MIN_DIST#2439,VL_AVG_DIST#2440,QT_CORR_REUNI#2441L,QT_CORR_NAO_REUNI#2442L])
      +- Relation [DT_REFE#2434,QT_CORR#2435L,QT_CORR_NEG#2436L,QT_CORR_PESS#2437L,VL_MAX_DIST#2438,VL_MIN_DIST#2439,VL_AVG_DIST#2440,QT_CORR_REUNI#2441L,QT_CORR_NAO_REUNI#2442L] parquet


In [2]:
# Listar todos os bancos de dados disponíveis
print("Databases disponíveis:")
spark.sql("SHOW DATABASES").show()

Databases disponíveis:
+---------+
|namespace|
+---------+
|  default|
+---------+



In [2]:
# Listar todas as tabelas no banco de dados atual
print("\nTabelas no banco de dados atual:")
spark.sql("SHOW TABLES").show()


Tabelas no banco de dados atual:
+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|  b_info_transportes|      false|
|  default|info_corridas_do_dia|      false|
|  default|  s_info_transportes|      false|
+---------+--------------------+-----------+



In [3]:
# Mostrar informações detalhadas sobre uma tabela específica (exemplo com a tabela gold)
print("\nDetalhes da tabela Gold:")
spark.sql("DESCRIBE EXTENDED info_corridas_do_dia").show(truncate=False)


Detalhes da tabela Gold:
+----------------------------+---------------------------------------------------+-------+
|col_name                    |data_type                                          |comment|
+----------------------------+---------------------------------------------------+-------+
|DT_REFE                     |date                                               |       |
|QT_CORR                     |bigint                                             |       |
|QT_CORR_NEG                 |bigint                                             |       |
|QT_CORR_PESS                |bigint                                             |       |
|VL_MAX_DIST                 |string                                             |       |
|VL_MIN_DIST                 |string                                             |       |
|VL_AVG_DIST                 |double                                             |       |
|QT_CORR_REUNI               |bigint                            

In [4]:
spark.sql("select * from info_corridas_do_dia where QT_CORR_NEG > 0").show(truncate=False)

[Stage 15:>                                                         (0 + 3) / 3]

+----------+-------+-----------+------------+-----------+-----------+------------------+-------------+-----------------+
|DT_REFE   |QT_CORR|QT_CORR_NEG|QT_CORR_PESS|VL_MAX_DIST|VL_MIN_DIST|VL_AVG_DIST       |QT_CORR_REUNI|QT_CORR_NAO_REUNI|
+----------+-------+-----------+------------+-----------+-----------+------------------+-------------+-----------------+
|2016-03-04|1      |1          |0           |1593       |1593       |1593.0            |1            |0                |
|2016-06-10|4      |4          |0           |184        |1126       |406.75            |0            |0                |
|2016-04-10|2      |2          |0           |286        |151        |218.5             |0            |0                |
|2016-07-11|2      |2          |0           |132        |118        |125.0             |0            |2                |
|2016-12-10|1      |1          |0           |184        |184        |184.0             |0            |0                |
|2016-02-04|6      |4          |

                                                                                