In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession \
    .builder \
    .appName("Atividade Modular, Módulo 3: Soluções de Big Data e Data Lake") \
    .getOrCreate()
spark.version

'3.5.1'

In [3]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [22]:
# le o arquivo de cnaes
cnaes_df = spark.read.csv('/content/drive/MyDrive/dados/cnaes/cnaes.csv', header=True, inferSchema=True, sep=";")


In [5]:
# Lê os três arquivos de estabelecimentos
estabelecimento1 = spark.read.csv('/content/drive/MyDrive/dados/estabelecimentos/estabelecimentos-1.csv', header=True, inferSchema=True, sep=";")
estabelecimento2 = spark.read.csv('/content/drive/MyDrive/dados/estabelecimentos/estabelecimentos-2.csv', header=True, inferSchema=True, sep=";")
estabelecimento3 = spark.read.csv('/content/drive/MyDrive/dados/estabelecimentos/estabelecimentos-2.csv', header=True, inferSchema=True, sep=";")


In [6]:
estabelecimento1.dtypes

[('CNPJ_BASICO', 'int'),
 ('CNPJ_ORDEM', 'int'),
 ('CNPJ_DV', 'int'),
 ('IDENTIFICADOR_MATRIZ_FILIAL', 'int'),
 ('NOME_FANTASIA', 'string'),
 ('SITUACAO_CADASTRAL', 'int'),
 ('DATA_SITUACAO_CADASTRAL', 'int'),
 ('MOTIVO_SITUACAO_CADASTRAL', 'int'),
 ('NOME_DA_CIDADE_NO_EXTERIOR', 'string'),
 ('PAIS', 'int'),
 ('DATA_INICIO_ATIVIDADE', 'int'),
 ('CNAE_PRINCIPAL', 'int'),
 ('CNAE_SECUNDARIA', 'string'),
 ('TIPO_LOGRADOURO', 'string'),
 ('LOGRADOURO', 'string'),
 ('NUMERO', 'string'),
 ('COMPLEMENTO', 'string'),
 ('BAIRRO', 'string'),
 ('CEP', 'int'),
 ('UF', 'string'),
 ('MUNICIPIO', 'int'),
 ('DDD_1', 'string'),
 ('TEL_1', 'string'),
 ('DDD_2', 'int'),
 ('TEL_2', 'string'),
 ('DDD_FAX', 'double'),
 ('FAX', 'string'),
 ('CORREIO_ELETRONICO', 'string'),
 ('SITUACAO_ESPECIAL', 'string'),
 ('DATA_SITUACAO_ESPECIAL', 'int')]

padronizando os dados

In [7]:
estabelecimento1 = estabelecimento1.withColumn("CEP", col("CEP").cast("string"))

estabelecimento1 = estabelecimento1.withColumn("DDD_2", col("DDD_2").cast("string"))


In [8]:
# une os 3 arquivos em um unico df
estabelecimentos_df = estabelecimento1.union(estabelecimento2).union(estabelecimento3)

#cria uma tabela temporaria de estabelecimentos
estabelecimentos_df.createOrReplaceTempView("estabelecimentos")

1. Quantos estabelecimentos existem?

In [None]:
estabelecimentos_df.count()


20996740

2. Na tabela de estabelecimentos, quantas colunas existem e quantas são identificadas pelo spark como números? Use o inferSchema ao ler os arquivos.

In [None]:
num_coluna = len(estabelecimentos_df.columns)


tipos_numericos = ['int', 'double']
num_colunas_numericas = sum(
    any(t in tipo.lower() for t in tipos_numericos)
    for _, tipo in estabelecimentos_df.dtypes
)

print(f"A tabela tem {num_coluna} colunas, e {num_colunas_numericas} tipos númericos")



estabelecimentos_df.dtypes

A tabela tem 30 colunas, e 13 tipos númericos


[('CNPJ_BASICO', 'int'),
 ('CNPJ_ORDEM', 'int'),
 ('CNPJ_DV', 'int'),
 ('IDENTIFICADOR_MATRIZ_FILIAL', 'int'),
 ('NOME_FANTASIA', 'string'),
 ('SITUACAO_CADASTRAL', 'int'),
 ('DATA_SITUACAO_CADASTRAL', 'int'),
 ('MOTIVO_SITUACAO_CADASTRAL', 'int'),
 ('NOME_DA_CIDADE_NO_EXTERIOR', 'string'),
 ('PAIS', 'int'),
 ('DATA_INICIO_ATIVIDADE', 'int'),
 ('CNAE_PRINCIPAL', 'int'),
 ('CNAE_SECUNDARIA', 'string'),
 ('TIPO_LOGRADOURO', 'string'),
 ('LOGRADOURO', 'string'),
 ('NUMERO', 'string'),
 ('COMPLEMENTO', 'string'),
 ('BAIRRO', 'string'),
 ('CEP', 'string'),
 ('UF', 'string'),
 ('MUNICIPIO', 'int'),
 ('DDD_1', 'string'),
 ('TEL_1', 'string'),
 ('DDD_2', 'string'),
 ('TEL_2', 'string'),
 ('DDD_FAX', 'double'),
 ('FAX', 'string'),
 ('CORREIO_ELETRONICO', 'string'),
 ('SITUACAO_ESPECIAL', 'string'),
 ('DATA_SITUACAO_ESPECIAL', 'int')]

3. comparar o tamanho do arquivo


In [None]:
estabelecimentos_df.write.mode("overwrite").parquet("estabelecimentos.parquet")

4. quantos estabelecimentos nao tem logradouro cadastrado

In [9]:
spark.sql("SELECT COUNT(*) FROM estabelecimentos WHERE LOGRADOURO IS NULL OR LOGRADOURO = ''").show()


+--------+
|count(1)|
+--------+
|     997|
+--------+



5. Crie uma UDF em Python para identificar logradouros que começam com "AVENIDA" e use Spark SQL para contar quantos registros estão incorretos.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

#define a função
def is_avenida(logradouro):
  if logradouro is None:
    return False
  return logradouro.upper().startswith("AVENIDA")

#transforma a função em um udf
is_avenida_udf = udf(is_avenida, BooleanType())

estabelecimentos = estabelecimentos_df.withColumn("is_avenida", is_avenida_udf("LOGRADOURO"))
estabelecimentos.createOrReplaceTempView("estabelecimentos")

spark.sql("SELECT COUNT(*) FROM estabelecimentos WHERE is_avenida = True").show()





+--------+
|count(1)|
+--------+
|   73423|
+--------+



6. Quantos CEPs distintos existem entre os estabelecimentos?


In [None]:
ceps = spark.sql("""
    SELECT COUNT(DISTINCT lpad(regexp_replace(CEP, '[^0-9]', ''), 8, '0')) AS total_ceps_distintos
    FROM estabelecimento
    WHERE CEP IS NOT NULL AND TRIM(CEP) != ''
""").show()



+--------------------+
|total_ceps_distintos|
+--------------------+
|              839265|
+--------------------+



7. Quantos CNAEs existem na tabela de CNAES?

In [None]:
# cria a view de cnaes
cnaes_df.createOrReplaceTempView("cnaes")

spark.sql("SELECT COUNT(*) FROM cnaes").show()

+--------+
|count(1)|
+--------+
|    1359|
+--------+



8. Vários CNAEs são de cultivo, como “cultivo de milho”, “cultivo de trigo” e “cultivo de arroz”. Quantos estabelecimentos possuem um CNAE relacionado a cultivo?

In [40]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

# faz o join entre as 2 tabelas
estabelecimentos_with_cnae = estabelecimentos_df.join(
    cnaes_df, estabelecimentos_df["CNAE_PRINCIPAL"] == cnaes_df["CNAE"]
)

#cria a udf
def is_cnae_cultivo(descricao):
  if descricao is None:
    return False
  return "cultivo" in descricao.lower()

# Registrando a UDF para uso no SQL
spark.udf.register("is_cnae_cultivo", is_cnae_cultivo, BooleanType())

#cria a view para usar no sql
estabelecimentos_with_cnae.createOrReplaceTempView("estabelecimentos_with_cnae")

resultado = spark.sql("""
    SELECT COUNT(*) AS total_cultivo
    FROM estabelecimentos_with_cnae
    WHERE is_cnae_cultivo(DESCRICAO_CNAE) = true
""")

resultado.show()

+-------------+
|total_cultivo|
+-------------+
|       292825|
+-------------+



9. Quantos estabelecimentos são filiais?

In [41]:
spark.sql("""
    SELECT COUNT(*) AS total_filiais
    FROM estabelecimentos
    WHERE IDENTIFICADOR_MATRIZ_FILIAL = 2
""").show()

+-------------+
|total_filiais|
+-------------+
|      1408801|
+-------------+

