# **Exploração de dados com Apache Spark**

## **Importação da configuração inicial e bibliotecas**

In [None]:
import pandas as pd
import findspark
findspark.init('/home/user/Desktop/Tcc/spark-test/spark')

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions 
from pyspark.sql.functions import lit

In [None]:
class SettingsSpark:
    
    def __init__(self):
        self.__init_conf__()
        self.sc = self.builder_conf().sparkContext
        self.log_level("OFF")
        
    
    def __init_conf__(self):
        conf = ( 
            SparkConf().setAppName("spark")
        )
        return conf
    
    def builder_conf(self):
        return SparkSession.builder.config(conf=self.__init_conf__()).getOrCreate()
    
    def init_minio(self, acess_key=None, secret_key=None, endpoint=None):
        self.acess_key = acess_key or "admin"
        self.secret_key = secret_key or "e7bc4dc8-3abf-4187-bcc8-5a4bc8dc32e1"
        self.endpoint = endpoint or "https://api.minio.magaiver.dev"
        
  
        
        self.sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", self.acess_key)
        self.sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", self.secret_key)
        self.sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", self.endpoint)
        self.sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
        self.sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")
        self.sc._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "2000")
        self.sc._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "2000")
        self.sc._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "2")
        self.sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled","false")
    
    def stop(self):
        self.sc.stop()
    
    def log_level(self, level=None):
        lev = level or "OFF"
        self.sc.setLogLevel(lev)

In [None]:
def drop_columns_matches(dataframe=None, patterns=[]):
    df = dataframe.toPandas()
    for pattern in patterns:
        df = df.loc[:,~df.columns.str.contains(pattern)]
    return df

## **Configuração spark**

In [None]:
spark_settings = SettingsSpark()
#spark_settings.log_level("DEBUG")
spark_settings.init_minio()
spark = spark_settings.builder_conf()

## **Carregamento dos arquivos para exploração e limpeza**

In [None]:
NCM = spark.read.csv("s3a://bronze/NCM.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
NCM_SH = spark.read.csv("s3a://bronze/NCM_SH.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
NBM_NCM = spark.read.csv("s3a://bronze/NBM_NCM.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
NCM_UNIDADE = spark.read.csv("s3a://bronze/NCM_UNIDADE.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
NCM_CGCE = spark.read.csv("s3a://bronze/NCM_CGCE.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
NCM_CUCI = spark.read.csv("s3a://bronze/NCM_CUCI.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
NCM_FAT_AGREG = spark.read.csv("s3a://bronze/NCM_FAT_AGREG.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
NCM_ISIC = spark.read.csv("s3a://bronze/NCM_ISIC.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
NCM_PPE = spark.read.csv("s3a://bronze/NCM_PPE.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
NCM_PPI = spark.read.csv("s3a://bronze/NCM_PPI.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
ISIC_CUCI = spark.read.csv("s3a://bronze/ISIC_CUCI.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
PAIS_BLOCO = spark.read.csv("s3a://bronze/PAIS_BLOCO.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
PAIS = spark.read.csv("s3a://bronze/PAIS.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
UF_MUN = spark.read.csv("s3a://bronze/UF_MUN.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
UF = spark.read.csv("s3a://bronze/UF.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
URF = spark.read.csv("s3a://bronze/URF.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")
VIA = spark.read.csv("s3a://bronze/VIA.csv", sep=';', inferSchema=True, header=True, encoding="ISO-8859-1")


## **Visualização dos dados brutos**

In [None]:
"""

#NCM.printSchema()
#NCM.describe().show()
print('NCM Dataset')
display(NCM.toPandas())

#NCM_SH.printSchema()
#NCM_SH.describe().show()
print('NCM_SH Dataset')
display(NCM_SH.toPandas())


#NCM_CGCE.printSchema()
#NCM_CGCE.describe().show()
print('NCM_CGCE Dataset')
display(NCM_CGCE.toPandas())#show(truncate=False)

#NCM_CUCI.printSchema()
#NCM_CUCI.describe().show()
print('NCM_CUCI Dataset')
display(NCM_CUCI.toPandas())#show(truncate=False)

#NCM_ISIC.printSchema()
#NCM_ISIC.describe().show()
print('NCM_ISIC Dataset')
display(NCM_ISIC.toPandas())#show(truncate=False)

#NCM_FAT_AGREG.printSchema()
#NCM_FAT_AGREG.describe().show()
print('NCM_FAT_AGREG Dataset')
NCM_FAT_AGREG.show(truncate=False)

#NBM_NCM.printSchema()
#NBM_NCM.describe().show()
print('NCM_NBM Dataset')
NBM_NCM.show(truncate=False)

#NCM_UNIDADE.printSchema()
#NCM_UNIDADE.describe().show()
print('NCM_UNIDADE Dataset')
NCM_UNIDADE.show(truncate=False)

#NCM_PPE.printSchema()
#NCM_PPE.describe().show()
print('NCM_PPE Dataset')
NCM_PPE.show(truncate=False)

#NCM_PPI.printSchema()
#NCM_PPI.describe().show()
print('NCM_PPI Dataset')
NCM_PPI.show(truncate=False)

#ISIC_CUCI.printSchema()
#ISIC_CUCI.describe().show()
print('ISIC_CUCI Dataset')
ISIC_CUCI.show(truncate=False)

print('PAIS_BLOCO Dataset')
PAIS_BLOCO.printSchema()
PAIS_BLOCO.describe().show()
PAIS_BLOCO.show(truncate=False)

print('PAIS Dataset')
PAIS.printSchema()
PAIS.describe().show()
PAIS.show(truncate=False)

UF_MUN.printSchema()
UF_MUN.describe().show()
UF_MUN.show(truncate=False)

URF.printSchema()
URF.describe().show()
URF.show(truncate=False)

VIA.printSchema()
VIA.describe().show()
VIA.show(truncate=False)

#spark_settings.stop()
"""

## **Normalização**

### **Normalização, exclusão de atributos de internacionalização**

In [None]:
# CO_ANO|CO_MES|  CO_NCM|CO_UNID|CO_PAIS|SG_UF_NCM|CO_VIA| CO_URF|QT_ESTAT|KG_LIQUIDO|VL_FOB|TYPE|

print("NCM",NCM.columns)
print("NCM_CUCI",NCM_CUCI.columns)
print("NCM_SH",NCM_SH.columns)
print("NCM_CGCE",NCM_CGCE.columns)
print("NCM_ISIC",NCM_ISIC.columns)
print("NCM_FAT_AGREG",NCM_FAT_AGREG.columns)
print("NBM_NCM",NBM_NCM.columns)
print("NCM_UNIDADE",NCM_UNIDADE.columns)
print("NCM_PPE",NCM_PPE.columns)
print("NCM_PPI",NCM_PPI.columns)
print("ISIC_CUCI",ISIC_CUCI.columns)
print("UF_MUN",UF_MUN.columns)
print("URF",URF.columns)
print("VIA",VIA.columns)
print("PAIS",PAIS.columns)
print("PAIS_BLOCO",PAIS_BLOCO.columns)

In [None]:
NCM = spark.createDataFrame(drop_columns_matches(dataframe=NCM, patterns=["_ESP","_ING"]))
NCM.show()

NCM_CUCI= spark.createDataFrame(drop_columns_matches(dataframe=NCM_CUCI, patterns=["_ESP","_ING"]))
NCM_CUCI.show()

NCM_SH= spark.createDataFrame(drop_columns_matches(dataframe=NCM_SH, patterns=["_ESP","_ING"]))
NCM_SH.show()

NCM_CGCE= spark.createDataFrame(drop_columns_matches(dataframe=NCM_CGCE, patterns=["_ESP","_ING"]))
NCM_CGCE.show()

NCM_ISIC= spark.createDataFrame(drop_columns_matches(dataframe=NCM_ISIC, patterns=["_ESP","_ING"]))
NCM_ISIC.show()

NCM_FAT_AGREG= spark.createDataFrame(drop_columns_matches(dataframe=NCM_FAT_AGREG, patterns=["_ESP","_ING"]))
NCM_FAT_AGREG.show()

NCM_UNIDADE= spark.createDataFrame(drop_columns_matches(dataframe=NCM_UNIDADE, patterns=["_ESP","_ING"]))
NCM_UNIDADE.show()

NCM_PPE= spark.createDataFrame(drop_columns_matches(dataframe=NCM_PPE, patterns=["_ESP","_ING"]))
NCM_PPE.show()

NCM_PPI= spark.createDataFrame(drop_columns_matches(dataframe=NCM_PPI, patterns=["_ESP","_ING"]))
NCM_PPI.show()

ISIC_CUCI= spark.createDataFrame(drop_columns_matches(dataframe=ISIC_CUCI, patterns=["_ESP","_ING"]))
ISIC_CUCI.show()

UF_MUN= spark.createDataFrame(drop_columns_matches(dataframe=UF_MUN, patterns=["_ESP","_ING"]))
UF_MUN.show()

URF= spark.createDataFrame(drop_columns_matches(dataframe=URF, patterns=["_ESP","_ING"]))
URF.show()

VIA= spark.createDataFrame(drop_columns_matches(dataframe=VIA, patterns=["_ESP","_ING"]))
VIA.show()

PAIS= spark.createDataFrame(drop_columns_matches(dataframe=PAIS, patterns=["_ESP","_ING"]))
PAIS.show()

PAIS_BLOCO= spark.createDataFrame(drop_columns_matches(dataframe=PAIS_BLOCO, patterns=["_ESP","_ING"]))
PAIS_BLOCO.show()


#NCM = NCM.drop(NCM.filter(like='ING').columns, axis=1)
#NCM = NCM.drop(NCM.filter(like='ESP').columns, axis=1)

#NCM_SH.createOrReplaceTempView("NCM_SH")

#NCM_SH_PTBR = spark.sql("select CO_SH6, NO_SH6_POR as NO_SH6, CO_SH4, NO_SH4_POR as NO_SH4, CO_SH2, NO_SH2_POR as NO_SH2, CO_NCM_SECROM, NO_SEC_POR as NO_SEC FROM NCM_SH ")
#NCM_SH_PTBR.show(20, False)

#NCM_SH_ESP = NCM_SH[['CO_SH6', 'NO_SH6_ESP', 'CO_SH4', 'NO_SH4_ESP', 'CO_SH2', 'NO_SH2_ESP', 'CO_NCM_SECROM', 'NO_SEC_ESP']]
#NCM_SH_ESP.show()

#NCM_SH_ING = NCM_SH[['CO_SH6', 'NO_SH6_ING', 'CO_SH4', 'NO_SH4_ING', 'CO_SH2', 'NO_SH2_ING', 'CO_NCM_SECROM', 'NO_SEC_ING']]
#NCM_SH_ING.show()


### **Junção dos arquivos NCM**

In [None]:
NCM.createOrReplaceTempView("NCM")
NCM_SH.createOrReplaceTempView("SH")
NCM_CUCI.createOrReplaceTempView("CUCI")
NCM_CGCE.createOrReplaceTempView("CGCE")
NCM_ISIC.createOrReplaceTempView("ISIC")
NCM_FAT_AGREG.createOrReplaceTempView("FAT_AGREG")
NBM_NCM.createOrReplaceTempView("NBM")
NCM_UNIDADE.createOrReplaceTempView("UNIDADE")
NCM_PPE.createOrReplaceTempView("PPE")
NCM_PPI.createOrReplaceTempView("PPI")
ISIC_CUCI.createOrReplaceTempView("ISIC_CUCI")
UF_MUN.createOrReplaceTempView("UF_MUN")
URF.createOrReplaceTempView("URF")
PAIS.createOrReplaceTempView("PAIS")
VIA.createOrReplaceTempView("VIA")
PAIS_BLOCO.createOrReplaceTempView("PAIS_BLOCO")

NCM_FULL_JOIN = spark.sql(
        """ 
            SELECT N.CO_NCM,
                   N.NO_NCM_POR as NO_NCM,
                   S.CO_SH6,
                   S.NO_SH2_POR,
                   S.NO_SH4_POR,
                   S.NO_SH6_POR AS NO_SH6,
                   lower(F.NO_FAT_AGREG) as NO_FAT_AGREG,
                   C.NO_CUCI_ITEM,
                   P.CO_PPE,
                   P.NO_PPE_MIN,
                   PI.CO_PPI,
                   PI.NO_PPI_MIN,
                   I.NO_ISIC_CLASSE
                   
              FROM NCM N
             INNER JOIN SH S
                ON N.CO_SH6 = S.CO_SH6
             INNER JOIN FAT_AGREG F
                ON N.CO_FAT_AGREG = F.CO_FAT_AGREG
             INNER JOIN CUCI C
                ON N.CO_CUCI_ITEM = C.CO_CUCI_ITEM
             INNER JOIN PPE P 
                ON N.CO_PPE = P.CO_PPE
             INNER JOIN PPI PI
                ON N.CO_PPI = PI.CO_PPI
             INNER JOIN ISIC I 
                ON N.CO_ISIC_CLASSE = I.CO_ISIC_CLASSE
         """)
               
NCM_FULL_JOIN.toPandas()

### **Gravação dos arquivos na camada silver**
---
Arquivos gravados e convertidos para forma parquet, compressão e redução do tamanho do arquivo

In [None]:
NCM_FULL_JOIN.write.parquet("s3a://silver/NCM_FULL_JOIN.parquet", mode="overwrite")

In [None]:


NCM.write.parquet("s3a://silver/NCM.parquet", mode="overwrite")
NCM_SH.write.parquet("s3a://silver/NCM_SH.parquet", mode="overwrite")
NCM_CUCI.write.parquet("s3a://silver/NCM_CUCI.parquet", mode="overwrite")
NCM_CGCE.write.parquet("s3a://silver/NCM_CGCE.parquet", mode="overwrite")
NCM_ISIC.write.parquet("s3a://silver/NCM_ISIC.parquet" , mode="overwrite")
NCM_FAT_AGREG.write.parquet("s3a://silver/NCM_FAT_AGREG.parquet", mode="overwrite")
NBM_NCM.write.parquet("s3a://silver/NBM_NCM.parquet" , mode="overwrite")
NCM_UNIDADE.write.parquet("s3a://silver/NCM_UNIDADE.parquet", mode="overwrite")
NCM_CUCI.write.parquet("s3a://silver/NCM_CUCI.parquet" , mode="overwrite")
NCM_PPE.write.parquet("s3a://silver/NCM_PPE.parquet", mode="overwrite")
NCM_PPI.write.parquet("s3a://silver/NCM_PPI.parquet" , mode="overwrite")
ISIC_CUCI.write.parquet("s3a://silver/ISIC_CUCI.parquet" , mode="overwrite")
UF_MUN.write.parquet("s3a://silver/UF_MUN.parquet", mode="overwrite")
PAIS.write.parquet("s3a://silver/PAIS.parquet" , mode="overwrite")
VIA.write.parquet("s3a://silver/VIA.parquet", mode="overwrite")
PAIS_BLOCO.write.parquet("s3a://silver/PAIS_BLOCO.parquet", mode="overwrite")


In [16]:
NCM.write.csv("s3a://silver/NCM.csv", mode="overwrite")
NCM_SH.write.csv("s3a://silver/NCM_SH.csv", mode="overwrite")
NCM_CUCI.write.csv("s3a://silver/NCM_CUCI.csv", mode="overwrite")
NCM_CGCE.write.csv("s3a://silver/NCM_CGCE.csv", mode="overwrite")
NCM_ISIC.write.csv("s3a://silver/NCM_ISIC.csv" , mode="overwrite")
NCM_FAT_AGREG.write.csv("s3a://silver/NCM_FAT_AGREG.csv", mode="overwrite")
NBM_NCM.write.csv("s3a://silver/NBM_NCM.csv" , mode="overwrite")
NCM_UNIDADE.write.csv("s3a://silver/NCM_UNIDADE.csv", mode="overwrite")
NCM_CUCI.write.csv("s3a://silver/NCM_CUCI.csv" , mode="overwrite")
NCM_PPE.write.csv("s3a://silver/NCM_PPE.csv", mode="overwrite")
NCM_PPI.write.csv("s3a://silver/NCM_PPI.csv" , mode="overwrite")
ISIC_CUCI.write.csv("s3a://silver/ISIC_CUCI.csv" , mode="overwrite")
UF_MUN.write.csv("s3a://silver/UF_MUN.csv", mode="overwrite")
PAIS.write.csv("s3a://silver/PAIS.csv" , mode="overwrite")
VIA.write.csv("s3a://silver/VIA.csv", mode="overwrite")
PAIS_BLOCO.write.csv("s3a://silver/PAIS_BLOCO.parquet", mode="overwrite")



### **Finalização das instancias do Spark e liberação da memória**

In [None]:
spark_settings.stop()

## **Carregamento dos datasets completos da balança comercial dados de Importação e Exportação** 

### **Carregamento**

In [None]:
EXP_COMPLETA = spark.read.csv("s3a://bronze/EXP_COMPLETA.csv", sep=';', inferSchema=True, header=True)
IMP_COMPLETA = spark.read.csv("s3a://bronze/IMP_COMPLETA.csv", sep=';', inferSchema=True, header=True)

EXP_COMPLETA.printSchema()


### **Harmonização das colunas**

In [None]:
EXP_COMPLETA = EXP_COMPLETA.withColumn("VL_FRETE",lit("0")) \
                           .withColumn("VL_SEGURO",lit("0")) \
                           .withColumn("TYPE",lit("EXP"))
        
IMP_COMPLETA = IMP_COMPLETA.withColumn("TYPE",lit("IMP"))

#EXP_COMPLETA.printSchema()
EXP_COMPLETA.show(10)
#IMP_COMPLETA.printSchema()
IMP_COMPLETA.show(10)

IMP_COMPLETA.printSchema()


### **Reescrita dos dados harmonizados e ajustados no fomato csv**

In [None]:
EXP_COMPLETA.write.csv("s3a://silver/EXP_COMPLETA.csv", mode="overwrite")
IMP_COMPLETA.write.csv("s3a://silver/IMP_COMPLETA.csv", mode="overwrite")

In [None]:
EXP_COMPLETA.write.parquet("file:///home/user/Desktop/Tcc-data/silver/EXP_COMPLETA.parquet", mode="overwrite")
IMP_COMPLETA.write.parquet("file:///home/user/Desktop/Tcc-data/silver/IMP_COMPLETA.csv", mode="overwrite")

In [None]:
EXP_COMPLETA.write.parquet("s3a://silver/EXP_COMPLETA.parquet", mode="overwrite")
IMP_COMPLETA.write.parquet("s3a://silver/IMP_COMPLETA.csv", mode="overwrite")

### **Junção dos datasets de exportação e importação**

In [None]:
df_exp_imp_full = EXP_COMPLETA.union(IMP_COMPLETA).distinct()
df_exp_imp_full.show()

In [None]:
#df_exp_imp_full.count()
spark_settings.stop()

In [None]:
df_exp_imp_full.write.csv("s3a://silver/EXP_IMP_FULL.csv", mode="overwrite")

### **Escrita dos dados exportação e importação após a junção no formato parquet**

In [None]:
df_exp_imp_full.write.parquet("s3a://silver/EXP_IMP_FULL.parquet", mode="overwrite")

In [None]:
EXP_IMP_FULL = spark.read.parquet("s3a://silver/EXP_IMP_FULL.parquet", sep=';', inferSchema=True, header=True)
EXP_IMP_FULL.show()

In [None]:
EXP_IMP_FULL.summary().show()