Instalando as dependências

In [2]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [4]:
import findspark
findspark.init()

Abrindo a sessão do Spark

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
.master('local[*]')\
.appName("Primeiro projeto Spark")\
.config('spark.ui.port', '4050')\
.getOrCreate()

In [6]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [7]:
get_ipython().system_raw('./ngrok authtoken 2qAxCok2L28Y6iDPEmGranW6E6Z_3FkB9EdubmzZ2u6YRerD9')
get_ipython().system_raw('./ngrok http 4050 &')

In [8]:
!curl -s http://localhost:4040/api/tunnels

In [9]:
spark

Montando o Drive para poder subir os arquivos

In [10]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Carregando os arquivos

In [31]:
path = '/content/drive/MyDrive/empresas'
# Carregar o arquivo como texto simples
empresas_raw = spark.read.text(path)

# Mostrar as primeiras linhas
empresas_raw.show(truncate=False)


+-------------------------------------------------------------------------------------+
|value                                                                                |
+-------------------------------------------------------------------------------------+
|"cnpj,nome_fantasia,porte,municipio,cnae_principal ","","","","","","","","",""      |
|"00000001,Construtora Alpha,Grande,São Paulo,4100 ","","","","","","","","",""       |
|"00000002,Padaria Central,Pequeno,Rio de Janeiro,1071 ","","","","","","","","",""   |
|"00000003,Ferreira Transportes,Médio,Belo Horizonte,4930 ","","","","","","","","",""|
|"00000004,Loja Econômica,Micro,Curitiba,4712 ","","","","","","","","",""            |
|"00000005,Tech Solutions,Grande,Florianópolis,6201 ","","","","","","","","",""      |
|"00000006,Construtora Beta,Grande,Porto Alegre,4100 ","","","","","","","","",""     |
|"00000007,Supermercado Ideal,Médio,Recife,4711 ","","","","","","","","",""          |
|"00000008,Café Gourmet,Micro,Sa

In [15]:
path2 = '/content/drive/MyDrive/atividades'
atividades = spark.read.option("multiline", "true").json(path2)
atividades.show()


+----+--------------------+
|cnae| descricao_atividade|
+----+--------------------+
|4100|Construção de edi...|
|1071|Fabricação de pro...|
|4930|Transporte rodovi...|
|4712|Comércio varejist...|
|6201|Desenvolvimento d...|
|4711|Comércio varejist...|
|5610|Restaurantes e si...|
|7020|Consultoria em ge...|
|1623|Fabricação de mad...|
+----+--------------------+



Transformação e limpeza dos dados


In [18]:
empresas.printSchema()

root
 |-- cnpj,nome_fantasia,porte,municipio,cnae_principal : string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



In [32]:
from pyspark.sql.functions import split

# Dividir a coluna "value" com base na vírgula
empresas = empresas_raw.withColumn("split_cols", split(empresas_raw["value"], ","))

# Selecionar e renomear as colunas individuais
empresas = empresas.select(
    empresas["split_cols"].getItem(0).alias("cnpj"),
    empresas["split_cols"].getItem(1).alias("nome_fantasia"),
    empresas["split_cols"].getItem(2).alias("porte"),
    empresas["split_cols"].getItem(3).alias("municipio"),
    empresas["split_cols"].getItem(4).alias("cnae_principal")
)

# Mostrar o resultado
empresas.show()



+---------+--------------------+-------+--------------+----------------+
|     cnpj|       nome_fantasia|  porte|     municipio|  cnae_principal|
+---------+--------------------+-------+--------------+----------------+
|    "cnpj|       nome_fantasia|  porte|     municipio|cnae_principal "|
|"00000001|   Construtora Alpha| Grande|     São Paulo|          4100 "|
|"00000002|     Padaria Central|Pequeno|Rio de Janeiro|          1071 "|
|"00000003|Ferreira Transportes|  Médio|Belo Horizonte|          4930 "|
|"00000004|      Loja Econômica|  Micro|      Curitiba|          4712 "|
|"00000005|      Tech Solutions| Grande| Florianópolis|          6201 "|
|"00000006|    Construtora Beta| Grande|  Porto Alegre|          4100 "|
|"00000007|  Supermercado Ideal|  Médio|        Recife|          4711 "|
|"00000008|        Café Gourmet|  Micro|      Salvador|          5610 "|
|"00000009|   Consultoria Delta|Pequeno|      Brasília|          7020 "|
+---------+--------------------+-------+-----------

In [33]:
from pyspark.sql.functions import col, regexp_replace

# Filtrar as linhas onde "cnpj" não contém o texto "cnpj"
empresas = empresas.filter(~col("cnpj").contains("cnpj"))

# Remover as aspas extras das colunas
for coluna in empresas.columns:
    empresas = empresas.withColumn(coluna, regexp_replace(col(coluna), '"', ''))

# Mostrar o resultado final
empresas.show()


+--------+--------------------+-------+--------------+--------------+
|    cnpj|       nome_fantasia|  porte|     municipio|cnae_principal|
+--------+--------------------+-------+--------------+--------------+
|00000001|   Construtora Alpha| Grande|     São Paulo|         4100 |
|00000002|     Padaria Central|Pequeno|Rio de Janeiro|         1071 |
|00000003|Ferreira Transportes|  Médio|Belo Horizonte|         4930 |
|00000004|      Loja Econômica|  Micro|      Curitiba|         4712 |
|00000005|      Tech Solutions| Grande| Florianópolis|         6201 |
|00000006|    Construtora Beta| Grande|  Porto Alegre|         4100 |
|00000007|  Supermercado Ideal|  Médio|        Recife|         4711 |
|00000008|        Café Gourmet|  Micro|      Salvador|         5610 |
|00000009|   Consultoria Delta|Pequeno|      Brasília|         7020 |
+--------+--------------------+-------+--------------+--------------+



Juntando os dois arquivos

In [36]:
# Mostrar os valores únicos de cnae no DataFrame empresas
empresas.select("cnae").distinct().show()

# Mostrar os valores únicos de cnae no DataFrame atividades
atividades.select("cnae").distinct().show()


+-----+
| cnae|
+-----+
|4100 |
|7020 |
|4930 |
|6201 |
|4711 |
|5610 |
|4712 |
|1071 |
+-----+

+----+
|cnae|
+----+
|1623|
|4930|
|1071|
|4712|
|7020|
|4711|
|6201|
|4100|
|5610|
+----+



In [37]:
from pyspark.sql.functions import col, trim

# Remover espaços em branco e garantir que cnae seja string no DataFrame empresas
empresas = empresas.withColumn("cnae", trim(col("cnae")).cast("string"))

# Remover espaços em branco e garantir que cnae seja string no DataFrame atividades
atividades = atividades.withColumn("cnae", trim(col("cnae")).cast("string"))

# Realizar o join entre os dois DataFrames na coluna "cnae"
resultado = empresas.join(atividades, on="cnae", how="inner")

# Mostrar o resultado do join
resultado.show()

# Salvar o resultado em um único arquivo CSV
output_path = "/content/drive/MyDrive/resultado/resultado_final.csv"
resultado.coalesce(1).write.csv(output_path, header=True, mode="overwrite")


+----+--------+--------------------+-------+--------------+--------------------+
|cnae|    cnpj|       nome_fantasia|  porte|     municipio| descricao_atividade|
+----+--------+--------------------+-------+--------------+--------------------+
|4100|00000001|   Construtora Alpha| Grande|     São Paulo|Construção de edi...|
|1071|00000002|     Padaria Central|Pequeno|Rio de Janeiro|Fabricação de pro...|
|4930|00000003|Ferreira Transportes|  Médio|Belo Horizonte|Transporte rodovi...|
|4712|00000004|      Loja Econômica|  Micro|      Curitiba|Comércio varejist...|
|6201|00000005|      Tech Solutions| Grande| Florianópolis|Desenvolvimento d...|
|4100|00000006|    Construtora Beta| Grande|  Porto Alegre|Construção de edi...|
|4711|00000007|  Supermercado Ideal|  Médio|        Recife|Comércio varejist...|
|5610|00000008|        Café Gourmet|  Micro|      Salvador|Restaurantes e si...|
|7020|00000009|   Consultoria Delta|Pequeno|      Brasília|Consultoria em ge...|
+----+--------+-------------

criando o View

In [38]:
resultado.createOrReplaceTempView("resultadoview")

In [39]:
spark.sql("select * from resultadoview").show()

+----+--------+--------------------+-------+--------------+--------------------+
|cnae|    cnpj|       nome_fantasia|  porte|     municipio| descricao_atividade|
+----+--------+--------------------+-------+--------------+--------------------+
|4100|00000001|   Construtora Alpha| Grande|     São Paulo|Construção de edi...|
|1071|00000002|     Padaria Central|Pequeno|Rio de Janeiro|Fabricação de pro...|
|4930|00000003|Ferreira Transportes|  Médio|Belo Horizonte|Transporte rodovi...|
|4712|00000004|      Loja Econômica|  Micro|      Curitiba|Comércio varejist...|
|6201|00000005|      Tech Solutions| Grande| Florianópolis|Desenvolvimento d...|
|4100|00000006|    Construtora Beta| Grande|  Porto Alegre|Construção de edi...|
|4711|00000007|  Supermercado Ideal|  Médio|        Recife|Comércio varejist...|
|5610|00000008|        Café Gourmet|  Micro|      Salvador|Restaurantes e si...|
|7020|00000009|   Consultoria Delta|Pequeno|      Brasília|Consultoria em ge...|
+----+--------+-------------

Consultas SQL

In [40]:
#Contando o número de empresas por porte

spark.sql("""
select porte, count(*) as total_empresas
from resultadoview
group by porte
order by  total_empresas DESC""").show()

+-------+--------------+
|  porte|total_empresas|
+-------+--------------+
| Grande|             3|
|  Micro|             2|
|Pequeno|             2|
|  Médio|             2|
+-------+--------------+



In [42]:
#Listando os municípios com mais empresas cadastradas
spark.sql("""select municipio, count(*) as total_empresas
from resultadoview
group by municipio
order by total_empresas DESC """).show()


+--------------+--------------+
|     municipio|total_empresas|
+--------------+--------------+
|      Curitiba|             1|
| Florianópolis|             1|
|Belo Horizonte|             1|
|      Salvador|             1|
|        Recife|             1|
|     São Paulo|             1|
|Rio de Janeiro|             1|
|      Brasília|             1|
|  Porto Alegre|             1|
+--------------+--------------+



In [44]:
#buscando empresas do setor de construção

spark.sql(""" select *
from resultadoview
where descricao_atividade LIKE '%Construção%'""").show()

+----+--------+-----------------+------+------------+--------------------+
|cnae|    cnpj|    nome_fantasia| porte|   municipio| descricao_atividade|
+----+--------+-----------------+------+------------+--------------------+
|4100|00000001|Construtora Alpha|Grande|   São Paulo|Construção de edi...|
|4100|00000006| Construtora Beta|Grande|Porto Alegre|Construção de edi...|
+----+--------+-----------------+------+------------+--------------------+



In [45]:
#distribuir empresas por atividade economica

spark.sql("""select descricao_atividade,count(*) as total_empresas
from resultadoview
group by descricao_atividade
order by total_empresas DESC""").show()

+--------------------+--------------+
| descricao_atividade|total_empresas|
+--------------------+--------------+
|Construção de edi...|             2|
|Comércio varejist...|             1|
|Transporte rodovi...|             1|
|Restaurantes e si...|             1|
|Consultoria em ge...|             1|
|Fabricação de pro...|             1|
|Desenvolvimento d...|             1|
|Comércio varejist...|             1|
+--------------------+--------------+



In [46]:
#empresas grandes e suas atividades economicas

spark.sql(""" select nome_fantasia, municipio, descricao_atividade
from resultadoview
where porte =='Grande'
order by municipio """).show()

+-----------------+-------------+--------------------+
|    nome_fantasia|    municipio| descricao_atividade|
+-----------------+-------------+--------------------+
|   Tech Solutions|Florianópolis|Desenvolvimento d...|
| Construtora Beta| Porto Alegre|Construção de edi...|
|Construtora Alpha|    São Paulo|Construção de edi...|
+-----------------+-------------+--------------------+



In [47]:
#Atividades com maior concentração de empresas micro e pequeno porte

spark.sql("""select descricao_atividade, count(*) as total_empresas
from resultadoview
where porte in ('Micro', 'Pequeno')
group by descricao_atividade
order by total_empresas DESC
 """).show()

+--------------------+--------------+
| descricao_atividade|total_empresas|
+--------------------+--------------+
|Comércio varejist...|             1|
|Fabricação de pro...|             1|
|Restaurantes e si...|             1|
|Consultoria em ge...|             1|
+--------------------+--------------+



In [48]:
#Empresas de TI em cidades grandes

spark.sql("""SELECT nome_fantasia, municipio, porte
FROM resultadoview
WHERE descricao_atividade LIKE '%Desenvolvimento de programas%'
AND porte = 'Grande' """).show()


+--------------+-------------+------+
| nome_fantasia|    municipio| porte|
+--------------+-------------+------+
|Tech Solutions|Florianópolis|Grande|
+--------------+-------------+------+



In [50]:
spark.stop()