### Instalação do Spark

In [None]:
!wget -q http://archive.apache.org/dist/spark/spark-3.2.2/spark-3.2.2-bin-hadoop2.7.tgz
!tar -zxf spark-3.2.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

### Criação do Spark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

#### Somente necessário para o Colab já que não é possível acessar o localhost criado pelo Spark

In [None]:
!wget -q https://bin.equinox.io/c/bNyj1mQVY4c/ngrok-v3-stable-linux-amd64.zip
!unzip ngrok-v3-stable-linux-amd64.zip

Archive:  ngrok-v3-stable-linux-amd64.zip
  inflating: ngrok                   


In [None]:
get_ipython().system_raw('./ngrok authtoken {}')
get_ipython().system_raw('./ngrok http 4050 &')


KeyboardInterrupt



In [None]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[],"uri":"/api/tunnels"}


### Exemplo - Criação de DataFrame

In [None]:
data = [('Lucio', 44), ('Maria', 22)] # [{'Nome': 'Zeca', 'Idade': '35'}, {'Nome': 'Eva', 'Idade': '29'}]
col = ['Nome', 'Idade']
df = spark.createDataFrame(data, col)
df

DataFrame[Nome: string, Idade: bigint]

In [None]:
df.show()

+-----+-----+
| Nome|Idade|
+-----+-----+
|Lucio|   44|
|Maria|   22|
+-----+-----+



In [None]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Lucio,44
1,Maria,22


### Carregamento dos dados

In [None]:
# !wget http://200.152.38.155/CNPJ/Empresas0.zip
!gdown https://drive.google.com/uc?id=1jkLy51Q26vjN8j0wWnXYASI_0HdvWjjC
!gdown https://drive.google.com/uc?id=1lHyKFbbM-NI6SoUuNxy_LifT6kho09oi
!gdown https://drive.google.com/uc?id=1CED5m2q9P3NNyL36MuE16_eiZ02cteOW

Downloading...
From: https://drive.google.com/uc?id=1jkLy51Q26vjN8j0wWnXYASI_0HdvWjjC
To: /content/Socios0.zip
100% 87.1M/87.1M [00:00<00:00, 154MB/s]
Downloading...
From: https://drive.google.com/uc?id=1lHyKFbbM-NI6SoUuNxy_LifT6kho09oi
To: /content/Empresas0.zip
100% 198M/198M [00:01<00:00, 125MB/s]
Downloading...
From: https://drive.google.com/uc?id=1CED5m2q9P3NNyL36MuE16_eiZ02cteOW
To: /content/Estabelecimentos0.zip
100% 763M/763M [00:05<00:00, 137MB/s]


In [None]:
!unzip -qo Empresas0.zip -d "/content/empresas/"
!unzip -qo Socios0.zip -d "/content/socios/"
!unzip -qo Estabelecimentos0.zip -d "/content/estabelecimentos/"

### Criação do DataFrame

In [None]:
path = '/content/empresas'
empresas = spark.read.csv(path, sep=';', inferSchema=True)

In [None]:
empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,41273600,AVANILSON BRUNO MATIAS DA SILVA 08778601495,2135,50,5000000,1,
1,41273601,GABRIELA HELENA FACINI DA SILVA 47022415838,2135,50,200000,1,
2,41273602,FABIO SOUZA DO ROSARIO 80392440210,2135,50,1500000,1,
3,41273603,GRAFLINE ACESSORIOS GRAFICOS LTDA,2062,49,1000000,1,
4,41273604,RUMO - ESTUDIO DE DANCA LTDA,2062,49,1000000,1,


### Mudança dos nomes das colunas e seus tipos

In [None]:
cols = ["CNPJ", "NOME_EMPRESARIAL", "NATUREZA_JURIDICA", "QUALIFICACAO_RESPONSAVEL", "CAPITAL_SOCIAL", "PORTE_EMPRESA", "ENTE_FEDERATIVO"]
"""
for id, col in enumerate(cols):
  empresas = empresas.withColumnRenamed(f"_c{id}", col)
empresas.columns
"""

# tambem funciona
empresas = empresas.toDF(*cols)
empresas.columns

['CNPJ',
 'NOME_EMPRESARIAL',
 'NATUREZA_JURIDICA',
 'QUALIFICACAO_RESPONSAVEL',
 'CAPITAL_SOCIAL',
 'PORTE_EMPRESA',
 'ENTE_FEDERATIVO']

In [None]:
empresas.printSchema()

root
 |-- CNPJ: integer (nullable = true)
 |-- NOME_EMPRESARIAL: string (nullable = true)
 |-- NATUREZA_JURIDICA: integer (nullable = true)
 |-- QUALIFICACAO_RESPONSAVEL: integer (nullable = true)
 |-- CAPITAL_SOCIAL: string (nullable = true)
 |-- PORTE_EMPRESA: integer (nullable = true)
 |-- ENTE_FEDERATIVO: string (nullable = true)



In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as f

Alteração da representação da casa decimal de ',' para '.' na coluna 'CAPITAL_SOCIAL':

In [None]:
empresas = empresas.withColumn("CAPITAL_SOCIAL", f.regexp_replace("CAPITAL_SOCIAL", ",", "."))
empresas.printSchema()

root
 |-- CNPJ: integer (nullable = true)
 |-- NOME_EMPRESARIAL: string (nullable = true)
 |-- NATUREZA_JURIDICA: integer (nullable = true)
 |-- QUALIFICACAO_RESPONSAVEL: integer (nullable = true)
 |-- CAPITAL_SOCIAL: string (nullable = true)
 |-- PORTE_EMPRESA: integer (nullable = true)
 |-- ENTE_FEDERATIVO: string (nullable = true)



Como por conta da vírgula, a coluna 'CAPITAL_SOCIAL' possuía o tipo _string_, após a alteração para ponto, faz-se a modificação do tipo da coluna para _double_:

In [None]:
empresas = empresas.withColumn("CAPITAL_SOCIAL", empresas["CAPITAL_SOCIAL"].cast(DoubleType()))

In [None]:
empresas.printSchema()

root
 |-- CNPJ: integer (nullable = true)
 |-- NOME_EMPRESARIAL: string (nullable = true)
 |-- NATUREZA_JURIDICA: integer (nullable = true)
 |-- QUALIFICACAO_RESPONSAVEL: integer (nullable = true)
 |-- CAPITAL_SOCIAL: double (nullable = true)
 |-- PORTE_EMPRESA: integer (nullable = true)
 |-- ENTE_FEDERATIVO: string (nullable = true)



### Consultas no _dataset_

Contagem de valores nulos por coluna:

In [None]:
empresas.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in empresas.columns]).show()

+----+----------------+-----------------+------------------------+--------------+-------------+---------------+
|CNPJ|NOME_EMPRESARIAL|NATUREZA_JURIDICA|QUALIFICACAO_RESPONSAVEL|CAPITAL_SOCIAL|PORTE_EMPRESA|ENTE_FEDERATIVO|
+----+----------------+-----------------+------------------------+--------------+-------------+---------------+
|   0|               0|                0|                       0|             0|         7547|       10003709|
+----+----------------+-----------------+------------------------+--------------+-------------+---------------+



Busca por determinado CNPJ:

In [None]:
empresas.filter(empresas["CNPJ"]==41274130)\
  .show(truncate=False)

+--------+----------------------------------------+-----------------+------------------------+--------------+-------------+---------------+
|CNPJ    |NOME_EMPRESARIAL                        |NATUREZA_JURIDICA|QUALIFICACAO_RESPONSAVEL|CAPITAL_SOCIAL|PORTE_EMPRESA|ENTE_FEDERATIVO|
+--------+----------------------------------------+-----------------+------------------------+--------------+-------------+---------------+
|41274130|DANIELE MARIA COSTA DA SILVA 10295891432|2135             |50                      |2000.0        |1            |null           |
+--------+----------------------------------------+-----------------+------------------------+--------------+-------------+---------------+



Contagem da quantidade de empresas por porte da empresa:

In [None]:
empresas.where(empresas.PORTE_EMPRESA=="null")\
  .show(truncate=False)

+----+----------------+-----------------+------------------------+--------------+-------------+---------------+
|CNPJ|NOME_EMPRESARIAL|NATUREZA_JURIDICA|QUALIFICACAO_RESPONSAVEL|CAPITAL_SOCIAL|PORTE_EMPRESA|ENTE_FEDERATIVO|
+----+----------------+-----------------+------------------------+--------------+-------------+---------------+
+----+----------------+-----------------+------------------------+--------------+-------------+---------------+



In [None]:
empresas\
  .select("PORTE_EMPRESA")\
  .groupBy("PORTE_EMPRESA")\
  .count()\
  .orderBy("PORTE_EMPRESA", ascending=True)\
  .show()

+-------------+-------+
|PORTE_EMPRESA|  count|
+-------------+-------+
|         null|   7547|
|            1|6312640|
|            3| 232354|
|            5|3458623|
+-------------+-------+



Média do capital social:

In [None]:
empresas\
  .select(f.avg("CAPITAL_SOCIAL"))\
  .show()

+-------------------+
|avg(CAPITAL_SOCIAL)|
+-------------------+
|  7699269.441710852|
+-------------------+



Quantidade de linhas do _dataset_:

In [None]:
empresas\
  .count()

10011164