# Iniciando Sessão Spark

In [1]:
from pyspark.sql import SparkSession

# Iniciar a sessão Spark com a configuração do Delta Lake
spark = (SparkSession.builder
    .appName("DeltaLake")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")  # Extensão do Delta
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")  # Catálogo do Delta
    .config("spark.jars", "/usr/local/spark/jars/delta-spark_2.12-3.0.0.jar,/usr/local/spark/jars/delta-storage-3.0.0.jar")  # Caminho dos JARs no Docker
    .config("spark.executor.cores", "2")  # Limitar o número de núcleos por executor
    .config("spark.driver.cores", "1")    # Limitar o número de núcleos para o driver
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .config("spark.memory.fraction", "0.3")  # Ajuste para controlar a fração de memória usada pelo Spark
    .config("spark.memory.storageFraction", "0.2")  # Fração da memória usada para armazenar dados persistentes
    .getOrCreate())

# Verificar se a sessão Spark foi criada com sucesso
print(spark.version)

3.5.0


# Importando CSV

In [2]:
df_test = spark.read.csv("/home/jovyan/app/data/raw/temas_ambientais.csv", sep=";", header=True)
df_test.show()

+---+--------------------+-----------+--------------+--------------------+-----------------+--------------------+------------+----------------------------------+---------------------------+---------------------------+---------------------+----------------------+-----------------+-----------------+--------------------+--------------------------------+----------------------+----------------------------+-----------------+---------------+-----------------+---------------------------+----------------------------------------+----------------+-----------------------+
| uf|           municipio|codigo_ibge|area_do_imovel|        registro_car|situacao_cadastro|   condicao_cadastro|area_liquida|area_remanescente_vegetacao_nativa|area_reserva_legal_proposta|area_preservacao_permanente|area_nao_classificada|solicitacao_adesao_pra|         latitude|        longitude|      data_inscricao|data_alteracao_condicao_cadastro|area_rural_consolidada|area_servidao_administrativa|tipo_imovel_rural|modulos_fis

# Salvando Bronze (Distribuída)

In [3]:
df_test = df_test.dropDuplicates()

In [None]:
df_test.write.mode("overwrite").format("delta").save("/home/jovyan/app/data/bronze/bronze_distribuida")

# Limpeza e Salvando Silver (Distribuída)

In [27]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [45]:
df_test = spark.read.format("delta").load("/home/jovyan/app/data/bronze/bronze_distribuida")
df_test.show(10)

+---+-----------------+-----------+--------------+--------------------+-----------------+--------------------+------------+----------------------------------+---------------------------+---------------------------+---------------------+----------------------+-----------------+-----------------+--------------------+--------------------------------+----------------------+----------------------------+-----------------+---------------+-----------------+---------------------------+----------------------------------------+-----------+-----------------------+
| uf|        municipio|codigo_ibge|area_do_imovel|        registro_car|situacao_cadastro|   condicao_cadastro|area_liquida|area_remanescente_vegetacao_nativa|area_reserva_legal_proposta|area_preservacao_permanente|area_nao_classificada|solicitacao_adesao_pra|         latitude|        longitude|      data_inscricao|data_alteracao_condicao_cadastro|area_rural_consolidada|area_servidao_administrativa|tipo_imovel_rural|modulos_fiscais|area_u

In [25]:
df_test.printSchema()

root
 |-- uf: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- codigo_ibge: string (nullable = true)
 |-- area_do_imovel: string (nullable = true)
 |-- registro_car: string (nullable = true)
 |-- situacao_cadastro: string (nullable = true)
 |-- condicao_cadastro: string (nullable = true)
 |-- area_liquida: string (nullable = true)
 |-- area_remanescente_vegetacao_nativa: string (nullable = true)
 |-- area_reserva_legal_proposta: string (nullable = true)
 |-- area_preservacao_permanente: string (nullable = true)
 |-- area_nao_classificada: string (nullable = true)
 |-- solicitacao_adesao_pra: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- data_inscricao: string (nullable = true)
 |-- data_alteracao_condicao_cadastro: string (nullable = true)
 |-- area_rural_consolidada: string (nullable = true)
 |-- area_servidao_administrativa: string (nullable = true)
 |-- tipo_imovel_rural: string (nullable = true)


In [30]:
df_test.columns

['uf',
 'municipio',
 'codigo_ibge',
 'area_do_imovel',
 'registro_car',
 'situacao_cadastro',
 'condicao_cadastro',
 'area_liquida',
 'area_remanescente_vegetacao_nativa',
 'area_reserva_legal_proposta',
 'area_preservacao_permanente',
 'area_nao_classificada',
 'solicitacao_adesao_pra',
 'latitude',
 'longitude',
 'data_inscricao',
 'data_alteracao_condicao_cadastro',
 'area_rural_consolidada',
 'area_servidao_administrativa',
 'tipo_imovel_rural',
 'modulos_fiscais',
 'area_uso_restrito',
 'area_reserva_legal_averbada',
 'area_reserva_legal_aprovada_nao_averbada',
 'area_pousio',
 'data_ultima_retificacao']

In [33]:
int_columns = ["codigo_ibge"]
float_columns = ["area_do_imovel", "area_liquida", "area_remanescente_vegetacao_nativa", 
                 "area_reserva_legal_proposta", "area_preservacao_permanente", "area_nao_classificada",
                 "latitude", "longitude", "area_rural_consolidada", "area_servidao_administrativa",
                 "modulos_fiscais", "area_uso_restrito", "area_reserva_legal_averbada",
                 "area_reserva_legal_aprovada_nao_averbada", "area_pousio"]
bool_columns = ["solicitacao_adesao_pra"]
date_columns = ["data_inscricao", "data_alteracao_condicao_cadastro", "data_ultima_retificacao"]
text_column = [x for x in df_test.columns if x not in int_columns+float_columns+bool_columns+date_columns]

In [46]:
# NULL para None
for i in df_test.columns:
    df_test = df_test.withColumn(i, 
                                 when(col(i) == "NULL", None).
                                 otherwise(col(i)))


# Dados Para Integer
df_test = df_test.withColumn("codigo_ibge", col("codigo_ibge").cast(IntegerType()))

# Dados Para Float
for i in float_columns:
    df_test = df_test.withColumn(i, col(i).cast(FloatType()))

# Dados para Bool
df_test = df_test.withColumn("solicitacao_adesao_pra", 
                             when(col("solicitacao_adesao_pra") == "Sim", True).
                             when(col("solicitacao_adesao_pra") == "Não", False).
                             otherwise(None))

# Dados para Date
for i in date_columns:
    df_test = df_test.withColumn(i, to_date(i, "yyyy-MM-dd"))

df_test.printSchema()

root
 |-- uf: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- codigo_ibge: integer (nullable = true)
 |-- area_do_imovel: float (nullable = true)
 |-- registro_car: string (nullable = true)
 |-- situacao_cadastro: string (nullable = true)
 |-- condicao_cadastro: string (nullable = true)
 |-- area_liquida: float (nullable = true)
 |-- area_remanescente_vegetacao_nativa: float (nullable = true)
 |-- area_reserva_legal_proposta: float (nullable = true)
 |-- area_preservacao_permanente: float (nullable = true)
 |-- area_nao_classificada: float (nullable = true)
 |-- solicitacao_adesao_pra: boolean (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- data_inscricao: date (nullable = true)
 |-- data_alteracao_condicao_cadastro: date (nullable = true)
 |-- area_rural_consolidada: float (nullable = true)
 |-- area_servidao_administrativa: float (nullable = true)
 |-- tipo_imovel_rural: string (nullable = true)
 |-- modulos

In [47]:
df_test.show(10)

Py4JJavaError: An error occurred while calling o2040.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 101.0 failed 1 times, most recent failure: Lost task 0.0 in stage 101.0 (TID 1323) (bb61fc89c10a executor driver): org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to parse '2014-06-03 21:07:56.251' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string.
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError(ExecutionErrors.scala:54)
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError$(ExecutionErrors.scala:48)
	at org.apache.spark.sql.errors.ExecutionErrors$.failToParseDateTimeInNewParserError(ExecutionErrors.scala:218)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:135)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:194)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.time.format.DateTimeParseException: Text '2014-06-03 21:07:56.251' could not be parsed, unparsed text found at index 10
	at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1952)
	at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:192)
	... 21 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.GeneratedMethodAccessor149.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to parse '2014-06-03 21:07:56.251' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string.
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError(ExecutionErrors.scala:54)
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError$(ExecutionErrors.scala:48)
	at org.apache.spark.sql.errors.ExecutionErrors$.failToParseDateTimeInNewParserError(ExecutionErrors.scala:218)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:135)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:194)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.time.format.DateTimeParseException: Text '2014-06-03 21:07:56.251' could not be parsed, unparsed text found at index 10
	at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1952)
	at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:192)
	... 21 more
