In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.
        builder.
        appName("curso_pyspark").
        getOrCreate()
)

spark

# Básico de Pyspark

## Dataframes

<!-- ###
### create table (
###    nome VARCHAR(100) not null,
###    sobrenome VARCHAR(100) not null,
###    idade INT not null
###)

#  nome sobrenome idade
# (N1      N2       I1) -->

create table ( <br />
   nome VARCHAR(100) not null, <br />
   sobrenome VARCHAR(100) not null, <br />
   idade INT not null <br />
) <br />
<br /><br />
nome sobrenome idade
(N1      N2       I1)

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

data = [
    ##  C1          C2        C3
    ("Matheus", "Cantarutti", 31),
    ("Ana", "Cláudia", 18),
    ("Brunno", "Oliveira", 25)
]

schema = StructType([
    StructField("Nome", StringType(), True),
    StructField("Sobre_Nome", StringType(), True),
    StructField("Idade", IntegerType(), True)
])

df = spark.createDataFrame(data, schema)
df.printSchema()

df.show()

In [None]:
df.createOrReplaceTempView("pessoas")

In [None]:
### SQL --> Pyspark

spark.sql( 
'''    
    select
        *
    from pessoas
    where Idade < 20
'''
).show()

In [None]:
from pyspark.sql import functions as F

# df.filter('Idade < 20').show()
df.filter(
    F.col('Idade') < 20
).show()

## Tipo de Dados

- TIPO TEXTO/STRING >> abrangendo apenas as funções que tratam texto
- TIPO DATA (DATA ESTÁ COM O TIPO DE STRING) >> Converter o seu texto para Data

- FLOAT/DECIMAL e INTERGER

In [None]:
spark.sql('''
    select
        *,
        cast(Idade * 5 as string) as Idade_2
    from pessoas
''').printSchema()

In [None]:
(   
    # nome da coluna, expressões/funcao
    df.withColumn('Idade_2', F.col('Idade') * 5)
      .withColumn('data', F.lit('2025-01-01')) # current date
      .withColumn('data2', F.to_date(F.col('data'), 'yyyy-MM-dd'))
      .withColumn('Idade_3', F.expr('cast(Idade * 5 as string) as Idade_3'))

).show()

## Cardinalidade

- Aula teórica explicativa sobre cardinalidade

## Dataframe de exemplo

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType

spark = (
    SparkSession.
        builder.
        getOrCreate()
)
# CLOUD --> spark.sql('select * from bd.aulas.tabelas')
# df <- na leitura dos arquivos (.csv ou xlsx)
## spark.read

data = [
    ("1", "PAGO"),
    ("2", "APROVADO"),
    ("3", "RECUSADO"),
    ("4", "ENTREGUE"),
    ("5", "CANCELADO"),
    ("6", "NÃO ENTREGUE")
]

schema = StructType([
    StructField("cd_identificacao", StringType(), True),
    StructField("status", StringType(), True)
])

df = spark.createDataFrame(data, schema)
df.printSchema()
df.show()

# Lendo arquivos

## Command Separated Value (.csv) 

In [None]:
caminho = './dados/csv/'
cliente = 'clientes.csv'
status = 'status.csv'
pedidos = 'pedidos.csv'

clientes = (
    spark.read.csv(
        f'{caminho}{cliente}', 
        sep=';',
        header=True
    )
)

status = (
    spark.read.csv(
        f'{caminho}{status}', 
        sep=';',
        header=True
    )
)

pedidos = (
    spark.read.csv(
        f'{caminho}{pedidos}', 
        sep=';',
        header=True
    )
)

pedidos.show()

## Excel

In [None]:
import pandas as pd

caminho = './dados/xlsx/'
cliente = 'clientes.xlsx'
aba = 'clientes'

def ler_excel(file_path, aba):
    try:
        df = pd.read_excel(file_path, sheet_name=aba, engine='openpyxl')
    except FileNotFoundError:
        print(f"Arquivo {file_path} não encontrado.")
    except ValueError as e:
        print(f"Erro ao ler a aba de nome {aba}: {e}")
    except Exception as e:
        print(f"Ocorreu um erro inesperado: {e}")
    return df

df = ler_excel(f'{caminho}{cliente}', aba)
df = spark.createDataFrame(df)
print(type(df))

# Intermediário

## Filtros

### Filter

In [None]:
from pyspark.sql import functions as F

In [None]:
caminho = './dados/csv/'
cliente = 'clientes.csv'

clientes = (
    spark.read.csv(
        f'{caminho}{cliente}', 
        sep=';',
        header=True
    )
)

clientes.show()
clientes.createOrReplaceTempView("clientes")

In [None]:
spark.sql('''
    select 
        * 
    from clientes
    where sexo = 'F'
''').show()

In [None]:
clientes2 = (
    # para filtrar dados, o filter é a função utilizada
    clientes
        .filter(
            F.col('sexo') == 'F'
        )
)
clientes2.show()

In [None]:
clientes3 = (
    # para filtrar dados, o filter é a função utilizada
    clientes
        .filter(
            F.col('sexo') == 'M'
        )
)
clientes3.show()

In [None]:
clientes4 = (
    # para filtrar dados, o filter é a função utilizada
    clientes
        .filter(
            ~ (F.col('sexo') == 'F')
        )
)
clientes4.show()

### Isin

In [None]:
spark.sql('''
    select
        *
    from clientes
    where cd_cliente in ('2', '3')
''').show()

In [None]:
spark.sql('''
    select
        *
    from clientes
    where cd_cliente not in ('2', '3')
''').show()

In [None]:
clientes.filter(
    F.col('cd_cliente').isin(['2', '3'])
).show()

In [None]:
clientes.filter(
    ~(F.col('cd_cliente').isin(['2', '3']))
).show()

## Tratamentos

### Strings para Números

### regex com Pyspark

In [None]:
caminho = './dados/csv/'
pedidos = 'pedidos.csv'

pedidos = (
    spark.read.csv(
        f'{caminho}{pedidos}', 
        sep=';',
        header=True
    )
)

pedidos.show()
pedidos.createOrReplaceTempView("pedidos")

In [None]:
pedidos.printSchema()

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, DoubleType

pedidos2 = (
    # MacOS é diferente do Windows
    pedidos.withColumn('valor_limpo', F.regexp_replace(F.col('valor'),  r'R\$\s*', ''))
           .withColumn('valor_limpo', F.regexp_replace(F.col('valor_limpo'), r'\.', ''))
           .withColumn('valor_limpo', F.regexp_replace(F.col('valor_limpo'), r',', '.'))
           .withColumn('valor_limpo', F.col('valor_limpo').cast(DoubleType()))
)

pedidos2.show()

In [None]:
pedidos2.printSchema()

In [None]:
pedidos2.groupBy('cd_cliente').agg(
    F.mean('valor_limpo').alias('valor_mean_limpo'),
    F.sum('valor_limpo').alias('valor_sum_limpo'),
    F.max('valor_limpo').alias('valor_max_limpo'),
    F.min('valor_limpo').alias('valor_min_limpo'),
    F.avg('valor_limpo').alias('valor_avg_limpo'),
    F.median('valor_limpo').alias('valor_median_limpo'),
    F.mode('valor_limpo').alias('valor_mode_limpo')
).show()

### Trabalhando com Datas

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.
        builder.
        appName("curso_pyspark").
        getOrCreate()
)

spark

In [None]:
caminho = './dados/csv/'
clientes = 'clientes.csv'

clientes = (
    spark.read.csv(
        f'{caminho}{clientes}', 
        sep=';',
        header=True
    )
)

clientes.show()
clientes.createOrReplaceTempView("clientes")

In [None]:
clientes3 = (
    clientes
        .select("cd_cliente", "data_nascimento")
        .withColumn("data_nascimento2", F.to_date(
            F.col("data_nascimento"), "yyyy-MM-dd"
        ))
        .withColumn("ano", F.year(F.col("data_nascimento2")))
        .withColumn("mes", F.month(F.col("data_nascimento2")))
        .withColumn("dia", F.day(F.col("data_nascimento2")))
        
        .withColumn(
            "data_BR", 
                F.concat(
                    F.col("dia"), # dia
                        F.lit("/"), 
                    F.col("mes"), # mes
                        F.lit("/"), 
                    F.col("ano") # ano
                )
        )
        .withColumn("data_BR", F.try_to_timestamp(
            F.col("data_BR"), "dd/MM/yyyy"
        ))

)

clientes3.show()

In [None]:
# pode substituir a forma pela qual a conversão de data ocorre.
        .withColumn(
            "data_BR_str",
            F.concat(
                F.lpad(F.col("dia"), 2, "0"), F.lit("/"),
                F.lpad(F.col("mes"), 2, "0"), F.lit("/"),
                F.col("ano")
            )
        )
        .withColumn("data_BR", F.to_date(F.col("data_BR_str"), "dd/MM/yyyy"))

## Um pouco mais de Modelagem

In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType

caminho = './dados/csv/'
pedidos = 'pedidos.csv'
clientes = 'clientes.csv'
status = 'status.csv'

pedidos = (
    spark.read.csv(
        f'{caminho}{pedidos}', 
        sep=';',
        header=True
    )
)

clientes = (
    spark.read.csv(
        f'{caminho}{clientes}', 
        sep=';',
        header=True
    )
)

status = (
    spark.read.csv(
        f'{caminho}{status}', 
        sep=';',
        header=True
    )
)


In [None]:
pedidos.groupBy('cd_cliente').agg(
    F.count('cd_cliente').alias('count')
).orderBy(
    F.col('count').desc()
).show()

In [None]:
pedidos.groupBy('cd_cliente').count().orderBy('count', ascending=False).show()

# Avançado

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType

spark = (
    SparkSession.
        builder.
        appName("curso_pyspark").
        getOrCreate()
)

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/10 22:00:21 WARN Utils: Your hostname, Matheuss-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.135 instead (on interface en0)
25/08/10 22:00:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/10 22:00:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [20]:
def ler_arquivo(caminho, nome_arquivo):
    arquivo = (
        spark.read.csv(
            f'{caminho}{nome_arquivo}',
            sep=';',
            header=True
        )
    )
    return arquivo

clientes = ler_arquivo('./dados/csv/', 'clientes.csv')
status = ler_arquivo('./dados/csv/', 'status.csv')
pedidos = ler_arquivo('./dados/csv/', 'pedidos.csv')


pedidos = (    
    
    pedidos.withColumn('valor_limpo', F.regexp_replace(F.col('valor'),  r'R\$\s*', ''))
           .withColumn('valor_limpo', F.regexp_replace(F.col('valor_limpo'), r'\.', ''))
           .withColumn('valor_limpo', F.regexp_replace(F.col('valor_limpo'), r',', '.'))
           .withColumn('valor_limpo', F.col('valor_limpo').cast(DoubleType()))

            # Renomear o nome de uma coluna
           .withColumnRenamed("valor_limpo", "valor2")
).drop("valor")

pedidos.show()

+---------+----------+--------------------+-------------+-------+
|cd_pedido|cd_cliente|             produto|status pedido| valor2|
+---------+----------+--------------------+-------------+-------+
|       10|         2|TV LED 55''- SANSUNG|            2|5500.48|
|       12|         3|MÁQUINA DE LAVAR ...|            4| 4850.0|
|       13|         5|NOTEBOOK LENOVO - i5|            3| 3585.5|
|       15|         7|CELULAR POCO M4 -...|            1| 1285.0|
|       12|         2|MÁQUINA DE LAVAR ...|            4| 4850.0|
+---------+----------+--------------------+-------------+-------+



## Join - Movimentando informações entre tabelas

### Busca Horizontal

#### Left

In [21]:
status = status.withColumnRenamed("cd_identificacao", "status_pedido")
clientes = clientes.select("cd_cliente", "nome_cliente", "data_nascimento")

pedidos = (
    pedidos.withColumnRenamed("valor2", "valor")
           .withColumnRenamed("status pedido", "status_pedido")
            # renomeia a coluna na tabela de pedidos ou renomeamos a coluna na tabela de status
)
            # Tabela, coluna(s), como = (left, right, inner)
pedidos = (
    pedidos.join(status, on="status_pedido", how="left")
           .join(clientes, on="cd_cliente", how="left")
)

pedidos.show()

+----------+-------------+---------+--------------------+-------+--------+--------------------+---------------+
|cd_cliente|status_pedido|cd_pedido|             produto|  valor|  status|        nome_cliente|data_nascimento|
+----------+-------------+---------+--------------------+-------+--------+--------------------+---------------+
|         2|            2|       10|TV LED 55''- SANSUNG|5500.48|APROVADO|      LUCAS DA SILVA|     1998-07-25|
|         3|            4|       12|MÁQUINA DE LAVAR ...| 4850.0|ENTREGUE| SILVÉRIO DA FONSECA|     1975-01-15|
|         5|            3|       13|NOTEBOOK LENOVO - i5| 3585.5|RECUSADO|FERNANDA DO NASCI...|     1994-09-12|
|         7|            1|       15|CELULAR POCO M4 -...| 1285.0|    PAGO|  VITÓRIA DE ALMEIDA|     1994-06-12|
|         2|            4|       12|MÁQUINA DE LAVAR ...| 4850.0|ENTREGUE|      LUCAS DA SILVA|     1998-07-25|
+----------+-------------+---------+--------------------+-------+--------+--------------------+---------

#### Right

In [22]:
def ler_arquivo(caminho, nome_arquivo):
    arquivo = (
        spark.read.csv(
            f'{caminho}{nome_arquivo}',
            sep=';',
            header=True
        )
    )
    return arquivo

clientes = ler_arquivo('./dados/csv/', 'clientes.csv')
pedidos = ler_arquivo('./dados/csv/', 'pedidos.csv')


In [None]:
(
    pedidos.join(clientes, on="cd_cliente", how="right")
        .filter(
            F.col("cd_pedido").isNull()
        )
).show()

+----------+---------+-------+-------------+-----+------------------+----+---------------+
|cd_cliente|cd_pedido|produto|status pedido|valor|      nome_cliente|sexo|data_nascimento|
+----------+---------+-------+-------------+-----+------------------+----+---------------+
|         1|     NULL|   NULL|         NULL| NULL|    MARIA DO CARMO|   F|     1963-02-10|
|         4|     NULL|   NULL|         NULL| NULL|  BRUNNO FERNANDES|   M|     2004-04-05|
|         6|     NULL|   NULL|         NULL| NULL|CARLOS DE OLIVEIRA|   M|     1998-04-15|
|         8|     NULL|   NULL|         NULL| NULL| GABRIELA DE SILVA|   F|     1994-03-27|
|         9|     NULL|   NULL|         NULL| NULL|    MARCOS PACHECO|   M|     1989-02-02|
|        10|     NULL|   NULL|         NULL| NULL| DANIEL WANDERGAST|   M|     1980-01-24|
+----------+---------+-------+-------------+-----+------------------+----+---------------+



In [27]:
clientes.join(pedidos, on="cd_cliente", how="left").filter(F.col("cd_pedido").isNull()).show()

+----------+------------------+----+---------------+---------+-------+-------------+-----+
|cd_cliente|      nome_cliente|sexo|data_nascimento|cd_pedido|produto|status pedido|valor|
+----------+------------------+----+---------------+---------+-------+-------------+-----+
|         1|    MARIA DO CARMO|   F|     1963-02-10|     NULL|   NULL|         NULL| NULL|
|         4|  BRUNNO FERNANDES|   M|     2004-04-05|     NULL|   NULL|         NULL| NULL|
|         6|CARLOS DE OLIVEIRA|   M|     1998-04-15|     NULL|   NULL|         NULL| NULL|
|         8| GABRIELA DE SILVA|   F|     1994-03-27|     NULL|   NULL|         NULL| NULL|
|         9|    MARCOS PACHECO|   M|     1989-02-02|     NULL|   NULL|         NULL| NULL|
|        10| DANIEL WANDERGAST|   M|     1980-01-24|     NULL|   NULL|         NULL| NULL|
+----------+------------------+----+---------------+---------+-------+-------------+-----+



#### Inner

In [28]:
pedidos.join(clientes, on="cd_cliente", how="inner").show()

+----------+---------+--------------------+-------------+-----------+--------------------+----+---------------+
|cd_cliente|cd_pedido|             produto|status pedido|      valor|        nome_cliente|sexo|data_nascimento|
+----------+---------+--------------------+-------------+-----------+--------------------+----+---------------+
|         2|       12|MÁQUINA DE LAVAR ...|            4|R$ 4.850,00|      LUCAS DA SILVA|   M|     1998-07-25|
|         2|       10|TV LED 55''- SANSUNG|            2|R$ 5.500,48|      LUCAS DA SILVA|   M|     1998-07-25|
|         3|       12|MÁQUINA DE LAVAR ...|            4|R$ 4.850,00| SILVÉRIO DA FONSECA|   M|     1975-01-15|
|         5|       13|NOTEBOOK LENOVO - i5|            3|R$ 3.585,50|FERNANDA DO NASCI...|   F|     1994-09-12|
|         7|       15|CELULAR POCO M4 -...|            1|R$ 1.285,00|  VITÓRIA DE ALMEIDA|   F|     1994-06-12|
+----------+---------+--------------------+-------------+-----------+--------------------+----+---------

### 'Busca' vertical

In [29]:
def ler_arquivo(caminho, nome_arquivo):
    arquivo = (
        spark.read.csv(
            f'{caminho}{nome_arquivo}',
            sep=';',
            header=True
        )
    )
    return arquivo

status = ler_arquivo('./dados/csv/', 'status.csv')

## Condicionais com When

## Pivot tables
### Transformando visões