# Análise de Venda de Medicamentos Controlados e Antimicrobianos - Medicamentos Industrializados

Projeto de análise de vendas de medicamentos controlados e antimicrobianos exclusivamente industrializados na Região Metropolitana da Baixada Santista (RMBS) composta por nove municípios no litoral do estado de São Paulo, através de dados extraídos do Sistema Nacional de Gerenciamento de Produtos Controlados (SNGPC) e disponibilizados no [portal de dados abertos](https://dados.gov.br/dados/conjuntos-dados/venda-de-medicamentos-controlados-e-antimicrobianos---medicamentos-industrializados) da Agência Nacional de Vigilância Sanitária (Anvisa). Através da análise das informações de vendas, dados geográricos, perfil de pacientes e características de medicamentos, o objetivo é extrair insights de negócios, sugerir soluções aos problemas identificados e apresentar propostas de aprimoramento.

## Demanda do negócio

- Compreender as tendências, padrões e características das vendas farmacêuticas.
- Construção de perfis de pacientes com base nos mendicamentos receitados.
- Compreender a demanda de medicamentos com base no perfil dos pacientes, por tempo e por município.
- Obter insights, identificar oportunidades e propor soluções a problemas.
- Elaboração de painel de informações de medicamentos baseado em filtros.

## Compreensão dos dados

Os dados que serão utilizados na análise compreendem o período de uma ano, outubro de 2020 até setembro de 2021, que integram um conjunto de doze arquivos em formato "CSV". Os dados foram extraídos do Sistema Nacional de Gerenciamento de Produtos Controlados (SNGPC), provenientes apenas de farmácias e drogarias privadas que periodicamente devem enviar os dados a respeito de todas as vendas realizadas de medicamentos sujeitos à escrituração no SNGPC. Os dados foram disponibilizados no [portal de dados abertos](https://dados.gov.br/dados/conjuntos-dados/venda-de-medicamentos-controlados-e-antimicrobianos---medicamentos-industrializados) da Agência Nacional de Vigilância Sanitária (Anvisa).


## Tópico da análise

- Construir uma ABT (analytical base table).

### Importação de bibliotecas

In [41]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc, col
import glob


### Criação e iniciação de uma sessão Spark

In [42]:
appName = 'PySpark - ABT de Vendas Farmaceuticas'

spark = SparkSession.builder \
    .appName(appName) \
    .config('spark.driver.memory', '8g') \
    .config('spark.driver.cores', '2') \
    .config('spark.executor.memory', '8g') \
    .config('spark.executor.cores', '4') \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

spark


### Criação do dataset a partir da leitura dos arquivos *.csv

In [43]:
# Lista de caminhos para os arquivos CSV
caminhos_csv = glob.glob('dados/*.csv')

# DataFrame vazio
df = None

# Loop para ler e unir os arquivos CSV
for caminho_csv in caminhos_csv:
    df_temp = spark.read.csv(caminho_csv, sep=';', header=True, encoding='latin1', inferSchema=True)
    
    if df is None:
        df = df_temp
    else:
        df = df.union(df_temp)

# Exibir o esquema do DataFrame combinado
df.printSchema()

root
 |-- ANO_VENDA: integer (nullable = true)
 |-- MES_VENDA: integer (nullable = true)
 |-- UF_VENDA: string (nullable = true)
 |-- MUNICIPIO_VENDA: string (nullable = true)
 |-- PRINCIPIO_ATIVO: string (nullable = true)
 |-- DESCRICAO_APRESENTACAO: string (nullable = true)
 |-- QTD_VENDIDA: integer (nullable = true)
 |-- UNIDADE_MEDIDA: string (nullable = true)
 |-- CONSELHO_PRESCRITOR: string (nullable = true)
 |-- UF_CONSELHO_PRESCRITOR: string (nullable = true)
 |-- TIPO_RECEITUARIO: decimal(1,0) (nullable = true)
 |-- CID10: string (nullable = true)
 |-- SEXO: integer (nullable = true)
 |-- IDADE: double (nullable = true)
 |-- UNIDADE_IDADE: integer (nullable = true)



In [44]:
# Exibir as 5 primeiras linhas do DataFrame combinado
df.show(15, truncate=False)


+---------+---------+--------+---------------+------------------------+---------------------------------------------------------+-----------+--------------+-------------------+----------------------+----------------+-----+----+-----+-------------+
|ANO_VENDA|MES_VENDA|UF_VENDA|MUNICIPIO_VENDA|PRINCIPIO_ATIVO         |DESCRICAO_APRESENTACAO                                   |QTD_VENDIDA|UNIDADE_MEDIDA|CONSELHO_PRESCRITOR|UF_CONSELHO_PRESCRITOR|TIPO_RECEITUARIO|CID10|SEXO|IDADE|UNIDADE_IDADE|
+---------+---------+--------+---------------+------------------------+---------------------------------------------------------+-----------+--------------+-------------------+----------------------+----------------+-----+----+-----+-------------+
|2020     |10       |BA      |JEQUIÉ         |BROMAZEPAM              |6 MG CAP GEL C/ MCGRAN AP CT BL AL PVC ACLAR PL X 30     |3          |FRASCO        |CRM                |BA                    |2               |NULL |NULL|NULL |NULL         |
|2020   

In [45]:
# Contar o número de linhas no DataFrame
qtde_linhas = df.count()
print(f'\nO DataFrame tem {qtde_linhas} linhas.')

# Obter o número de colunas no DataFrame
qtde_colunas = len(df.columns)
print(f'\nO DataFrame tem {qtde_colunas} colunas.')


O DataFrame tem 73950048 linhas.

O DataFrame tem 15 colunas.


### Análise dos dados para construção da ABT

In [46]:
# Criando uma View temporária para uso do SparkSQL
df.createOrReplaceTempView('tb_medicamentos')

In [47]:
# Verificando a volume total de registros dos 9 municípios da RMBS
qtde_linhas_rmbs = spark.sql('''
    SELECT
        COUNT(*) AS QTDE_TOTAL
    FROM 
        tb_medicamentos
    WHERE
        MUNICIPIO_VENDA IN ('BERTIOGA', 'CUBATÃO', 'GUARUJÁ', 'ITANHAÉM', 'MONGAGUÁ', 'PERUÍBE', 'PRAIA GRANDE', 'SANTOS', 'SÃO VICENTE')
''')

qtde_linhas_rmbs.show()


+----------+
|QTDE_TOTAL|
+----------+
|    748950|
+----------+



In [48]:
# Coletar o resultado da primeira consulta
qtde_rmbs = qtde_linhas_rmbs.first()['QTDE_TOTAL']

# Verificando a volume total de registros de cada 1 dos 9 municípios da RMBS
spark.sql('''
    SELECT
        MUNICIPIO_VENDA,
        COUNT(*) AS QTDE_TOTAL,
        ROUND(100*(COUNT(*)/{}), 3) AS QTDE_TOTAL_PERCENT
    FROM 
        tb_medicamentos
    WHERE
        MUNICIPIO_VENDA IN ('BERTIOGA', 'CUBATÃO', 'GUARUJÁ', 'ITANHAÉM', 'MONGAGUÁ', 'PERUÍBE', 'PRAIA GRANDE', 'SANTOS', 'SÃO VICENTE')
    GROUP BY
        MUNICIPIO_VENDA
    ORDER BY 
        QTDE_TOTAL DESC
'''.format(qtde_rmbs)).show()


+---------------+----------+------------------+
|MUNICIPIO_VENDA|QTDE_TOTAL|QTDE_TOTAL_PERCENT|
+---------------+----------+------------------+
|         SANTOS|    214327|            28.617|
|   PRAIA GRANDE|    135360|            18.073|
|    SÃO VICENTE|    108158|            14.441|
|        GUARUJÁ|     94778|            12.655|
|        PERUÍBE|     44902|             5.995|
|       ITANHAÉM|     42737|             5.706|
|        CUBATÃO|     41304|             5.515|
|       BERTIOGA|     34645|             4.626|
|       MONGAGUÁ|     32739|             4.371|
+---------------+----------+------------------+



In [57]:
# Selecionando os dados que irão compor a ABT
abt_rmbs = spark.sql('''
    SELECT
        ANO_VENDA,
        MES_VENDA,
        UF_VENDA,
        MUNICIPIO_VENDA,
        PRINCIPIO_ATIVO,
        DESCRICAO_APRESENTACAO,
        QTD_VENDIDA,
        UNIDADE_MEDIDA,
        CONSELHO_PRESCRITOR,
        UF_CONSELHO_PRESCRITOR,
        TIPO_RECEITUARIO,
        CID10,
        SEXO,
        IDADE,
        UNIDADE_IDADE,
        TO_DATE(CONCAT(ANO_VENDA, MES_VENDA),'yyyyMM') AS DATA_REF,
        CURRENT_DATE AS DATA_PROC 
    FROM
        tb_medicamentos
    WHERE
        MUNICIPIO_VENDA IN ('BERTIOGA', 'CUBATÃO', 'GUARUJÁ', 'ITANHAÉM', 'MONGAGUÁ', 'PERUÍBE', 'PRAIA GRANDE', 'SANTOS', 'SÃO VICENTE')    
''')

abt_rmbs.show(truncate=False)

+---------+---------+--------+---------------+--------------------------------------------------------+-----------------------------------------------------+-----------+--------------+-------------------+----------------------+----------------+-----+----+-----+-------------+----------+----------+
|ANO_VENDA|MES_VENDA|UF_VENDA|MUNICIPIO_VENDA|PRINCIPIO_ATIVO                                         |DESCRICAO_APRESENTACAO                               |QTD_VENDIDA|UNIDADE_MEDIDA|CONSELHO_PRESCRITOR|UF_CONSELHO_PRESCRITOR|TIPO_RECEITUARIO|CID10|SEXO|IDADE|UNIDADE_IDADE|DATA_REF  |DATA_PROC |
+---------+---------+--------+---------------+--------------------------------------------------------+-----------------------------------------------------+-----------+--------------+-------------------+----------------------+----------------+-----+----+-----+-------------+----------+----------+
|2020     |10       |SP      |MONGAGUÁ       |ACETATO DE PREDNISOLONA + GATIFLOXACINO SESQUI-HIDRATADO|3 M

In [70]:
teste = spark.sql('''
    SELECT
        ANO_VENDA,
        right(replicate('0',2) + convert(VARCHAR,MES_VENDA),2),
        TO_DATE(CONCAT(ANO_VENDA, MES_VENDA),'yyyyMM') AS DATA_REF,
        CURRENT_DATE AS DATA_PROC 
    FROM
        tb_medicamentos
    WHERE
        MUNICIPIO_VENDA IN ('BERTIOGA', 'CUBATÃO', 'GUARUJÁ', 'ITANHAÉM', 'MONGAGUÁ', 'PERUÍBE', 'PRAIA GRANDE', 'SANTOS', 'SÃO VICENTE') and
        ANO_VENDA = '2021'   
''')

teste.show()


AnalysisException: [UNRESOLVED_ROUTINE] Cannot resolve function `replicate` on search path [`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`].; line 4 pos 14

In [50]:
# Exibir o esquema da ABT
abt_rmbs.printSchema()

root
 |-- ANO_VENDA: integer (nullable = true)
 |-- MES_VENDA: integer (nullable = true)
 |-- UF_VENDA: string (nullable = true)
 |-- MUNICIPIO_VENDA: string (nullable = true)
 |-- PRINCIPIO_ATIVO: string (nullable = true)
 |-- DESCRICAO_APRESENTACAO: string (nullable = true)
 |-- QTD_VENDIDA: integer (nullable = true)
 |-- UNIDADE_MEDIDA: string (nullable = true)
 |-- CONSELHO_PRESCRITOR: string (nullable = true)
 |-- UF_CONSELHO_PRESCRITOR: string (nullable = true)
 |-- TIPO_RECEITUARIO: decimal(1,0) (nullable = true)
 |-- CID10: string (nullable = true)
 |-- SEXO: integer (nullable = true)
 |-- IDADE: double (nullable = true)
 |-- UNIDADE_IDADE: integer (nullable = true)
 |-- DATA_REF: date (nullable = true)
 |-- DATA_PROC: date (nullable = false)



### Salvando a ABT em formato parquet

In [63]:
# Convertendo a ABT (sql) para o formato 'parquet'
caminho = 'dados/ABT/'
#abt_rmbs.write.parquet(caminho, mode='overwrite')
#abt.write.partitionBy("PK_DATREF").parquet("/content/drive/Shareddrives/PoD Academy/Cursos/Formação em Ciência de Dados/dados/sinteticos/ABT")
abt_rmbs.write.partitionBy('DATA_REF').parquet(caminho, mode='overwrite')

Py4JJavaError: An error occurred while calling o469.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 119.0 failed 1 times, most recent failure: Lost task 21.0 in stage 119.0 (TID 1479) (rootx executor driver): org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to parse '20211' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string.
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError(ExecutionErrors.scala:54)
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError$(ExecutionErrors.scala:48)
	at org.apache.spark.sql.errors.ExecutionErrors$.failToParseDateTimeInNewParserError(ExecutionErrors.scala:218)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:135)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:194)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:385)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.time.format.DateTimeParseException: Text '20211' could not be parsed at index 4
	at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2108)
	at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1936)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:192)
	... 25 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to parse '20211' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string.
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError(ExecutionErrors.scala:54)
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError$(ExecutionErrors.scala:48)
	at org.apache.spark.sql.errors.ExecutionErrors$.failToParseDateTimeInNewParserError(ExecutionErrors.scala:218)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:135)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:194)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:385)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.time.format.DateTimeParseException: Text '20211' could not be parsed at index 4
	at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2108)
	at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1936)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:192)
	... 25 more


In [None]:
# Validando a quantidade de linhas
read_abt_rmbs = spark.read.format('parquet').load(caminho)

print(f'\nA ABT \'parquet\' tem {read_abt_rmbs.count()} linhas.')
print(f'\nA ABT \'sql\' tem {qtde_rmbs} linhas.')