In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Atividade Modular Módulo 3") \
    .getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!ls "/content/drive/MyDrive/Colab Notebooks"

atividade_modular_3.ipynb  estabelecimentos-2.csv
cnaes.csv		   estabelecimentos-3.csv
estabelecimentos-1.csv	   NOVOLAYOUTDOSDADOSABERTOSDOCNPJ.pdf


In [None]:
# Path
base_path = "/content/drive/MyDrive/Colab Notebooks"

# Lê os três arquivos de estabelecimentos
df1 = spark.read.option("header", True).option("inferSchema", True).option("sep", ";").csv(f"{base_path}/estabelecimentos-1.csv")
df2 = spark.read.option("header", True).option("inferSchema", True).option("sep", ";").csv(f"{base_path}/estabelecimentos-2.csv")
df3 = spark.read.option("header", True).option("inferSchema", True).option("sep", ";").csv(f"{base_path}/estabelecimentos-3.csv")

# Une os três arquivos em um único df
df_estabelecimentos = df1.unionByName(df2).unionByName(df3)

# Lê o arquivo de CNAEs
df_cnaes = spark.read.option("header", True).option("inferSchema", True).option("sep", ";").csv(f"{base_path}/cnaes.csv")



In [None]:
# Cria as views
df_estabelecimentos.createOrReplaceTempView("estabelecimentos")
df_cnaes.createOrReplaceTempView("cnaes")

In [None]:
# Teste para verificar os dados
spark.sql("SELECT * FROM estabelecimentos LIMIT 5").show()
spark.sql("SELECT * FROM cnaes LIMIT 5").show()

+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+--------------------------+----+---------------------+--------------+---------------+---------------+-----------------+------+--------------------+---------+--------+---+---------+-----+--------+-----+--------+-------+--------+--------------------+-----------------+----------------------+
|CNPJ_BASICO|CNPJ_ORDEM|CNPJ_DV|IDENTIFICADOR_MATRIZ_FILIAL|NOME_FANTASIA|SITUACAO_CADASTRAL|DATA_SITUACAO_CADASTRAL|MOTIVO_SITUACAO_CADASTRAL|NOME_DA_CIDADE_NO_EXTERIOR|PAIS|DATA_INICIO_ATIVIDADE|CNAE_PRINCIPAL|CNAE_SECUNDARIA|TIPO_LOGRADOURO|       LOGRADOURO|NUMERO|         COMPLEMENTO|   BAIRRO|     CEP| UF|MUNICIPIO|DDD_1|   TEL_1|DDD_2|   TEL_2|DDD_FAX|     FAX|  CORREIO_ELETRONICO|SITUACAO_ESPECIAL|DATA_SITUACAO_ESPECIAL|
+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-----------------

In [None]:
# 1 - Quantos estabelecimentos existem?
total_estabelecimentos = spark.sql("SELECT COUNT(*) AS total FROM estabelecimentos")
total_estabelecimentos.show()


+--------+
|   total|
+--------+
|20996744|
+--------+



In [None]:
# 2 - Quantas colunas existem e quantas são identificadas como numéricas?

df_estabelecimentos.printSchema()


root
 |-- CNPJ_BASICO: integer (nullable = true)
 |-- CNPJ_ORDEM: integer (nullable = true)
 |-- CNPJ_DV: integer (nullable = true)
 |-- IDENTIFICADOR_MATRIZ_FILIAL: integer (nullable = true)
 |-- NOME_FANTASIA: string (nullable = true)
 |-- SITUACAO_CADASTRAL: integer (nullable = true)
 |-- DATA_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- MOTIVO_SITUACAO_CADASTRAL: integer (nullable = true)
 |-- NOME_DA_CIDADE_NO_EXTERIOR: string (nullable = true)
 |-- PAIS: integer (nullable = true)
 |-- DATA_INICIO_ATIVIDADE: integer (nullable = true)
 |-- CNAE_PRINCIPAL: integer (nullable = true)
 |-- CNAE_SECUNDARIA: string (nullable = true)
 |-- TIPO_LOGRADOURO: string (nullable = true)
 |-- LOGRADOURO: string (nullable = true)
 |-- NUMERO: string (nullable = true)
 |-- COMPLEMENTO: string (nullable = true)
 |-- BAIRRO: string (nullable = true)
 |-- CEP: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- MUNICIPIO: integer (nullable = true)
 |-- DDD_1: string (nullable = true)

In [None]:
# 3 - Comparar tamanho do arquivo Parquet com o CSV original

df_estabelecimentos.write.mode("overwrite").parquet("estabelecimentos.parquet")

!du -sh estabelecimentos.parquet
!du -sh "/content/drive/MyDrive/Colab Notebooks/estabelecimentos-1.csv" "/content/drive/MyDrive/Colab Notebooks/estabelecimentos-2.csv" "/content/drive/MyDrive/Colab Notebooks/estabelecimentos-3.csv"



1.5G	estabelecimentos.parquet
836M	/content/drive/MyDrive/Colab Notebooks/estabelecimentos-1.csv
2.1G	/content/drive/MyDrive/Colab Notebooks/estabelecimentos-2.csv
845M	/content/drive/MyDrive/Colab Notebooks/estabelecimentos-3.csv


In [None]:
# 4 - Quantos estabelecimentos não têm logradouro cadastrado?
spark.sql("SELECT COUNT(*) AS total_sem_logradouro FROM estabelecimentos WHERE LOGRADOURO IS NULL OR LOGRADOURO = ''").show()


+--------------------+
|total_sem_logradouro|
+--------------------+
|                 828|
+--------------------+



In [None]:
# 5 - Quantos estabelecimentos têm logradouro começando com "AVENIDA"?
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

def is_avenida(logradouro):
    if logradouro is None:
        return False
    return logradouro.strip().upper().startswith("AVENIDA")

is_avenida_udf = udf(is_avenida, BooleanType())

df_estabelecimentos = df_estabelecimentos.withColumn("is_avenida", is_avenida_udf("LOGRADOURO"))
df_estabelecimentos.createOrReplaceTempView("estabelecimentos")

spark.sql("SELECT COUNT(*) AS total_avenida FROM estabelecimentos WHERE is_avenida = true").show()


+-------------+
|total_avenida|
+-------------+
|        52587|
+-------------+



In [None]:
# 6 - Quantos CEPs distintos existem?
from pyspark.sql.functions import col

# Cria um nova view filtrando apenas CEPs de 8 dígitos
df_ceps_validos = df_estabelecimentos.filter(
    (col("CEP").isNotNull()) & (col("CEP").rlike("^[0-9]{8}$"))
)

df_ceps_validos.createOrReplaceTempView("ceps_validos")

# Conta os distintos
spark.sql("SELECT COUNT(DISTINCT CEP) AS ceps_distintos FROM ceps_validos").show()




+--------------+
|ceps_distintos|
+--------------+
|        871523|
+--------------+



In [None]:
# 7 - Quantos CNAEs existem na tabela de CNAEs?
spark.sql("SELECT COUNT(DISTINCT CNAE) AS total_cnaes FROM cnaes").show()


+-----------+
|total_cnaes|
+-----------+
|       1359|
+-----------+



In [None]:
# 8 - Quantos estabelecimentos têm CNAE relacionado a cultivo?
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

# UDF para identificar CNAEs de cultivo
def is_cnae_cultivo(descricao):
    if descricao is None:
        return False
    return "cultivo" in descricao.lower()

is_cnae_cultivo_udf = udf(is_cnae_cultivo, BooleanType())

# Faz o join entre estabelecimentos e cnaes usando as colunas corretas
df_est_cnae = df_estabelecimentos.join(
    df_cnaes,
    df_estabelecimentos["CNAE_PRINCIPAL"] == df_cnaes["CNAE"],
    "inner"
)

# Adiciona coluna booleana is_cultivo
df_est_cnae = df_est_cnae.withColumn("is_cultivo", is_cnae_cultivo_udf("DESCRICAO_CNAE"))

# Cria uma view temporária
df_est_cnae.createOrReplaceTempView("estabelecimentos_with_cnae")

# Consulta SQL para contar quantos têm CNAE de cultivo
spark.sql("""
    SELECT COUNT(*) AS total_cultivo
    FROM estabelecimentos_with_cnae
    WHERE is_cultivo = true
""").show()



+-------------+
|total_cultivo|
+-------------+
|       223500|
+-------------+



In [None]:
# 9 - Quantos estabelecimentos são filiais?
spark.sql("""
    SELECT COUNT(*) AS total_filiais
    FROM estabelecimentos
    WHERE IDENTIFICADOR_MATRIZ_FILIAL = 2
""").show()



+-------------+
|total_filiais|
+-------------+
|      1093082|
+-------------+

