Import the relevant libraries

In [55]:
import os
import glob
# Find Apache Spark instance in the local machine :
import findspark
findspark.init()
# Get the SparkSession class for use :
from pyspark.sql import SparkSession

Create Instance of Spark Session

In [56]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Get details of the instance

In [57]:
spark

Ngrok Configuration commands 

In [58]:

get_ipython().system_raw('./ngrok config add-authtoken 2AU8bGzlLoU3EmA4cVgKhoUiW36_7vbt9oB4ficTvWqbaKYRn')
get_ipython().system_raw('./ngrok authtoken 2AU8bGzlLoU3EmA4cVgKhoUiW36_7vbt9oB4ficTvWqbaKYRn')
get_ipython().system_raw('./ngrok')
get_ipython().system_raw('./ngrok http 4050 &')

In [59]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line","ID":"403d3c9525d70b722fd049678484194c","uri":"/api/tunnels/command_line","public_url":"https://108c-2804-14c-5bb7-41f6-7ca7-cb8-e8aa-ca22.sa.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":70,"gauge":0,"rate1":8.971202307467745e-108,"rate5":5.122769922682075e-23,"rate15":3.6445876158572436e-9,"p50":64737000,"p90":546837539.9999998,"p95":32550529745.000015,"p99":51988145500},"http":{"count":96,"rate1":1.183414541732904e-107,"rate5":6.981112213368052e-23,"rate15":4.988119901433911e-9,"p50":11729200,"p90":41749649.99999997,"p95":66371344.99999996,"p99":195053800}}}],"uri":"/api/tunnels"}


DataFrames Spark

In [60]:
# Create refernce data & columns :
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']

In [61]:
df =  spark.createDataFrame(data,colNames)
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [62]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


In [63]:
def csv_list(path : str):
    return glob.glob(path + '\*csv')


In [64]:

company_list = csv_list('empresas')
partners_list = csv_list('socios')
establishments_list = csv_list('estabelecimentos')


In [65]:

df_empresa = spark.read.csv(company_list, sep = ';')
df_empresa.count()

4585679

In [66]:
df_socios = spark.read.csv(partners_list, sep = ';')
df_socios.count()

2046430

In [67]:
df_estabelecimentos = spark.read.csv(establishments_list, sep = ';')
df_estabelecimentos.count()

4836219

### Basic Operations

#### Column Renaming

In [68]:
empresasColNames = ['cnpj_basico', 
                    'razao_social_nome_empresarial', 
                    'natureza_juridica', 
                    'qualificacao_do_responsavel', 
                    'capital_social_da_empresa', 
                    'porte_da_empresa', 
                    'ente_federativo_responsavel']

estabsColNames = ['cnpj_basico',
                'cnpj_ordem', 
                'cnpj_dv', 
                'identificador_matriz_filial', 
                'nome_fantasia', 
                'situacao_cadastral', 
                'data_situacao_cadastral', 
                'motivo_situacao_cadastral', 
                'nome_da_cidade_no_exterior', 
                'pais', 'data_de_inicio_atividade', 
                'cnae_fiscal_principal', 
                'cnae_fiscal_secundaria', 
                'tipo_de_logradouro', 
                'logradouro', 
                'numero', 
                'complemento', 
                'bairro', 
                'cep', 
                'uf', 
                'municipio', 
                'ddd_1', 
                'telefone_1', 
                'ddd_2', 
                'telefone_2', 
                'ddd_do_fax', 
                'fax', 
                'correio_eletronico', 
                'situacao_especial', 
                'data_da_situacao_especial']

sociosColNames = ['cnpj_basico',
                 'identificador_de_socio', 
                 'nome_do_socio_ou_razao_social', 
                 'cnpj_ou_cpf_do_socio', 
                 'qualificacao_do_socio', 
                 'data_de_entrada_sociedade', 
                 'pais', 
                 'representante_legal', 
                 'nome_do_representante', 
                 'qualificacao_do_representante_legal', 
                 'faixa_etaria']


In [71]:
df_empresa = df_empresa.toDF(*empresasColNames)
df_estabelecimentos = df_estabelecimentos.toDF(*estabsColNames)
df_socios = df_socios.toDF(*sociosColNames)

Schemas

In [72]:
df_empresa.printSchema()

root
 |-- cnpj_basico: string (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: string (nullable = true)
 |-- qualificacao_do_responsavel: string (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: string (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [73]:
df_socios.printSchema()

root
 |-- cnpj_basico: string (nullable = true)
 |-- identificador_de_socio: string (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: string (nullable = true)
 |-- data_de_entrada_sociedade: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: string (nullable = true)
 |-- faixa_etaria: string (nullable = true)



In [74]:
df_estabelecimentos.printSchema()

root
 |-- cnpj_basico: string (nullable = true)
 |-- cnpj_ordem: string (nullable = true)
 |-- cnpj_dv: string (nullable = true)
 |-- identificador_matriz_filial: string (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: string (nullable = true)
 |-- data_situacao_cadastral: string (nullable = true)
 |-- motivo_situacao_cadastral: string (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- data_de_inicio_atividade: string (nullable = true)
 |-- cnae_fiscal_principal: string (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- ddd_1: string (nullabl

Convert Data

In [76]:
from pyspark.sql.types import StringType, DoubleType, DateType
from pyspark.sql import functions as f

Change the decimal separator

In [79]:
df_empresa = df_empresa.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa',',','.'))
df_empresa.limit(10).show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0.00|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0.00|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0.00|               5|                       null|
|       5347|         ROSELY APARE

Change from String to Double

In [82]:
df_empresa = df_empresa.withColumn('capital_social_da_empresa', df_empresa['capital_social_da_empresa'].cast(DoubleType()))
df_empresa.printSchema()

root
 |-- cnpj_basico: string (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: string (nullable = true)
 |-- qualificacao_do_responsavel: string (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: string (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



Change from String to Date

In [89]:
df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])
df.show()

+--------+
|    data|
+--------+
|20200924|
|20201022|
|20210215|
+--------+



In [90]:
df = df.withColumn('data', f.to_date(df.data.cast(StringType()), 'yyyyMMdd'))
df.show()

+----------+
|      data|
+----------+
|2020-09-24|
|2020-10-22|
|2021-02-15|
+----------+



In [92]:
# Apply in the datasets :
df_socios = df_socios.withColumn('data_de_entrada_sociedade', f.to_date(df_socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd'))
df_socios.show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|                     null|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      