In [None]:
# !pip install pyspark

# Índice
*Notebook contendo apenas algumas funções basicas*

1. [Imports](#Import)
2. [Usando dropna](#dropna)
3. [Dropando duplicados](#dropDuplicates)
4. [Função Sample](#sample)
5. [Filtrando com filter ou where](#filter_where)
6. [Joins](#Join)
7. [Agregação(agg)](#agg)
8. [Extras](#Extras)

## Import

In [None]:
# Importação das bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, concat, round, substring, udf, length
from pyspark.sql.types import StringType

In [None]:
# Inicializar uma sessão do Spark e impedindo avisos desnecessários
spark = SparkSession.builder \
    .appName("NomeDaApp") \
    .config("spark.logConf", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

## dropna

In [None]:
# Exemplo de DataFrame com 10 registros e 4 campos (incluindo valores nulos/vazios)
data = [
    ("Alice", 34, "São Paulo", "Brasil"),
    ("Bob", 45, "Nova York", "EUA"),
    ("Alice", None, "São Paulo", "Brasil"),  # Idade nula
    ("Carla", 29, "", "Portugal"),          # Cidade vazia
    ("David", 30, "Berlim", None),          # País nulo
    ("Eva", 25, "Paris", "França"),
    ("Bob", 45, "Nova York", "EUA"),
    ("Frank", 40, "Tóquio", "Japão"),
    ("Grace", 35, None, "Reino Unido"),     # Cidade nula
    ("Hank", 50, "Sydney", "Austrália")
]

df = spark.createDataFrame(data, ["Nome", "Idade", "Cidade", "País"])

# Remover linhas com valores nulos ou vazios e salvar na mesma variável `df`
df = df.na.drop()  # Remove linhas com qualquer valor nulo ou vazio

# Mostrar o resultado
df.show()

## dropDuplicates

In [None]:
# Exemplo de DataFrame com 10 registros e 4 campos
data = [
    ("Alice", 34, "São Paulo", "Brasil"),
    ("Bob", 45, "Nova York", "EUA"),
    ("Alice", 34, "São Paulo", "Brasil"),
    ("Carla", 29, "Lisboa", "Portugal"),
    ("David", 30, "Berlim", "Alemanha"),
    ("Eva", 25, "Paris", "França"),
    ("Bob", 45, "Nova York", "EUA"),
    ("Frank", 40, "Tóquio", "Japão"),
    ("Grace", 35, "Londres", "Reino Unido"),
    ("Hank", 50, "Sydney", "Austrália")
]

df = spark.createDataFrame(data, ["Nome", "Idade", "Cidade", "País"])

# Remover linhas duplicadas e salvar na mesma variável `df`
df = df.dropDuplicates()

# Mostrar o resultado
df.show()

## sample

In [None]:
# Dados de exemplo
data = [
    ("Alice", 25, "São Paulo"),
    ("Bob", 30, "Nova York"),
    ("Carla", 22, "Lisboa"),
    ("David", 35, "Berlim"),
    ("Eva", 28, "Paris"),
    ("Frank", 40, "Tóquio"),
    ("Grace", 33, "Londres"),
    ("Hank", 50, "Sydney"),
    ("Ivy", 27, "Toronto"),
    ("Jack", 29, "Cidade do México")
]

# Criar DataFrame
df = spark.createDataFrame(data, ["Nome", "Idade", "Cidade"])

# Aplicar sample para retornar uma amostra aleatória de 30% dos registros
df = df.sample(fraction=0.3, seed=None)

# Exibir a amostra
df.show()

## filter_where

In [None]:
# Dados de exemplo
data = [
    ("Alice", 25, "São Paulo"),
    ("Bob", 30, "Nova York"),
    ("Carla", 22, "Lisboa"),
    ("David", 35, "Berlim"),
    ("Eva", 28, "Paris"),
    ("Frank", 40, "Tóquio"),
    ("Grace", 33, "Londres"),
    ("Hank", 50, "Sydney"),
    ("Ivy", 27, "Toronto"),
    ("Jack", 29, "Cidade do México")
]

# Criar DataFrame
df = spark.createDataFrame(data, ["Nome", "Idade", "Cidade"])

# Filtra linhas com filter onde a coluna "Idade" é maior que 30
print("Uso do filter")
df.filter(df["Idade"] > 30).show()

print("Uso do where")
df.where(df["Idade"] > 30).show()

## Join

In [None]:
# Criando dois dataframes para que possam ser ligados pelo id da cidade
data_pessoas = [
    (1, "Alice", 1),
    (2, "Bob", 2),
    (3, "Carla", 3),
    (4, "David", 5)   
]
df_pessoas = spark.createDataFrame(data_pessoas, ["id_pessoa", "nome", "id_cidade"])
df_pessoas.show()

data_cidades = [
    (1, "São Paulo"),
    (2, "Nova York"),
    (3, "Lisboa"),
    (4, "Berlim")
]
df_cidades = spark.createDataFrame(data_cidades, ["id_cidade", "cidade"])
df_cidades.show()

# Realizando join
df_join = df_pessoas.join(df_cidades, df_pessoas.id_cidade == df_cidades.id_cidade, how="inner")
# Retornando resultado com select
df_join.select("id_pessoa", "nome", "cidade").show()

##

## agg

In [None]:
# Dados de exemplo
data = [
    ("2024-11-26", 654.0, 38.9),
    ("2024-12-03", 602.1, 40.19),
    ("2024-12-18", 649.5, 42.80),
    ("2025-01-26", 636.9, 42.13)
]

# Criar DataFrame
df = spark.createDataFrame(data, ["data_abastecimento", "trip", "gasto_gasolina"])

# Usar agg para calcular o gasto de combustivel e criar uma nova coluna
df = df.withColumn("consumo_total", round(col("trip") / col("gasto_gasolina"), 2))

# Exibir o resultado
df.show()

---

## Extras

### Caracteres Especiais

In [None]:
#### Função para checar caracteres especiais e retornar onde se encontram(util em dataframes grandes)
import pyspark.sql.functions as F
from pyspark.sql.functions import col,lit
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, TimestampType
from pyspark.sql.functions import collect_list, concat_ws

def search_dataframe(df, tabela):

    intermediate_dfs = []

    for coluna in df.columns:

        processing_df = df.filter(F.lower(df[coluna]).rlike('[^a-z0-9\s!`´"#$%&\'()*+,-./\]\[|:;<=>?@\[\\áàâäãéèêëíìîïóòôöõúùûüñç]'))
        processing_df = processing_df.withColumn('COLUMN_ERROR', lit(coluna))\
            .withColumn('TABELA', lit(tabela))
        intermediate_dfs.append(processing_df)

    # Combine todos os DataFrames intermediários em um único DataFrame
    result_df = intermediate_dfs[0]
    for i in range(1, len(intermediate_dfs)):
        result_df = result_df.union(intermediate_dfs[i])

    colunas_referencia = df.columns
    # Exiba o DataFrame result_df
    # result_df = result_df.groupBy(df.columns,"TABELA"]).agg(concat_ws(', ',collect_list("COLUMN_ERROR")).alias('COLUMN_ERROR'))
    result_df = result_df.groupBy(*[col(coluna) for coluna in colunas_referencia] + [col("TABELA")]).agg(concat_ws(', ',collect_list("COLUMN_ERROR")).alias('COLUMN_ERROR'))

    # Retonando dataframe criado dataframe usado + coluna da tabela + coluna(s) com erros
    return result_df


In [None]:
data = [
    ("Alice☺", 25, "São Paulo", "Brasil", "Rua A"),
    ("Bob", 30, "Nova York♠", "EUA", "Rua B"),
    ("Carla", 22, "Lisboa", "Portugal✓", "Rua C"),
    ("David", 35, "Berlim∞", "Alemanha", "Rua D"),
    ("Eva", 28, "Paris", "França", "Rua E"),
    ("Frank", 40, "Tóquio☺", "Japão", "Rua F"),
    ("Grace", 33, "Londres♠", "Reino Unido", "Rua G"),
    ("Hank", 50, "Sydney", "Austrália✓", "Rua H"),
    ("Ivy", 27, "Toronto", "Canadá", "Rua I"),
    ("Jack®", 29, "Cidade do México", "México", "Rua J")
]

# Nomes das colunas
colunas = ["Nome", "Idade", "Cidade", "País", "Rua"]

# Criar DataFrame
df = spark.createDataFrame(data, colunas)

tabela = 'tabela_teste'
resultado = search_dataframe(df,tabela)
if resultado.isEmpty():
    print(f'Sem dados invalidos na tabela {tabela}')
else:
    resultado.show()

### Calculo_CNPJ
*Sabemos que os digitos verificadores do CNPJ, são na verdade um calculo dos numeros anteriores, algumas bases antigas usavam essa estrategia para reduzir custos de armazenamento, deixando que funções proprias calculassem exibido na consulta com os digitos verificadores*

Segue um ex de como deve ser o calculo, Fonte: https://www.cadcobol.com.br/calcula_cpf_cnpj_caepf.htm
```
Veja, abaixo, exemplo de cálculo de DV módulo 11 (o mais usado pelos bancos) e de DV módulo 10 para o CNPJ nº 18781203/0001:

1    8    7    8   1   2   0    3   0   0   0   1               
x    x    x    x   x   x   x    x   x   x   x   x               
6    7    8    9   2   3   4    5   6   7   8   9            
-   --   --   --   -   -   -  ---   -   -   -   -                   
6 + 56 + 56 + 72 + 2 + 6 + 0 + 15 + 0 + 0 + 0 + 9 = 222 ÷ 11 = 20, com resto 2


1    8    7    8   1   2   0    3   0   0   0   1    2
x    x    x    x   x   x   x    x   x   x   x   x    x
5    6    7    8   9   2   3    4   5   6   7   8    9
-   --   --   --   -   -   -   --   -   -   -   -   --
5 + 48 + 49 + 64 + 9 + 4 + 0 + 12 + 0 + 0 + 0 + 8 + 18 = 217 ÷ 11 = 19, com resto 8

Portanto, CNPJ+DV = 18.781.203/0001-28
```

In [91]:
data = [
    ("Empresa A", "123456780001"),
    ("Empresa B", "987654320001"),
    ("Empresa C", "456789120002"),
    ("Empresa D", "321654980001"),
    ("Empresa E", "654123780001"),
    ("Empresa F", "789321450002"),
    ("Empresa G", "852963740001"),
    ("Empresa H", "963852140001"),
    ("Empresa I", "741852960003"),
    ("Empresa J", "187812030001")
]

# Criar DataFrame com a ordem desejada
df = spark.createDataFrame(data, ["nome_empresa", "cnpj"])

tabela="empresas"
df.write.mode("overwrite").saveAsTable(tabela)
spark.sql("SELECT * FROM empresas").show()

+------------+------------+
|nome_empresa|        cnpj|
+------------+------------+
|   Empresa A|123456780001|
|   Empresa F|789321450002|
|   Empresa B|987654320001|
|   Empresa C|456789120002|
|   Empresa G|852963740001|
|   Empresa H|963852140001|
|   Empresa D|321654980001|
|   Empresa E|654123780001|
|   Empresa I|741852960003|
|   Empresa J|187812030001|
+------------+------------+



In [92]:
## Usando função UDF
# Vantagem: Fácil manutenção e implementação
# Desvatagem: Não ser otimizado pelo catalyst, dependendo do volume pode resultar em overhead
def calcular_digito_verificador(cnpj):
    if not cnpj or len(cnpj) != 12 or not cnpj.isdigit():
        return None  # Retorna None se o CNPJ for inválido

    # Pesos para o cálculo dos dígitos
    dv1 = [5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
    dv2 = [6, 5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]

    # Cálculo do primeiro dígito verficador
    soma = sum(int(cnpj[i]) * dv1[i] for i in range(12))
    resto = soma % 11
    primeiro_digito = 0 if resto < 2 else 11 - resto

    # Cálculo do segundo dígito
    cnpj_com_primeiro_digito = cnpj + str(primeiro_digito)
    soma = sum(int(cnpj_com_primeiro_digito[i]) * dv2[i] for i in range(13))
    resto = soma % 11
    segundo_digito = 0 if resto < 2 else 11 - resto

    return f"{primeiro_digito}{segundo_digito}"

# Registrando a função como UDF
calcular_digito_verificador_udf = udf(calcular_digito_verificador, StringType())


# Adicionar coluna com o dígito verificador
df = spark.sql("SELECT * FROM empresas")
df = df.withColumn("digito_verificador", calcular_digito_verificador_udf(col("cnpj")))

# Mostrar o resultado
df.show(truncate=False)

+------------+------------+------------------+
|nome_empresa|cnpj        |digito_verificador|
+------------+------------+------------------+
|Empresa A   |123456780001|95                |
|Empresa F   |789321450002|07                |
|Empresa B   |987654320001|98                |
|Empresa C   |456789120002|36                |
|Empresa G   |852963740001|59                |
|Empresa H   |963852140001|00                |
|Empresa D   |321654980001|39                |
|Empresa E   |654123780001|48                |
|Empresa I   |741852960003|60                |
|Empresa J   |187812030001|28                |
+------------+------------+------------------+



In [97]:
# # TODO: Verificar calculo incorreto(script ainda sendo atualizado) - é a melhor alternativa em questão de otimização.
# # Calculando o DV1

# # Vantagem: Otimizado e ideal para grandes volumes de dados
# # Desvantagem: Complexidade

# df = spark.sql("SELECT * FROM empresas")
# df = df.withColumn(
#     "cnpj_valido",
#     when(length(col("cnpj")) == 12, True).otherwise(False)
# )
#     # dv1 = [5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
#     # dv2 = [6, 5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
# # Cálculo do primeiro dígito verificador (DV1)
# df = df.withColumn(
#     "DV1",
#     when(
#         col("cnpj_valido"),
#         (
#             substring(col("cnpj"), 1, 1).cast('int') * 5 +
#             substring(col("cnpj"), 2, 1).cast('int') * 4 +
#             substring(col("cnpj"), 3, 1).cast('int') * 3 +
#             substring(col("cnpj"), 4, 1).cast('int') * 2 +
#             substring(col("cnpj"), 5, 1).cast('int') * 9 +
#             substring(col("cnpj"), 6, 1).cast('int') * 8 +
#             substring(col("cnpj"), 7, 1).cast('int') * 7 +
#             substring(col("cnpj"), 8, 1).cast('int') * 6 +
#             substring(col("cnpj"), 9, 1).cast('int') * 5 +
#             substring(col("cnpj"), 10, 1).cast('int') * 4 +
#             substring(col("cnpj"), 11, 1).cast('int') * 3 +
#             substring(col("cnpj"), 12, 1).cast('int') * 2
#         ) % 11
#     ).otherwise(None)
# )

# df = df.withColumn(
#     "DV1",
#     when(col("DV1") >= 10, 0).otherwise(col("DV1"))
# )

# # Cálculo do segundo dígito verificador (DV2)
# df = df.withColumn(
#     "DV2",
#     when(
#         col("cnpj_valido"),
#         (
#             substring(col("cnpj"), 1, 1).cast('int') * 6 +
#             substring(col("cnpj"), 2, 1).cast('int') * 5 +
#             substring(col("cnpj"), 3, 1).cast('int') * 4 +
#             substring(col("cnpj"), 4, 1).cast('int') * 3 +
#             substring(col("cnpj"), 5, 1).cast('int') * 2 +
#             substring(col("cnpj"), 6, 1).cast('int') * 9 +
#             substring(col("cnpj"), 7, 1).cast('int') * 8 +
#             substring(col("cnpj"), 8, 1).cast('int') * 7 +
#             substring(col("cnpj"), 9, 1).cast('int') * 6 +
#             substring(col("cnpj"), 10, 1).cast('int') * 5 +
#             substring(col("cnpj"), 11, 1).cast('int') * 4 +
#             substring(col("cnpj"), 12, 1).cast('int') * 3 +
#             col("DV1").cast('int') * 2
#         ) % 11
#     ).otherwise(None)
# )

# df = df.withColumn(
#     "DV2",
#     when(col("DV2") >= 10, 0).otherwise(col("DV2"))
# )

# # Concatena os dígitos verificadores em uma nova coluna
# df = df.withColumn(
#     "digito_verificador",
#     when(col("cnpj_valido"), concat(col("DV1"), col("DV2"))).otherwise(None)
# )


# df.show()

# df.printSchema()
# df = df.drop("DV1", "DV2", "cnpj_valido")

# # Mostra o resultado
# df.show(truncate=False)

In [94]:
### Usando enumerate:
# Vantagem: Fácil manutenção e implementação, não há serialização(também sendo uma desvatagem)
# Desvatagem: Dependendo do volume pode resultar em overhead, pois o dataset é processado dentro da JVM
def calcular_digito(cnpj_expr, pesos):
    soma = sum(substring(cnpj_expr, i, 1).cast("int") * peso for i, peso in enumerate(pesos, start=1))
    resto = soma % 11
    return when(resto < 2, 0).otherwise(11 - resto)

# DV1 (pesos: 5,4,3,2,9,8,7,6,5,4,3,2)
df = spark.sql("SELECT * FROM empresas")
df = df.withColumn(
    "DV1",
    calcular_digito(col("cnpj"), [5,4,3,2,9,8,7,6,5,4,3,2])
)

# DV2 (pesos: 6,5,4,3,2,9,8,7,6,5,4,3,2 + DV1 * 2)
df = df.withColumn(
    "DV2",
    calcular_digito(
        concat(col("cnpj"), col("DV1")), 
        [6,5,4,3,2,9,8,7,6,5,4,3,2]
    )
)

# Dígito final
df = df.withColumn("digito_verificador", concat(col("DV1"), col("DV2")))

df.show(truncate=False)

+------------+------------+---+---+------------------+
|nome_empresa|cnpj        |DV1|DV2|digito_verificador|
+------------+------------+---+---+------------------+
|Empresa A   |123456780001|9  |5  |95                |
|Empresa F   |789321450002|0  |7  |07                |
|Empresa B   |987654320001|9  |8  |98                |
|Empresa C   |456789120002|3  |6  |36                |
|Empresa G   |852963740001|5  |9  |59                |
|Empresa H   |963852140001|0  |0  |00                |
|Empresa D   |321654980001|3  |9  |39                |
|Empresa E   |654123780001|4  |8  |48                |
|Empresa I   |741852960003|6  |0  |60                |
|Empresa J   |187812030001|2  |8  |28                |
+------------+------------+---+---+------------------+



In [86]:
print(df.schema)

StructType([StructField('nome_empresa', StringType(), True), StructField('cnpj', StringType(), True), StructField('digito_verificador', StringType(), True)])


In [None]:
df.show()

---