In [1]:
# instalar as dependências do spark
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
# Definir as variaveis de hambiente do Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

In [4]:
import zipfile
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [5]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

[Site ngrok](https://ngrok.com)

Utilizado para visualizar o spark ui atravé so colab.
Tem que criar uma conta e gerar um token pra utilizar.

In [None]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

In [None]:
get_ipython().system_raw('./ngrok authtoken 2XGaLL2gUnl0ZQ6Eyc1TCSV9gnO_46CuomGFTP9ZAcrdJvZbq')
get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
# Abre o link que vai gerar para acessar o sparkUI
!curl -s http://localhost:4040/api/tunnels

In [6]:
# Monta o Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/spark/data/empresas.zip','r').extractall('/content/drive/MyDrive/Colab Notebooks/spark/data')

### Load dados das Empresas
> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)


In [16]:
path = '/content/drive/MyDrive/Colab Notebooks/spark/data/empresas/'
empresas = spark.read.csv(path, sep=';', inferSchema=True)

### Load dados dos estabelecimento
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)

In [17]:
path = '/content/drive/MyDrive/Colab Notebooks/spark/data/estabelecimentos/'
estabelecimentos = spark.read.csv(path, sep=';', inferSchema=True)

### Load dados dos sócios
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

In [18]:
path = '/content/drive/MyDrive/Colab Notebooks/spark/data/socios/'
socios = spark.read.csv(path, sep=';', inferSchema=True)

###Tratativas nas colunas

In [19]:
## Ajustar o nome das colunas
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [20]:
for index,colName in enumerate(empresasColNames):
  empresas = empresas.withColumnRenamed(f"_c{index}", colName)

for index,colName in enumerate(estabsColNames):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

for index,colName in enumerate(sociosColNames):
  socios = socios.withColumnRenamed(f"_c{index}", colName)

In [21]:
empresas = empresas.withColumn('capital_social_da_empresa',f.regexp_replace('capital_social_da_empresa',',','.').cast(DoubleType()))

In [22]:
estabelecimentos = estabelecimentos\
  .withColumn('data_situacao_cadastral',f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()),'yyyyMMdd'))\
  .withColumn('data_de_inicio_atividade',f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()),'yyyyMMdd'))\
  .withColumn('data_da_situacao_especial',f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()),'yyyyMMdd'))

In [23]:
socios = socios.withColumn('data_de_entrada_sociedade',f.to_date(socios.data_de_entrada_sociedade.cast(StringType()),'yyyyMMdd'))

In [None]:
# Verificando quais colunas tem registros nulos e suas quantidades
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()

In [None]:
estabelecimentos.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in estabelecimentos.columns]).show()

In [None]:
empresas.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in empresas.columns]).show()

### Algumas Análises de dados.

In [None]:
## Novos Socios por ano.
socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupBy('ano_de_entrada').count()\
    .orderBy('ano_de_entrada', ascending=True)\
    .show()


In [None]:
## Porte x Capital Medio x QTD
empresas\
    .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
    .groupBy('porte_da_empresa')\
    .agg(
        f.avg("capital_social_da_empresa").alias("capital_social_medio"),
        f.count("cnpj_basico").alias("frequencia")
    )\
    .orderBy('porte_da_empresa', ascending=True)\
    .show()


In [None]:
empresas\
    .select("capital_social_da_empresa")\
    .summary()\
    .show()

In [24]:
empresas_join = estabelecimentos.join(empresas,'cnpj_basico',how = 'inner')

In [28]:
freq = empresas_join\
    .select(
        'cnpj_basico',
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count("cnpj_basico").alias("frequencia"))\
    .orderBy('data_de_inicio', ascending=True)

In [None]:
freq.toPandas()

In [None]:
freq.union(
    freq.select(
        f.lit('Total').alias('data_inicio'),
        f.sum('frequencia').alias('frequencia')
    )
).show()

In [18]:
empresas.createOrReplaceTempView("empresasView")

In [33]:
empresas_join.createOrReplaceTempView("empresasJoinView")

In [None]:
freq = spark\
    .sql("""
        SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count
            FROM empresasJoinView
            WHERE YEAR(data_de_inicio_atividade) >= 2010
            GROUP BY data_de_inicio
            ORDER BY data_de_inicio
    """)

freq\
    .show()

### Salvando os dados em CSV

In [27]:
empresas.write.csv(
    path='/content/drive/MyDrive/Colab Notebooks/spark/csv/empresas',
    mode='overwrite',
    sep=';',
    header = True
)

In [14]:
estabelecimentos.write.csv(
    path='/content/drive/MyDrive/Colab Notebooks/spark/csv/estabelecimentos',
    mode='overwrite',
    sep=';',
    header = True
)

In [32]:
socios.write.csv(
    path='/content/drive/MyDrive/Colab Notebooks/spark/csv/socios',
    mode='overwrite',
    sep=';',
    header = True
)

###Salvando os dados em Parquet

In [35]:
empresas.write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/spark/parquet/empresas',
    mode='overwrite'
)

In [None]:
estabelecimentos.write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/spark/parquet/estabelecimentos',
    mode='overwrite'
)

In [44]:
socios.write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/spark/parquet/socios',
    mode='overwrite'
)

###Particionamento

In [45]:
empresas.coalesce(1).write.csv(
    path='/content/drive/MyDrive/Colab Notebooks/spark/csv_unico/empresas',
    mode='overwrite',
    sep=';',
    header = True
)

In [47]:
empresas.coalesce(1).write.parquet(
    path='/content/drive/MyDrive/Colab Notebooks/spark/parquet_partitionby/empresas',
    mode='overwrite',
    partitionBy='porte_da_empresa'
)

### END SPARK


In [None]:
spark.stop()