In [35]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark: SparkSession = (
    SparkSession
    .builder
    .config("spark.sql.warehouse.dir", "./tmp") # type: ignore
    .master('local[*]')
    .getOrCreate()
)


25/04/02 17:43:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Geração de DataFrames de exemplo

In [36]:
def criar_dataframe_funcionarios():
    data = [
        (1, "Maria Silva", 101, 75000, "2019-05-15", "São Paulo", 4.2),
        (2, "João Santos", 101, 85000, "2018-03-12", "Rio de Janeiro", 4.7),
        (3, "Ana Oliveira", 102, 65000, "2020-01-10", "São Paulo", 3.9),
        (4, "Carlos Souza", 103, 72000, "2019-08-22", "Belo Horizonte", 4.5),
        (5, "Juliana Lima", 104, 60000, "2021-02-18", "São Paulo", 4.0),
        (6, "Pedro Costa", 101, 90000, "2017-11-05", "Rio de Janeiro", 4.8),
        (7, "Fernanda Alves", 102, 67000, "2020-03-20", "São Paulo", 4.1),
        (8, "Rafael Pereira", 103, 78000, "2018-07-14", "Belo Horizonte", 4.6),
        (9, "Mariana Santos", 104, 62000, "2021-01-05", "São Paulo", 3.8),
        (10, "Lucas Ferreira", 101, 82000, "2019-04-30", "Rio de Janeiro", 4.3),
        (11, "Bianca Lima", 105, 70000, "2020-06-15", "Curitiba", 4.0),
        (12, "Thiago Almeida", 106, 68000, "2019-07-20", "Brasília", 4.2)
    ]

    schema = ["id", "nome", "id_departamento", "salario", "data_contratacao", "localidade", "avaliacao"]
    
    df = spark.createDataFrame(data, schema)
    
    return df

def criar_dataframe_departamentos():
    data = [
        (101, "Engenharia", "Carlos Mendes", 15, "Prédio A"),
        (102, "Marketing", "Amanda Rocha", 8, "Prédio B"),
        (103, "Vendas", "Roberto Alves", 12, "Prédio C"),
        (104, "Recursos Humanos", "Camila Ferreira", 5, "Prédio A"),
        (107, "Financeiro", "Marcelo Santos", 7, "Prédio B"),
        (108, "Jurídico", "Patricia Lima", 3, "Prédio C")
    ]
    
    schema = ["id", "nome", "diretor", "num_funcionarios", "localizacao"]
    
    df = spark.createDataFrame(data, schema)
    
    return df

## Amostra de dados do dataframe de funcionarios

In [37]:
criar_dataframe_funcionarios().show()

+---+--------------+---------------+-------+----------------+--------------+---------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|
+---+--------------+---------------+-------+----------------+--------------+---------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9|
|  4|  Carlos Souza|            103|  72000|      2019-08-22|Belo Horizonte|      4.5|
|  5|  Juliana Lima|            104|  60000|      2021-02-18|     São Paulo|      4.0|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|
|  7|Fernanda Alves|            102|  67000|      2020-03-20|     São Paulo|      4.1|
|  8|Rafael Pereira|            103|  78000|      2018-07-14|Belo Horizonte|      4.6|
|  9|Mariana Santos|            104|  62000

## Amostra de dados do dataframe de departamentos

In [38]:
criar_dataframe_departamentos().show()

+---+----------------+---------------+----------------+-----------+
| id|            nome|        diretor|num_funcionarios|localizacao|
+---+----------------+---------------+----------------+-----------+
|101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|102|       Marketing|   Amanda Rocha|               8|   Prédio B|
|103|          Vendas|  Roberto Alves|              12|   Prédio C|
|104|Recursos Humanos|Camila Ferreira|               5|   Prédio A|
|107|      Financeiro| Marcelo Santos|               7|   Prédio B|
|108|        Jurídico|  Patricia Lima|               3|   Prédio C|
+---+----------------+---------------+----------------+-----------+



# Manipulação de colunas 

Para gerar colunas em um DataFrame temos algumas opções:

In [None]:
# 1. com withColumn - É um método que adiciona uma nova coluna ou substitui uma coluna existente em um DataFrame.
df = criar_dataframe_funcionarios()



# Adicionando uma nova coluna com valor fixo 
df = df.withColumn("bonus", F.lit(5000))

# Adicionando uma nova coluna com valor calculado utilizando a coluna "salario"
df = df.withColumn("salario_anual", df["salario"] * 12)

# Adicionando uma nova coluna definindo se o funcionário está acima da média (> 4) pela avaliação
df = df.withColumn("acima_media", F.when(df["avaliacao"] > 4, 'ACIMA DA MEDIA').otherwise('ABAIXO DA MEDIA'))

# Podemos também criar colunas com expressões mais complexas FORA da função e utilizar o objeto como argumento:
# Calcula se o funcionário é de São Paulo ou não

coluna_sao_paulo = F.when(df["localidade"] == "São Paulo", "SÃO PAULO").otherwise("OUTRO LUGAR")
df = df.withColumn("localidade_sao_paulo", coluna_sao_paulo)

# Podemos utilizar também a função F.expr para criar colunas com expressões SQL (aquelas que geralmente utilizamos no comando SELECT para transformar os dados)
# Exemplo: Criando uma nova coluna "salario_ajustado" que é 10% maior que o salário atual
df = df.withColumn("salario_ajustado", F.expr("salario * 1.1"))

# Exemplo: se o salário for maior que 80000, o funcionário é considerado "ALTO SALÁRIO", caso contrário "BAIXO SALÁRIO"
df = df.withColumn("nivel_salario", F.expr("CASE WHEN salario > 80000 THEN 'ALTO SALARIO' ELSE 'BAIXO SALARIO' END"))

df.show()


+---+--------------+---------------+-------+----------------+--------------+---------+-----+-------------+---------------+--------------------+----------------+-------------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|bonus|salario_anual|    acima_media|localidade_sao_paulo|salario_ajustado|nivel_salario|
+---+--------------+---------------+-------+----------------+--------------+---------+-----+-------------+---------------+--------------------+----------------+-------------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2| 5000|       900000| ACIMA DA MEDIA|           SÃO PAULO|         82500.0|BAIXO SALARIO|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7| 5000|      1020000| ACIMA DA MEDIA|         OUTRO LUGAR|         93500.0| ALTO SALARIO|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9| 5000|       780000|ABAIXO DA MEDIA|   

In [None]:
# 2. com select - O método select é usado para selecionar colunas específicas de um DataFrame. Ele pode ser usado para renomear colunas, aplicar funções e criar novas colunas.
# Para exemplificar seu uso, iremos fazer as mesmas transformações que fizemos com o withColumn, mas agora utilizando o select. 
# Obs: APENAS COLUNAS QUE ESTÃO NO SELECT SÃO RETORNADAS, diferente do withColumn que retorna o DataFrame inteiro. Portanto, sempre que usar esse comando, lembre-se de colocar as colunas que você quer retornar.

df = criar_dataframe_funcionarios()

# Assim como no withColumn, podemos também criar colunas com expressões mais complexas FORA da função e utilizar o objeto como argumento:

coluna_sao_paulo = F.when(df["localidade"] == "São Paulo", "SÃO PAULO").otherwise("OUTRO LUGAR")

df = df.select(
    "*", # Seleciona todas as colunas existentes
    F.lit(5000).alias("bonus"), # Adiciona uma nova coluna com valor fixo
    (df["salario"] * 12).alias("salario_anual"), # Adiciona uma nova coluna com valor calculado
    F.when(df["avaliacao"] > 4, 'ACIMA DA MEDIA').otherwise('ABAIXO DA MEDIA').alias("acima_media"), # Adiciona uma nova coluna definindo se o funcionário está acima da média (> 4) pela avaliação
    coluna_sao_paulo.alias("localidade_sao_paulo"), # Calcula se o funcionário é de São Paulo ou não
    (df["salario"] * 1.1).alias("salario_ajustado"), # Criando uma nova coluna "salario_ajustado" que é 10% maior que o salário atual
    F.expr("CASE WHEN salario > 80000 THEN 'ALTO SALARIO' ELSE 'BAIXO SALARIO' END").alias("nivel_salario") # Exemplo: se o salário for maior que 80000, o funcionário é considerado "ALTO SALÁRIO", caso contrário "BAIXO SALÁRIO"
)

df.show()

+-----+-------------+---------------+--------------------+-----------------+-------------+
|bonus|salario_anual|    acima_media|localidade_sao_paulo| salario_ajustado|nivel_salario|
+-----+-------------+---------------+--------------------+-----------------+-------------+
| 5000|       900000| ACIMA DA MEDIA|           SÃO PAULO|          82500.0|BAIXO SALARIO|
| 5000|      1020000| ACIMA DA MEDIA|         OUTRO LUGAR|93500.00000000001| ALTO SALARIO|
| 5000|       780000|ABAIXO DA MEDIA|           SÃO PAULO|          71500.0|BAIXO SALARIO|
| 5000|       864000| ACIMA DA MEDIA|         OUTRO LUGAR|          79200.0|BAIXO SALARIO|
| 5000|       720000|ABAIXO DA MEDIA|           SÃO PAULO|          66000.0|BAIXO SALARIO|
| 5000|      1080000| ACIMA DA MEDIA|         OUTRO LUGAR|99000.00000000001| ALTO SALARIO|
| 5000|       804000| ACIMA DA MEDIA|           SÃO PAULO|          73700.0|BAIXO SALARIO|
| 5000|       936000| ACIMA DA MEDIA|         OUTRO LUGAR|          85800.0|BAIXO SALARIO|

In [None]:
# 3. com selectExpr - O método selectExpr é semelhante ao select, mas permite usar expressões SQL diretamente para selecionar e transformar colunas. Isso pode ser útil quando você deseja aplicar operações mais complexas ou usar funções SQL específicas.
df = criar_dataframe_funcionarios()

df = df.selectExpr(
    "*", # Seleciona todas as colunas existentes
    "5000 as bonus", # Adiciona uma nova coluna com valor fixo
    "(salario * 12) as salario_anual", # Adiciona uma nova coluna com valor calculado
    "CASE WHEN avaliacao > 4 THEN 'ACIMA DA MEDIA' ELSE 'ABAIXO DA MEDIA' END as acima_media", # Adiciona uma nova coluna definindo se o funcionário está acima da média (> 4) pela avaliação
    "CASE WHEN localidade = 'São Paulo' THEN 'SÃO PAULO' ELSE 'OUTRO LUGAR' END as localidade_sao_paulo", # Calcula se o funcionário é de São Paulo ou não
    "(salario * 1.1) as salario_ajustado", # Criando uma nova coluna "salario_ajustado" que é 10% maior que o salário atual
    "CASE WHEN salario > 80000 THEN 'ALTO SALARIO' ELSE 'BAIXO SALARIO' END as nivel_salario" # Exemplo: se o salário for maior que 80000, o funcionário é considerado "ALTO SALÁRIO", caso contrário "BAIXO SALÁRIO"
)

df.show()

== Physical Plan ==
*(1) Project [id#3702L, nome#3703, id_departamento#3704L, salario#3705L, data_contratacao#3706, localidade#3707, avaliacao#3708, 5000 AS bonus#3716, (salario#3705L * 12) AS salario_anual#3717L, CASE WHEN (avaliacao#3708 > 4.0) THEN ACIMA DA MEDIA ELSE ABAIXO DA MEDIA END AS acima_media#3718, CASE WHEN (localidade#3707 = São Paulo) THEN SÃO PAULO ELSE OUTRO LUGAR END AS localidade_sao_paulo#3719, (cast(salario#3705L as decimal(20,0)) * 1.1) AS salario_ajustado#3720, CASE WHEN (salario#3705L > 80000) THEN ALTO SALARIO ELSE BAIXO SALARIO END AS nivel_salario#3721]
+- *(1) Scan ExistingRDD[id#3702L,nome#3703,id_departamento#3704L,salario#3705L,data_contratacao#3706,localidade#3707,avaliacao#3708]




# Uso da função "col" e sua utilidade em method chaining

In [None]:
# A função "col" serve para referenciar colunas de forma mais legível, especialmente quando o nome da coluna contém espaços ou caracteres especiais.
# Exemplo: df.select(F.col("nome da coluna")) é equivalente a df.select("nome da coluna")
# Como recomendação pessoal, utilize sempre o col, pois ele ajuda a evitar erros de digitação e torna o código mais legível.
# Como exemplo, iremos fazer a mesma transformação que fizemos anteriormente, mas agora utilizando o col + method chaining

df = criar_dataframe_funcionarios()

# Nota importante:
# No PySpark, quando você transforma um DataFrame, o DataFrame original permanece inalterado.
# Em vez disso, o PySpark gera um novo DataFrame com as transformações aplicadas.
# Por isso, sempre que você aplicar uma transformação, é necessário atribuir o resultado
# a uma nova variável ou atualizar a variável existente.
# Quando utilizamos o encadeamento de métodos (method chaining), o resultado de cada
# transformação é automaticamente passado para a próxima operação, permitindo que você
# combine várias transformações em uma única linha de código.
# Como cada transformação gera um novo DataFrame, se você precisar referenciar uma coluna
# criada em uma transformação anterior, use a função col() para acessá-la. Isso acontece
# porque o PySpark não reconhece as novas colunas se você tentar acessar usando df["nome_coluna"]
# ou df.nome_coluna.

df = (
    df
    .withColumn("bonus", F.lit(5000)) # Adiciona uma nova coluna com valor fixo
    .withColumn("salario_anual", F.col('salario') * 12) # Adiciona uma nova coluna com valor calculado
    .withColumn("acima_media", F.when(F.col('avaliacao') > 4, 'ACIMA DA MEDIA').otherwise('ABAIXO DA MEDIA')) # Adiciona uma nova coluna definindo se o funcionário está acima da média (> 4) pela avaliação
    .withColumn("localidade_sao_paulo", F.when(F.col('localidade') == "São Paulo", "SÃO PAULO").otherwise("OUTRO LUGAR")) # Calcula se o funcionário é de São Paulo ou não
    .withColumn("salario_ajustado", F.expr("salario * 1.1")) # Criando uma nova coluna "salario_ajustado" que é 10% maior que o salário atual
    .withColumn("nivel_salario", F.expr("CASE WHEN salario > 80000 THEN 'ALTO SALARIO' ELSE 'BAIXO SALARIO' END")) # Exemplo: se o salário for maior que 80000, o funcionário é considerado "ALTO SALÁRIO", caso contrário "BAIXO SALÁRIO"
)

df.show()

+---+--------------+---------------+-------+----------------+--------------+---------+-----+-------------+---------------+--------------------+----------------+-------------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|bonus|salario_anual|    acima_media|localidade_sao_paulo|salario_ajustado|nivel_salario|
+---+--------------+---------------+-------+----------------+--------------+---------+-----+-------------+---------------+--------------------+----------------+-------------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2| 5000|       905000| ACIMA DA MEDIA|           SÃO PAULO|         82500.0|BAIXO SALARIO|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7| 5000|      1025000| ACIMA DA MEDIA|         OUTRO LUGAR|         93500.0| ALTO SALARIO|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9| 5000|       785000|ABAIXO DA MEDIA|   

# Filtros

In [43]:
# Para filtrar os dados, podemos usar o método filter ou where. Ambos são equivalentes e podem ser usados de forma intercambiável.

# 1. com filter - O método filter é usado para filtrar linhas em um DataFrame com base em uma condição específica. Ele retorna um novo DataFrame contendo apenas as linhas que atendem à condição especificada.
df = criar_dataframe_funcionarios()

# Filtrando os funcionários com salário maior que 80000
df = df.filter(F.col("salario") > 80000)

df.show()

+---+--------------+---------------+-------+----------------+--------------+---------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|
+---+--------------+---------------+-------+----------------+--------------+---------+
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|
+---+--------------+---------------+-------+----------------+--------------+---------+



In [44]:
# 2. com where - O método where é semelhante ao filter e pode ser usado para filtrar linhas em um DataFrame com base em uma condição específica. 
# A principal diferença é que o where é mais comumente usado em consultas SQL, enquanto o filter é mais comum em operações de DataFrame.

df = criar_dataframe_funcionarios()

# Filtrando os funcionários com salário maior que 80000
df = df.where(F.col("salario") > 80000)
df.show()


+---+--------------+---------------+-------+----------------+--------------+---------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|
+---+--------------+---------------+-------+----------------+--------------+---------+
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|
+---+--------------+---------------+-------+----------------+--------------+---------+



In [45]:
# 3. Se deseja uma condição mais complexa, você pode usar o método filter com expressões compostas.

df = criar_dataframe_funcionarios()

# Filtrando os funcionários com salário maior que 80000 E avaliação maior que 4
df = df.filter((F.col("salario") > 80000) & (F.col("avaliacao") > 4))
df.show()



+---+--------------+---------------+-------+----------------+--------------+---------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|
+---+--------------+---------------+-------+----------------+--------------+---------+
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|
+---+--------------+---------------+-------+----------------+--------------+---------+



In [46]:
df = criar_dataframe_funcionarios()

# Filtrando os funcionários com salário maior que 80000 OU localidade seja são paulo

df = df.filter((F.col("salario") > 80000) | (F.col("localidade") == "São Paulo"))
df.show()


+---+--------------+---------------+-------+----------------+--------------+---------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|
+---+--------------+---------------+-------+----------------+--------------+---------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9|
|  5|  Juliana Lima|            104|  60000|      2021-02-18|     São Paulo|      4.0|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|
|  7|Fernanda Alves|            102|  67000|      2020-03-20|     São Paulo|      4.1|
|  9|Mariana Santos|            104|  62000|      2021-01-05|     São Paulo|      3.8|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|
+---+--------------+---------------+-------

In [None]:
# Se prefere uma abordagem parecida com SQL, voce pode usar o método where com expressões compostas em string.

df = criar_dataframe_funcionarios()

# Fitlrando os funcionários com salário maior que 80000 E avaliação maior que 4
df = df.filter('salario > 80000 AND avaliacao > 4')
df.show()

+---+--------------+---------------+-------+----------------+--------------+---------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|
+---+--------------+---------------+-------+----------------+--------------+---------+
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|
+---+--------------+---------------+-------+----------------+--------------+---------+



# Group By

In [48]:
# Para fazer agrupamentos e agregações, utilizamos o método groupBy() seguido de funções de agregação como count(), sum(), avg(), min(), max(), etc.
df = criar_dataframe_funcionarios()

# Exemplo: Contar o número de funcionários por departamento

df = (
    df.groupBy("id_departamento")
    .agg(
        F.count("*").alias("num_funcionarios"), # Conta o número de funcionários por departamento
        F.avg("salario").alias("salario_medio"), # Calcula o salário médio por departamento
        F.max("avaliacao").alias("avaliacao_maxima"), # Encontra a avaliação máxima por departamento
        F.min("avaliacao").alias("avaliacao_minima") # Encontra a avaliação mínima por departamento
    )
)

df.show()



+---------------+----------------+-------------+----------------+----------------+
|id_departamento|num_funcionarios|salario_medio|avaliacao_maxima|avaliacao_minima|
+---------------+----------------+-------------+----------------+----------------+
|            101|               4|      83000.0|             4.8|             4.2|
|            102|               2|      66000.0|             4.1|             3.9|
|            103|               2|      75000.0|             4.6|             4.5|
|            104|               2|      61000.0|             4.0|             3.8|
|            105|               1|      70000.0|             4.0|             4.0|
|            106|               1|      68000.0|             4.2|             4.2|
+---------------+----------------+-------------+----------------+----------------+



                                                                                

In [49]:
# Podemos agrupar por mais de uma coluna, por exemplo, agrupando por departamento e localidade
df = criar_dataframe_funcionarios()

df = (
    df.groupBy("id_departamento", "localidade")
    .agg(
        F.count("*").alias("num_funcionarios"), # Conta o número de funcionários por departamento e localidade
        F.avg("salario").alias("salario_medio"), # Calcula o salário médio por departamento e localidade
        F.max("avaliacao").alias("avaliacao_maxima"), # Encontra a avaliação máxima por departamento e localidade
        F.min("avaliacao").alias("avaliacao_minima") # Encontra a avaliação mínima por departamento e localidade
    )
)

df.show()



+---------------+--------------+----------------+-----------------+----------------+----------------+
|id_departamento|    localidade|num_funcionarios|    salario_medio|avaliacao_maxima|avaliacao_minima|
+---------------+--------------+----------------+-----------------+----------------+----------------+
|            101|     São Paulo|               1|          75000.0|             4.2|             4.2|
|            101|Rio de Janeiro|               3|85666.66666666667|             4.8|             4.3|
|            102|     São Paulo|               2|          66000.0|             4.1|             3.9|
|            103|Belo Horizonte|               2|          75000.0|             4.6|             4.5|
|            104|     São Paulo|               2|          61000.0|             4.0|             3.8|
|            105|      Curitiba|               1|          70000.0|             4.0|             4.0|
|            106|      Brasília|               1|          68000.0|             4.

                                                                                

In [50]:
# Prefere usar expressões SQL? Você pode usar a função expr() para criar expressões SQL diretamente no PySpark.

df = criar_dataframe_funcionarios()

df = (
    df
    .groupBy("id_departamento")
    .agg(
        F.expr('COUNT(*) as num_funcionarios'),
        F.expr('AVG(salario) as salario_medio'),
        F.expr('MAX(avaliacao) as avaliacao_maxima'),
        F.expr('MIN(avaliacao) as avaliacao_minima')
    )
)

df.show()



+---------------+----------------+-------------+----------------+----------------+
|id_departamento|num_funcionarios|salario_medio|avaliacao_maxima|avaliacao_minima|
+---------------+----------------+-------------+----------------+----------------+
|            101|               4|      83000.0|             4.8|             4.2|
|            102|               2|      66000.0|             4.1|             3.9|
|            103|               2|      75000.0|             4.6|             4.5|
|            104|               2|      61000.0|             4.0|             3.8|
|            105|               1|      70000.0|             4.0|             4.0|
|            106|               1|      68000.0|             4.2|             4.2|
+---------------+----------------+-------------+----------------+----------------+



                                                                                

# Joins

O Spark possui nativamente diversos tipos de joins, a seguir um exemplo de cada um.

In [51]:
df_funcionarios = criar_dataframe_funcionarios()
df_departamentos = criar_dataframe_departamentos()

In [None]:
# Inner Join: retorna um dataframe com as linhas que têm correspondência em ambos os dataframes.
# Obs: como o join é feito entre dois dataframes, o uso da função "col" não é recomendado.

df_join = df_funcionarios.join(df_departamentos, df_funcionarios['id_departamento'] == df_departamentos['id'], "inner")
df_join.show()



+---+--------------+---------------+-------+----------------+--------------+---------+---+----------------+---------------+----------------+-----------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao| id|            nome|        diretor|num_funcionarios|localizacao|
+---+--------------+---------------+-------+----------------+--------------+---------+---+----------------+---------------+----------------+-----------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      

                                                                                

In [53]:
# Left Join: retorna todas as linhas do dataframe da esquerda e as linhas correspondentes do dataframe da direita. 
# Se não houver correspondência, os valores do dataframe da direita serão nulos.

df_join = df_funcionarios.join(df_departamentos, df_funcionarios['id_departamento'] == df_departamentos['id'], "left")
df_join.show()

                                                                                

+---+--------------+---------------+-------+----------------+--------------+---------+----+----------------+---------------+----------------+-----------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|  id|            nome|        diretor|num_funcionarios|localizacao|
+---+--------------+---------------+-------+----------------+--------------+---------+----+----------------+---------------+----------------+-----------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2| 101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7| 101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9| 102|       Marketing|   Amanda Rocha|               8|   Prédio B|
|  4|  Carlos Souza|            103|  72000|      2019-08-22|Belo Horizonte|

In [54]:
# Right Join: retorna todas as linhas do dataframe da direita e as linhas correspondentes do dataframe da esquerda. 
# Se não houver correspondência, os valores do dataframe da esquerda serão nulos.

df_join = df_funcionarios.join(df_departamentos, df_funcionarios['id_departamento'] == df_departamentos['id'], "right")
df_join.show()

                                                                                

+----+--------------+---------------+-------+----------------+--------------+---------+---+----------------+---------------+----------------+-----------+
|  id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao| id|            nome|        diretor|num_funcionarios|localizacao|
+----+--------------+---------------+-------+----------------+--------------+---------+---+----------------+---------------+----------------+-----------+
|  10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|   6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|   2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|   1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo

In [55]:
# Full Outer Join: retorna todas as linhas de ambos os dataframes.
# Se não houver correspondência, os valores do dataframe correspondente serão nulos.

df_join = df_funcionarios.join(df_departamentos, df_funcionarios['id_departamento'] == df_departamentos['id'], "full")
df_join.show()



+----+--------------+---------------+-------+----------------+--------------+---------+----+----------------+---------------+----------------+-----------+
|  id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|  id|            nome|        diretor|num_funcionarios|localizacao|
+----+--------------+---------------+-------+----------------+--------------+---------+----+----------------+---------------+----------------+-----------+
|   1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2| 101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|   2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7| 101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|   6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8| 101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|  10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de J

                                                                                

In [56]:
# Cross Join: retorna o produto cartesiano dos dois dataframes, ou seja, combina todas as linhas do dataframe da esquerda com todas as linhas do dataframe da direita.
df_join = df_funcionarios.crossJoin(df_departamentos)
df_join.show()



+---+------------+---------------+-------+----------------+--------------+---------+---+----------------+---------------+----------------+-----------+
| id|        nome|id_departamento|salario|data_contratacao|    localidade|avaliacao| id|            nome|        diretor|num_funcionarios|localizacao|
+---+------------+---------------+-------+----------------+--------------+---------+---+----------------+---------------+----------------+-----------+
|  1| Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|101|      Engenharia|  Carlos Mendes|              15|   Prédio A|
|  1| Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|102|       Marketing|   Amanda Rocha|               8|   Prédio B|
|  1| Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|103|          Vendas|  Roberto Alves|              12|   Prédio C|
|  1| Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|104|Recurs

                                                                                

In [57]:
# Semi Join: retorna todas as linhas do dataframe da esquerda que têm correspondência no dataframe da direita, mas não retorna colunas do dataframe da direita.

df_join = df_funcionarios.join(df_departamentos, df_funcionarios['id_departamento'] == df_departamentos['id'], "left_semi")
df_join.show()



+---+--------------+---------------+-------+----------------+--------------+---------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|
+---+--------------+---------------+-------+----------------+--------------+---------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9|
|  7|Fernanda Alves|            102|  67000|      2020-03-20|     São Paulo|      4.1|
|  4|  Carlos Souza|            103|  72000|      2019-08-22|Belo Horizonte|      4.5|
|  8|Rafael Pereira|            103|  78000|      2018-07-14|Belo Horizonte|      4.6|
|  5|  Juliana Lima|            104|  60000

                                                                                

In [58]:
# Anti Join: retorna todas as linhas do dataframe da esquerda que não têm correspondência no dataframe da direita.
df_join = df_funcionarios.join(df_departamentos, df_funcionarios['id_departamento'] == df_departamentos['id'], "left_anti")
df_join.show()

                                                                                

+---+--------------+---------------+-------+----------------+----------+---------+
| id|          nome|id_departamento|salario|data_contratacao|localidade|avaliacao|
+---+--------------+---------------+-------+----------------+----------+---------+
| 11|   Bianca Lima|            105|  70000|      2020-06-15|  Curitiba|      4.0|
| 12|Thiago Almeida|            106|  68000|      2019-07-20|  Brasília|      4.2|
+---+--------------+---------------+-------+----------------+----------+---------+



# Window Function

In [None]:
# As funções de JANELA (window functions) permitem realizar cálculos em um conjunto de linhas relacionadas a uma linha atual, sem a necessidade de agrupar os dados.
# Funciona da mesma que no SQL e seus casos de uso são os mesmos: usamos quando queremos calcular valores sem perder a granularidade dos dados.

# Exemplo: calcular o salário médio de todos os funcionários, mas mantendo a granularidade dos dados

from pyspark.sql import Window

df = criar_dataframe_funcionarios()

# Definindo a janela
window = Window.partitionBy() # Sem partição, ou seja, considerando todos os dados
# Aplicando a função de janela
df = df.withColumn("salario_medio", F.avg("salario").over(window))
df.show()




25/04/02 17:43:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/02 17:43:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/02 17:43:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---+--------------+---------------+-------+----------------+--------------+---------+-----------------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|    salario_medio|
+---+--------------+---------------+-------+----------------+--------------+---------+-----------------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|72833.33333333333|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|72833.33333333333|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9|72833.33333333333|
|  4|  Carlos Souza|            103|  72000|      2019-08-22|Belo Horizonte|      4.5|72833.33333333333|
|  5|  Juliana Lima|            104|  60000|      2021-02-18|     São Paulo|      4.0|72833.33333333333|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|72833.33333333333|
|  7|Fernanda Alves|            102|  67000|      2020-

25/04/02 17:43:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/02 17:43:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [60]:
# Usando função de janela com partição
df = criar_dataframe_funcionarios()

# Definindo a janela
window = Window.partitionBy("id_departamento") # Particionando por departamento
# Aplicando a função de janela
df = df.withColumn("salario_medio_do_departamento", F.avg("salario").over(window))
df.show()




+---+--------------+---------------+-------+----------------+--------------+---------+-----------------------------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|salario_medio_do_departamento|
+---+--------------+---------------+-------+----------------+--------------+---------+-----------------------------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|                      83000.0|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|                      83000.0|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|                      83000.0|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|                      83000.0|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9|                      66000.0|
|  7|Fernanda Alves|            102|  67000|      2020-03-20|   

                                                                                

In [61]:
# Também podemos utilizar SQL para aplicar as funcões de janela

df = criar_dataframe_funcionarios()

df = (
    df
    .selectExpr(
        "*",
        "AVG(salario) OVER (PARTITION BY id_departamento) as salario_medio_do_departamento" # Particionando por departamento
    )
)

df.show()

+---+--------------+---------------+-------+----------------+--------------+---------+-----------------------------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|salario_medio_do_departamento|
+---+--------------+---------------+-------+----------------+--------------+---------+-----------------------------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|                      83000.0|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|                      83000.0|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|                      83000.0|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|                      83000.0|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9|                      66000.0|
|  7|Fernanda Alves|            102|  67000|      2020-03-20|   

In [62]:
# Também funciona com withColumn

df = criar_dataframe_funcionarios()

df = df.withColumn(
    'salario_medio_do_departamento', 
    F.expr("AVG(salario) OVER (PARTITION BY id_departamento)")
) # Particionando por departamento

df.show()

+---+--------------+---------------+-------+----------------+--------------+---------+-----------------------------+
| id|          nome|id_departamento|salario|data_contratacao|    localidade|avaliacao|salario_medio_do_departamento|
+---+--------------+---------------+-------+----------------+--------------+---------+-----------------------------+
|  1|   Maria Silva|            101|  75000|      2019-05-15|     São Paulo|      4.2|                      83000.0|
|  2|   João Santos|            101|  85000|      2018-03-12|Rio de Janeiro|      4.7|                      83000.0|
|  6|   Pedro Costa|            101|  90000|      2017-11-05|Rio de Janeiro|      4.8|                      83000.0|
| 10|Lucas Ferreira|            101|  82000|      2019-04-30|Rio de Janeiro|      4.3|                      83000.0|
|  3|  Ana Oliveira|            102|  65000|      2020-01-10|     São Paulo|      3.9|                      66000.0|
|  7|Fernanda Alves|            102|  67000|      2020-03-20|   

                                                                                