# # Configuracao inicial necessarias para o projeto:

## - Instalando dependências necessarias.

In [None]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -qq https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -qq findspark
!pip install -qq pyspark

[K     |████████████████████████████████| 281.4 MB 44 kB/s 
[K     |████████████████████████████████| 199 kB 60.1 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## - Definindo de variaveis de ambiente necessarias para o spark.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

## - Localizando o binario do spark definido acima atraves do findspark.

In [None]:
import findspark
findspark.init()

## - Definindo uma sessao spark.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

## - Testando o spark.

In [None]:
data = [('Zeca', '35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']

df = spark.createDataFrame(data, colNames)

df.toPandas()

# # Importacao dos arquivos a serem processados:

## - Definindo acesso aos arquivos do Google Drive.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## - Extraindo os arquivos.

In [None]:
import zipfile

In [None]:
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/alura-curso-spark/zips/empresas.zip','r') \
  .extractall('/content/drive/MyDrive/Colab Notebooks/alura-curso-spark')

In [None]:
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/alura-curso-spark/zips/estabelecimentos.zip','r') \
  .extractall('/content/drive/MyDrive/Colab Notebooks/alura-curso-spark')

In [None]:
zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/alura-curso-spark/zips/socios.zip','r') \
  .extractall('/content/drive/MyDrive/Colab Notebooks/alura-curso-spark')

## - Importando arquivos.

In [None]:
path_empresas = '/content/drive/MyDrive/Colab Notebooks/alura-curso-spark/empresas'

empresas = spark.read.csv(path_empresas, sep=';', inferSchema=True)

In [None]:
path_estabelecimentos = '/content/drive/MyDrive/Colab Notebooks/alura-curso-spark/estabelecimentos'

estabelecimentos = spark.read.csv(path_estabelecimentos, sep=';', inferSchema=True)

In [None]:
path_socios = '/content/drive/MyDrive/Colab Notebooks/alura-curso-spark/socios'

socios = spark.read.csv(path_socios, sep=';', inferSchema=True)

## - Testando arquivos no spark.

In [None]:
empresas.count()

4585679

In [None]:
estabelecimentos.count()

4836219

In [None]:
socios.count()

2046430

# # Inicializacao do processamento:

## - Renomeando colunas do DataFrame.

In [None]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [None]:
for index, colName in enumerate(empresasColNames):
  empresas = empresas.withColumnRenamed(f"_c{index}", colName)

In [None]:
empresas.limit(5).toPandas()

In [None]:
empresas.printSchema()

In [None]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [None]:
for index, colName in enumerate(estabsColNames):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

In [None]:
estabelecimentos.limit(5).toPandas()

In [None]:
estabelecimentos.printSchema()

In [None]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [None]:
for index, colName in enumerate(sociosColNames):
  socios = socios.withColumnRenamed(f"_c{index}", colName)

In [None]:
socios.limit(5).toPandas()

In [None]:
socios.printSchema()

## - Transformando virgula em ponto na coluna ***capital_social_da_empresa***.

In [None]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [None]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))

## - Transformando string em double na coluna ***capital_social_da_empresa***.

In [None]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))

## - Transformando string em date.

In [None]:
estabelecimentos = estabelecimentos \
  .withColumn(
      'data_situacao_cadastral',
      f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
  ) \
  .withColumn(
      'data_de_inicio_atividade',
      f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
  ) \
  .withColumn(
      'data_da_situacao_especial',
      f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
  )

In [None]:
empresas \
  .select('natureza_juridica','porte_da_empresa','capital_social_da_empresa') \
  .show(5)

In [None]:
socios = socios \
  .withColumn(
    'data_de_entrada_sociedade',
    f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
  )

In [None]:
socios \
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria',  f.year('data_de_entrada_sociedade').alias('ano_de_entrada')) \
  .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False]) \
  .show(5, False)

## - Contagem de nulls nas colunas.

In [None]:
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()

### - - Observacao:

In [None]:
socios.fillna(0).limit(5).show() # -> remove os nulls de colunas que nao sao string type
socios.fillna('-').limit(5).show() # -> remove os nulls de colunas que sao string type

In [None]:
# df = spark.createDataFrame([(1, ), (2, ), (3, ), (None, )], ['data'])
# df.show() # ----> retorna null no ultimo elemento.
# df.toPandas()# -> retorna 3 primeiros elementos como float e o ultimo elemento como NaN.

# df = spark.createDataFrame([(1., ), (2., ), (3., ), (float('nan'), )], ['data'])
# df.show() # ----
               # |--> retornan NaN no ultimo elemento.
# df.toPandas()# -

# conclusao: NaN e um elemento do tipo double e/ou float.
"""
Dependendo da natureza dos dados, uma boa pratica seria substituir os NaNs
por nulls, depois trata-los de maneira adequada, removendo-os ou preenchen-
do-os
"""

## - - Filatrando dados.

In [None]:
  # .where(empresas.capital_social_da_empresa == 50) \
empresas \
  .where('capital_social_da_empresa==50') \
  .toPandas()

In [None]:
socios \
  .select('nome_do_socio_ou_razao_social') \
  .filter(socios.nome_do_socio_ou_razao_social.startswith("RODRIGO")) \
  .filter(socios.nome_do_socio_ou_razao_social.endswith("DIAS")) \
  .limit(10) \
  .toPandas()

In [None]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])

df \
  .where(f.upper(df.data).like('%RESTAURANTE%')) \
  .show(truncate=False)


In [None]:
empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%RESTAURANTE%'))\
    .show(15, False)

# - Sumarizacao dos dados.

## - - Contando e ordenando dados.

In [None]:
socios \
  .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada')) \
  .where('ano_de_entrada >= 2010') \
  .groupBy('ano_de_entrada') \
  .count() \
  .orderBy('ano_de_entrada', ascending=True) \
  .show()

## - - Agregacao de dados.

In [None]:
empresas \
  .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa') \
  .groupBy('porte_da_empresa') \
  .agg(\
      f.avg('capital_social_da_empresa').alias('capital_social_medio'),
      f.count('cnpj_basico').alias('frequencia')
  ) \
  .show()

In [None]:
empresas \
  .select('capital_social_da_empresa') \
  .summary() \
  .show()

## - - Entendendo Joins.

In [None]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')

## - - Calculando a frequencia.

In [None]:
freq = empresas_join \
    .select(
        'cnpj_basico', 
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count("cnpj_basico").alias("frequencia"))\
    .orderBy('data_de_inicio', ascending=True)

In [None]:
freq.toPandas()

In [None]:
freq.union(
    freq.select(
        f.lit('Total').alias('kkkk'), # .lit() -> cria um valor literal logo abaixo da ultima linha
        f.sum(freq.frequencia).alias('frequencia')
    )
).show()