In [3]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import Row
from pyspark.sql.types import *

# Creating Spark Session

In [4]:
spark = SparkSession.builder \
    .appName("Doc Learning") \
    .getOrCreate()

# 2 - agg() Exercises

## Aggregate on the entire DataFrame without groups (shorthand for df.groupBy().agg()).

In [4]:
agg_sample = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])

In [5]:
# example 1 - simple operations
agg_df_1 = agg_sample.agg({"age": "max"})
agg_df_1.show()

+--------+
|max(age)|
+--------+
|       5|
+--------+



In [6]:
# example 2 - aggregating multiple columns
agg_df_2 = agg_sample.agg(
    {
        "age": "max",
        "name": "min"
    }
)
agg_df_2.show()

+---------+--------+
|min(name)|max(age)|
+---------+--------+
|    Alice|       5|
+---------+--------+



In [7]:
# exempla 3 - using sql functions for aggregations
agg_df_3 = agg_sample.agg(
    F.max("age").alias("MAX_AGE"),
    F.count("name").alias("NAME_COUNT")
).select("MAX_AGE", "NAME_COUNT").show()

+-------+----------+
|MAX_AGE|NAME_COUNT|
+-------+----------+
|      5|         2|
+-------+----------+



In [8]:
# example 4 - using with groupBy
agg_df_4 = agg_sample.groupBy(F.col("age")).agg(F.count("*"))
agg_df_4.show()

+---+--------+
|age|count(1)|
+---+--------+
|  2|       1|
|  5|       1|
+---+--------+



In [9]:
# example 5 - Aggregating with Multiple Functions on the Same Column
agg_df_5 = agg_sample.agg(
    F.min("age").alias("min_age"),
    F.max("age").alias("max_age"),
    F.avg("age").alias("avg_age"),
    F.count("age").alias("count_age")
)
agg_df_5.show()

+-------+-------+-------+---------+
|min_age|max_age|avg_age|count_age|
+-------+-------+-------+---------+
|      2|      5|    3.5|        2|
+-------+-------+-------+---------+



In [10]:
# example 6 - using custom aggregations
from pyspark.sql import types as T

# Define a UDF for custom aggregation
@F.udf(T.DoubleType())
def custom_agg(values):
    return sum(values) / len(values)  # Custom example: mean calculation

# Apply custom aggregation
agg_df_6 = agg_sample.groupBy("name").agg(
    custom_agg(F.collect_list("age")).alias("custom_age_mean")
)
agg_df_6.show()


+-----+---------------+
| name|custom_age_mean|
+-----+---------------+
|Alice|            2.0|
|  Bob|            5.0|
+-----+---------------+



## Exercises "agg" function

In [11]:
# sample data
data = [(1, "Alice", "Sales", 34, 70000, 5),
        (2, "Bob", "HR", 45, 80000, 10),
        (3, "Catherine", "IT", 29, 90000, 3),
        (4, "David", "IT", 39, 85000, 7),
        (5, "Eve", "Sales", 41, 75000, 8),
        (6, "Frank", "HR", 30, 60000, 2),
        (7, "Grace", "IT", 35, 95000, 6),
        (8, "Hannah", "Sales", 50, 65000, 12),
        (9, "Ivy", "IT", 38, 87000, 9),
        (10, "Jack", "HR", 28, 72000, 4)]

columns = ["id", "name", "department", "age", "salary", "experience"]

agg_sample = spark.createDataFrame(data, columns)

# Show the sample data
agg_sample.show()

+---+---------+----------+---+------+----------+
| id|     name|department|age|salary|experience|
+---+---------+----------+---+------+----------+
|  1|    Alice|     Sales| 34| 70000|         5|
|  2|      Bob|        HR| 45| 80000|        10|
|  3|Catherine|        IT| 29| 90000|         3|
|  4|    David|        IT| 39| 85000|         7|
|  5|      Eve|     Sales| 41| 75000|         8|
|  6|    Frank|        HR| 30| 60000|         2|
|  7|    Grace|        IT| 35| 95000|         6|
|  8|   Hannah|     Sales| 50| 65000|        12|
|  9|      Ivy|        IT| 38| 87000|         9|
| 10|     Jack|        HR| 28| 72000|         4|
+---+---------+----------+---+------+----------+



In [12]:
# question 1: Find the maximum age, average salary, and total experience for all employees.
q1 = agg_sample.agg(
    F.max("age").alias("max_age"),
    F.avg("salary").alias("avg_salary"),
    F.sum("experience").alias("total_experience")
).show()

+-------+----------+----------------+
|max_age|avg_salary|total_experience|
+-------+----------+----------------+
|     50|   77900.0|              66|
+-------+----------+----------------+



In [13]:
# question 2: Group the employees by department and find the maximum age, average salary, and total experience for each department.
q2 = agg_sample.groupBy("department").agg(
    F.max("age").alias("max_age"),
    F.avg("salary").alias("avg_salary"),
    F.sum("experience").alias("total_experience")
).select(
    "department",
    "max_age",
    "avg_salary",
    "total_experience"
).show()

+----------+-------+-----------------+----------------+
|department|max_age|       avg_salary|total_experience|
+----------+-------+-----------------+----------------+
|     Sales|     50|          70000.0|              25|
|        HR|     45|70666.66666666667|              16|
|        IT|     39|          89250.0|              25|
+----------+-------+-----------------+----------------+



In [14]:
# question 3: Find the minimum age, maximum salary, and count of employees.
q3 = agg_sample.agg(
    F.min("age").alias("min_age"),
    F.max("salary").alias("max_salary"),
    F.count("*").alias("employee_count")
).show()

+-------+----------+--------------+
|min_age|max_salary|employee_count|
+-------+----------+--------------+
|     28|     95000|            10|
+-------+----------+--------------+



In [15]:
# question 4 - Group the employees by department and find the minimum age,
# maximum salary, and count of employees for each department.
q4 = agg_sample.groupBy("department").agg(
    F.min("age").alias("min_age"),
    F.max("salary").alias("max_salary"),
    F.count("*").alias("employee_count")
).show()

+----------+-------+----------+--------------+
|department|min_age|max_salary|employee_count|
+----------+-------+----------+--------------+
|     Sales|     34|     75000|             3|
|        HR|     28|     80000|             3|
|        IT|     29|     95000|             4|
+----------+-------+----------+--------------+



In [16]:
# question 5 - Find the average age and total salary of employees.
q5 = agg_sample.agg({
    "age": "avg",
    "salary": "sum"
}).show()

+-----------+--------+
|sum(salary)|avg(age)|
+-----------+--------+
|     779000|    36.9|
+-----------+--------+



In [17]:
# 6 -  Calculate the average salary for each department.
q6 = agg_sample.groupBy("department").agg(
    F.avg(F.col("salary")).alias("average_salary")
).show()

+----------+-----------------+
|department|   average_salary|
+----------+-----------------+
|     Sales|          70000.0|
|        HR|70666.66666666667|
|        IT|          89250.0|
+----------+-----------------+



In [18]:
# 7 - Find the total number of employees in each department.
q7 = agg_sample.groupBy("department").agg(
    F.count(F.col("*")).alias("count_employees")
).show()

+----------+---------------+
|department|count_employees|
+----------+---------------+
|     Sales|              3|
|        HR|              3|
|        IT|              4|
+----------+---------------+



In [19]:
# 8 - Find the average salary for employees with more than 5 years of experience.
q8 = agg_sample.agg(
    F.avg(
        F.when(F.col("experience") > 5,
        F.col("salary"))
        ).alias("average_salary")
).show()

+-----------------+
|   average_salary|
+-----------------+
|81166.66666666667|
+-----------------+



In [20]:
# 9 - Find the maximum and minimum experience for the Sales department.
q9 = agg_sample.groupBy("department").agg(
    F.max(F.col("experience")).alias("max_experience"),
    F.min(F.col("experience")).alias("min_experience")
).where(F.col("department") == "Sales").show()

+----------+--------------+--------------+
|department|max_experience|min_experience|
+----------+--------------+--------------+
|     Sales|            12|             5|
+----------+--------------+--------------+



# 3 - withColumn function

A função withColumn no PySpark é usada para adicionar uma nova coluna a um DataFrame ou para substituir uma coluna existente com base em uma expressão especificada

Syntax:

**DataFrame.withColumn(colName, col)**

colName: O nome da nova coluna ou da coluna existente a ser substituída.
col: Uma expressão que define os valores da coluna, que pode ser uma instância de Column, uma expressão SQL, ou uma função do módulo pyspark.sql.functions.


## 3.1 - Examples

**Adicionar uma nova coluna**

Vamos adicionar uma nova coluna chamada "idade_5_anos" que será a idade atual acrescida de 5 anos.

**Substituir uma coluna existente**

Vamos substituir a coluna "Idade" com a idade acrescida de 10 anos.

In [21]:
data = [(1, "Alice", "Sales", 34, 70000, 5),
        (2, "Bob", "HR", 45, 80000, 10),
        (3, "Catherine", "IT", 29, 90000, 3),
        (4, "David", "IT", 39, 85000, 7),
        (5, "Eve", "Sales", 41, 75000, 8),
        (6, "Frank", "HR", 30, 60000, 2),
        (7, "Grace", "IT", 35, 95000, 6),
        (8, "Hannah", "Sales", 50, 65000, 12),
        (9, "Ivy", "IT", 38, 87000, 9),
        (10, "Jack", "HR", 28, 72000, 4)]

columns = ["id", "name", "department", "age", "salary", "experience"]

df = spark.createDataFrame(data, columns)

In [22]:
df = df.withColumn("age", F.col("age") + 10)
df.show()

+---+---------+----------+---+------+----------+
| id|     name|department|age|salary|experience|
+---+---------+----------+---+------+----------+
|  1|    Alice|     Sales| 44| 70000|         5|
|  2|      Bob|        HR| 55| 80000|        10|
|  3|Catherine|        IT| 39| 90000|         3|
|  4|    David|        IT| 49| 85000|         7|
|  5|      Eve|     Sales| 51| 75000|         8|
|  6|    Frank|        HR| 40| 60000|         2|
|  7|    Grace|        IT| 45| 95000|         6|
|  8|   Hannah|     Sales| 60| 65000|        12|
|  9|      Ivy|        IT| 48| 87000|         9|
| 10|     Jack|        HR| 38| 72000|         4|
+---+---------+----------+---+------+----------+



**Converter tipos de dados**

In [23]:
# Converter uma coluna de string para inteiro
df = df.withColumn("age", F.col("age").cast("int"))
df.show()

+---+---------+----------+---+------+----------+
| id|     name|department|age|salary|experience|
+---+---------+----------+---+------+----------+
|  1|    Alice|     Sales| 44| 70000|         5|
|  2|      Bob|        HR| 55| 80000|        10|
|  3|Catherine|        IT| 39| 90000|         3|
|  4|    David|        IT| 49| 85000|         7|
|  5|      Eve|     Sales| 51| 75000|         8|
|  6|    Frank|        HR| 40| 60000|         2|
|  7|    Grace|        IT| 45| 95000|         6|
|  8|   Hannah|     Sales| 60| 65000|        12|
|  9|      Ivy|        IT| 48| 87000|         9|
| 10|     Jack|        HR| 38| 72000|         4|
+---+---------+----------+---+------+----------+



**Aplicar funções SQL integradas**

In [24]:
# Adicionar uma nova coluna "AnoAtual" usando a função current_year
df = df.withColumn("AnoAtual", F.year(F.current_date()))
df.show()

+---+---------+----------+---+------+----------+--------+
| id|     name|department|age|salary|experience|AnoAtual|
+---+---------+----------+---+------+----------+--------+
|  1|    Alice|     Sales| 44| 70000|         5|    2024|
|  2|      Bob|        HR| 55| 80000|        10|    2024|
|  3|Catherine|        IT| 39| 90000|         3|    2024|
|  4|    David|        IT| 49| 85000|         7|    2024|
|  5|      Eve|     Sales| 51| 75000|         8|    2024|
|  6|    Frank|        HR| 40| 60000|         2|    2024|
|  7|    Grace|        IT| 45| 95000|         6|    2024|
|  8|   Hannah|     Sales| 60| 65000|        12|    2024|
|  9|      Ivy|        IT| 48| 87000|         9|    2024|
| 10|     Jack|        HR| 38| 72000|         4|    2024|
+---+---------+----------+---+------+----------+--------+



**Criar uma coluna calculada com base em outras colunas**

In [25]:
# Adicionar uma nova coluna "SalarioAnual" que é "SalarioMensal" vezes 12
df = df.withColumn("SalarioAnual", F.col("SalarioMensal") * 12)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `SalarioMensal` cannot be resolved. Did you mean one of the following? [`department`, `experience`, `salary`, `AnoAtual`, `age`].;
'Project [id#492L, name#493, department#494, age#536, salary#496L, experience#497L, AnoAtual#568, ('SalarioMensal * 12) AS SalarioAnual#605]
+- Project [id#492L, name#493, department#494, age#536, salary#496L, experience#497L, year(current_date(Some(America/Sao_Paulo))) AS AnoAtual#568]
   +- Project [id#492L, name#493, department#494, cast(age#504L as int) AS age#536, salary#496L, experience#497L]
      +- Project [id#492L, name#493, department#494, (age#495L + cast(10 as bigint)) AS age#504L, salary#496L, experience#497L]
         +- LogicalRDD [id#492L, name#493, department#494, age#495L, salary#496L, experience#497L], false


## 2.2 - Exercises

In [None]:
# data
data = [
    ("John", "Doe", 28),
    ("Jane", "Smith", 32),
    ("Mike", "Johnson", 25),
    ("Emily", "Brown", 34),
    ("Kevin", "Davis", 30)
]

columns = ["first_name", "last_name", "age"]

# Create a DataFrame
df = spark.createDataFrame(data, columns)

In [None]:
# 1 - Add a new column full_name by concatenating first_name and last_name with a space in between.
full_name_df = df.withColumn(
    "full_name",
    F.concat(F.col("first_name"), F.lit(" "), F.col("last_name"))
    )
full_name_df.show()

+----------+---------+---+------------+
|first_name|last_name|age|   full_name|
+----------+---------+---+------------+
|      John|      Doe| 28|    John Doe|
|      Jane|    Smith| 32|  Jane Smith|
|      Mike|  Johnson| 25|Mike Johnson|
|     Emily|    Brown| 34| Emily Brown|
|     Kevin|    Davis| 30| Kevin Davis|
+----------+---------+---+------------+



In [None]:
# 2 - Create a new column age_after_5_years that shows each person's age 5 years from now.
age_df = df.withColumn(
    "age_after_5_years",
    F.col("age") + 5
)

age_df.show()

+----------+---------+---+-----------------+
|first_name|last_name|age|age_after_5_years|
+----------+---------+---+-----------------+
|      John|      Doe| 28|               33|
|      Jane|    Smith| 32|               37|
|      Mike|  Johnson| 25|               30|
|     Emily|    Brown| 34|               39|
|     Kevin|    Davis| 30|               35|
+----------+---------+---+-----------------+



In [None]:
# 3 - Add a column name_length that calculates the length of the first_name.
name_df = df.withColumn(
    "name_length",
    F.length("first_name")
)
name_df.show()

+----------+---------+---+-----------+
|first_name|last_name|age|name_length|
+----------+---------+---+-----------+
|      John|      Doe| 28|          4|
|      Jane|    Smith| 32|          4|
|      Mike|  Johnson| 25|          4|
|     Emily|    Brown| 34|          5|
|     Kevin|    Davis| 30|          5|
+----------+---------+---+-----------+



In [None]:
# 4 - Convert the age column to a string type and store it in a new column age_str.
str_df = df.withColumn(
    "age_str",
    F.col("age").cast("int")
)

str_df.show()

+----------+---------+---+-------+
|first_name|last_name|age|age_str|
+----------+---------+---+-------+
|      John|      Doe| 28|     28|
|      Jane|    Smith| 32|     32|
|      Mike|  Johnson| 25|     25|
|     Emily|    Brown| 34|     34|
|     Kevin|    Davis| 30|     30|
+----------+---------+---+-------+



In [None]:
# 5 - Create a column is_adult that indicates whether a person is an adult (age >= 18).
adult_df = df.withColumn(
    "is_adult",
    F.col("age") >= 18
)

adult_df.show()

+----------+---------+---+--------+
|first_name|last_name|age|is_adult|
+----------+---------+---+--------+
|      John|      Doe| 28|    true|
|      Jane|    Smith| 32|    true|
|      Mike|  Johnson| 25|    true|
|     Emily|    Brown| 34|    true|
|     Kevin|    Davis| 30|    true|
+----------+---------+---+--------+



In [None]:
# 6 - Add a column name_in_uppercase that converts the first_name to uppercase.
upper_df = df.withColumn(
    "name_in_upper",
    F.upper("first_name")
)

upper_df.show()

+----------+---------+---+-------------+
|first_name|last_name|age|name_in_upper|
+----------+---------+---+-------------+
|      John|      Doe| 28|         JOHN|
|      Jane|    Smith| 32|         JANE|
|      Mike|  Johnson| 25|         MIKE|
|     Emily|    Brown| 34|        EMILY|
|     Kevin|    Davis| 30|        KEVIN|
+----------+---------+---+-------------+



In [None]:
# 7 - Create a column age_category that categorizes age into "Young" (age < 30) and "Old" (age >= 30).
category_df = df.withColumn(
    "age_category",
    F.when(F.col("age") < 30, "Young")
    .when(F.col("age") >= 30, "Old")
)

category_df.show()

+----------+---------+---+------------+
|first_name|last_name|age|age_category|
+----------+---------+---+------------+
|      John|      Doe| 28|       Young|
|      Jane|    Smith| 32|         Old|
|      Mike|  Johnson| 25|       Young|
|     Emily|    Brown| 34|         Old|
|     Kevin|    Davis| 30|         Old|
+----------+---------+---+------------+



In [None]:
# 8 - Add a column last_name_length that calculates the length of the last_name.
last_name_df = df.withColumn(
    "last_name_length",
    F.length("last_name")
)
last_name_df.show()

+----------+---------+---+----------------+
|first_name|last_name|age|last_name_length|
+----------+---------+---+----------------+
|      John|      Doe| 28|               3|
|      Jane|    Smith| 32|               5|
|      Mike|  Johnson| 25|               7|
|     Emily|    Brown| 34|               5|
|     Kevin|    Davis| 30|               5|
+----------+---------+---+----------------+



In [None]:
# 9 - Create a column name_with_initial that combines first_name and only the first letter of last_name.
initial_df = df.withColumn(
    "name_with_initial",
    F.concat(F.col("first_name"), F.lit(" "), F.col("last_name").substr(1, 1))
)
initial_df.show()

+----------+---------+---+-----------------+
|first_name|last_name|age|name_with_initial|
+----------+---------+---+-----------------+
|      John|      Doe| 28|           John D|
|      Jane|    Smith| 32|           Jane S|
|      Mike|  Johnson| 25|           Mike J|
|     Emily|    Brown| 34|          Emily B|
|     Kevin|    Davis| 30|          Kevin D|
+----------+---------+---+-----------------+



In [None]:
# 10 - Add a column even_or_odd_age that labels each person's age as "Even" or "Odd".
even_odd = df.withColumn(
    "even_or_odd_age",
    F.when(F.col("age") % 2 == 0, "Even")     
    .when(F.col("age") % 2 != 0, "Odd")
)

even_odd.show()

+----------+---------+---+----------+------+---------------+
|first_name|last_name|age|department|salary|even_or_odd_age|
+----------+---------+---+----------+------+---------------+
|      John|      Doe| 28|     Sales|  5000|           Even|
|      Jane|    Smith| 32|   Finance|  6000|           Even|
|      Mike|  Johnson| 25| Marketing|  4500|            Odd|
|     Emily|    Brown| 34|     Sales|  5200|           Even|
|     Kevin|    Davis| 30|   Finance|  5800|           Even|
|     Laura|   Wilson| 27| Marketing|  4800|            Odd|
+----------+---------+---+----------+------+---------------+



In [None]:
# 11 - Create a column seniority that labels employees as "Junior" (age < 30) and "Senior" (age >= 30).
level = df.withColumn(
    "seniority",
    F.when(
        F.col("age") < 30,
        "Junior"
    )
    .when(
        F.col("age") >= 30,
        "Senior"
    )
).show()

+----------+---------+---+----------+------+---------+
|first_name|last_name|age|department|salary|seniority|
+----------+---------+---+----------+------+---------+
|      John|      Doe| 28|     Sales|  5000|   Junior|
|      Jane|    Smith| 32|   Finance|  6000|   Senior|
|      Mike|  Johnson| 25| Marketing|  4500|   Junior|
|     Emily|    Brown| 34|     Sales|  5200|   Senior|
|     Kevin|    Davis| 30|   Finance|  5800|   Senior|
|     Laura|   Wilson| 27| Marketing|  4800|   Junior|
+----------+---------+---+----------+------+---------+



In [None]:
# 12 - Add a column age_category that categorizes age into groups: "20-29", "30-39".
category_df = df.withColumn(
    "age_category",
    F.when(
        F.col("age").between(20, 30), "20-29"
    )
    .when(
        F.col("age").between(30, 40), "30-39"
    )
).show()

+----------+---------+---+----------+------+------------+
|first_name|last_name|age|department|salary|age_category|
+----------+---------+---+----------+------+------------+
|      John|      Doe| 28|     Sales|  5000|       20-29|
|      Jane|    Smith| 32|   Finance|  6000|       30-39|
|      Mike|  Johnson| 25| Marketing|  4500|       20-29|
|     Emily|    Brown| 34|     Sales|  5200|       30-39|
|     Kevin|    Davis| 30|   Finance|  5800|       20-29|
|     Laura|   Wilson| 27| Marketing|  4800|       20-29|
+----------+---------+---+----------+------+------------+



### Another exercises - 2 day

In [None]:
#sample
data = [
    (1, "Alice", 30, 5000.0),
    (2, "Bob", 25, 6000.0),
    (3, "Cathy", 30, 7000.0),
    (4, "David", 35, 8000.0),
    (5, "Eve", 40, 9000.0)
]

columns = ["id", "name", "age", "salary"]

df = spark.createDataFrame(data, columns)

In [None]:
# Create a new column called age_group that categorizes people into "Young" (age < 30),
# "Middle-aged" (30 <= age < 40), and "Senior" (age >= 40).
df_1 = df.withColumn(
    "age_group",
    F.when(F.col("age") < 30, "Young")
    .when(F.col("age").between(30, 40), "Middle-aged")
    .otherwise("Senior")
)

df_1.show()

+---+-----+---+------+-----------+
| id| name|age|salary|  age_group|
+---+-----+---+------+-----------+
|  1|Alice| 30|5000.0|Middle-aged|
|  2|  Bob| 25|6000.0|      Young|
|  3|Cathy| 30|7000.0|Middle-aged|
|  4|David| 35|8000.0|Middle-aged|
|  5|  Eve| 40|9000.0|Middle-aged|
+---+-----+---+------+-----------+



In [None]:
# Create a new column called salary_increase that adds 10% to the salary
# if the person is younger than 30, and adds 5% if they are 30 or older.
df_2 = df.withColumn(
    "salary_increase",
    F.when(F.col("age") < 30, (F.col("salary") + F.col("salary") * 0.1))
    .otherwise(F.col("salary") + F.col("salary") * 0.05)
)

df_2.show()

+---+-----+---+------+---------------+
| id| name|age|salary|salary_increase|
+---+-----+---+------+---------------+
|  1|Alice| 30|5000.0|         5250.0|
|  2|  Bob| 25|6000.0|         6600.0|
|  3|Cathy| 30|7000.0|         7350.0|
|  4|David| 35|8000.0|         8400.0|
|  5|  Eve| 40|9000.0|         9450.0|
+---+-----+---+------+---------------+



# 4 - Object Row Exercises

In [None]:
# Create Row Objects
# Convert each row of the DataFrame into a Row object and print them
rows = df.collect()
for row in rows:
    print(row)

In [None]:
# Access Individual Columns
# From each Row object, extract and print the "first_name" and "last_name" fields.
rows = df.collect()
for row in rows:
    name: str = row.name
    print(name)

Alice
Bob
Catherine
David
Eve
Frank
Grace
Hannah
Ivy
Jack


In [None]:
# Calculate Age Sum
# Calculate the sum of the ages of all individuals using Row objects.
rows = df.collect()
sum_age = 0
for row in rows:
    sum_age += row.age
print(sum_age)

469


In [None]:
# Filter by Age
# Filter the Row objects to include only those individuals whose age is greater than 30.
rows = df.collect()

rows = [row for row in rows if row.age > 30]
print(rows)

[Row(id=1, name='Alice', department='Sales', age=44, salary=70000, experience=5), Row(id=2, name='Bob', department='HR', age=55, salary=80000, experience=10), Row(id=3, name='Catherine', department='IT', age=39, salary=90000, experience=3), Row(id=4, name='David', department='IT', age=49, salary=85000, experience=7), Row(id=5, name='Eve', department='Sales', age=51, salary=75000, experience=8), Row(id=6, name='Frank', department='HR', age=40, salary=60000, experience=2), Row(id=7, name='Grace', department='IT', age=45, salary=95000, experience=6), Row(id=8, name='Hannah', department='Sales', age=60, salary=65000, experience=12), Row(id=9, name='Ivy', department='IT', age=48, salary=87000, experience=9), Row(id=10, name='Jack', department='HR', age=38, salary=72000, experience=4)]


In [None]:
# Create New DataFrame
# Create a new DataFrame using Row objects, but include only individuals whose name starts with the letter "D".
rows = df.collect()

# Filter the rows where name starts with "D"
filtered_rows = [Row(*row) for row in rows if row.name.startswith("D")]

# Create a new DataFrame from the filtered rows
new_df = spark.createDataFrame(filtered_rows)

new_df.show()

+---+-----+---+---+-----+---+
| _1|   _2| _3| _4|   _5| _6|
+---+-----+---+---+-----+---+
|  4|David| IT| 49|85000|  7|
+---+-----+---+---+-----+---+



In [None]:
# Modify Age Field
# Increase the age of each individual by 5 years using the Row objects.
rows = df.collect()

for row in rows:
    age_normal: int = row.age
    age_added: int = row.age + 5
    print("Age normal:", age_normal)
    print("Age added:", age_added)

Age normal: 44
Age added: 49
Age normal: 55
Age added: 60
Age normal: 39
Age added: 44
Age normal: 49
Age added: 54
Age normal: 51
Age added: 56
Age normal: 40
Age added: 45
Age normal: 45
Age added: 50
Age normal: 60
Age added: 65
Age normal: 48
Age added: 53
Age normal: 38
Age added: 43


In [None]:
# Sort by First Name
# Sort the Row objects in ascending order by the "first_name" field.
rows = df.collect()

# Sort the rows by the "first_name" field
sorted_rows = sorted(rows, key=lambda row: row.name)

# Print the sorted rows
for row in sorted_rows:
    print(row)

Row(id=1, name='Alice', department='Sales', age=44, salary=70000, experience=5)
Row(id=2, name='Bob', department='HR', age=55, salary=80000, experience=10)
Row(id=3, name='Catherine', department='IT', age=39, salary=90000, experience=3)
Row(id=4, name='David', department='IT', age=49, salary=85000, experience=7)
Row(id=5, name='Eve', department='Sales', age=51, salary=75000, experience=8)
Row(id=6, name='Frank', department='HR', age=40, salary=60000, experience=2)
Row(id=7, name='Grace', department='IT', age=45, salary=95000, experience=6)
Row(id=8, name='Hannah', department='Sales', age=60, salary=65000, experience=12)
Row(id=9, name='Ivy', department='IT', age=48, salary=87000, experience=9)
Row(id=10, name='Jack', department='HR', age=38, salary=72000, experience=4)


In [None]:
# Custom Field Extraction
# Extract the "name" and "age" fields from each Row object and create a new list of dictionaries with the extracted data.
rows = df.collect()

# Extract "name" and "age" fields and create a new list of dictionaries
list_of_dicts = [{"name": row.name, "age": row.age} for row in rows]

# Print the list of dictionaries
print(list_of_dicts)

[{'name': 'Alice', 'age': 44}, {'name': 'Bob', 'age': 55}, {'name': 'Catherine', 'age': 39}, {'name': 'David', 'age': 49}, {'name': 'Eve', 'age': 51}, {'name': 'Frank', 'age': 40}, {'name': 'Grace', 'age': 45}, {'name': 'Hannah', 'age': 60}, {'name': 'Ivy', 'age': 48}, {'name': 'Jack', 'age': 38}]


# 5 - drop() Exercises

In [None]:
#data
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", DoubleType(), True),
    StructField("transaction_id", IntegerType(), True),
    StructField("amount", DoubleType(), True),
    StructField("email", StringType(), True),
    StructField("phone_number", StringType(), True),
    StructField("status", StringType(), True),
    StructField("date", StringType(), True),
    StructField("grade", StringType(), True),
    StructField("details", StructType([
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("age", IntegerType(), True)
    ]), True)
])

# Create data
data = [
    (1, "Alice", 23, "F", 50000.0, 101, 250.0, "alice@example.com", "123-456-7890", "active", "2024-01-01", "A", {"first_name": "Alice", "last_name": "Smith", "age": 23}),
    (2, "Bob", 19, "M", 60000.0, 102, 150.0, "bob@example.com", "234-567-8901", "inactive", "2024-02-01", "B", {"first_name": "Bob", "last_name": "Jones", "age": 19}),
    (3, "Cathy", 22, "F", 70000.0, 103, 300.0, "cathy@example.com", "345-678-9012", "active", "2024-03-01", "C", {"first_name": "Cathy", "last_name": "Johnson", "age": 22}),
    (4, "David", 25, "M", 80000.0, 104, 200.0, "david@example.com", "456-789-0123", "inactive", "2024-04-01", "D", {"first_name": "David", "last_name": "Williams", "age": 25}),
    (5, "Eve", 18, "F", 90000.0, 105, 350.0, "eve@example.com", "567-890-1234", "active", "2024-05-01", "A", {"first_name": "Eve", "last_name": "Brown", "age": 18})
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

In [None]:
df.show()

+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
| id| name|age|gender| salary|transaction_id|amount|            email|phone_number|  status|      date|grade|             details|
+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
|  1|Alice| 23|     F|50000.0|           101| 250.0|alice@example.com|123-456-7890|  active|2024-01-01|    A|  {Alice, Smith, 23}|
|  2|  Bob| 19|     M|60000.0|           102| 150.0|  bob@example.com|234-567-8901|inactive|2024-02-01|    B|    {Bob, Jones, 19}|
|  3|Cathy| 22|     F|70000.0|           103| 300.0|cathy@example.com|345-678-9012|  active|2024-03-01|    C|{Cathy, Johnson, 22}|
|  4|David| 25|     M|80000.0|           104| 200.0|david@example.com|456-789-0123|inactive|2024-04-01|    D|{David, Williams,...|
|  5|  Eve| 18|     F|90000.0|           105| 350.0|  eve@example.com|567-890-1234|

In [None]:
# 1. Given a DataFrame with columns ["name", "age", "gender"], drop the column age.
df_1 = df.drop("age")
df_1.show()

+---+-----+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
| id| name|gender| salary|transaction_id|amount|            email|phone_number|  status|      date|grade|             details|
+---+-----+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
|  1|Alice|     F|50000.0|           101| 250.0|alice@example.com|123-456-7890|  active|2024-01-01|    A|  {Alice, Smith, 23}|
|  2|  Bob|     M|60000.0|           102| 150.0|  bob@example.com|234-567-8901|inactive|2024-02-01|    B|    {Bob, Jones, 19}|
|  3|Cathy|     F|70000.0|           103| 300.0|cathy@example.com|345-678-9012|  active|2024-03-01|    C|{Cathy, Johnson, 22}|
|  4|David|     M|80000.0|           104| 200.0|david@example.com|456-789-0123|inactive|2024-04-01|    D|{David, Williams,...|
|  5|  Eve|     F|90000.0|           105| 350.0|  eve@example.com|567-890-1234|  active|2024-05-01|    A|    {E

In [None]:
# 2. Given a DataFrame with columns ["id", "name", "age", "salary"], drop the columns name and salary.
df_2 = df.drop("name", "salary")
df_2.show()

+---+---+------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
| id|age|gender|transaction_id|amount|            email|phone_number|  status|      date|grade|             details|
+---+---+------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
|  1| 23|     F|           101| 250.0|alice@example.com|123-456-7890|  active|2024-01-01|    A|  {Alice, Smith, 23}|
|  2| 19|     M|           102| 150.0|  bob@example.com|234-567-8901|inactive|2024-02-01|    B|    {Bob, Jones, 19}|
|  3| 22|     F|           103| 300.0|cathy@example.com|345-678-9012|  active|2024-03-01|    C|{Cathy, Johnson, 22}|
|  4| 25|     M|           104| 200.0|david@example.com|456-789-0123|inactive|2024-04-01|    D|{David, Williams,...|
|  5| 18|     F|           105| 350.0|  eve@example.com|567-890-1234|  active|2024-05-01|    A|    {Eve, Brown, 18}|
+---+---+------+--------------+------+-----------------+--------

In [None]:
# 3. Given a DataFrame with columns ["product_id", "price", "quantity"], drop the column quantity using a Column object.
columns = [column for column in df.columns if column != "quantity"]
df_3 = df.select(*columns)
df_3.show()

+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
| id| name|age|gender| salary|transaction_id|amount|            email|phone_number|  status|      date|grade|             details|
+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
|  1|Alice| 23|     F|50000.0|           101| 250.0|alice@example.com|123-456-7890|  active|2024-01-01|    A|  {Alice, Smith, 23}|
|  2|  Bob| 19|     M|60000.0|           102| 150.0|  bob@example.com|234-567-8901|inactive|2024-02-01|    B|    {Bob, Jones, 19}|
|  3|Cathy| 22|     F|70000.0|           103| 300.0|cathy@example.com|345-678-9012|  active|2024-03-01|    C|{Cathy, Johnson, 22}|
|  4|David| 25|     M|80000.0|           104| 200.0|david@example.com|456-789-0123|inactive|2024-04-01|    D|{David, Williams,...|
|  5|  Eve| 18|     F|90000.0|           105| 350.0|  eve@example.com|567-890-1234|

In [None]:
# 4. Given a DataFrame with columns ["id", "status", "date"],
# attempt to drop a column named address that does not exist in the DataFrame.
df_4 = df.drop("address")
df_4.show()

+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
| id| name|age|gender| salary|transaction_id|amount|            email|phone_number|  status|      date|grade|             details|
+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
|  1|Alice| 23|     F|50000.0|           101| 250.0|alice@example.com|123-456-7890|  active|2024-01-01|    A|  {Alice, Smith, 23}|
|  2|  Bob| 19|     M|60000.0|           102| 150.0|  bob@example.com|234-567-8901|inactive|2024-02-01|    B|    {Bob, Jones, 19}|
|  3|Cathy| 22|     F|70000.0|           103| 300.0|cathy@example.com|345-678-9012|  active|2024-03-01|    C|{Cathy, Johnson, 22}|
|  4|David| 25|     M|80000.0|           104| 200.0|david@example.com|456-789-0123|inactive|2024-04-01|    D|{David, Williams,...|
|  5|  Eve| 18|     F|90000.0|           105| 350.0|  eve@example.com|567-890-1234|

In [None]:
# 5. Given a DataFrame with columns ["transaction_id", "amount", "amount"],
# first drop duplicate columns and then drop the column amount.

# Remove duplicate columns while preserving order
unique_columns = list(dict.fromkeys(df.columns))
df_5 = df.select(unique_columns)
df_5.show()

+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
| id| name|age|gender| salary|transaction_id|amount|            email|phone_number|  status|      date|grade|             details|
+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+--------------------+
|  1|Alice| 23|     F|50000.0|           101| 250.0|alice@example.com|123-456-7890|  active|2024-01-01|    A|  {Alice, Smith, 23}|
|  2|  Bob| 19|     M|60000.0|           102| 150.0|  bob@example.com|234-567-8901|inactive|2024-02-01|    B|    {Bob, Jones, 19}|
|  3|Cathy| 22|     F|70000.0|           103| 300.0|cathy@example.com|345-678-9012|  active|2024-03-01|    C|{Cathy, Johnson, 22}|
|  4|David| 25|     M|80000.0|           104| 200.0|david@example.com|456-789-0123|inactive|2024-04-01|    D|{David, Williams,...|
|  5|  Eve| 18|     F|90000.0|           105| 350.0|  eve@example.com|567-890-1234|

In [None]:
# 6. Given a DataFrame with columns ["student_id", "name", "age", "grade"],
# drop the grade column if the average age of students is greater than 18.
df_6 = df
avg_age = df.agg(F.avg("age")).collect()[0][0]

if avg_age > 18:
    df_6 = df_6.drop("grade")
    
df_6.show()

+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+--------------------+
| id| name|age|gender| salary|transaction_id|amount|            email|phone_number|  status|      date|             details|
+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+--------------------+
|  1|Alice| 23|     F|50000.0|           101| 250.0|alice@example.com|123-456-7890|  active|2024-01-01|  {Alice, Smith, 23}|
|  2|  Bob| 19|     M|60000.0|           102| 150.0|  bob@example.com|234-567-8901|inactive|2024-02-01|    {Bob, Jones, 19}|
|  3|Cathy| 22|     F|70000.0|           103| 300.0|cathy@example.com|345-678-9012|  active|2024-03-01|{Cathy, Johnson, 22}|
|  4|David| 25|     M|80000.0|           104| 200.0|david@example.com|456-789-0123|inactive|2024-04-01|{David, Williams,...|
|  5|  Eve| 18|     F|90000.0|           105| 350.0|  eve@example.com|567-890-1234|  active|2024-05-01|    {Eve, Brown, 18}|


In [None]:
# 7. Given a DataFrame with a column ["details"] where details is a struct
# containing fields first_name, last_name, and age, drop the field last_name from the struct.

df_7 = df.withColumn(
    "details",
    F.struct(
        F.col("details.first_name").alias("first_name"),
        F.col("details.age").alias("age")
    )
)
df_7.show()

+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+-----------+
| id| name|age|gender| salary|transaction_id|amount|            email|phone_number|  status|      date|grade|    details|
+---+-----+---+------+-------+--------------+------+-----------------+------------+--------+----------+-----+-----------+
|  1|Alice| 23|     F|50000.0|           101| 250.0|alice@example.com|123-456-7890|  active|2024-01-01|    A|{Alice, 23}|
|  2|  Bob| 19|     M|60000.0|           102| 150.0|  bob@example.com|234-567-8901|inactive|2024-02-01|    B|  {Bob, 19}|
|  3|Cathy| 22|     F|70000.0|           103| 300.0|cathy@example.com|345-678-9012|  active|2024-03-01|    C|{Cathy, 22}|
|  4|David| 25|     M|80000.0|           104| 200.0|david@example.com|456-789-0123|inactive|2024-04-01|    D|{David, 25}|
|  5|  Eve| 18|     F|90000.0|           105| 350.0|  eve@example.com|567-890-1234|  active|2024-05-01|    A|  {Eve, 18}|
+---+-----+---+------+--

In [None]:
# 8. Given a DataFrame with columns ["a", "b", "c", "d", "e"],
# drop the columns c and e, and then reorder the remaining columns in descending order.
col = df.drop("id", "salary")
columns = col.columns

ordered_columns = sorted(columns, reverse=True)
df_8 = df.select(ordered_columns)

df_8.show()

+--------------+--------+------------+-----+-----+------+-----------------+--------------------+----------+------+---+
|transaction_id|  status|phone_number| name|grade|gender|            email|             details|      date|amount|age|
+--------------+--------+------------+-----+-----+------+-----------------+--------------------+----------+------+---+
|           101|  active|123-456-7890|Alice|    A|     F|alice@example.com|  {Alice, Smith, 23}|2024-01-01| 250.0| 23|
|           102|inactive|234-567-8901|  Bob|    B|     M|  bob@example.com|    {Bob, Jones, 19}|2024-02-01| 150.0| 19|
|           103|  active|345-678-9012|Cathy|    C|     F|cathy@example.com|{Cathy, Johnson, 22}|2024-03-01| 300.0| 22|
|           104|inactive|456-789-0123|David|    D|     M|david@example.com|{David, Williams,...|2024-04-01| 200.0| 25|
|           105|  active|567-890-1234|  Eve|    A|     F|  eve@example.com|    {Eve, Brown, 18}|2024-05-01| 350.0| 18|
+--------------+--------+------------+-----+----

In [None]:
# 9 - Given a DataFrame with columns ["col1", "col2", "col3", "col4"]
# of types Integer, String, Integer, and String respectively, drop all columns of type String.
df_9 = df
dtype_dict = dict(df.dtypes)
for col in df_9.columns:
    if dtype_dict[col] == "string":
        df_9 = df_9.drop(col)
df_9.show()

+---+---+-------+--------------+------+--------------------+
| id|age| salary|transaction_id|amount|             details|
+---+---+-------+--------------+------+--------------------+
|  1| 23|50000.0|           101| 250.0|  {Alice, Smith, 23}|
|  2| 19|60000.0|           102| 150.0|    {Bob, Jones, 19}|
|  3| 22|70000.0|           103| 300.0|{Cathy, Johnson, 22}|
|  4| 25|80000.0|           104| 200.0|{David, Williams,...|
|  5| 18|90000.0|           105| 350.0|    {Eve, Brown, 18}|
+---+---+-------+--------------+------+--------------------+



# 6 - dropna() Exercises

In [None]:
# creating dataframe 
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", DoubleType(), True),
    StructField("details", StructType([
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("age", IntegerType(), True)
    ]), True)
])

# Create data with nulls
data = [
    (1, "Alice", 23, 50000.0, {"first_name": "Alice", "last_name": None, "age": 23}),
    (2, "Bob", None, None, {"first_name": "Bob", "last_name": "Jones", "age": None}),
    (3, None, None, 70000.0, {"first_name": None, "last_name": "Smith", "age": 22}),
    (4, "David", 25, None, {"first_name": "David", "last_name": "Williams", "age": None}),
    (5, "Eve", 18, 90000.0, None)
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

In [None]:
# 1 - Given a DataFrame, drop all rows that contain any null values.
df_1 = df.dropna()
df_1.show()

+---+-----+---+-------+-----------------+
| id| name|age| salary|          details|
+---+-----+---+-------+-----------------+
|  1|Alice| 23|50000.0|{Alice, NULL, 23}|
+---+-----+---+-------+-----------------+



In [None]:
# 2 - Given a DataFrame with columns ["id", "name", "age"], drop rows where the age column has null values.
df_2 = df.na.drop(subset=["age"])
df_2.show()

+---+-----+---+-------+--------------------+
| id| name|age| salary|             details|
+---+-----+---+-------+--------------------+
|  1|Alice| 23|50000.0|   {Alice, NULL, 23}|
|  4|David| 25|   NULL|{David, Williams,...|
|  5|  Eve| 18|90000.0|                NULL|
+---+-----+---+-------+--------------------+



In [None]:
# 3 -Given a DataFrame with columns ["id", "name", "age", "salary"],
# drop rows where either the age or salary column contains null values.
df_3 = df.dropna(how="any", subset=["age", "salary"])
df_3.show()

+---+-----+----+-------+--------------------+
| id| name| age| salary|             details|
+---+-----+----+-------+--------------------+
|  1|Alice|  23|50000.0|   {Alice, NULL, 23}|
|  3| NULL|NULL|70000.0|   {NULL, Smith, 22}|
|  4|David|  25|   NULL|{David, Williams,...|
|  5|  Eve|  18|90000.0|                NULL|
+---+-----+----+-------+--------------------+



In [None]:
# 4 - Given a DataFrame with columns ["id", "name", "age", "salary"], drop rows where all columns are null.
df_4 = df.dropna(how="all")
df_4.show()

+---+-----+----+-------+--------------------+
| id| name| age| salary|             details|
+---+-----+----+-------+--------------------+
|  1|Alice|  23|50000.0|   {Alice, NULL, 23}|
|  2|  Bob|NULL|   NULL|  {Bob, Jones, NULL}|
|  3| NULL|NULL|70000.0|   {NULL, Smith, 22}|
|  4|David|  25|   NULL|{David, Williams,...|
|  5|  Eve|  18|90000.0|                NULL|
+---+-----+----+-------+--------------------+



In [None]:
# 5 - Given a DataFrame with columns ["id", "name", "age", "salary"], drop rows if more than one column has null values.
df_5 = df.filter(
    F.expr("size(filter(array(id, name, age, salary), x -> x is null)) <= 1")
)

df_5.show()

+---+-----+---+-------+--------------------+
| id| name|age| salary|             details|
+---+-----+---+-------+--------------------+
|  1|Alice| 23|50000.0|   {Alice, NULL, 23}|
|  4|David| 25|   NULL|{David, Williams,...|
|  5|  Eve| 18|90000.0|                NULL|
+---+-----+---+-------+--------------------+



In [None]:
# 6 - Given a DataFrame, drop all columns that contain any null values.
df_6 = df.dropna(how="any")
df_6.show()

+---+-----+---+-------+-----------------+
| id| name|age| salary|          details|
+---+-----+---+-------+-----------------+
|  1|Alice| 23|50000.0|{Alice, NULL, 23}|
+---+-----+---+-------+-----------------+



In [None]:
# 7 - Given a DataFrame with columns ["id", "name", "age", "salary"], drop columns where more than one row contains null values.
# Count nulls per column
null_counts = df.select([
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
])


# Collect null counts and identify columns with more than one null value
null_counts_row = null_counts.collect()[0].asDict()
columns_to_drop = [c for c in null_counts_row if null_counts_row[c] > 1]

# Drop identified columns
df_7 = df.drop(*columns_to_drop)
df_7.show()

+---+-----+--------------------+
| id| name|             details|
+---+-----+--------------------+
|  1|Alice|   {Alice, NULL, 23}|
|  2|  Bob|  {Bob, Jones, NULL}|
|  3| NULL|   {NULL, Smith, 22}|
|  4|David|{David, Williams,...|
|  5|  Eve|                NULL|
+---+-----+--------------------+



In [None]:
# 8 - Given a DataFrame with a details column that is a struct containing fields
# first_name, last_name, and age, drop rows where any field within the struct is null.
df_8 = df.filter(
    F.col("details.first_name").isNotNull() &
    F.col("details.last_name").isNotNull() &
    F.col("details.age").isNotNull()
)

df_8.show()

+---+----+---+------+-------+
| id|name|age|salary|details|
+---+----+---+------+-------+
+---+----+---+------+-------+



In [None]:
# 9 - Given a DataFrame with columns ["id", "name", "age", "salary"],
# first filter rows where age is greater than 20, then drop rows with null values in the remaining rows.
df_9 = df.filter(F.col("age") > 20)

df_9 = df_9.dropna()
df_9.show()

+---+-----+---+-------+-----------------+
| id| name|age| salary|          details|
+---+-----+---+-------+-----------------+
|  1|Alice| 23|50000.0|{Alice, NULL, 23}|
+---+-----+---+-------+-----------------+



In [None]:
# 10 - Given a DataFrame with columns ["id", "name", "age"], drop rows with null values and then reset the DataFrame index.
df_10 = df.dropna()

df_10 = df_10.withColumn(
    "id",
    F.monotonically_increasing_id() + 1
)
df_10.show()

+---+-----+---+-------+-----------------+
| id| name|age| salary|          details|
+---+-----+---+-------+-----------------+
|  1|Alice| 23|50000.0|{Alice, NULL, 23}|
+---+-----+---+-------+-----------------+



# 7 - dropDuplicates() Exercises

In [None]:
data = [
    ("Alice", 34, "F", "New York"),
    ("Bob", 45, "M", "California"),
    ("Alice", 34, "F", "New York"),  # Duplicate
    ("Charlie", 30, "M", "New York"),
    ("Dave", 40, "M", "Texas"),
    ("Eve", 28, "F", "California"),
    ("Charlie", 30, "M", "New York"),  # Duplicate
]

columns = ["Name", "Age", "Gender", "State"]

df = spark.createDataFrame(data, columns)
df.show()

+-------+---+------+----------+
|   Name|Age|Gender|     State|
+-------+---+------+----------+
|  Alice| 34|     F|  New York|
|    Bob| 45|     M|California|
|  Alice| 34|     F|  New York|
|Charlie| 30|     M|  New York|
|   Dave| 40|     M|     Texas|
|    Eve| 28|     F|California|
|Charlie| 30|     M|  New York|
+-------+---+------+----------+



In [None]:
# 1 - Use dropDuplicates() to remove all duplicate rows based on all columns.
df_1 = df.dropDuplicates()
df_1.show()

+-------+---+------+----------+
|   Name|Age|Gender|     State|
+-------+---+------+----------+
|  Alice| 34|     F|  New York|
|    Bob| 45|     M|California|
|   Dave| 40|     M|     Texas|
|Charlie| 30|     M|  New York|
|    Eve| 28|     F|California|
+-------+---+------+----------+



In [None]:
# 2 - Remove duplicate rows based on the "Name" column only.
df_2 = df.dropDuplicates(subset=["Name"])
df_2.show()

+-------+---+------+----------+
|   Name|Age|Gender|     State|
+-------+---+------+----------+
|  Alice| 34|     F|  New York|
|    Bob| 45|     M|California|
|Charlie| 30|     M|  New York|
|   Dave| 40|     M|     Texas|
|    Eve| 28|     F|California|
+-------+---+------+----------+



In [None]:
# 3 - Remove duplicates based on the combination of "Name" and "State" columns.
df_3 = df.dropDuplicates(subset=["Name", "State"])
df_3.show()

+-------+---+------+----------+
|   Name|Age|Gender|     State|
+-------+---+------+----------+
|  Alice| 34|     F|  New York|
|    Bob| 45|     M|California|
|Charlie| 30|     M|  New York|
|   Dave| 40|     M|     Texas|
|    Eve| 28|     F|California|
+-------+---+------+----------+



In [None]:
# 4 - Remove duplicates and keep the first occurrence of each row.
#  Alice| 34|     F|  New York
#  Charlie| 30|     M|  New York
df_4 = df.dropDuplicates()
df_4.show()

+-------+---+------+----------+
|   Name|Age|Gender|     State|
+-------+---+------+----------+
|  Alice| 34|     F|  New York|
|    Bob| 45|     M|California|
|   Dave| 40|     M|     Texas|
|Charlie| 30|     M|  New York|
|    Eve| 28|     F|California|
+-------+---+------+----------+



In [None]:
# 5 - Remove duplicates and keep the last occurrence of each row.
df_5 = df.dropDuplicates()
df_5.show()

+-------+---+------+----------+
|   Name|Age|Gender|     State|
+-------+---+------+----------+
|  Alice| 34|     F|  New York|
|    Bob| 45|     M|California|
|   Dave| 40|     M|     Texas|
|Charlie| 30|     M|  New York|
|    Eve| 28|     F|California|
+-------+---+------+----------+



In [None]:
# 6 - Count the number of rows in the DataFrame after removing duplicates based on all columns.
df_6 = df.dropDuplicates()
df_6.count()

5

In [None]:
# 7 - Compare the count of rows before and after using dropDuplicates() on the "Name" column.
df_7 = df.dropDuplicates(subset=["Name"])
print(f"Df 7: {df_7.count()}")
print(f"Original DF: {df.count()}")

Df 7: 5
Original DF: 7


In [None]:
# 8 - Remove duplicates based on the "State" column and show only the "Name" and "State" columns in the result.
df_8 = df.dropDuplicates(subset=["State"]).select(
    "Name",
    "State"
)

df_8.show()

+-----+----------+
| Name|     State|
+-----+----------+
|  Bob|California|
|Alice|  New York|
| Dave|     Texas|
+-----+----------+



In [None]:
# 9 - Remove duplicates based on "Age" and verify if the resulting DataFrame still contains any duplicate rows.
df_9 = df.dropDuplicates(subset=["Age"])
original_df_count = df.count()
df_9_count = df_9.count()

if original_df_count == df_9_count:
    print("Same")
else:
    print("Not same")

Not same


In [None]:
# 10 - Remove duplicates based on "State" and then apply a transformation (e.g., converting "Name" to uppercase)
# to the resulting DataFrame.
df_10 = df.dropDuplicates(subset=["State"])
df_10 = df_10.withColumn(
    "Name",
    F.upper(F.col("Name"))
)
df_10.show()

+-----+---+------+----------+
| Name|Age|Gender|     State|
+-----+---+------+----------+
|  BOB| 45|     M|California|
|ALICE| 34|     F|  New York|
| DAVE| 40|     M|     Texas|
+-----+---+------+----------+



# 8 - selectExpr() Exercises

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SelectExprExample").getOrCreate()

data = [
    ("Alice", 34, "F", 7000),
    ("Bob", 45, "M", 8000),
    ("Charlie", 30, "M", 12000),
    ("Dave", 40, "M", 15000),
    ("Eve", 28, "F", 5000),
]

columns = ["Name", "Age", "Gender", "Salary"]

df = spark.createDataFrame(data, columns)
df.show()

+-------+---+------+------+
|   Name|Age|Gender|Salary|
+-------+---+------+------+
|  Alice| 34|     F|  7000|
|    Bob| 45|     M|  8000|
|Charlie| 30|     M| 12000|
|   Dave| 40|     M| 15000|
|    Eve| 28|     F|  5000|
+-------+---+------+------+



In [None]:
# 1 - Use selectExpr() to select only the "Name" and "Salary" columns.
df_1 = df.selectExpr("Name", "Salary")
df_1.show() 

+-------+------+
|   Name|Salary|
+-------+------+
|  Alice|  7000|
|    Bob|  8000|
|Charlie| 12000|
|   Dave| 15000|
|    Eve|  5000|
+-------+------+



In [None]:
# 2 - Rename the "Salary" column to "Income" using selectExpr().
df_2 = df.selectExpr(
    "Salary AS Income"
)

df_2.show()

+------+
|Income|
+------+
|  7000|
|  8000|
| 12000|
| 15000|
|  5000|
+------+



In [None]:
# 3 - Add a new column "Tax" which is 10% of "Salary" using selectExpr().
df_3 = df.selectExpr(
    "*",
    "Salary * 0.1 AS Tax"
)
df_3.show()

+-------+---+------+------+------+
|   Name|Age|Gender|Salary|   Tax|
+-------+---+------+------+------+
|  Alice| 34|     F|  7000| 700.0|
|    Bob| 45|     M|  8000| 800.0|
|Charlie| 30|     M| 12000|1200.0|
|   Dave| 40|     M| 15000|1500.0|
|    Eve| 28|     F|  5000| 500.0|
+-------+---+------+------+------+



In [None]:
# 4 - Select rows where "Salary" is greater than 7000 using selectExpr().
df_4 = df.filter(F.col("Salary") > 7000)
df_4 = df_4.selectExpr("*")
df_4.show()

+-------+---+------+------+
|   Name|Age|Gender|Salary|
+-------+---+------+------+
|    Bob| 45|     M|  8000|
|Charlie| 30|     M| 12000|
|   Dave| 40|     M| 15000|
+-------+---+------+------+



In [None]:
# 5 - Add a new column "AdjustedSalary" which is the "Salary" plus 500 using selectExpr().
df_5 = df.selectExpr(
    "*",
    "Salary + 500 AS AdjustedSalary"
)
df_5.show()

+-------+---+------+------+--------------+
|   Name|Age|Gender|Salary|AdjustedSalary|
+-------+---+------+------+--------------+
|  Alice| 34|     F|  7000|          7500|
|    Bob| 45|     M|  8000|          8500|
|Charlie| 30|     M| 12000|         12500|
|   Dave| 40|     M| 15000|         15500|
|    Eve| 28|     F|  5000|          5500|
+-------+---+------+------+--------------+



In [None]:
# 6 - Select the "Name" and a calculated "SalaryWithBonus" column, where the bonus is 1000 using selectExpr().
df_6 = df.selectExpr(
    "Name",
    "Salary + 1000 AS SalaryWithBonus"
)

df_6.show()

+-------+---------------+
|   Name|SalaryWithBonus|
+-------+---------------+
|  Alice|           8000|
|    Bob|           9000|
|Charlie|          13000|
|   Dave|          16000|
|    Eve|           6000|
+-------+---------------+



In [None]:
# 7 - Create a new column "NameWithPrefix" that adds the prefix "Employee: " to the "Name" column using selectExpr().
df_7 = df.selectExpr(
    "*",
    "CONCAT('Employee: ', Name) AS NameWithPrefix"
)
df_7.show()

+-------+---+------+------+-----------------+
|   Name|Age|Gender|Salary|   NameWithPrefix|
+-------+---+------+------+-----------------+
|  Alice| 34|     F|  7000|  Employee: Alice|
|    Bob| 45|     M|  8000|    Employee: Bob|
|Charlie| 30|     M| 12000|Employee: Charlie|
|   Dave| 40|     M| 15000|   Employee: Dave|
|    Eve| 28|     F|  5000|    Employee: Eve|
+-------+---+------+------+-----------------+



In [None]:
# 8 - Use selectExpr() to calculate the average salary of all employees.
df_8 = df.selectExpr(
    "AVG(salary) AS AverageSalary"
)
df_8.show()

+-------------+
|AverageSalary|
+-------------+
|       9400.0|
+-------------+



In [None]:
# 9 - Select the "Name" and two new columns:
# "SalaryInK" (Salary divided by 1000) and "Senior" (if Age is greater than 40, 'Yes', otherwise 'No') using selectExpr().
df_9 = df.selectExpr(
    "*",
    "Salary / 1000 AS SalaryInk",
    """
    CASE
        WHEN Age >= 40 THEN 'Yes'
        ELSE 'No'
    END AS Senior
    """
)
df_9.show()

+-------+---+------+------+---------+------+
|   Name|Age|Gender|Salary|SalaryInk|Senior|
+-------+---+------+------+---------+------+
|  Alice| 34|     F|  7000|      7.0|    No|
|    Bob| 45|     M|  8000|      8.0|   Yes|
|Charlie| 30|     M| 12000|     12.0|    No|
|   Dave| 40|     M| 15000|     15.0|   Yes|
|    Eve| 28|     F|  5000|      5.0|    No|
+-------+---+------+------+---------+------+



In [None]:
# 10 - Create a new column "Compensation" which is the product of "Salary"
# and a factor of 1.2, and format it as currency (e.g., "$12,000") using selectExpr().
df_10 = df.selectExpr(
    "*",
    "CONCAT('$', FORMAT_NUMBER(Salary * 1.2, 0)) AS Compensation"
)
df_10.show()

+-------+---+------+------+------------+
|   Name|Age|Gender|Salary|Compensation|
+-------+---+------+------+------------+
|  Alice| 34|     F|  7000|      $8,400|
|    Bob| 45|     M|  8000|      $9,600|
|Charlie| 30|     M| 12000|     $14,400|
|   Dave| 40|     M| 15000|     $18,000|
|    Eve| 28|     F|  5000|      $6,000|
+-------+---+------+------+------------+



# 9 - cache() quiz

## Question 1: What is the primary purpose of the cache() method in PySpark?
The primary purpose of the cache is to store the intermediate results of a DF or RDD in memory (RAM). It helps to optimize the performance or multiple actions on the same DF/RDD by avoiding recomputation.

## Question 2: How does the cache() method improve the performance of Spark applications?
The cache improve the performance by avoiding recomputations on a DF/OS, one case is when the same DF/RDD is getting multiple actions

## Question 3: What is the difference between cache() and persist() in PySpark?
The difference between both are that cache() is a shortcut for persist and you cannot set the StorageLevel, so it work only on the default LevelStorage (MEMORY_AND_DISK_DESER).
With persist() you can set the StorageLevel based on what you want, the default StorageLevel also is MEMORY_AND_DISK_DESER.

## Question 4: What is the default storage level used by the cache() method?
MEMORY_AND_DISK_DESER

## Question 5: Name at least three different storage levels available in PySpark.
1 - MEMORY_ONLY,
2 - MEMORY_AND_DISK_SER_2, 
3 - DISK_ONLY

## Question 6: How can you check if an RDD or DataFrame is cached?
df.is_cached()

## Question 7: How does caching affect the memory usage in a Spark cluster?
Caching increases memory usage by storing intermediate results in memory to improve performance. If memory is insufficient, Spark may spill data to disk, and inefficient management of cached data can lead to memory pressure and potential performance issues.

## Question 8: What happens if you call cache() on a DataFrame that is already cached?
No, calling cache() on a DataFrame that is already cached will not store the computation again. When you call cache(), Spark marks the DataFrame for caching, but it does not duplicate the cached data or re-compute the DataFrame. The DataFrame will continue to use the already cached data.

## Question 9: Can you use cache() on both DataFrames and RDDs in PySpark?
Yes

## Question 10: How do you remove a cached DataFrame or RDD from memory?
df.unpersist()

# 10 - persist() quiz

## Question 1 - What is the purpose of the persist() method in PySpark?
The persist() method in PySpark is used to store (persist) partitions of the DataFrame or RDD in memory and/or disk with a specific storage level. This helps avoid recomputation of the DataFrame or RDD when it is reused in multiple actions or transformations, leading to improved performance.

## Question 2 - How does persist() differ from cache() in terms of functionality?
While cache() you cannot set storage level, with persist() you can set storage level with a lot of options, but if you do not set storage level, they will have the same functionality, both have storage level default MEMORY_AND_DISK_DESER, so cache() is a shortcut for persist()

## Question 3 - Name the default storage level used by persist() when no level is specified.
MEMORY_AND_DISK_DESER (Spark 3.0)

## Question 4 - What are the different storage levels available when using persist() in PySpark?
1 - StorageLevel.MEMORY_ONLY: Stores DFs/RDDs partitions on memory only \
2 - StorageLevel.MEMORY_ONLY_2: Stores DFs/RDDs partitions on memory replicating each partitions to two clusters nodes \
3 - StorageLevel.MEMORY_AND_DISK: Stores DFs/RDDs partitions on memory, if memory is not enough Disk will store \
4 - StorageLevel.MEMORY_AND_DISK_2: Stores DFs/RDDs partitions on memory, if memory is not enough Disk will store, replicating each partitions to two clusters nodes \
5 - StorageLevel.DISK_ONLY: Stores DFs/RDDs partitions on disk only \
6 - StorageLevel.DISK_ONLY_2: Stores DFs/RDDs partitions on disk only, replicating each partitions to two cluster nodes \
7 - StorageLevel.MEMORY_AND_DISK_DESER: Stores DFs/RDDs as serialized objects in JVM memory and disk is space is not enough. \
8 - StorageLevel.MEMORY_ONLY_DESER: Stores DFs/RDDs as serialized objects in JVM memory. Takes lesser memory but requires additional CPU cycles for deserialization. 

## Question 5 - Explain the MEMORY_AND_DISK storage level in the context of persist().
Stores the DataFrame or RDD partitions in memory. If the memory is insufficient, it spills the excess partitions to disk, ensuring that data is still persisted and accessible without recomputation. 

## Question 6 - How do you specify a custom storage level when persisting a DataFrame or RDD?
df.persist(StorageLevel.MEMORY_AND_DISK_2)

## Question 7 - What impact does using the persist() method have on the resources of a Spark cluster?
Using persist() increases memory and potentially disk usage depending on the storage level specified. It can also lead to increased network and I/O usage if replication is involved. While persist() can improve performance by avoiding recomputation, it requires careful management of resources to balance memory, disk usage, and performance.

## Question 8 - Why might you choose to use persist() over cache() in certain scenarios?
Choosing persist() over cache() is beneficial when you need flexibility in specifying the storage level to suit specific needs or optimize resource usage. persist() allows for tailored data storage strategies, which can improve performance and efficiency based on the characteristics of your data and application.

## Question 9 - How can you verify which storage level is being used by a persisted DataFrame or RDD?
df: df.storagelevel \
rdd: rdd.getStorageLevel()

## Question 10 - How does the persist() method contribute to fault tolerance in Spark?
The persist() method enhances fault tolerance by allowing data to be stored with replication across multiple nodes, ensuring that data is not lost if a node fails. Additionally, Spark’s lineage information provides a fallback mechanism for recomputing lost data, contributing to overall reliability and robustness in the face of failures.

In [5]:
data = [
    ("Alice", 34, "F", 7000),
    ("Bob", 45, "M", 8000),
    ("Charlie", 30, "M", 12000),
    ("Dave", 40, "M", 15000),
    ("Eve", 28, "F", 5000),
]

columns = ["name", "age", "gender", "salary"]

df_test = spark.createDataFrame(data=data, schema=columns)

In [6]:
cols = ["salary"] + [col for col in df_test.columns if col != "salary"]
df_2 = df_test.select(cols)

In [31]:
print(cols)

['salary', 'name', 'age', 'gender']


In [30]:
df_test.show()
df_2.show()

+-------+---+------+------+
|   name|age|gender|salary|
+-------+---+------+------+
|  Alice| 34|     F|  7000|
|    Bob| 45|     M|  8000|
|Charlie| 30|     M| 12000|
|   Dave| 40|     M| 15000|
|    Eve| 28|     F|  5000|
+-------+---+------+------+

+------+-------+---+------+
|salary|   name|age|gender|
+------+-------+---+------+
|  7000|  Alice| 34|     F|
|  8000|    Bob| 45|     M|
| 12000|Charlie| 30|     M|
| 15000|   Dave| 40|     M|
|  5000|    Eve| 28|     F|
+------+-------+---+------+



In [10]:
from pyspark.sql.functions import rand, round, count

num_rows = 1_000 # quantidade de linhas

df_origin = spark.range(0, num_rows) \
    .withColumn("id", (round(rand() * 100)).cast("int")) \
    .withColumn("transactions", round(rand() * 1000, 2))
    
df_origin.show()

+---+------------+
| id|transactions|
+---+------------+
| 44|      972.26|
| 35|       530.3|
| 63|       93.22|
| 58|      289.05|
| 71|      867.56|
| 18|      999.02|
| 71|      875.13|
| 21|      550.16|
| 14|      146.77|
| 41|       637.4|
| 46|      282.64|
| 78|      970.33|
| 21|      284.64|
| 88|      710.23|
| 82|      485.83|
|  1|      769.36|
| 47|      431.35|
| 27|      760.55|
| 78|      170.72|
| 12|       986.6|
+---+------------+
only showing top 20 rows



In [11]:
from pyspark.sql import Window
from pyspark.sql.functions import col, count, when, lit, avg, row_number

window_spec = Window.partitionBy("id").orderBy("transactions")

df_with_row_num = df_origin.withColumn("row_num", row_number().over(window_spec)) \
                    .withColumn("total_count", count("transactions").over(Window.partitionBy("id")))

df_median = df_with_row_num.filter(
    when(
        col("total_count") % 2 == 0,
        (col("row_num") == col("total_count") / 2) | (col("row_num") == col("total_count") / 2 + 1)
    ).otherwise(col("row_num") == col("total_count") / 2 + 0.5)
).groupBy("id").agg(avg("transactions").alias("median_transactions"))


df_median.show(10)

+---+-------------------+
| id|median_transactions|
+---+-------------------+
|  0|            307.185|
|  1|             506.08|
|  2|             304.94|
|  3|             283.48|
|  4|            190.385|
|  5|             623.65|
|  6|            486.985|
|  7| 450.33500000000004|
|  8| 439.34000000000003|
|  9|             526.21|
+---+-------------------+
only showing top 10 rows



In [12]:
result_df = df_origin.join(df_median, on="id", how="inner") \
    .select(df_origin["*"], df_median["median_transactions"]) \
    .orderBy("id")
    
result_df.show(100, truncate = False)

+---+------------+-------------------+
|id |transactions|median_transactions|
+---+------------+-------------------+
|0  |471.07      |307.185            |
|0  |143.3       |307.185            |
|1  |542.17      |506.08             |
|1  |967.54      |506.08             |
|1  |211.4       |506.08             |
|1  |229.46      |506.08             |
|1  |367.43      |506.08             |
|1  |715.07      |506.08             |
|1  |648.83      |506.08             |
|1  |216.59      |506.08             |
|1  |506.08      |506.08             |
|1  |983.58      |506.08             |
|1  |255.17      |506.08             |
|1  |13.1        |506.08             |
|1  |506.56      |506.08             |
|1  |29.27       |506.08             |
|1  |769.36      |506.08             |
|2  |173.74      |304.94             |
|2  |578.77      |304.94             |
|2  |336.72      |304.94             |
|2  |103.76      |304.94             |
|2  |318.94      |304.94             |
|2  |3.19        |304.94 