#**Instalação do Spark** na Instância do *Google Colab*

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

#So let's get started!!

###Teste de DataFrame

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

In [None]:
data = [('Felipe','Data Engineer'),('John','Data Scientist')]
Cols = ['Name','Profession']

df = spark.createDataFrame(data,Cols)
df.show()

+------+--------------+
|  Name|    Profession|
+------+--------------+
|Felipe| Data Engineer|
|  John|Data Scientist|
+------+--------------+



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


###Importando Conteúdos

In [None]:
path_empresa = '/content/drive/MyDrive/Curso Sparkao/empresas'
empresas = spark.read.csv(path_empresa,sep=';',inferSchema=True)
empresas.count()

4585679

In [None]:
path_socio = '/content/drive/MyDrive/Curso Sparkao/socios'
socios = spark.read.csv(path_socio,sep=';',inferSchema=True)
socios.count()

2046430

In [None]:
path_estabelecimento = '/content/drive/MyDrive/Curso Sparkao/estabelecimentos'
estabelecimentos = spark.read.csv(path_estabelecimento,sep=';',inferSchema=True)
estabelecimentos.count()

4836219

###Manipulando Headers

In [None]:
#Renomeando colunas _C# de empresas
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

for index, ColName in enumerate(empresasColNames):
  empresas = empresas.withColumnRenamed(f"_C{index}",ColName)

empresas.limit(5).show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0,00|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0,00|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0,00|               5|                       null|
|       5347|         ROSELY APARE

In [None]:
#Renomeando colunas _C# de estabelecimentos
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

for index, ColName in enumerate(estabsColNames):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_C{index}",ColName)

estabelecimentos.limit(5).show()

+-----------+----------+-------+---------------------------+-----------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-----------+------------------+-------+---+---------+-----+----------+-----+----------+----------+----+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|    nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|complemento|            bairro|    cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax| fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+--------------

In [None]:
#Renomeando colunas _C# de socios
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

for index, ColName in enumerate(sociosColNames):
  socios = socios.withColumnRenamed(f"_c{index}",ColName)

socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|                 19940725|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

###Manipulando Data types

In [None]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [None]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
empresas.limit(5).show()

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0,00|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0,00|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0,00|               5|                       null|
|       5347|         ROSELY APARE

In [None]:
empresas = empresas.withColumn('capital_social_da_empresa',f.regexp_replace('capital_social_da_empresa',',','.'))
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))

In [None]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
socios = socios\
  .withColumn('data_de_entrada_sociedade',f\
  .to_date(socios['data_de_entrada_sociedade']\
  .cast(StringType()),'yyyyMMdd'))

In [None]:
socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

### Alguns selects

In [None]:
empresas\
  .select('natureza_juridica','porte_da_empresa','capital_social_da_empresa')\
  .show(5)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|             2240|               1|                      0.0|
|             2062|               5|                      0.0|
|             3034|               5|                      0.0|
|             2135|               5|                      0.0|
|             2062|               1|                   4000.0|
+-----------------+----------------+-------------------------+
only showing top 5 rows



In [None]:
socios\
  .select('nome_do_socio_ou_razao_social','faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .orderBy('faixa_etaria',ascending=False)\
  .show(5, False)

+--------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social         |faixa_etaria|ano_de_entrada|
+--------------------------------------+------------+--------------+
|NOSKE DO REGO CASTELO BRANCO          |9           |2000          |
|JOSE BATISTA SOARES LEITE             |9           |2002          |
|RICARDO DE MELLO OLIVEIRA GASPARIAN   |9           |1995          |
|EUZA THEREZINHA DE PAULA E SILVA COSTA|9           |1995          |
|MARIA LUIZA DA SILVA                  |9           |2002          |
+--------------------------------------+------------+--------------+
only showing top 5 rows



In [None]:
empresas\
  .select('*')\
  .where('capital_social_da_empresa>=1000000')\
  .count()

25548

In [None]:
empresas\
  .select('*')\
  .where('capital_social_da_empresa>=10000000')\
  .count()

5304

In [None]:
socios\
  .select("nome_do_socio_ou_razao_social")\
  .filter(socios.nome_do_socio_ou_razao_social.contains("SILVA"))\
  .count()

11

###Trabalhando com view e SparkSQL

In [None]:
empresas.createOrReplaceTempView("EmpresasView")
spark.sql("SELECT * FROM EmpresasView").show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       null|
|       5347|         ROSELY APARE

In [None]:
spark.sql("""
SELECT * FROM EmpresasView
WHERE razao_social_nome_empresarial LIKE '%PADARIA%'
""").show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|     133578|          L A MAGRINI PADARIA|             2135|                         50|                      0.0|               5|                       null|
|    2876672|         J BATISTA DE SOUZ...|             2135|                         50|                      0.0|               1|                       null|
|    5010164|         JOSE SIMON - PADARIA|             2135|                         50|                      0.0|               5|                       null|
|    5065715|         PADARIA COPA