In [61]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os

# Cria uma sessão Spark habilitando Hive support para armazenar dados no Spoark Warehouse
spark = SparkSession \
    .builder \
    .appName("projeto") \
    .config('spark.master', 'local') \
    .config("spark.jars", "/home/hadoop/Desktop/projeto1/code/BigData/part_1/postgresql-42.7.3.jar") \
    .enableHiveSupport() \
    .getOrCreate()

In [62]:
# Carrega os dados sobre o PIB das cidades que se encontra no servidor PostgreSQL
dfpib = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/projeto") \
    .option("dbtable", "pib_municipios") \
    .option("user", "hadoop") \
    .option("password", "bigdata") \
    .option("driver", "org.postgresql.Driver") \
    .load()

dfpib.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- codigo_regiao: integer (nullable = true)
 |-- nome_regiao: string (nullable = true)
 |-- codigo_uf: integer (nullable = true)
 |-- sigla_uf: string (nullable = true)
 |-- nome_uf: string (nullable = true)
 |-- codigo_municipio: integer (nullable = true)
 |-- nome_municipio: string (nullable = true)
 |-- nome_mesoregiao: string (nullable = true)
 |-- nome_microregiao: string (nullable = true)
 |-- tipologia_rural_urbana: string (nullable = true)
 |-- hierarquia_urbana: string (nullable = true)
 |-- valor_adicionado_agro: integer (nullable = true)
 |-- valor_adicionado_industria: integer (nullable = true)
 |-- valor_adicionado_servico: integer (nullable = true)
 |-- valor_adicionado_adm: integer (nullable = true)
 |-- valor_adicionado_total: integer (nullable = true)
 |-- pib_concorrentes: integer (nullable = true)
 |-- pop: integer (nullable = true)
 |-- pib: string (nullable = true)



In [63]:
# Carrega os dados das notas fiscais que se encontra em um arquivo CSV
dfnf = spark.read.csv("../../../datasets/notas_fiscais.csv", header=True, sep=',')
dfnf.printSchema()

root
 |-- Numero: string (nullable = true)
 |-- Data_de_emissao: string (nullable = true)
 |-- Situacao: string (nullable = true)
 |-- Valor_total_da_nota: string (nullable = true)
 |-- Nota_referenciada: string (nullable = true)
 |-- Nome_razao_social_emit: string (nullable = true)
 |-- CPF_CNPJ_emit: string (nullable = true)
 |-- Endereco_emit: string (nullable = true)
 |-- Bairro_distrito_emit: string (nullable = true)
 |-- CEP_emit: string (nullable = true)
 |-- Municipio_emit: string (nullable = true)
 |-- Nome_razao_social_dest: string (nullable = true)
 |-- CPF_CNPJ_dest: string (nullable = true)
 |-- Endereco_dest: string (nullable = true)
 |-- Bairro_distrito_dest: string (nullable = true)
 |-- CEP_dest: string (nullable = true)
 |-- Municipio_dest: string (nullable = true)
 |-- Base_de_Calculo_do_ICMS: string (nullable = true)
 |-- Valor_do_ICMS: string (nullable = true)
 |-- Base_de_calculo_do_ICMS_substituicao: string (nullable = true)
 |-- Valor_do_ICMS_substituicao: strin

In [64]:
# Função para criar uma lsita de colunas a serem exluídas com base em uma lsita de colunas a serem mantidas
def get_columns_to_drop(all_columns, columns_to_keep):
    return [column for column in all_columns if column not in columns_to_keep]

In [65]:
# Cria uma lista de colunas a serem mantidas na base dfnf
dfnf_columns_to_keep = [
'Numero',
'Data_de_emissao',
'Valor_total_da_nota',
'CPF_CNPJ_emit',
'CEP_emit',
'Municipio_emit',
'CPF_CNPJ_dest',
'CEP_dest',
'Municipio_dest',
'Nr_item',
'Cod_prod',
'Descricao_do_Produto_ou_servicos',
'NCM_prod',
'Quant_prod',
'Valor_unit_prod',
'Valor_total_prod',
'Unid_prod'
]

# Cria uma lista de colunas a serem retiradas da base
dfnf_all_columns = dfnf.columns
dfnf_columns_to_drop = get_columns_to_drop(dfnf_all_columns, dfnf_columns_to_keep)
dfnf_columns_to_drop

['Situacao',
 'Nota_referenciada',
 'Nome_razao_social_emit',
 'Endereco_emit',
 'Bairro_distrito_emit',
 'Nome_razao_social_dest',
 'Endereco_dest',
 'Bairro_distrito_dest',
 'Base_de_Calculo_do_ICMS',
 'Valor_do_ICMS',
 'Base_de_calculo_do_ICMS_substituicao',
 'Valor_do_ICMS_substituicao',
 'Valor_total_dos_produtos',
 'Valor_do_frete',
 'Valor_do_seguro',
 'Valor_desconto',
 'Valor_outras_despesas_acessorias',
 'Valor_do_IPI',
 'Valor_total_ICMS_UF_dest',
 'Valor_total_ICMS_UF_remet',
 'Valor_BC_ICMS_UF_dest',
 'Aliquota_interna_UF_dest',
 'Aliquota_interestadual_UF_env',
 'Perc_prov_partilha_UF',
 'Perc_ICMS_FCP_UF_dest',
 'Valor_ICMS_FCP_UF_dest',
 'Valor_ICMS_partilha_UF_dest',
 'Valor_ICMS_partilha_UF_remet',
 'CST_prod',
 'CFOP_prod',
 'Valor_desconto_item',
 'BC_ICMS_prod',
 'Valor_ICMS_prod',
 'Aliq_ICMS_prod',
 'BC_ICMS_ST_prod',
 'Valor_ICMS_ST_prod',
 'Aliq_ICMS_ST_prod',
 'Valor_IPI_prod',
 'Aliq_IPI_prod',
 'Valor_PMC_prod',
 'Cod_EAN',
 'Info_Adicional_Item',
 'Informac

In [66]:
# Cria uma lista de colunas a serem mantidas na base dfpib
dfpib_columns_to_keep = [
	'ano',
	'nome_regiao',
	'sigla_uf',
	'nome_municipio',
	'nome_mesoregiao',
	'nome_microregiao',
	'tipologia_rural_urbana',
	'hierarquia_urbana',
	'pop',
	'pib',
]

# Cria uma lista de colunas a serem retiradas da base
dfpib_all_columns = dfpib.columns
dfpib_columns_to_drop = get_columns_to_drop(dfpib_all_columns, dfpib_columns_to_keep)
dfpib_columns_to_drop

['codigo_regiao',
 'codigo_uf',
 'nome_uf',
 'codigo_municipio',
 'valor_adicionado_agro',
 'valor_adicionado_industria',
 'valor_adicionado_servico',
 'valor_adicionado_adm',
 'valor_adicionado_total',
 'pib_concorrentes']

In [67]:
# Retira as colunas desnecessárias
dfnf = dfnf.drop(*dfnf_columns_to_drop)
dfnf.columns

['Numero',
 'Data_de_emissao',
 'Valor_total_da_nota',
 'CPF_CNPJ_emit',
 'CEP_emit',
 'Municipio_emit',
 'CPF_CNPJ_dest',
 'CEP_dest',
 'Municipio_dest',
 'Nr_item',
 'Cod_prod',
 'Descricao_do_Produto_ou_servicos',
 'NCM_prod',
 'Quant_prod',
 'Valor_unit_prod',
 'Valor_total_prod',
 'Unid_prod']

In [68]:
# Retira as colunas desnecessárias
dfpib = dfpib.drop(*dfpib_columns_to_drop)
dfpib.columns

['ano',
 'nome_regiao',
 'sigla_uf',
 'nome_municipio',
 'nome_mesoregiao',
 'nome_microregiao',
 'tipologia_rural_urbana',
 'hierarquia_urbana',
 'pop',
 'pib']

In [69]:
# Lista de colunas a serem renomeadas para a base dfnf
dfnf_columns_to_rename = [
'nf_numero',
'nf_data_emissao',
'nf_valor_total',
'emit_cnpj',
'emit_cep',
'emit_municipio',
'dest_cnpj',
'dest_cep',
'dest_municipio',
'prod_nr_item',
'prod_cod',
'prod_desc',
'prod_ncm',
'prod_quant',
'prod_valor_unit',
'prod_valor_total',
'prod_unid'
]

# Renomeia as colunas da base
dfnf = dfnf.toDF(*dfnf_columns_to_rename)
dfnf.columns

['nf_numero',
 'nf_data_emissao',
 'nf_valor_total',
 'emit_cnpj',
 'emit_cep',
 'emit_municipio',
 'dest_cnpj',
 'dest_cep',
 'dest_municipio',
 'prod_nr_item',
 'prod_cod',
 'prod_desc',
 'prod_ncm',
 'prod_quant',
 'prod_valor_unit',
 'prod_valor_total',
 'prod_unid']

In [70]:
# Cria as views temporárias para poder acessar a engine SQL
dfnf.createOrReplaceTempView("dfnf")
dfpib.createOrReplaceTempView("dfpib")

In [71]:
# Prepara a base dfnf para realizar o join entre as tabelas

# Altera os nomes dos municípios para apenas letras minuscúlas
dfnf = dfnf.withColumn("emit_municipio", F.lower(dfnf["emit_municipio"]))
dfnf = dfnf.withColumn("dest_municipio", F.lower(dfnf["dest_municipio"]))

# Retira os acentos dos nomes dos municípios
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"ã", "a"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"á", "a"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"à", "a"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"ê", "e"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"é", "e"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"í", "i"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"ó", "o"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"õ", "o"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"ú", "u"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"ç", "c"))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"-", " "))
dfnf = dfnf.withColumn("emit_municipio", F.regexp_replace(dfnf["emit_municipio"], r"'", " "))

# Retira os espaços em branco desnecessários
dfnf = dfnf.withColumn("emit_municipio", F.trim(dfnf["emit_municipio"]))

# Retira os acentos dos nomes dos municípios
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"ã", "a"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"á", "a"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"à", "a"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"ê", "e"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"é", "e"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"í", "i"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"ó", "o"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"õ", "o"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"ú", "u"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"ç", "c"))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"-", " "))
dfnf = dfnf.withColumn("dest_municipio", F.regexp_replace(dfnf["dest_municipio"], r"'", " "))

# Retira os espaços em branco desnecessários
dfnf = dfnf.withColumn("emit_municipio", F.trim(dfnf["emit_municipio"]))

# Substitui a view temporária
dfnf.createOrReplaceTempView("dfnf")

In [72]:
# Seleciona apenas as colunas onde o ano é 2016 da base de dados contendo o PIB
dfpib = spark.sql("SELECT * FROM dfpib WHERE ano=2016")
dfpib.createOrReplaceTempView("dfpib")

In [73]:
# Prepara a base dfpib para realizar o join entre as tabelas

# Altera os nomes dos municípios para apenas letras minuscúlas
dfpib = dfpib.withColumn("nome_municipio", F.lower(dfpib["nome_municipio"]))

# Retira os acentos dos nomes dos municípios
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"ã", "a"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"á", "a"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"à", "a"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"ê", "e"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"é", "e"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"í", "i"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"ó", "o"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"õ", "o"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"ú", "u"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"ç", "c"))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"-", " "))
dfpib = dfpib.withColumn("nome_municipio", F.regexp_replace(dfpib["nome_municipio"], r"'", " "))

# Retira os espaços em branco desnecessários
dfpib = dfpib.withColumn("nome_municipio", F.trim(dfpib["nome_municipio"]))

# Substitui a view temporária
dfpib.createOrReplaceTempView("dfpib")

In [74]:
# Faz o join entre as duas tabelas a partir da coluna do muicípio do emissor e o nome do município da tabela de PIB
# Cria um novo Data Frame contendo os dados fundidos
df = dfnf.join(dfpib, dfnf["emit_municipio"] == dfpib["nome_municipio"], how="left")
df.printSchema()

root
 |-- nf_numero: string (nullable = true)
 |-- nf_data_emissao: string (nullable = true)
 |-- nf_valor_total: string (nullable = true)
 |-- emit_cnpj: string (nullable = true)
 |-- emit_cep: string (nullable = true)
 |-- emit_municipio: string (nullable = true)
 |-- dest_cnpj: string (nullable = true)
 |-- dest_cep: string (nullable = true)
 |-- dest_municipio: string (nullable = true)
 |-- prod_nr_item: string (nullable = true)
 |-- prod_cod: string (nullable = true)
 |-- prod_desc: string (nullable = true)
 |-- prod_ncm: string (nullable = true)
 |-- prod_quant: string (nullable = true)
 |-- prod_valor_unit: string (nullable = true)
 |-- prod_valor_total: string (nullable = true)
 |-- prod_unid: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- nome_regiao: string (nullable = true)
 |-- sigla_uf: string (nullable = true)
 |-- nome_municipio: string (nullable = true)
 |-- nome_mesoregiao: string (nullable = true)
 |-- nome_microregiao: string (nullable = true)
 |--

In [75]:
# Cria uma lista de municípios que estão presentes na tabela de notas fiscais mas não estão na do PIB
df.createOrReplaceTempView("df")
df_select = spark.sql("SELECT emit_municipio, COUNT(*) FROM df WHERE nome_municipio IS NULL AND emit_municipio IS NOT NULL GROUP BY emit_municipio")

In [76]:
# Salva o dataframe
df_select.write.format("csv").option("header", True).save("missing_cities.csv")

                                                                                

In [77]:
# Filtra o data frame para conter apenas dados onde o nome da cidade existe para as duas tabelas usadas no join
df.createOrReplaceTempView("df")
df = spark.sql("SELECT * FROM df WHERE nome_municipio IS NOT NULL AND emit_municipio IS NOT NULL")
df.createOrReplaceTempView("df")

In [78]:
# Faz o casting das colunas para os tipos certos
df = df.withColumn("nf_data_emissao", F.to_date(df["nf_data_emissao"]))
df = df.withColumn("nf_valor_total", F.col("nf_valor_total").cast(DecimalType(15,2)))
df = df.withColumn("prod_nr_item", F.col("prod_nr_item").cast(IntegerType()))
df = df.withColumn("prod_quant", F.col("prod_quant").cast(DecimalType(15,2)))
df = df.withColumn("prod_valor_unit", F.col("prod_valor_unit").cast(DecimalType(15,2)))
df = df.withColumn("prod_valor_total", F.col("prod_valor_total").cast(DecimalType(15,2)))
df = df.withColumn("pib", F.col("pib").cast(DecimalType(15,2)))
df.createOrReplaceTempView("df")

In [79]:
# Imprime o esquema para confirmar o casting
df.printSchema()

root
 |-- nf_numero: string (nullable = true)
 |-- nf_data_emissao: date (nullable = true)
 |-- nf_valor_total: decimal(15,2) (nullable = true)
 |-- emit_cnpj: string (nullable = true)
 |-- emit_cep: string (nullable = true)
 |-- emit_municipio: string (nullable = true)
 |-- dest_cnpj: string (nullable = true)
 |-- dest_cep: string (nullable = true)
 |-- dest_municipio: string (nullable = true)
 |-- prod_nr_item: integer (nullable = true)
 |-- prod_cod: string (nullable = true)
 |-- prod_desc: string (nullable = true)
 |-- prod_ncm: string (nullable = true)
 |-- prod_quant: decimal(15,2) (nullable = true)
 |-- prod_valor_unit: decimal(15,2) (nullable = true)
 |-- prod_valor_total: decimal(15,2) (nullable = true)
 |-- prod_unid: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- nome_regiao: string (nullable = true)
 |-- sigla_uf: string (nullable = true)
 |-- nome_municipio: string (nullable = true)
 |-- nome_mesoregiao: string (nullable = true)
 |-- nome_microregiao: st

In [80]:
# Remove duplicatas selecionando apenas as linhas únicas
df = spark.sql("SELECT DISTINCT * FROM df")
df.createOrReplaceTempView("df")

In [81]:
# Mostra a quantidade de entradas por região
# Aqui há o problema em que uma cidade pode ser parte de dois estados e não há informações sobre o estado nas notas fiscais que não possa ser facilmente acessível
df_select = spark.sql("SELECT nome_regiao, COUNT(*) FROM df GROUP BY nome_regiao")
df_select.show()

24/05/27 16:32:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:32:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:32:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:32:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:32:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:32:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:32:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:32:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:32:35 WARN RowBasedKeyValueBatch: Calling spill() on

+------------+--------+
| nome_regiao|count(1)|
+------------+--------+
|    Nordeste| 2156357|
|         Sul|   96363|
|     Sudeste|  113977|
|Centro-oeste|    6994|
|       Norte|   22915|
+------------+--------+



                                                                                

In [82]:
# Seleciona apenas as entradas da região Nordeste
df = spark.sql("SELECT * FROM df WHERE nome_regiao='Nordeste'")
df.createOrReplaceTempView("df")

In [83]:
# Para cada linha, cria uma função de janela contendo a soma da população e PIB da mesoregião aonde o registro foi emitido
df = spark.sql("SELECT *, SUM(pop) OVER (PARTITION BY nome_mesoregiao) AS pop_meso, SUM(pib) OVER (PARTITION BY nome_mesoregiao) AS pib_meso FROM df")
df.createOrReplaceTempView("df")

In [84]:
# Verifica quais colunas apresentam valores nulos
string_columns = [
    'nf_numero',
    'nf_data_emissao',
    'emit_cnpj',
    'emit_cep',
    'emit_municipio',
    'dest_cnpj',
    'dest_cep',
    'dest_municipio',
    'prod_cod',
    'prod_desc',
    'prod_ncm',
    'prod_unid',
    'nome_regiao',
    'sigla_uf',
    'nome_municipio',
    'nome_mesoregiao',
    'nome_microregiao',
    'tipologia_rural_urbana',
    'hierarquia_urbana',
]

number_columns = [
    'nf_valor_total',
    'prod_nr_item',
    'prod_quant',
    'prod_valor_unit',
    'prod_valor_total',
    'ano',
    'pop',
    'pib',
]

df_strings_null = df.select([F.count(F.when(F.col(c).contains('None') | \
                            F.col(c).contains('NULL') | \
                            (F.col(c) == '' ) | \
                            F.col(c).isNull(), c 
                           )).alias(c)
                    for c in string_columns])

df_numbers_null = df.select([F.count(F.when(F.col(c).contains('None') | \
                            F.col(c).contains('NULL') | \
                            (F.col(c) == '' ) | \
                            F.col(c).isNull() | \
                            F.isnan(c), c 
                           )).alias(c)
                    for c in number_columns])

In [85]:
df_strings_null.show()

24/05/27 16:36:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:36:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:36:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:36:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:36:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:36:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:36:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:36:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:36:38 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+---------------+---------+--------+--------------+---------+--------+--------------+--------+---------+--------+---------+-----------+--------+--------------+---------------+----------------+----------------------+-----------------+
|nf_numero|nf_data_emissao|emit_cnpj|emit_cep|emit_municipio|dest_cnpj|dest_cep|dest_municipio|prod_cod|prod_desc|prod_ncm|prod_unid|nome_regiao|sigla_uf|nome_municipio|nome_mesoregiao|nome_microregiao|tipologia_rural_urbana|hierarquia_urbana|
+---------+---------------+---------+--------+--------------+---------+--------+--------------+--------+---------+--------+---------+-----------+--------+--------------+---------------+----------------+----------------------+-----------------+
|        0|              0|        0|       0|             0|        0|       0|             0|       0|        3|       0|       13|          0|       0|             0|              0|               0|                     0|                0|
+---------+-------------

                                                                                

In [86]:
df_numbers_null.show()

24/05/27 16:40:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:40:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:40:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:40:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:40:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:40:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:40:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:40:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:40:30 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+------------+----------+---------------+----------------+---+---+---+
|nf_valor_total|prod_nr_item|prod_quant|prod_valor_unit|prod_valor_total|ano|pop|pib|
+--------------+------------+----------+---------------+----------------+---+---+---+
|             0|           0|     31855|          31853|           31853|  0|  0|  0|
+--------------+------------+----------+---------------+----------------+---+---+---+



                                                                                

In [87]:
# Encontra qual é a moda da coluna prod_unit
df_select = spark.sql("SELECT prod_unid, COUNT(*) FROM df GROUP BY prod_unid ORDER BY COUNT(*) DESC")
df_select.show()

24/05/27 16:43:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:43:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:43:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:43:22 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:43:22 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:43:22 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:43:22 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:43:29 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:43:29 WARN RowBasedKeyValueBatch: Calling spill() on

+-------------------+--------+
|          prod_unid|count(1)|
+-------------------+--------+
|                UND| 1221427|
|                 KG|  218170|
|             Pacote|  149916|
|                 CX|  130198|
|                  L|   85233|
|                 NA|   32673|
|                 FR|   29079|
|                AMP|   28973|
|                  m|   23702|
| DE ISS E DE IPI ."|   20292|
|                CPR|   19585|
|                 m3|   14818|
|                 FD|   14583|
|                 CP|   14553|
|                  1|    9382|
|                 RL|    8252|
|                 AP|    7296|
|               COMP|    6280|
|                 SC|    5176|
| DE ISS E DE IPI?."|    5110|
+-------------------+--------+
only showing top 20 rows



                                                                                

In [88]:
# Substitui valores nulos da coluna prod_unit com o valor da moda
df = spark.sql("SELECT *, CASE WHEN prod_unid IS NULL OR prod_unid='' THEN 'UND' ELSE prod_unid END prod_unid_nn FROM df")
df.createOrReplaceTempView("df")

In [89]:
# Tenta substituir os valores nulos da culuna prod_quant com a média da quantidade dos registros com o mesmo NCM
df = spark.sql("SELECT *, CASE WHEN prod_quant IS NOT NULL THEN prod_quant ELSE (SELECT AVG(prod_quant) FROM df df2 WHERE df2.prod_ncm = df1.prod_ncm) END prod_quant_nn FROM df df1")
df.createOrReplaceTempView("df")

In [90]:
# Tenta substituir os valores nulos da culuna prod_valor_unit com a média da quantidade dos registros com o mesmo NCM
df = spark.sql("SELECT *, CASE WHEN prod_valor_unit IS NOT NULL THEN prod_valor_unit ELSE (SELECT AVG(prod_valor_unit ) FROM df df2 WHERE df2.prod_ncm = df1.prod_ncm) END prod_valor_unit_nn FROM df df1")
df.createOrReplaceTempView("df")

In [91]:
# Cria uma coluna onde o valor total é a multiplicação da quantidade pelo valor unitário das colunas transformadas anteriormente
df = spark.sql("SELECT *, prod_quant_nn * prod_valor_unit_nn as prod_valor_total_nn FROM df")
df.createOrReplaceTempView("df")
df.printSchema()

root
 |-- nf_numero: string (nullable = true)
 |-- nf_data_emissao: date (nullable = true)
 |-- nf_valor_total: decimal(15,2) (nullable = true)
 |-- emit_cnpj: string (nullable = true)
 |-- emit_cep: string (nullable = true)
 |-- emit_municipio: string (nullable = true)
 |-- dest_cnpj: string (nullable = true)
 |-- dest_cep: string (nullable = true)
 |-- dest_municipio: string (nullable = true)
 |-- prod_nr_item: integer (nullable = true)
 |-- prod_cod: string (nullable = true)
 |-- prod_desc: string (nullable = true)
 |-- prod_ncm: string (nullable = true)
 |-- prod_quant: decimal(15,2) (nullable = true)
 |-- prod_valor_unit: decimal(15,2) (nullable = true)
 |-- prod_valor_total: decimal(15,2) (nullable = true)
 |-- prod_unid: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- nome_regiao: string (nullable = true)
 |-- sigla_uf: string (nullable = true)
 |-- nome_municipio: string (nullable = true)
 |-- nome_mesoregiao: string (nullable = true)
 |-- nome_microregiao: st

In [92]:
# Para as linhas do tipo string, altera para letras minúsculas apenas
df = df.withColumn("prod_desc", F.lower(df["prod_desc"]))
df = df.withColumn("prod_unid", F.lower(df["prod_unid"]))
df = df.withColumn("nome_regiao", F.lower(df["nome_regiao"]))
df = df.withColumn("nome_mesoregiao", F.lower(df["nome_mesoregiao"]))
df = df.withColumn("nome_microregiao", F.lower(df["nome_microregiao"]))
df = df.withColumn("tipologia_rural_urbana", F.lower(df["tipologia_rural_urbana"]))
df = df.withColumn("hierarquia_urbana", F.lower(df["hierarquia_urbana"]))

# Remove todos os espaços desnecessários para as colunas do tipo string
df = df.withColumn("prod_desc", F.trim(df["prod_desc"]))
df = df.withColumn("prod_unid_nn", F.trim(df["prod_unid"]))
df = df.withColumn("nome_regiao", F.trim(df["nome_regiao"]))
df = df.withColumn("nome_mesoregiao", F.trim(df["nome_mesoregiao"]))
df = df.withColumn("nome_microregiao", F.trim(df["nome_microregiao"]))
df = df.withColumn("tipologia_rural_urbana", F.trim(df["tipologia_rural_urbana"]))
df = df.withColumn("hierarquia_urbana", F.trim(df["hierarquia_urbana"]))
df = df.withColumn("nf_numero", F.trim(df["nf_numero"]))
df = df.withColumn("emit_cnpj", F.trim(df["emit_cnpj"]))
df = df.withColumn("emit_cep", F.trim(df["emit_cep"]))
df = df.withColumn("dest_cnpj", F.trim(df["dest_cnpj"]))
df = df.withColumn("dest_cep", F.trim(df["dest_cep"]))
df = df.withColumn("prod_cod", F.trim(df["prod_cod"]))
df = df.withColumn("prod_ncm", F.trim(df["prod_ncm"]))
df = df.withColumn("dest_cnpj", F.trim(df["dest_cnpj"]))
df = df.withColumn("dest_cep", F.trim(df["dest_cep"]))
df = df.withColumn("sigla_uf", F.trim(df["sigla_uf"]))
df = df.withColumn("dest_cep", F.trim(df["dest_cep"]))

# Aplica uma trsnformação logaritimica para tentar reduzir o impacto dos dados distoricidos para a direita (skewed)
df = df.withColumn("log_prod_quant", F.log1p(F.col("prod_quant_nn")))
df = df.withColumn("log_prod_valor_unit", F.log1p(F.col("prod_valor_unit_nn")))

# Normaliza os valores numéricos relevantes
df.createOrReplaceTempView("df")
mean, sttdev = df.select(F.mean("log_prod_quant"), F.stddev("log_prod_quant")).first()
df = df.withColumn("scaled_log_prod_quant", (F.col("log_prod_quant") - mean) / sttdev)

df.createOrReplaceTempView("df")
mean, sttdev = df.select(F.mean("log_prod_valor_unit"), F.stddev("log_prod_valor_unit")).first()
df = df.withColumn("scaled_log_prod_valor_unit", (F.col("log_prod_valor_unit") - mean) / sttdev)

df.createOrReplaceTempView("df")
mean, sttdev = df.select(F.mean("pop"), F.stddev("pop")).first()
df = df.withColumn("scaled_pop", (F.col("pop") - mean) / sttdev)

df.createOrReplaceTempView("df")
mean, sttdev = df.select(F.mean("pib"), F.stddev("pib")).first()
df = df.withColumn("scaled_pib", (F.col("pib") - mean) / sttdev)

df.createOrReplaceTempView("df")

24/05/27 16:47:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:47:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:47:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:47:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:47:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:47:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:48:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:48:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 16:48:05 WARN RowBasedKeyValueBatch: Calling spill() on

In [93]:
# Como muitas colunas com quantidade e valor unitário nulos não contém NCM, as colunas sem NCM ou NCM igual a NA foram retiradas
df = spark.sql("SELECT * FROM df WHERE prod_ncm!='NA' AND prod_ncm IS NOT NULL AND prod_ncm!='' AND log_prod_quant IS NOT NULL AND log_prod_valor_unit IS NOT NULL")
df.createOrReplaceTempView("df")

In [94]:
# Remove as colunas desnecessárias
df_columns_to_drop = [
    'prod_unid',
    'prod_quant',
    'prod_valor_unit',
    'prod_valor_total'
]

df = df.drop(*df_columns_to_drop)
df.createOrReplaceTempView("df")

In [95]:
df.printSchema()

root
 |-- nf_numero: string (nullable = true)
 |-- nf_data_emissao: date (nullable = true)
 |-- nf_valor_total: decimal(15,2) (nullable = true)
 |-- emit_cnpj: string (nullable = true)
 |-- emit_cep: string (nullable = true)
 |-- emit_municipio: string (nullable = true)
 |-- dest_cnpj: string (nullable = true)
 |-- dest_cep: string (nullable = true)
 |-- dest_municipio: string (nullable = true)
 |-- prod_nr_item: integer (nullable = true)
 |-- prod_cod: string (nullable = true)
 |-- prod_desc: string (nullable = true)
 |-- prod_ncm: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- nome_regiao: string (nullable = true)
 |-- sigla_uf: string (nullable = true)
 |-- nome_municipio: string (nullable = true)
 |-- nome_mesoregiao: string (nullable = true)
 |-- nome_microregiao: string (nullable = true)
 |-- tipologia_rural_urbana: string (nullable = true)
 |-- hierarquia_urbana: string (nullable = true)
 |-- pop: integer (nullable = true)
 |-- pib: decimal(15,2) (nullable = t

In [96]:
# Verifica a quantidade de valores nulos por coluna
string_columns = [
    'nf_numero',
    'nf_data_emissao',
    'emit_cnpj',
    'emit_cep',
    'emit_municipio',
    'dest_cnpj',
    'dest_cep',
    'dest_municipio',
    'prod_cod',
    'prod_desc',
    'prod_ncm',
    'nome_regiao',
    'sigla_uf',
    'nome_municipio',
    'nome_mesoregiao',
    'nome_microregiao',
    'tipologia_rural_urbana',
    'hierarquia_urbana',
    'prod_unid_nn'
]

number_columns = [
    'nf_valor_total',
    'prod_nr_item',
    'ano',
    'pop',
    'pib',
    'pop_meso',
    'pib_meso',
    'log_prod_quant',
    'log_prod_valor_unit',
    'prod_quant_nn',
    'prod_valor_unit_nn',
    'prod_valor_total_nn',
    'scaled_log_prod_quant',
    'scaled_log_prod_valor_unit',
    'scaled_pop',
    'scaled_pib',
]

df_strings_null = df.select([F.count(F.when(F.col(c).contains('None') | \
                            F.col(c).contains('NULL') | \
                            (F.col(c) == '' ) | \
                            F.col(c).isNull(), c 
                           )).alias(c)
                    for c in string_columns])

df_numbers_null = df.select([F.count(F.when(F.col(c).contains('None') | \
                            F.col(c).contains('NULL') | \
                            (F.col(c) == '' ) | \
                            F.col(c).isNull() | \
                            F.isnan(c), c 
                           )).alias(c)
                    for c in number_columns])

In [97]:
df_strings_null.show()

24/05/27 17:04:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:04:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:04:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:04:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:05:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:05:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:05:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:05:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:05:09 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+---------------+---------+--------+--------------+---------+--------+--------------+--------+---------+--------+-----------+--------+--------------+---------------+----------------+----------------------+-----------------+------------+
|nf_numero|nf_data_emissao|emit_cnpj|emit_cep|emit_municipio|dest_cnpj|dest_cep|dest_municipio|prod_cod|prod_desc|prod_ncm|nome_regiao|sigla_uf|nome_municipio|nome_mesoregiao|nome_microregiao|tipologia_rural_urbana|hierarquia_urbana|prod_unid_nn|
+---------+---------------+---------+--------+--------------+---------+--------+--------------+--------+---------+--------+-----------+--------+--------------+---------------+----------------+----------------------+-----------------+------------+
|        0|              0|        0|       0|             0|        0|       0|             0|       0|        0|       0|          0|       0|             0|              0|               0|                     0|                0|           0|
+---------+-

                                                                                

In [98]:
df_numbers_null.show()

24/05/27 17:11:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:11:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:11:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:11:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:11:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:11:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:11:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:11:41 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------+------------+---+---+---+--------+--------+--------------+-------------------+-------------+------------------+-------------------+---------------------+--------------------------+----------+----------+
|nf_valor_total|prod_nr_item|ano|pop|pib|pop_meso|pib_meso|log_prod_quant|log_prod_valor_unit|prod_quant_nn|prod_valor_unit_nn|prod_valor_total_nn|scaled_log_prod_quant|scaled_log_prod_valor_unit|scaled_pop|scaled_pib|
+--------------+------------+---+---+---+--------+--------+--------------+-------------------+-------------+------------------+-------------------+---------------------+--------------------------+----------+----------+
|             0|           0|  0|  0|  0|       0|       0|             0|                  0|            0|                 0|                  0|                    0|                         0|         0|         0|
+--------------+------------+---+---+---+--------+--------+--------------+-------------------+-------------+----------------

                                                                                

In [100]:
# Cria uma base de dados 'projeto' e salva no Spark Warehouse
spark.sql("CREATE DATABASE IF NOT EXISTS projeto")
spark.sql("USE projeto")
df.write.mode("overwrite").saveAsTable("notas_fiscais")

24/05/27 17:38:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:38:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:38:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:38:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:38:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:38:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:38:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:38:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:38:28 WARN RowBasedKeyValueBatch: Calling spill() on

AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Couldnt obtain a new sequence (unique id) : Container 384 not found.)

In [101]:
df.show()

24/05/27 17:50:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:50:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:50:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:50:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:50:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:50:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/27 17:50:44 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+---------------+--------------+--------------+--------+--------------+--------------+--------+-------------------+------------+------------+--------------------+--------+----+-----------+--------+--------------+-----------------+----------------+----------------------+-----------------+-----+-------+---------+------------+------------+-------------+------------------+-------------------+------------------+-------------------+---------------------+--------------------------+-------------------+-------------------+
|nf_numero|nf_data_emissao|nf_valor_total|     emit_cnpj|emit_cep|emit_municipio|     dest_cnpj|dest_cep|     dest_municipio|prod_nr_item|    prod_cod|           prod_desc|prod_ncm| ano|nome_regiao|sigla_uf|nome_municipio|  nome_mesoregiao|nome_microregiao|tipologia_rural_urbana|hierarquia_urbana|  pop|    pib| pop_meso|    pib_meso|prod_unid_nn|prod_quant_nn|prod_valor_unit_nn|prod_valor_total_nn|    log_prod_quant|log_prod_valor_unit|scaled_log_prod_quant|scaled_l

In [102]:
spark.stop()