<a href="https://colab.research.google.com/github/etgcrog/glue_aws_docker/blob/main/spark_revisao.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [5]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']

In [6]:
df = spark.createDataFrame(data, colNames)

In [7]:
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [8]:
df.filter(df.Idade == "35").show(truncate=False)

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|35   |
+----+-----+



In [9]:
from zipfile import ZipFile

prefix_zip = "/content/drive/MyDrive/Colab Notebooks/data"
prefix_unzip = "/content/data"

names_archives = ['empresas','estabelecimentos','socios']

for archive in names_archives:
  with ZipFile(f"{prefix_zip}/{archive}.zip", 'r') as zip_ref:
    zip_ref.extractall(f"{prefix_unzip}")


In [10]:
def read_parquet(directory):
  return spark.read.options(delimiter=";", inferSchema='True').csv(directory)

def sub_comman_dot(df, col_name):
  return df.withColumn(col_name, f.regexp_replace(col_name, ",", ".").cast(DoubleType()))

def rename_column(df, list_names):
  for index, colName in enumerate(list_names):
    df = df.withColumnRenamed(f"_c{index}", colName)
  return df

def sub_string_date(df, column_name, pattern, many_columns=False):
  if many_columns:
    for column in column_name:
      df = df.withColumn(column, f.to_date(df[column].cast(StringType()), pattern))
    return df
  else:
    df = df.withColumn(column_name, f.to_date(df[column_name].cast(StringType()), pattern))
    return df

In [11]:
df_empresas = read_parquet(f"{prefix_unzip}/empresas")
df_estabelecimentos = read_parquet(f"{prefix_unzip}/estabelecimentos")
df_socios = read_parquet(f"{prefix_unzip}/socios")

In [12]:
#Lista para renomear
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

df_empresas = rename_column(df_empresas, empresasColNames)
df_estabelecimentos = rename_column(df_estabelecimentos, estabsColNames)
df_socios = rename_column(df_socios, sociosColNames)


In [13]:
# TRATAMENTO DE DADOS
df_empresas = sub_comman_dot(df_empresas, "capital_social_da_empresa")
df_estabelecimentos = sub_string_date(df_estabelecimentos, ["data_de_inicio_atividade", "data_da_situacao_especial", "data_situacao_cadastral"], "yyyyMMdd", many_columns=True)
# df_socios = sub_string_date(df_socios, "data_de_entrada_sociedade", "yyyyMMdd")
# df_estabelecimentos = rename_column()

In [14]:
df_socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [15]:
df_estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,403,1,30,1,ALUPE,2,2005-11-03,0,,,...,7107,,,,,,,,,
1,2498,1,21,1,,8,2002-06-27,1,,,...,6789,,,,,,,,,
2,4599,1,31,1,,8,2008-12-31,71,,,...,7121,,,,,,,,,
3,13090,1,55,1,KI PAO PADARIA,3,2008-08-27,21,,,...,6229,,,,,,,,,
4,16903,1,60,1,,8,1999-09-09,2,,,...,7107,,,,,,,,,


In [16]:
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)

