# Estabelecimentos

## Read files and create the df

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

In [2]:
spark = SparkSession \
        .builder \
        .appName("DE - Final challenge XP-E") \
        .getOrCreate()

spark.version

'3.5.1'

In [3]:
# https://homepages.dcc.ufmg.br/~pcalais/XPE/engenharia-dados/big-data-spark/desafio/

file_path = './data/de_challenge_xpe/'

In [4]:
os.listdir(file_path+'estabelecimentos/estabelecimentos/')

['estabelecimentos-1.csv', 'estabelecimentos-2.csv', 'estabelecimentos-3.csv']

In [5]:
df_estabelecimento = spark.read.csv(file_path + 'estabelecimentos/estabelecimentos', inferSchema=True, header=True, sep=";")

## Exploring the data

In [33]:
df_estabelecimento.printSchema()

root
 |-- CNPJ_BASICO: integer (nullable = true)
 |-- CNPJ_ORDEM: integer (nullable = true)
 |-- CNPJ_DV: integer (nullable = true)
 |-- IDENTIFICADOR_MATRIZ_FILIAL: integer (nullable = true)
 |-- NOME_FANTASIA: string (nullable = true)
 |-- SITUACAO_CADASTRAL: integer (nullable = true)
 |-- DATA_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- MOTIVO_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- NOME_DA_CIDADE_NO_EXTERIOR: string (nullable = true)
 |-- PAIS: integer (nullable = true)
 |-- DATA_INICIO_ATIVIDADE: integer (nullable = true)
 |-- CNAE_PRINCIPAL: integer (nullable = true)
 |-- CNAE_SECUNDARIA: string (nullable = true)
 |-- TIPO_LOGRADOURO: string (nullable = true)
 |-- LOGRADOURO: string (nullable = true)
 |-- NUMERO: string (nullable = true)
 |-- COMPLEMENTO: string (nullable = true)
 |-- BAIRRO: string (nullable = true)
 |-- CEP: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- MUNICIPIO: integer (nullable = true)
 |-- DDD_1: string (nullable = true)

In [34]:
# How many columns

len(df_estabelecimento.dtypes)

30

In [35]:
# How many int columns

total_int_cols = 0

for i in df_estabelecimento.dtypes:
    if i[1] == 'int': total_int_cols += 1

total_int_cols

12

In [36]:
# How many companies

df_estabelecimento.select('CNPJ_BASICO').count()

20996744

In [37]:
# How many distinct companies

df_estabelecimento \
    .select(concat_ws('-','CNPJ_DV','CNPJ_ORDEM','CNPJ_BASICO').alias('CNPJ_FULL')) \
    .distinct() \
    .count()

20996744

In [38]:
# Defining the view estabelecimentos

df_estabelecimento.createOrReplaceTempView('estabelecimentos')

In [39]:
# How many companies have logradouro eq null - SQL

spark.sql('SELECT COUNT(*) FROM estabelecimentos WHERE LOGRADOURO IS NULL').show()

+--------+
|count(1)|
+--------+
|     828|
+--------+



In [40]:
# How many companies have logradouro eq null - Pyspark

total_log_null = df_estabelecimento \
                .where(col('logradouro').isNull()) \
                .count()

total_log_null

828

In [41]:
# Creating the UDF is_avenida
def is_avenida(logradouro):
    if logradouro is not None:
        return logradouro.upper().startswith('AVENIDA')

spark.udf.register('is_avenida', is_avenida, returnType=BooleanType())

<function __main__.is_avenida(logradouro)>

In [42]:
# How many companies have logradouro starting with AVENIDA - SQL + UDF

spark.sql('SELECT COUNT(*) FROM estabelecimentos WHERE is_avenida(LOGRADOURO)').show()

+--------+
|count(1)|
+--------+
|   52587|
+--------+



In [6]:
# How many distinct ZIP code

total_zip_code = df_estabelecimento \
                .select('CEP') \
                .distinct() \
                .count()

total_zip_code

889886

In [62]:
# How many companies are annex

total_annex = df_estabelecimento \
            .where(col('IDENTIFICADOR_MATRIZ_FILIAL') == 2) \
            .count()

total_annex

1093082

## Writing a parquet file

In [44]:
df_estabelecimento.write.mode('overwrite').parquet(file_path+'estabelecimentos_parquet/')

# CNAES

## Read files and create the df

In [45]:
df_cnaes = spark.read.csv(file_path + 'cnaes/', inferSchema=True, header=True,sep=";")

## Exploring the data

In [46]:
df_cnaes.printSchema()

root
 |-- CNAE: integer (nullable = true)
 |-- DESCRICAO_CNAE: string (nullable = true)



In [48]:
# How many CNAES

total_cnaes = df_cnaes.select('CNAE').distinct().count()

total_cnaes

1359

In [53]:
# Creating estabelecimento_cnaes view

df_estabelecimento \
    .join(df_cnaes, df_estabelecimento['CNAE_PRINCIPAL'] == df_cnaes['CNAE'] ,'left') \
    .createOrReplaceTempView('estabelecimento_cnaes')

In [59]:
# Creating is_cnae_cultivo UDF

def is_cnae_cultivo(descricao_cnae):
    if descricao_cnae is not None:
        return descricao_cnae.upper().startswith('CULTIVO')
    
spark.udf.register('is_cnae_cultivo', is_cnae_cultivo, returnType=BooleanType())

<function __main__.is_cnae_cultivo(descricao_cnae)>

In [60]:
# How many CNAES are from farming

spark.sql(
    """
        SELECT 
            COUNT(*) 
        from 
            estabelecimento_cnaes 
        WHERE 
            is_cnae_cultivo(DESCRICAO_CNAE)
    """
).show()

+--------+
|count(1)|
+--------+
|  200243|
+--------+

