<a href="https://colab.research.google.com/github/claudioalvesmonteiro/machinelearning-CNPJ/blob/master/pre-processamento/Extracao_e_Pre_Processamento.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

   # Projeto de Análise de Cŕedito CESAR School
   
   ## Extração de Dados e Pré-processamento
   
   Claudio Alves Monteiro
   
   Marcos Antonio Almeida Souto Júnior
   
   Virgínia Heimann
   
   Kayo Renato da Silva Nascimento
   
   Rosely Cabral

## Configurar SPARK

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://mirror.nbtelecom.com.br/apache/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
# import modules
import os
import pandas as pd
from pyspark.sql import functions as SF
import pyspark.sql.types as ST

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
# import data
path = 'drive/My Drive/Colab Notebooks/ProjetoCESARSchool/codes/data/'

df = spark.read.csv(path+'jur.csv',
                    sep='\t',
                    encoding='utf-8',
                    header=True,
                    inferSchema=False)


df_cursor = spark.read.csv(path+'Base_Des-TRN',
                    sep='\t',
                    encoding='utf-8',
                    header=True,
                    inferSchema=False)


In [0]:
# funcao para contagem de valores categoricos
def countGenTab(df, column):
    # groupby, count and sort pyspark
    tab = df.select(column).groupby(column).count().alias('count').sort('count', ascending=False)
    # transform to pandas df
    tab = tab.toPandas()
    # calculate proportion
    tab['prop'] = round(tab['count']/sum(tab['count'])*100, 2)
    return tab
  
  
countGenTab(df_cursor, 'CS_ALVO')

Unnamed: 0,CS_ALVO,count,prop
0,0,57856,88.64
1,1,7418,11.36


In [0]:
clientes = df_cursor.count()
clientes

65274

In [0]:
taxa_inad = df_cursor.where(SF.col('CS_ALVO')==1).count()/clientes*100
taxa_inad

11.364402365413488

## Tratamento de Dados

In [0]:
#------ tratar inicio do CNPJ sem o 0

@SF.udf('string')
def fill_cnpj(value):
    aux = '00000000000000'+value
    return aux[len(value):]

#------ tratar inicio do CEP sem o 0

@SF.udf('string')
def fill_cep(value):
    aux = '00000000'+value
    return aux[len(value):]

In [0]:
df_cursor = df_cursor.withColumn('CNPJ',fill_cnpj('CNPJ'))
df = df.withColumn('CNPJ',fill_cnpj('CNPJ'))
df = df.withColumn('CEP',fill_cep('CEP'))


In [0]:
#------ Convertendo REF_DATE do df_cursor para timestamp

df_cursor = df_cursor.withColumn('REF_DATE',SF.from_unixtime(
            SF.unix_timestamp('DATA_REF','yyyy-MM')).cast('timestamp'))

#------ Convertendo REF_DATE do df para timestamp

df = df.withColumn('REF_DATE',SF.col('REF_DATE').cast('timestamp'))

In [0]:
# renomear colunas

df = df.withColumnRenamed('CNPJ','CNPJ_DF').withColumnRenamed('REF_DATE','REF_DATE_DF')

In [0]:
#------ Criando o df_base. O df_base é construido a partir do cursor. Todas as entradas do cursor sao preservadas 
# e e feito um join com as entradas do df que correspondem à condição dupla: mesmo "CNPJ" e "Data de Referencia" do df
# anterior a "Data de Referencia" do cursor  

df_base = df_cursor.join(df,
                 (df_cursor['CNPJ']==df['CNPJ_DF']) & 
                 (df_cursor['REF_DATE'] >= df['REF_DATE_DF']),
                 'left')

## 1. Quantidade de Sócios

In [0]:
df_aux = df_base.select('CNPJ','REF_DATE','NOME_SOCIO').dropDuplicates().\
            groupBy('CNPJ','REF_DATE').agg(SF.count('NOME_SOCIO').alias('QTD_SOCIOS')).sort(SF.desc('QTD_SOCIOS'))

In [0]:
df_final = df_cursor.join(df_aux, ['CNPJ','REF_DATE'], 'left')

## 2. Natureza Jurídica

In [0]:
df_aux = df_base.select('CNPJ','REF_DATE','NATUREZA_JURIDICA').dropDuplicates()

In [0]:
df_final = df_final.join(df_aux,['CNPJ','REF_DATE'], 'left' )

## 3. Idade da Empresa

In [0]:
# aqui estamos incluindo a idade em segundos (como no final vamos precisar normalizar, não faz diferença)
df_aux = df_base.select('CNPJ','REF_DATE','ABERTURA').dropDuplicates()
df_aux = df_aux.withColumn('ABERTURA',SF.col('ABERTURA').cast('timestamp'))
df_aux = df_aux.withColumn('IDADE',SF.col('REF_DATE').cast("long")-SF.col('ABERTURA').cast("long")).drop('ABERTURA')

In [0]:
df_final = df_final.join(df_aux,['CNPJ','REF_DATE'], 'left' )

## 4. Capital Social

In [0]:
df_aux = df_base.select('CNPJ','REF_DATE','CAPITAL_SOCIAL').dropDuplicates()

In [0]:
df_final = df_final.join(df_aux,['CNPJ','REF_DATE'], 'left' )

## 5. Tipo (Matriz ou Filial)

In [0]:
df_aux = df_base.select('CNPJ','REF_DATE','TIPO').dropDuplicates()
df_final = df_final.join(df_aux,['CNPJ','REF_DATE'], 'left' )

## 6. Situação 

In [0]:
df_aux = df_base.select('CNPJ','REF_DATE','SITUACAO').dropDuplicates().\
            groupBy('CNPJ','SITUACAO').agg(SF.max('REF_DATE').alias('REF_DATE'))



In [0]:
df_final = df_final.join(df_aux,['CNPJ','REF_DATE'], 'left' )

## 7. Atividade Principal

In [0]:
df_aux = df_base.select('CNPJ','REF_DATE','ATIVIDADE_PRINCIPAL').dropDuplicates()
df_final = df_final.join(df_aux,['CNPJ','REF_DATE'], 'left' )

## 8 e 9. Unidade Federativa e Município

In [0]:
df_final = df_final.drop('UF','MUNICIPIO')

In [0]:
# import data

df_ddd = spark.read.csv(path+'externos/DDD-Cidade-Estado.csv',
                    sep=';',
                    encoding='latin1',
                    header=True,
                    inferSchema=False)

In [0]:
#------ remove_acentos
from unicodedata import normalize
  
def remover_acentos(txt):
    if txt is None: 
      return ""
    else:
      return normalize('NFKD', txt).encode('ASCII', 'ignore').decode('ASCII')

if __name__ == '__main__':
    from doctest import testmod
    testmod()
    
#------ tratar nomes de cidades

@SF.udf('string')
def trata_municipio(value, UF):
    if value == 'AMPARO DA SERRA':
      return 'AMPARO DO SERRA'
    if value == 'ASSU':
      return 'ACU'
    if value == 'BALNEARIO DE PICARRAS':
      return 'BALNEARIO PICARRAS'
    if value == 'BELEM DE SAO FRANCISCO':
      return 'BELEM DO SAO FRANCISCO'
    if (value == 'BOM JESUS')&(UF=='GO'):
      return 'BOM JESUS DE GOIAS'
    if value == 'BRASOPOLIS':
      return 'BRAZOPOLIS'
    if (value == 'CAMPO GRANDE')&(UF=='RN'):
      return 'AUGUSTO SEVERO'
    if value == 'DIAS D:AVILA':
      return "DIAS D'AVILA"
    if value == 'DONA EUZEBIA':
      return "DONA EUSEBIA"
    if value == 'ELDORADO DOS CARAJAS':
      return "ELDORADO DO CARAJAS"
    if value == 'FLORINEA':
      return "FLORINIA"
    if value == 'MOGI-GUACU':
      return "MOGI GUACU"
    if value == 'OLHO D:AGUA DAS CUNHAS':
      return "OLHO D'AGUA DAS CUNHAS"
    if value == 'OLHO D:AGUA DAS FLORES':
      return "OLHO D'AGUA DAS FLORES"
    if value == 'SAO TOME DAS LETRAS':
      return "SAO THOME DAS LETRAS"
    if value == 'SANTA ISABEL DO PARA':
      return "SANTA IZABEL DO PARA"
    if value == 'SAO LUIZ DO PARAITINGA':
      return "SAO LUIS DO PARAITINGA"
    if value == 'SANTANA DO LIVRAMENTO':
      return "SANT'ANA DO LIVRAMENTO"
    if value == "PINGO D'AGUA":
      return "PINGO-D'AGUA"
    if value == "POXOREO":
      return "POXOREU"
    if value == "SANTA BARBARA D:OESTE":
      return "SANTA BARBARA D'OESTE"
    return value 
  
@SF.udf('string')
def codigo_area(TELEFONE):
  if TELEFONE is None: 
    return ""
  else:
    return (TELEFONE[1]+TELEFONE[2])

In [0]:
df_aux = df_base.select('CNPJ','REF_DATE','UF','MUNICIPIO','TELEFONE_1').dropDuplicates()

In [0]:
df_aux = df_aux.withColumn('UF_TEL', codigo_area(SF.col('TELEFONE_1')))

In [0]:
df_aux = df_aux.join(df_ddd, df_ddd['CODIGO']==df_aux['UF_TEL'], 'left')

In [0]:
df_aux = df_aux.toPandas()
df_aux['CIDADE'] = df_aux['CIDADE'].apply(lambda x: remover_acentos(x).upper())
df_aux = spark.createDataFrame(df_aux)


In [0]:
df_aux.show(10, False)

+--------------+-------------------+---+------------------+--------------+------+------+---------+------+
|CNPJ          |REF_DATE           |UF |MUNICIPIO         |TELEFONE_1    |UF_TEL|CODIGO|CIDADE   |ESTADO|
+--------------+-------------------+---+------------------+--------------+------+------+---------+------+
|00231887000129|2017-05-01 00:00:00|BA |FEIRA DE SANTANA  |null          |      |null  |         |null  |
|00256837000104|2017-01-01 00:00:00|SP |FRANCA            |null          |      |null  |         |null  |
|00501368000133|2016-09-01 00:00:00|SP |JUNDIAI           |null          |      |null  |         |null  |
|00560191000146|2017-01-01 00:00:00|SP |SAO PAULO         |(11) 2288-1105|11    |11    |SAO PAULO|SP    |
|00642972000180|2017-06-01 00:00:00|RJ |RIO DE JANEIRO    |null          |      |null  |         |null  |
|00663069000103|2016-07-01 00:00:00|SP |SAO CAETANO DO SUL|null          |      |null  |         |null  |
|00691020000156|2017-05-01 00:00:00|PE |RECIFE

In [0]:
df_aux = df_aux.withColumn('UF', SF.when(SF.col("UF").isNull(), SF.col('ESTADO')).otherwise(SF.col('UF')))
df_aux = df_aux.withColumn('MUNICIPIO', SF.when(SF.col("MUNICIPIO").isNull(), SF.col('CIDADE')).otherwise(SF.col('MUNICIPIO')))

In [0]:
df_final = df_final.join(df_aux,['CNPJ','REF_DATE'], 'left' ).drop('CIDADE','ESTADO','TELEFONE_1','UF_TEL','CODIGO')

In [0]:
df_final = df_final.withColumn('MUNICIPIO', trata_municipio(SF.col('MUNICIPIO'), SF.col('UF')))

## 10. População do Município

In [0]:
# import data

df_municipio = spark.read.csv(path+'externos/municipio_minilake.csv',
                    encoding='UTF-8',
                    header=True,
                    inferSchema=True)

df_municipio.show(5)

+----------+---+---------+------------+--------------+-------------------------+----------------------+----------------------------+--------------------+-----+----------+------------------+--------------+-----------------------+--------------------+---------+
|code_muni2| UF|code_muni|dist_capital|area_municipio|taxa_atividade_18anosmais|porcent_pop_saneamento|porcent_mulheres10a17_filhos|mortalidade_infantil| IDHM|IDHM_renda|taxa_analfabetismo|porcent_pobres|municipio_metropolitano|      municipio_nome|populacao|
+----------+---+---------+------------+--------------+-------------------------+----------------------+----------------------------+--------------------+-----+----------+------------------+--------------+-----------------------+--------------------+---------+
|   1100015| RO|   110001|      409.34|      7067.025|                    60.43|                  80.2|                         2.6|                23.8|0.641|     0.657|              1.22|         26.04|                

In [0]:
df_aux = df_municipio

df_final.show(5)

+--------------+-------------------+--------+-------+----------+--------------------+---------+--------------+------+--------+--------------------+---+--------------+
|          CNPJ|           REF_DATE|DATA_REF|CS_ALVO|QTD_SOCIOS|   NATUREZA_JURIDICA|    IDADE|CAPITAL_SOCIAL|  TIPO|SITUACAO| ATIVIDADE_PRINCIPAL| UF|     MUNICIPIO|
+--------------+-------------------+--------+-------+----------+--------------------+---------+--------------+------+--------+--------------------+---+--------------+
|00135548000149|2017-03-01 00:00:00| 2017-03|      0|         2|SOCIEDADE EMPRESA...|712281600|           0.0|MATRIZ|   ATIVA|LANCHONETES, CASA...| PR|      CURITIBA|
|00144781000198|2016-12-01 00:00:00| 2016-12|      0|         2|SOCIEDADE EMPRESA...|705369600|         500.0|MATRIZ|   ATIVA|RESTAURANTES E SI...| SP|         BAURU|
|00234446000180|2016-11-01 00:00:00| 2016-11|      0|         2|SOCIEDADE EMPRESA...|696643200|        3000.0|MATRIZ|   ATIVA|LANCHONETES, CASA...| SP|     SAO PAULO

In [0]:
df_aux = df_aux.toPandas()
df_aux['municipio_nome'] = df_aux['municipio_nome'].apply(lambda x: remover_acentos(x).upper())
df_aux = spark.createDataFrame(df_aux)
df_aux.show(10, False)

+----------+---+---------+------------+--------------+-------------------------+----------------------+----------------------------+--------------------+-----+----------+------------------+--------------+-----------------------+---------------------+---------+
|code_muni2|UF |code_muni|dist_capital|area_municipio|taxa_atividade_18anosmais|porcent_pop_saneamento|porcent_mulheres10a17_filhos|mortalidade_infantil|IDHM |IDHM_renda|taxa_analfabetismo|porcent_pobres|municipio_metropolitano|municipio_nome       |populacao|
+----------+---+---------+------------+--------------+-------------------------+----------------------+----------------------------+--------------------+-----+----------+------------------+--------------+-----------------------+---------------------+---------+
|1100015   |RO |110001   |409.34      |7067.025      |60.43                    |80.2                  |2.6                         |23.8                |0.641|0.657     |1.22              |26.04         |0            

In [0]:
df_final = df_final.withColumn('MUNICIPIO_UF', SF.concat(SF.col('MUNICIPIO'), SF.lit(" - "), SF.col('UF')))
df_aux = df_aux.withColumn('municipio_nome_uf', SF.concat(SF.col('municipio_nome'), SF.lit(" - "), SF.col('UF'))).drop('UF')

df_aux.show(20, False)

+----------+---------+------------+--------------+-------------------------+----------------------+----------------------------+--------------------+-----+----------+------------------+--------------+-----------------------+------------------------+----------+-----------------------------+
|code_muni2|code_muni|dist_capital|area_municipio|taxa_atividade_18anosmais|porcent_pop_saneamento|porcent_mulheres10a17_filhos|mortalidade_infantil|IDHM |IDHM_renda|taxa_analfabetismo|porcent_pobres|municipio_metropolitano|municipio_nome          |populacao |municipio_nome_uf            |
+----------+---------+------------+--------------+-------------------------+----------------------+----------------------------+--------------------+-----+----------+------------------+--------------+-----------------------+------------------------+----------+-----------------------------+
|1100015   |110001   |409.34      |7067.025      |60.43                    |80.2                  |2.6                         

In [0]:
df_aux.where(df_aux.municipio_nome_uf == "ARAGARCAS - GO").show()

+----------+---------+------------+--------------+-------------------------+----------------------+----------------------------+--------------------+-----+----------+------------------+--------------+-----------------------+--------------+---------+-----------------+
|code_muni2|code_muni|dist_capital|area_municipio|taxa_atividade_18anosmais|porcent_pop_saneamento|porcent_mulheres10a17_filhos|mortalidade_infantil| IDHM|IDHM_renda|taxa_analfabetismo|porcent_pobres|municipio_metropolitano|municipio_nome|populacao|municipio_nome_uf|
+----------+---------+------------+--------------+-------------------------+----------------------+----------------------------+--------------------+-----+----------+------------------+--------------+-----------------------+--------------+---------+-----------------+
|   5201702|   520170|      331.45|        662.89|                    64.86|                 91.95|                        4.58|                13.4|0.732|     0.694|              1.51|           

In [0]:
df_finalx = df_final.join(df_aux,
                 df_final['MUNICIPIO_UF']==df_aux['municipio_nome_uf'], 
                 'left')

df_finalx.show(5)

+--------------+-------------------+--------+-------+----------+--------------------+---------+--------------+------+--------+--------------------+---+-----------+----------------+----------+---------+------------+--------------+-------------------------+----------------------+----------------------------+--------------------+-----+----------+------------------+--------------+-----------------------+--------------+---------+-----------------+
|          CNPJ|           REF_DATE|DATA_REF|CS_ALVO|QTD_SOCIOS|   NATUREZA_JURIDICA|    IDADE|CAPITAL_SOCIAL|  TIPO|SITUACAO| ATIVIDADE_PRINCIPAL| UF|  MUNICIPIO|    MUNICIPIO_UF|code_muni2|code_muni|dist_capital|area_municipio|taxa_atividade_18anosmais|porcent_pop_saneamento|porcent_mulheres10a17_filhos|mortalidade_infantil| IDHM|IDHM_renda|taxa_analfabetismo|porcent_pobres|municipio_metropolitano|municipio_nome|populacao|municipio_nome_uf|
+--------------+-------------------+--------+-------+----------+--------------------+---------+-----------

In [0]:
df_finalx = df_finalx.filter(df_finalx.porcent_pop_saneamento.isNotNull())
df_finalx = df_finalx.filter(df_finalx.populacao.isNotNull())

df_finalx.count()

64912

In [0]:
df_output = df_finalx.toPandas()
df_output.to_csv(path+'output5.csv')