In [1]:
"""Define imports to work with Spark and Columns"""

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [2]:
"""Configure environment"""

os.environ["SPARK_HOME"] = "/usr/local/bin/spark-3.5.1-bin-hadoop3"
spark_session = SparkSession.builder.appName("Casting").getOrCreate()
input_file_path = "/Users/fernandoferreira/Code/pyspark-course/data/"

24/08/12 13:53:01 WARN Utils: Your hostname, Fernandos-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
24/08/12 13:53:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/12 13:53:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
----------------------------------------

In [3]:
"""Load data to spark data frame through 'generic' read method"""

empresa_database = f"{input_file_path}/empresas"
empresas_df = spark_session.read.format('csv').load(empresa_database, sep=";", inferSchema=True)

                                                                                

In [4]:
"""Verify record quantity e data schema"""

empresas_df.count()
empresas_df.printSchema()
empresas_df.limit(5).toPandas()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: string (nullable = true)



Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,403,ALUPE REPRESENTACOES LTDA,2240,49,0,1,
1,2498,JCA COMERCIO E REPRESENTACOES LTDA,2062,49,0,5,
2,4599,ROSA MARIA DE PAIVA MENEZES,2135,50,0,5,
3,13090,ADELINA ZANETI PARDINI,2135,50,0,1,
4,16903,PERSONNALITE SPR CURTO PRAZO - FUNDO DE APLICA...,2224,43,0,5,


In [5]:
"""Rename data frame columns"""

empresas_column_name = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

for index, column_name in enumerate(empresas_column_name):
    empresas_df = empresas_df.withColumnRenamed(f"_c{index}", column_name)

empresas_df.printSchema()
empresas_df.limit(5).toPandas()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,403,ALUPE REPRESENTACOES LTDA,2240,49,0,1,
1,2498,JCA COMERCIO E REPRESENTACOES LTDA,2062,49,0,5,
2,4599,ROSA MARIA DE PAIVA MENEZES,2135,50,0,5,
3,13090,ADELINA ZANETI PARDINI,2135,50,0,1,
4,16903,PERSONNALITE SPR CURTO PRAZO - FUNDO DE APLICA...,2224,43,0,5,


24/08/12 13:53:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
"""Apply regular expression to replace values"""

empresas_df = empresas_df.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))

empresas_df.limit(2).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,403,ALUPE REPRESENTACOES LTDA,2240,49,0.0,1,
1,2498,JCA COMERCIO E REPRESENTACOES LTDA,2062,49,0.0,5,


In [7]:
"""Apply casting to the capital social da empresa"""

empresas_df = empresas_df.withColumn('capital_social_da_empresa', empresas_df['capital_social_da_empresa'].cast(DoubleType()))

empresas_df.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [8]:
"""Sample data frame to demonstrate casting to date type"""

df = spark_session.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])

In [9]:
df.printSchema()
df.toPandas()

root
 |-- data: long (nullable = true)



                                                                                

Unnamed: 0,data
0,20200924
1,20201022
2,20210215


In [10]:
df = df.withColumn('data', f.to_date(df.data.cast(StringType()), 'yyyyMMdd'))

df.printSchema()

root
 |-- data: date (nullable = true)



In [11]:
"""Apply casting to date fields at estabelecimentos database"""

""" It is possible to use .withColumn('column_name', f.Col('column_name').cast(IntegerType()))"""

estabelecimentos_df = spark_session.read.format('csv').load(f"{input_file_path}/estabelecimentos", sep=";", inferSchema=True)

estabelecimentos_df.limit(5).toPandas()

24/08/12 13:53:47 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c20,_c21,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [13]:
""" Change Column Name for Estabelecimentos"""

estabelecimento_column_names = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
for index, column_name in enumerate(estabelecimento_column_names):
    estabelecimentos_df = estabelecimentos_df.withColumnRenamed(f"_c{index}", column_name)

estabelecimentos_df.columns

['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [14]:
"""Apply casting to date fields at estabelecimentos database"""

estabelecimentos_df = estabelecimentos_df\
                            .withColumn("data_situacao_cadastral", f.to_date(estabelecimentos_df.data_situacao_cadastral.cast(StringType()), "yyyyMMdd"))\
                            .withColumn("data_de_inicio_atividade", f.to_date(estabelecimentos_df.data_de_inicio_atividade.cast(StringType()), "yyyyMMdd"))\
                            .withColumn("data_da_situacao_especial", f.to_date(estabelecimentos_df.data_da_situacao_especial.cast(StringType()), "yyyyMMdd"))

In [15]:
estabelecimentos_df.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [16]:
estabelecimentos_df.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,
