<a href="https://colab.research.google.com/github/elbyvaz/data_engineering/blob/main/spark/dados_abertos_cnpj.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=d07f09a1bd0a5f761cb33fdf98dffa066b89522c09a96d16bc72d56d41177b93
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
# using spark and veritying its version
from pyspark.sql import SparkSession

spark = SparkSession \
                    .builder \
                    .appName("Challenge: open data") \
                    .getOrCreate()

spark.version

'3.5.0'

In [3]:
# dataframe 1
df_1 = spark.read.csv('/content/sample_data/estabelecimentos-1.csv', header=True, inferSchema=True, sep=';')
df_1.count()

115502

In [4]:
df_1.printSchema()

root
 |-- CNPJ_BASICO: integer (nullable = true)
 |-- CNPJ_ORDEM: integer (nullable = true)
 |-- CNPJ_DV: integer (nullable = true)
 |-- IDENTIFICADOR_MATRIZ_FILIAL: integer (nullable = true)
 |-- NOME_FANTASIA: string (nullable = true)
 |-- SITUACAO_CADASTRAL: integer (nullable = true)
 |-- DATA_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- MOTIVO_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- NOME_DA_CIDADE_NO_EXTERIOR: string (nullable = true)
 |-- PAIS: integer (nullable = true)
 |-- DATA_INICIO_ATIVIDADE: integer (nullable = true)
 |-- CNAE_PRINCIPAL: integer (nullable = true)
 |-- CNAE_SECUNDARIA: string (nullable = true)
 |-- TIPO_LOGRADOURO: string (nullable = true)
 |-- LOGRADOURO: string (nullable = true)
 |-- NUMERO: string (nullable = true)
 |-- COMPLEMENTO: string (nullable = true)
 |-- BAIRRO: string (nullable = true)
 |-- CEP: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- MUNICIPIO: integer (nullable = true)
 |-- DDD_1: integer (nullable = tru

In [5]:
# dataframe 3
df_3 = spark.read.csv('/content/sample_data/estabelecimentos-3.csv', header=True, inferSchema=True, sep=';')
df_3.count()

122015

In [6]:
df_3.printSchema()

root
 |-- CNPJ_BASICO: integer (nullable = true)
 |-- CNPJ_ORDEM: integer (nullable = true)
 |-- CNPJ_DV: integer (nullable = true)
 |-- IDENTIFICADOR_MATRIZ_FILIAL: integer (nullable = true)
 |-- NOME_FANTASIA: string (nullable = true)
 |-- SITUACAO_CADASTRAL: integer (nullable = true)
 |-- DATA_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- MOTIVO_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- NOME_DA_CIDADE_NO_EXTERIOR: string (nullable = true)
 |-- PAIS: integer (nullable = true)
 |-- DATA_INICIO_ATIVIDADE: integer (nullable = true)
 |-- CNAE_PRINCIPAL: integer (nullable = true)
 |-- CNAE_SECUNDARIA: string (nullable = true)
 |-- TIPO_LOGRADOURO: string (nullable = true)
 |-- LOGRADOURO: string (nullable = true)
 |-- NUMERO: string (nullable = true)
 |-- COMPLEMENTO: string (nullable = true)
 |-- BAIRRO: string (nullable = true)
 |-- CEP: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- MUNICIPIO: integer (nullable = true)
 |-- DDD_1: integer (nullable = tru

In [7]:
# dataframes have identical columns. If no, use withColumn() to rename
df_result = df_1.union(df_3)
df_result.count()

237517

In [12]:
# checking duplicated rows (df_result rows = distinct rows).
# df_result.printSchema()
print('TOTAL OF ESTABLISHMENTS: {}'.format(df_result.select('CNPJ_BASICO').distinct().count()))

TOTAL OF ESTABLISHMENTS: 237098


In [None]:
# exists duplicated cnpj_basico
# checking
# from pyspark.sql.functions import col
# df_result.groupBy('CNPJ_BASICO').count().sort(col('count').desc()).show()

In [19]:
# total of columns
print('TOTAL OF COLUMNS: {}'.format(len(df_result.columns)))
# total of numeric columns (list comprehension)
num_numeric_columns = len([col for col, dtype in df_result.dtypes if dtype in ('integer') or dtype in ('double')])
print("Number of numeric columns:", num_numeric_columns)

TOTAL OF COLUMNS: 30
Number of numeric columns: 15


In [27]:
import os
csv_1 = os.path.getsize('/content/sample_data/estabelecimentos-1.csv')
csv_3 = os.path.getsize('/content/sample_data/estabelecimentos-3.csv')
total_csv_size = csv_1 + csv_3
print('Total csv size: {}'.format(total_csv_size))

Total csv size: 633339904


In [28]:
# writing parquet file
df_result.write.mode("overwrite").parquet("/content/sample_data/estabelecimentos.parquet")

In [29]:
total_parquet_size = os.path.getsize('/content/sample_data/estabelecimentos.parquet')
print('Total parquet size: {}'.format(total_parquet_size))

Total parquet size: 4096


In [37]:
# checking total establishments with LOGRADOURO null
df_result.createOrReplaceTempView("estabelecimentos")
spark.sql("SELECT COUNT(*) FROM estabelecimentos WHERE LOGRADOURO IS NULL").show()

+--------+
|count(1)|
+--------+
|      16|
+--------+



In [41]:
def is_avenida(logradouro):
  """
  verify if logradouro contains AVENIDA in its name

  Args:
    logradouro: name of avenue, street...

  Returns:
    True if contains AVENIDA or False if not
  """
  result = logradouro.startswith("AVENIDA")
  return result

#spark.sql("SELECT COUNT(*) FROM estabelecimentos WHERE is_avenida(logradouro) == True").show()
spark.sql("SELECT COUNT(*) FROM estabelecimentos WHERE logradouro LIKE 'AVENIDA%' ").show()

+--------+
|count(1)|
+--------+
|     421|
+--------+



In [44]:
# distincts ceps
# spark.sql("SELECT COUNT(DISTINCT CEP) FROM estabelecimentos").show()
# or
df_result.select('CEP').distinct().count()

113824

In [46]:
# cnae dataframe
df_cnae = spark.read.csv('/content/sample_data/cnaes.csv', header=True, inferSchema=True, sep=';')
df_cnae.count()

1359

In [47]:
df_cnae.printSchema()

root
 |-- CNAE: integer (nullable = true)
 |-- DESCRICAO_CNAE: string (nullable = true)



In [51]:
# total of CULTIVO
df_cnae.createOrReplaceTempView("cnaes")
spark.sql("SELECT COUNT(e.CNPJ_BASICO) \
            FROM estabelecimentos e \
            INNER JOIN cnaes c ON e.CNAE_PRINCIPAL = c.CNAE \
            WHERE UPPER(c.DESCRICAO_CNAE) LIKE 'CULTIVO%' ").show()

+------------------+
|count(CNPJ_BASICO)|
+------------------+
|               798|
+------------------+



In [52]:
# filial
spark.sql("SELECT COUNT(*) FROM estabelecimentos WHERE IDENTIFICADOR_MATRIZ_FILIAL = 2 ").show()

+--------+
|count(1)|
+--------+
|    4574|
+--------+

