In [0]:
# Importing the necessary libraries
from pyspark.sql import SparkSession

# Initializing the Spark session
spark = SparkSession.builder \
    .appName("Creating Folders and Loading CSVs") \
    .getOrCreate()

# Creating the folders in DBFS
dbutils.fs.mkdirs("dbfs:/FileStore/Companies")
dbutils.fs.mkdirs("dbfs:/FileStore/Partners")
dbutils.fs.mkdirs("dbfs:/FileStore/Establishments")

Out[1]: True


### Loading the database of companies, partners and establishments

In [0]:
path = 'dbfs:/FileStore/Companies'
path_p = 'dbfs:/FileStore/Partners'
path_e = 'dbfs:/FileStore/Establishments'
Companies = spark.read.csv(path, sep=';', inferSchema=True)
Partners = spark.read.csv(path_p, sep=';', inferSchema=True)
Establishments = spark.read.csv(path_e, sep=';', inferSchema=True)
# Imprimindo o número de linhas de cada DataFrame
print("The number of companies is:", Companies.count())
print("The number of Partners is:", Partners.count())
print("The number of Establishments is:", Establishments.count())



The number of companies is: 4585679
The number of Partners is: 2046430
The number of Establishments is: 4836219


In [0]:
from pyspark.sql import functions as f
Companies.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in Companies.columns]).show()


+---+---+---+---+---+----+-------+
|_c0|_c1|_c2|_c3|_c4| _c5|    _c6|
+---+---+---+---+---+----+-------+
|  0|  0|  0|  0|  0|5985|4579678|
+---+---+---+---+---+----+-------+



In [0]:
#Loading data referring to the nature description column
df_legal_nature = spark.read.csv("dbfs:/FileStore/tables/F_K03200_Z_D40210.NATJUCSV", sep=';', inferSchema=True, encoding="latin1")

#Renaming the column names
naturezaColNames = ['natureza', 'Descrição_Natureza']

for index, colName in enumerate(naturezaColNames):
    df_legal_nature = df_legal_nature.withColumnRenamed(f"_c{index}", colName)

df_legal_nature.columns


Out[5]: ['natureza', 'Descrição_Natureza']

In [0]:
#Loading data referring to the responsible partner's qualification column
df_qualificacao_responsavel = spark.read.csv("dbfs:/FileStore/tables/F_K03200_Z_D40210.QUALSCSV", sep=';', inferSchema=True, encoding="latin1")
df_qualificacao_responsavel.printSchema()

#Renaming the column names
qualiColNames = ['id', 'Descrição_qualificacao']

for index, colName in enumerate(qualiColNames):
    df_qualificacao_responsavel = df_qualificacao_responsavel.withColumnRenamed(f"_c{index}", colName)

df_qualificacao_responsavel.columns

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)

Out[6]: ['id', 'Descrição_qualificacao']


## Analyzing
 - Schema
 - Column names
 - Column type

 

In [0]:
Companies.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: string (nullable = true)



In [0]:
Companies.show()

+-----+--------------------+----+---+-------+---+----+
|  _c0|                 _c1| _c2|_c3|    _c4|_c5| _c6|
+-----+--------------------+----+---+-------+---+----+
|  306|FRANCAMAR REFRIGE...|2240| 49|   0,00|  1|null|
| 1355|BRASILEIRO & OLIV...|2062| 49|   0,00|  5|null|
| 4820|REGISTRO DE IMOVE...|3034| 32|   0,00|  5|null|
| 5347|ROSELY APARECIDA ...|2135| 50|   0,00|  5|null|
| 6846|BADU E FILHOS TEC...|2062| 49|4000,00|  1|null|
| 8416|  ELETRICA RUBI LTDA|2062| 49|   0,00|  5|null|
| 8992|SHIROMA VEICULOS ...|2062| 49|   0,00|  5|null|
| 9091|CONTATOS BAR E LA...|2062| 49|   0,00|  5|null|
| 9614|ANTONIA APARECIDA...|2135| 50|   0,00|  5|null|
| 9896|DORACY CORAT DA C...|2135| 50|   0,00|  5|null|
|12112|LANCHONETE RIO VE...|2062| 49|   0,00|  5|null|
|12605|VALMAR JACAREI CO...|2062| 49|   0,00|  5|null|
|13407|ROSANA CRISTINA D...|2135| 50|   0,00|  5|null|
|13408|CELIO RODRIGUES D...|2135| 50|   0,00|  5|null|
|13721|MAQFRAN COMERCIO ...|2062| 49|   0,00|  1|null|
|21181|MOU

In [0]:
Companies.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [0]:
# Handling column names and data types
CompaniesColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

for index, colName in enumerate(CompaniesColNames):
    Companies = Companies.withColumnRenamed(f"_c{index}", colName)

Companies.columns

Out[10]: ['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [0]:

estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']


for index, colName in enumerate(estabsColNames):
    Establishments = Establishments.withColumnRenamed(f"_c{index}", colName)
    
Establishments.columns

Out[11]: ['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [0]:
PartnersColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']


for index, colName in enumerate(PartnersColNames):
    Partners = Partners.withColumnRenamed(f"_c{index}", colName)

Partners.columns

Out[12]: ['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

## Converting String ➔ Double

### `StringType ➔ DoubleType`

In [0]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [0]:
# Using regex function to replace "," with "."
Companies = Companies.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))
Companies.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [0]:
# Changing the column type
Companies = Companies.withColumn('capital_social_da_empresa', Companies['capital_social_da_empresa'].cast(DoubleType()))
Companies.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


## Converting String ➔ Date

### `StringType ➔ DateType`


In [0]:
Establishments = Establishments\
    .withColumn(
        "data_situacao_cadastral", 
        f.to_date(Establishments.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        "data_de_inicio_atividade", 
        f.to_date(Establishments.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        "data_da_situacao_especial", 
        f.to_date(Establishments.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
    )

Establishments.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [0]:
Partners = Partners\
    .withColumn(
        "data_de_entrada_sociedade", 
        f.to_date(Partners.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
    )

Partners.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)




## Checking for null values

In [0]:
#Checking null values in the Companies dataframe

Companies.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in Companies.columns]).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,0,0,0,0,0,5985,4579678


In [0]:
#Checking null values in the Partners dataframe

Partners.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in Partners.columns]).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,0,0,208,1234,0,1,2038255,0,1995432,0,0


In [0]:
#Checking null values in the Establishments dataframe

Establishments.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in Establishments.columns]).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,0,0,0,0,2014706,0,16195,0,4833519,4823786,...,0,1251627,1251623,4424413,4425166,4132454,4131990,2304672,4836208,4836208


In [0]:
#Filling all null values with 0, but you can choose another strategy

Companies.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [0]:
#Filling all null values with 0, but you can choose another strategy

Partners.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,0,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,0,***000000**,,0,8


In [0]:
#Filling all null values with - in string columns 

Partners.na.fill('-').limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,-,0,8



## Inserting the column with the description of the codes

In [0]:
df_legal_nature.limit(5).toPandas()

Unnamed: 0,natureza,Descrição_Natureza
0,0,Natureza Jurídica não informada
1,3271,Órgão de Direção Local de Partido Político
2,3280,Comitê Financeiro de Partido Político
3,3298,Frente Plebiscitária ou Referendária
4,3301,Organização Social (OS)


In [0]:
df_qualificacao_responsavel.limit(5).toPandas()

Unnamed: 0,id,Descrição_qualificacao
0,0,Não informada
1,5,Administrador
2,8,Conselheiro de Administração
3,9,Curador
4,10,Diretor



### Manipulating cols

In [0]:
# Joining the original "Companies" DataFrame with the DataFrames with the description codes for Legal nature and Partner qualification
df_Companies = Companies.join(df_legal_nature, Companies['natureza_juridica'] == df_legal_nature['natureza'], "left") \
                                     .join(df_qualificacao_responsavel, Companies['qualificacao_do_responsavel'] == df_qualificacao_responsavel['id'], "left")

# Visualizing the resulting DataFrame
df_Companies.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel,natureza,Descrição_Natureza,id,Descrição_qualificacao
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,,2240,Sociedade Simples Limitada,49,Sócio-Administrador
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,,2062,Sociedade Empresária Limitada,49,Sócio-Administrador
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,,3034,Serviço Notarial e Registral (Cartório),32,Tabelião
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,,2135,Empresário (Individual),50,Empresário
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,,2062,Sociedade Empresária Limitada,49,Sócio-Administrador


In [0]:
df_Companies.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)
 |-- natureza: integer (nullable = true)
 |-- Descrição_Natureza: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- Descrição_qualificacao: string (nullable = true)



In [0]:
from pyspark.sql.functions import when

# Adding a new column 'company_size' based on the company size code
df_Companies = df_Companies.withColumn("company_size", 
                                                when(df_Companies["porte_da_empresa"] == 0, "NOT INFORMED")
                                                .when(df_Companies["porte_da_empresa"] == 1, "MICRO COMPANY")
                                                .when(df_Companies["porte_da_empresa"] == 3, "SMALL COMPANY")
                                                .when(df_Companies["porte_da_empresa"] == 5, "OTHER")
                                                .otherwise("Unknown"))

# Showing the resulting DataFrame
df_Companies.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel,natureza,Descrição_Natureza,id,Descrição_qualificacao,company_size
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,,2240,Sociedade Simples Limitada,49,Sócio-Administrador,MICRO COMPANY
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,,2062,Sociedade Empresária Limitada,49,Sócio-Administrador,OTHER
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,,3034,Serviço Notarial e Registral (Cartório),32,Tabelião,OTHER
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,,2135,Empresário (Individual),50,Empresário,OTHER
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,,2062,Sociedade Empresária Limitada,49,Sócio-Administrador,MICRO COMPANY


In [0]:
# Removing the duplicate columns 'legal_nature' and 'id' from the resulting DataFrame
df_Companies = df_Companies.drop('natureza', 'id', 'ente_federativo_responsavel', 'qualificacao_do_responsavel', 'natureza_juridica','porte_da_empresa')

# Viewing the resulting DataFrame without the duplicate columns
df_Companies.limit(5).toPandas()


Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,capital_social_da_empresa,Descrição_Natureza,Descrição_qualificacao,company_size
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,0.0,Sociedade Simples Limitada,Sócio-Administrador,MICRO COMPANY
1,1355,BRASILEIRO & OLIVEIRA LTDA,0.0,Sociedade Empresária Limitada,Sócio-Administrador,OTHER
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",0.0,Serviço Notarial e Registral (Cartório),Tabelião,OTHER
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,0.0,Empresário (Individual),Empresário,OTHER
4,6846,BADU E FILHOS TECIDOS LTDA,4000.0,Sociedade Empresária Limitada,Sócio-Administrador,MICRO COMPANY


In [0]:
# Performing the join with the original DataFrame "Partners"
df_Partners = Partners.join(df_qualificacao_responsavel, Partners['qualificacao_do_socio'] == df_qualificacao_responsavel['id'], "left")

# Showing the resulting DataFrame
df_Partners.limit(5).toPandas()


Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria,id,Descrição_qualificacao
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7,22,Sócio
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7,28,Sócio-Gerente
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8,49,Sócio-Administrador
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5,49,Sócio-Administrador
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8,49,Sócio-Administrador


In [0]:
df_Partners = df_Partners.withColumn("Partner_description", 
                                            when(df_Partners["identificador_de_socio"] == 1, "LEGAL ENTITY")
                                            .when(df_Partners["identificador_de_socio"] == 2, "INDIVIDUAL")
                                            .when(df_Partners["identificador_de_socio"] == 3, "FOREIGN")
                                            .otherwise("Unknown"))

# Showing the resulting DataFrame
df_Partners.columns


Out[32]: ['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria',
 'id',
 'Descrição_qualificacao',
 'Partner_description']

In [0]:
df_Partners.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria,id,Descrição_qualificacao,Partner_description
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7,22,Sócio,INDIVIDUAL
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7,28,Sócio-Gerente,INDIVIDUAL
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8,49,Sócio-Administrador,INDIVIDUAL
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5,49,Sócio-Administrador,INDIVIDUAL
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8,49,Sócio-Administrador,INDIVIDUAL


In [0]:
# Removendo as colunas duplicadas 'natureza_juridica' e 'id' do DataFrame resultante
df_Partners =df_Partners.drop('representante_legal', 'identificador_de_socio','qualificacao_do_socio', 'nome_do_representante', 'cnpj_ou_cpf_do_socio', 'pais','qualificacao_do_representante_legal', 'id', 'ente_federativo_responsavel')

# Visualizando o DataFrame resultante sem as colunas duplicadas
df_Partners.limit(5).toPandas()

Unnamed: 0,cnpj_basico,nome_do_socio_ou_razao_social,data_de_entrada_sociedade,faixa_etaria,Descrição_qualificacao,Partner_description
0,411,LILIANA PATRICIA GUASTAVINO,1994-07-25,7,Sócio,INDIVIDUAL
1,411,CRISTINA HUNDERTMARK,1994-07-25,7,Sócio-Gerente,INDIVIDUAL
2,5813,CELSO EDUARDO DE CASTRO STEPHAN,1994-05-16,8,Sócio-Administrador,INDIVIDUAL
3,5813,EDUARDO BERRINGER STEPHAN,1994-05-16,5,Sócio-Administrador,INDIVIDUAL
4,14798,HANNE MAHFOUD FADEL,1994-06-09,8,Sócio-Administrador,INDIVIDUAL


In [0]:
Establishments = Establishments.withColumn("branch_type", 
                                            when(Establishments["identificador_matriz_filial"] == 1, "HEADQUARTER")
                                            .when(Establishments["identificador_matriz_filial"] == 2, "BRANCH")
                                            .otherwise("Unknown"))

Establishments = Establishments.withColumn("registration_status", 
                                            when(Establishments["situacao_cadastral"] == '01', "NULL")
                                            .when(Establishments["situacao_cadastral"] == '2', "ACTIVE")
                                            .when(Establishments["situacao_cadastral"] == '3', "SUSPENDED")
                                            .when(Establishments["situacao_cadastral"] == '4', "INACTIVE")
                                            .when(Establishments["situacao_cadastral"] == '08', "CLOSED")
                                            .otherwise("Unknown"))

# Showing the resulting DataFrame
Establishments.limit(5).toPandas()


Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial,branch_type,registration_status
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,,,,,,,,,HEADQUARTER,CLOSED
1,2818,1,43,1,,8,2008-12-31,71,,,...,,,,,,,,,HEADQUARTER,CLOSED
2,3110,1,7,1,,8,1997-12-31,1,,,...,,,,,,,,,HEADQUARTER,CLOSED
3,3733,1,80,1,,8,2008-12-31,71,,,...,,,,,,,,,HEADQUARTER,CLOSED
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,,,,,,,,,BRANCH,CLOSED


In [0]:
# List of columns to be removed
columns_to_drop = [
    'cnpj_ordem',
    'cnpj_dv',
    'motivo_situacao_cadastral',
    'nome_da_cidade_no_exterior',
    'pais',
    'cnae_fiscal_principal',
    'cnae_fiscal_secundaria',
    'tipo_de_logradouro',
    'logradouro',
    'numero',
    'complemento',
    'bairro',
    'cep',
    'uf',
    'municipio',
    'ddd_1',
    'telefone_1',
    'ddd_2',
    'telefone_2',
    'ddd_do_fax',
    'fax',
    'correio_eletronico',
    'situacao_especial',
    'data_da_situacao_especial',
    'situacao_cadastral',
    'identificador_matriz_filial',
    'matriz_filial'

]

# Dropping the specified columns
df_Establishments = Establishments.drop(*columns_to_drop)

# Showing the resulting DataFrame
df_Establishments.limit(5).toPandas()


Unnamed: 0,cnpj_basico,nome_fantasia,data_situacao_cadastral,data_de_inicio_atividade,branch_type,registration_status
0,1879,PIRAMIDE M. C.,2001-10-29,1994-05-09,HEADQUARTER,CLOSED
1,2818,,2008-12-31,1994-05-12,HEADQUARTER,CLOSED
2,3110,,1997-12-31,1994-05-12,HEADQUARTER,CLOSED
3,3733,,2008-12-31,1994-05-13,HEADQUARTER,CLOSED
4,4628,EMBROIDERY & GIFT,1998-04-29,1995-05-09,BRANCH,CLOSED



# Analyzing the 3 dataframes

In [0]:
df_Companies.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,capital_social_da_empresa,Descrição_Natureza,Descrição_qualificacao,company_size
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,0.0,Sociedade Simples Limitada,Sócio-Administrador,MICRO COMPANY
1,1355,BRASILEIRO & OLIVEIRA LTDA,0.0,Sociedade Empresária Limitada,Sócio-Administrador,OTHER
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",0.0,Serviço Notarial e Registral (Cartório),Tabelião,OTHER
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,0.0,Empresário (Individual),Empresário,OTHER
4,6846,BADU E FILHOS TECIDOS LTDA,4000.0,Sociedade Empresária Limitada,Sócio-Administrador,MICRO COMPANY


In [0]:
df_Partners.limit(5).toPandas()

Unnamed: 0,cnpj_basico,nome_do_socio_ou_razao_social,data_de_entrada_sociedade,faixa_etaria,Descrição_qualificacao,Partner_description
0,411,LILIANA PATRICIA GUASTAVINO,1994-07-25,7,Sócio,INDIVIDUAL
1,411,CRISTINA HUNDERTMARK,1994-07-25,7,Sócio-Gerente,INDIVIDUAL
2,5813,CELSO EDUARDO DE CASTRO STEPHAN,1994-05-16,8,Sócio-Administrador,INDIVIDUAL
3,5813,EDUARDO BERRINGER STEPHAN,1994-05-16,5,Sócio-Administrador,INDIVIDUAL
4,14798,HANNE MAHFOUD FADEL,1994-06-09,8,Sócio-Administrador,INDIVIDUAL


In [0]:
df_Establishments.limit(5).toPandas()

Unnamed: 0,cnpj_basico,nome_fantasia,data_situacao_cadastral,data_de_inicio_atividade,branch_type,registration_status
0,1879,PIRAMIDE M. C.,2001-10-29,1994-05-09,HEADQUARTER,CLOSED
1,2818,,2008-12-31,1994-05-12,HEADQUARTER,CLOSED
2,3110,,1997-12-31,1994-05-12,HEADQUARTER,CLOSED
3,3733,,2008-12-31,1994-05-13,HEADQUARTER,CLOSED
4,4628,EMBROIDERY & GIFT,1998-04-29,1995-05-09,BRANCH,CLOSED


In [0]:
df_Companies.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- Descrição_Natureza: string (nullable = true)
 |-- Descrição_qualificacao: string (nullable = true)
 |-- company_size: string (nullable = false)



In [0]:
df_Establishments.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- branch_type: string (nullable = false)
 |-- registration_status: string (nullable = false)



In [0]:
df_Partners.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- faixa_etaria: integer (nullable = true)
 |-- Descrição_qualificacao: string (nullable = true)
 |-- Partner_description: string (nullable = false)



#### Total companies by size:

In [0]:
from pyspark.sql.functions import count
df_Companies.groupBy("company_size").agg(count("*").alias("total_companies_by_size")).show()


+-------------+-----------------------+
| company_size|total_companies_by_size|
+-------------+-----------------------+
|SMALL COMPANY|                 115151|
|MICRO COMPANY|                3129043|
|      Unknown|                   5985|
|        OTHER|                1335500|
+-------------+-----------------------+



#### Total Headquarters and Subsidiaries:

In [0]:
df_Establishments.groupBy("branch_type").agg(count("*").alias("total_establishments_by_type")).show()


+-----------+----------------------------+
|branch_type|total_establishments_by_type|
+-----------+----------------------------+
|HEADQUARTER|                     4585679|
|     BRANCH|                      250540|
+-----------+----------------------------+




#### Total establishments by registration status:

In [0]:
Establishments.groupBy("registration_status").agg(count("*").alias("total_establishments_by_status")).show()


+-------------------+------------------------------+
|registration_status|total_establishments_by_status|
+-------------------+------------------------------+
|           INACTIVE|                        737328|
|          SUSPENDED|                         17145|
|             CLOSED|                       2127895|
|             ACTIVE|                       1945695|
|               NULL|                          8156|
+-------------------+------------------------------+




#### Total establishments opened per year:

In [0]:
from pyspark.sql.functions import year
Establishments.groupBy(year("data_de_inicio_atividade")).agg(count("*").alias("total_establishments_opened_per_year")).show()


+------------------------------+------------------------------------+
|year(data_de_inicio_atividade)|total_establishments_opened_per_year|
+------------------------------+------------------------------------+
|                          1959|                                  17|
|                          1990|                               63580|
|                          1903|                                   1|
|                          1975|                               14058|
|                          1977|                               28535|
|                          2003|                               59344|
|                          2007|                               74924|
|                          2018|                              275435|
|                          1974|                               11896|
|                          2015|                              212523|
|                          1955|                                  21|
|                   

#### Establishments opened per year since 2011:

In [0]:
from pyspark.sql.functions import year

# Filtering the data to include only years from 2014 onwards
Establishments_filtered = df_Establishments.filter(year("data_de_inicio_atividade") >= 2011)

# Creating a temporary view with the filtered data
Establishments_filtered.createOrReplaceTempView("establishments_filtered_view")

# Performing the analysis using the view
result = spark.sql("""
    SELECT YEAR(data_de_inicio_atividade) AS year,
           COUNT(*) AS total_establishments_opened_per_year
    FROM establishments_filtered_view
    GROUP BY YEAR(data_de_inicio_atividade)
    ORDER BY YEAR(data_de_inicio_atividade)
""")

# Showing the result
result.show()


+----+------------------------------------+
|year|total_establishments_opened_per_year|
+----+------------------------------------+
|2011|                              172677|
|2012|                              232480|
|2013|                              198424|
|2014|                              202276|
|2015|                              212523|
|2016|                              265415|
|2017|                              237292|
|2018|                              275435|
|2019|                              325922|
|2020|                              400654|
|2021|                              153275|
+----+------------------------------------+




#### Summary statistics for the "Share capital" column:

In [0]:
df_Companies\
    .select("capital_social_da_empresa")\
    .summary()\
    .show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542674|
| stddev|     2.1118691490537727E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+




#### Frequency and average share capital by company size:

In [0]:
df_Companies\
    .select('cnpj_basico', 'company_size', 'capital_social_da_empresa')\
    .groupBy('company_size')\
    .agg(
        f.avg("capital_social_da_empresa").alias("average_social_capital"),
        f.count("cnpj_basico").alias("freq")
    )\
    .orderBy('company_size', ascending=True)\
    .toPandas()    

Unnamed: 0,company_size,average_social_capital,freq
0,MICRO COMPANY,339994.5,3129043
1,OTHER,708660.4,1335500
2,SMALL COMPANY,2601002.0,115151
3,Unknown,8.354219,5985



#### Joining Companies and Establishments:

In [0]:
 Establishments_join = df_Establishments.join(df_Companies, 'cnpj_basico', how='inner')
 Establishments_join.limit(5).toPandas()

Unnamed: 0,cnpj_basico,nome_fantasia,data_situacao_cadastral,data_de_inicio_atividade,branch_type,registration_status,razao_social_nome_empresarial,capital_social_da_empresa,Descrição_Natureza,Descrição_qualificacao,company_size
0,243,,2005-08-27,1994-07-29,HEADQUARTER,ACTIVE,IN FOCO PRODUCOES FOTOGRAFICAS S/S LTDA,500.0,Sociedade Simples Limitada,Sócio-Administrador,MICRO COMPANY
1,336,,1995-07-07,1994-08-23,HEADQUARTER,CLOSED,CLINICA ROMA S/C LTDA,0.0,Sociedade Simples Limitada,Sócio-Gerente,OTHER
2,362,,2008-12-31,1994-08-30,HEADQUARTER,CLOSED,SOFT TRAINNING TREINAMENTO EM INFORMATICA S/C ...,0.0,Sociedade Simples Limitada,Sócio-Administrador,OTHER
3,451,CASA DO PASTOR,2008-12-31,1994-10-04,HEADQUARTER,CLOSED,CONSELHO PASTORAL BATISTA FUNDAMENTALISTA DO B...,0.0,Associação Privada,Presidente,OTHER
4,458,,2005-11-03,1994-10-10,HEADQUARTER,ACTIVE,AMERICO REGATIERI NETO REPRESENTACOES,5000.0,Empresário (Individual),Empresário,MICRO COMPANY


#### Filtering only company names that have "RESTAURANT" in their trade name:

In [0]:
from pyspark.sql.functions import col, upper
Establishments_join\
    .select('razao_social_nome_empresarial', 'Descrição_Natureza', 'company_size', 'registration_status', 'capital_social_da_empresa')\
    .filter(
        (upper(Establishments_join['razao_social_nome_empresarial']).like('%RESTAURANTE%')) & 
        (col('registration_status') == 'ACTIVE')
    )\
    .show(15, False)

+----------------------------------------+------------------------------------------------------------------------+-------------+-------------------+-------------------------+
|razao_social_nome_empresarial           |Descrição_Natureza                                                      |company_size |registration_status|capital_social_da_empresa|
+----------------------------------------+------------------------------------------------------------------------+-------------+-------------------+-------------------------+
|LANCHONETE E RESTAURANTE PARACATU LTDA  |Sociedade Empresária Limitada                                           |MICRO COMPANY|ACTIVE             |0.0                      |
|M NUNES DE OLIVEIRA RESTAURANTE         |Empresário (Individual)                                                 |MICRO COMPANY|ACTIVE             |30000.0                  |
|D JAPA RESTAURANTE LTDA                 |Sociedade Empresária Limitada                                           |SMALL