## Importações e inicialização do Spark

In [None]:
import os

# coloca as variáveis de ambiente necessárias para o PySpark no windows
os.environ['JAVA_HOME'] = r"C:\Program Files\Eclipse Adoptium\jdk-8.0.452.9-hotspot"
os.environ['SPARK_HOME'] = r"C:\spark\spark-3.5.6-bin-hadoop3"

os.environ['PYSPARK_PYTHON'] = r"C:\Users\diego\Desktop\spark_alura\.venv\Scripts\python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = r"C:\Users\diego\Desktop\spark_alura\.venv\Scripts\python.exe"


In [None]:
import findspark # busca o PySpark no sistema
findspark.init()

In [None]:
# importa a classe que inicia uma sessão do Spark para usar o Spark SQL
from pyspark.sql import SparkSession

In [None]:
# inicia uma sessão do Spark, que é o ponto de entrada para usar o Spark SQL
spark = SparkSession.builder.master('local[*]').appName("Iniciando").config("spark.hadoop.io.native.lib", "false").getOrCreate()
spark

## Exemplos de DataFrames

In [None]:
# exemplo de criação de um DataFrame usando tuplas
data = [('zeca', '35'), ('eva', '29')]
col_names = ['nome', 'idade']

df = spark.createDataFrame(data, col_names)

df.show()

In [None]:
# exemplo de criação de um DataFrame usando dicionários
data_dict = [{'nome': 'zeca', 'idade': '35'}, {'nome': 'eva', 'idade': '29'}]

df = spark.createDataFrame(data_dict)

df.show()

In [None]:
# converte o DataFrame do Spark para um DataFrame do Pandas
df.toPandas()

## Extrair conjunto de dados

In [None]:
zip_path = os.path.abspath(r'.\zip_files')
data_path = os.path.abspath(r'.\data')

In [None]:
import zipfile # biblioteca nativa do Python para manipular arquivos zip

zipfile.ZipFile(os.path.join(zip_path, 'empresas.zip'), 'r').extractall(data_path)
zipfile.ZipFile(os.path.join(zip_path, 'estabelecimentos.zip'), 'r').extractall(data_path)
zipfile.ZipFile(os.path.join(zip_path, 'socios.zip'), 'r').extractall(data_path)

In [None]:
# retorna uma lista com os caminhos dos arquivos de um diretório
def get_files_from_directory(directory):
    files = [
        os.path.join(directory, f)
        for f in os.listdir(directory)
    ]
    return files

In [None]:
# lê todos os arquivos CSV do diretório especificado e cria um DataFrame
df_emps = spark.read.csv(
    get_files_from_directory(os.path.join(data_path, 'empresas')), 
    sep=";", header=False, inferSchema=True)

df_emps.count()

In [None]:
df_estabs = spark.read.csv(
    get_files_from_directory(os.path.join(data_path, 'estabelecimentos')), 
    sep=";", header=False, inferSchema=True)

df_estabs.count()

In [None]:
df_socios = spark.read.csv(
    get_files_from_directory(os.path.join(data_path, 'socios')), 
    sep=";", header=False, inferSchema=True)

df_socios.count()

In [None]:
# exibe as primeiras 3 linhas dos DataFrames
df_emps.show(3)
df_estabs.show(3)
df_socios.show(3)

In [None]:
# converte as primeiras 3 linhas do DataFrame do Spark para um DataFrame do Pandas
df_socios.limit(3).toPandas()

## Renomear colunas

In [None]:
# define os nomes das colunas para os DataFrames

emps_col_names = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

estabs_col_names = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

socios_col_names = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [None]:
# exibe os nomes das colunas dos DataFrames em formato enumerado para mapeamento das colunas quando renomear
for i, col_name in enumerate(emps_col_names):
    print([i, col_name])

In [None]:
# função para renomear as colunas de um DataFrame utilizando o mapeamento de nomes de colunas
def rename_columns(df, col_names):
    for i, col_name in enumerate(col_names):
        df = df.withColumnRenamed(f'_c{i}', col_name)
    return df

In [None]:
# renomeia as colunas dos DataFrames
df_emps = rename_columns(df_emps, emps_col_names)
df_estabs = rename_columns(df_estabs, estabs_col_names)
df_socios = rename_columns(df_socios, socios_col_names)

In [None]:
# exibe as primeiras 3 linhas dos DataFrames com os nomes das colunas renomeados
print(df_emps.columns)
print(df_estabs.columns)
print(df_socios.columns)

In [None]:
# converte as primeiras 3 linhas do DataFrame do Spark para DataFrame do Pandas com o nomes das colunas renomeados
df_socios.limit(3).toPandas()

# Converter para coluna de dinheiro para tipo Double

In [None]:
# mostra os nomes das colunas e seus tipos de dados
df_emps.printSchema()
df_socios.printSchema()
df_estabs.printSchema()

In [None]:
# importa a classe DoubleType para converter o tipo de dado para Double
# importa a classe functions do para manipulação de colunas
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as fx


In [None]:
df_emps.show(3)

In [None]:
# substitui a vírgula por ponto na coluna 'capital_social_da_empresa' para que o PySpark reconheça como número decimal
df_emps = df_emps.withColumn('capital_social_da_empresa', fx.regexp_replace('capital_social_da_empresa', ',', '.'))
df_emps.select('capital_social_da_empresa').show(3)

In [None]:
df_emps.printSchema()

In [None]:
# converte a coluna 'capital_social_da_empresa' para o tipo Double
df_emps = df_emps.withColumn('capital_social_da_empresa', df_emps['capital_social_da_empresa'].cast(DoubleType()))
df_emps.printSchema()

# Converter coluna para o tipo Date

In [None]:
# importa a classe StringType para converter o tipo de dado para String
from pyspark.sql.types import StringType

In [None]:
# a coluna estava com tipo de dado Integer, mas foi convertida para String para que seja possivel passar to_date
df_socios = df_socios.withColumn('data_de_entrada_sociedade', df_socios['data_de_entrada_sociedade'].cast(StringType()))
df_socios.printSchema()

In [None]:
df_socios.select('data_de_entrada_sociedade').show(3)

In [None]:
# para converter para o tipo Date é necessario converter para String primeiro, depois converte para Date usando o mesmo formato de data na coluna original
df_socios = df_socios.withColumn('data_de_entrada_sociedade', fx.to_date(df_socios['data_de_entrada_sociedade'], 'yyyyMMdd'))

df_socios.printSchema()

In [None]:
df_socios.show(3)

# Converter multiplas colunas em tipo Date

In [None]:
df_estabs.select('data_situacao_cadastral', 'data_de_inicio_atividade', 'data_da_situacao_especial').show(3)

In [None]:
# aplica a conversão de data para várias colunas do DataFrame de estabelecimentos
df_estabs = df_estabs\
    .withColumn('data_situacao_cadastral', fx.to_date(df_estabs.data_situacao_cadastral\
        .cast(StringType()), 'yyyyMMdd'))\
    .withColumn('data_de_inicio_atividade', fx.to_date(df_estabs.data_de_inicio_atividade\
        .cast(StringType()), 'yyyyMMdd'))\
    .withColumn('data_da_situacao_especial', fx.to_date(df_estabs.data_da_situacao_especial\
        .cast(StringType()), 'yyyyMMdd'))

df_estabs.select('data_situacao_cadastral', 'data_de_inicio_atividade', 'data_da_situacao_especial').show(3)

In [None]:
df_estabs.select('data_situacao_cadastral', 'data_de_inicio_atividade', 'data_da_situacao_especial').printSchema()

## Extrair ano de uma coluna de data

In [None]:
# assim como no SQL, podemos aplicar funções de transformação nas colunas
df_socios\
    .select(
        'nome_do_socio_ou_razao_social',
        fx.year('data_de_entrada_sociedade').alias('ano_entrada_sociedade'), # extrai o ano da data e renomeia a coluna
    )\
    .show(5, truncate=False) # 'truncate=False' para mostrar o texto completo das colunas

### Exercício - Extrair parte de string de uma coluna:
- Criar coluna com ultimo sobrenome e primeiro nome, separados por virgula

In [None]:
data = [
    ('GISELLE PAULA GUIMARAES CASTRO', 15),
    ('ELAINE GARCIA DE OLIVEIRA', 22),
    ('JOAO CARLOS ABNER DE LOURDES', 43),
    ('MARTA ZELI FERREIRA', 24),
    ('LAUDENETE WIGGERS ROEDER', 51)
]
col_names = ['nome', 'idade']

df = spark.createDataFrame(data, col_names)
df.show(truncate=False)

In [None]:
# cria uma coluna com o último sobrenome e o primeiro nome, separados por vírgula
df\
    .select(
        'nome',
        fx.concat_ws( # concatena as colunas com um separador
            ', ', 
            fx.substring_index('nome', ' ', -1), # pega o ultimo sobrenome
            fx.substring_index('nome', ' ', 1) # pega o primeiro nome
        ).alias('ident'), 
        'idade'
    )\
    .show(truncate=False)

## Diferença entre NaN, None, Null para o Pandas e o PySpark
- para o PySpark, NaN aparece apenas para o tipo Float, Null aparece para todos os tipos de dados
- para o Pandas, NaN aparece para todos os tipos de dados exceto para o tipo String, que aparece como None

In [None]:
from pyspark.sql.types import FloatType, IntegerType, StringType, StructType, StructField

In [None]:
data = [
    (float('nan'), None, None, None),
    (float('nan'), None, None, None),
    (float('nan'), None, None, None),
]

# determina o nome e o tipo de cada coluna do DataFrame
schema = \
    StructType([
        StructField(name="float_nan", dataType=FloatType(), nullable=True),
        StructField(name="float_none", dataType=FloatType(), nullable=True),
        StructField(name="integer_none", dataType=IntegerType(), nullable=True),
        StructField(name="string_none", dataType=StringType(), nullable=True),
    ])

df = spark.createDataFrame(data=data, schema=schema)

df.show(truncate=False)
df.printSchema()

# para o PySpark, NaN aparece apenas para o tipo Float, Null aparece para todos os tipos de dados

In [None]:
df.toPandas()

# para o Pandas, NaN aparece para todos os tipos de dados exceto para o tipo String, que aparece como None

## Substituir valores Null de tipos diferentes
- ao preencher os nulls com 0 (int), o PySpark troca apenas os valores da coluna com tipo numérico
- ao preencher os nulls com '0' (str), o PySpark troca apenas os valores da coluna com tipo String

In [None]:
# as colunas 'pais' e 'nome_do_representante' possui valores Null e são de tipos diferentes

df_socios.select('pais', 'nome_do_representante').show(5, truncate=False)

df_socios.select('pais', 'nome_do_representante').printSchema()

df_socios.select('pais', 'nome_do_representante').limit(5).toPandas()

In [None]:
# contagem de valores Null para colunas específicas
df_socios\
    .select(
    
    fx.count( # conta o número de linhas quando a condição de ser Null é atendida
        fx.when(
            fx.isnull(col='pais'), 1
        )
    ).alias('pais_null_count'),
    
    fx.count(
        fx.when(
            fx.isnull(col='nome_do_representante'), 1
        )
    ).alias('nome_do_representante_null_count')
    
).show()

In [None]:
# contagem de valores Null em todas as colunas do DataFrame usando lista
df_socios\
    .select(
    [
        fx.count( # conta o número de linhas quando a condição de ser Null é atendida para cada coluna
            fx.when(
                fx.isnull(col_name), 1
            )
        )\
        .alias(col_name[:10]) 
        
        for col_name in df_socios.columns
    ]
).show()


In [None]:
# ao preencher os nulls com 0 (int), o PySpark troca apenas os valores da coluna com tipo numérico
df_socios.na.fill(0).select('pais', 'nome_do_representante').show(5)

# ao preencher os nulls com '0' (str), o PySpark troca apenas os valores da coluna com tipo String
df_socios.na.fill('0').select('pais', 'nome_do_representante').show(5) 

## Ordenar colunas

In [None]:
df_socios\
    .select(
        'nome_do_socio_ou_razao_social', 'data_de_entrada_sociedade'
    )\
    .orderBy(
        'data_de_entrada_sociedade', ascending=True
    )\
    .show(5, truncate=False)

In [None]:
# atribui uma lista de colunas para o método orderBy, permitindo ordenar por múltiplas colunas
df_socios\
    .select(
        'nome_do_socio_ou_razao_social', 'data_de_entrada_sociedade'
    )\
    .orderBy(
        ['data_de_entrada_sociedade', 'nome_do_socio_ou_razao_social'], 
        ascending=[True, True]
    )\
    .show(5, truncate=False)

### Exercício - Ordenar colunas:
- Ordenar o ano e mês dos alunos mais novos para os mais velhos

In [None]:
data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
col_names = ['nome', 'mes', 'ano']

df = spark.createDataFrame(data, col_names)
df.show(truncate=False)

In [None]:
# ordena por ano e mês, ambos em ordem decrescente, para que os meses mais recentes apareçam primeiro
df\
    .orderBy(['ano', 'mes'], ascending=[False, False])\
    .show(truncate=False)