#Chained Condition PySpark
*we are using Google Colab

In [None]:
!pip install -q pyspark findspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#bring in Pyspark functions into your session
from pyspark.sql.functions import *
from pyspark.sql.functions import col

In [None]:
#create our data
data = [
    ("Alice", 25, "F"),
    ("Bob", 30, "M"),
    ("Charlie", 35, "M"),
    ("Diana", 40, "F")
]

columns = ["name", "age", "gender"]

df = spark.createDataFrame(data, columns)
df.show()


+-------+---+------+
|   name|age|gender|
+-------+---+------+
|  Alice| 25|     F|
|    Bob| 30|     M|
|Charlie| 35|     M|
|  Diana| 40|     F|
+-------+---+------+



# The basics




# Conditions with & (AND)

In [None]:
#Get people aged over 30 AND female:

df_filtered = df.filter( (col("age") > 30) & (col("gender") == "F") )
df_filtered.show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
|Diana| 40|     F|
+-----+---+------+



# Conditions with | (OR)

In [None]:
#Get people aged over 30 OR female:

df_filtered = df.filter( (col("age") > 30) | (col("gender") == "F") )
df_filtered.show()


+-------+---+------+
|   name|age|gender|
+-------+---+------+
|  Alice| 25|     F|
|Charlie| 35|     M|
|  Diana| 40|     F|
+-------+---+------+



# NOT Condition with ~

In [None]:
df_filtered = df.filter( ~(col("gender") == "F") )
df_filtered.show()

+-------+---+------+
|   name|age|gender|
+-------+---+------+
|    Bob| 30|     M|
|Charlie| 35|     M|
+-------+---+------+



#Chained Conditions

In [None]:
#Create our data
data = [
    ("Alice", 25, "F", 60000),
    ("Bob", 30, "M", 48000),
    ("Charlie", 35, "M", 70000),
    ("Diana", 40, "F", 52000),
    ("Evan", 28, "M", 35000),
    ("Fiona", 32, "F", 75000)
]

columns = ["name", "age", "gender", "salary"]
df = spark.createDataFrame(data, columns)
df.show()

+-------+---+------+------+
|   name|age|gender|salary|
+-------+---+------+------+
|  Alice| 25|     F| 60000|
|    Bob| 30|     M| 48000|
|Charlie| 35|     M| 70000|
|  Diana| 40|     F| 52000|
|   Evan| 28|     M| 35000|
|  Fiona| 32|     F| 75000|
+-------+---+------+------+



#Show me who are over 30 years old and whose Salary > 50,000, also include any females who are under 30.

In [None]:
df_filtered = df.filter(
    ((col("age") > 30) & (col("salary") > 50000)) |
    ((col("gender") == "F") & (col("age") < 30))
)

df_filtered.show()

+-------+---+------+------+
|   name|age|gender|salary|
+-------+---+------+------+
|  Alice| 25|     F| 60000|
|Charlie| 35|     M| 70000|
|  Diana| 40|     F| 52000|
|  Fiona| 32|     F| 75000|
+-------+---+------+------+



# Show me anyhone whos name starts with 'A' or 'F' and  salary above 50,000


In [None]:
df_filtered = df.filter(
    (col("name").startswith("A") | col("name").startswith("M")) &
    (col("salary") > 50000)
)

df_filtered.show()

Filter rows where salary is not < 50,000 OR Age < 30

In [None]:

df_filtered = df.filter(
    ~((col("salary") < 50000) | (col("age") < 30))
)
df_filtered.show()

Add a new column high_earner that is Yes if salary > 60,000 else No, then filter

In [None]:
df2 = df.withColumn(
    "high_earner",
    when(col("salary") > 60000, "Yes").otherwise("No")
)



df_filtered = df2.filter(col("high_earner") == "Yes")
df_filtered.show()

Create a new column that categorizes age group, then fliter

In [None]:
df2 = df.withColumn(
    "age_group",
    when(col("age") < 30, "Young")
    .when((col("age") >= 10) & (col("age") <= 40), "Middle-aged")
    .otherwise("Senior")
)

df_filtered = df2.filter(col("age_group") == "Young")
df_filtered.show()

#Create data for SQL examples

In [None]:
#Create our data
data = [
    ("Alice", 25, "F", 60000),
    ("Bob", 30, "M", 48000),
    ("Charlie", 35, "M", 70000),
    ("Diana", 40, "F", 52000),
    ("Evan", 28, "M", 35000),
    ("Fiona", 32, "F", 75000)
]

columns = ["name", "age", "gender", "salary"]
df = spark.createDataFrame(data, columns)
df.show()

+-------+---+------+------+
|   name|age|gender|salary|
+-------+---+------+------+
|  Alice| 25|     F| 60000|
|    Bob| 30|     M| 48000|
|Charlie| 35|     M| 70000|
|  Diana| 40|     F| 52000|
|   Evan| 28|     M| 35000|
|  Fiona| 32|     F| 75000|
+-------+---+------+------+



#Register as SQL Table

In [None]:

# Register as SQL Table
df.createOrReplaceTempView("employees")


#AND

In [None]:

spark.sql("""SELECT * FROM employees
WHERE age > 30 AND salary > 50000;""").show()

+-------+---+------+------+
|   name|age|gender|salary|
+-------+---+------+------+
|Charlie| 35|     M| 70000|
|  Diana| 40|     F| 52000|
|  Fiona| 32|     F| 75000|
+-------+---+------+------+



#OR

In [None]:
spark.sql("""SELECT *
FROM employees
WHERE salary < 40000 OR age < 28;""").show()

+-----+---+------+------+
| name|age|gender|salary|
+-----+---+------+------+
|Alice| 25|     F| 60000|
| Evan| 28|     M| 35000|
+-----+---+------+------+



# Not

In [None]:
spark.sql("""SELECT *
FROM employees
WHERE NOT (gender = 'M');""").show()

+-----+---+------+------+
| name|age|gender|salary|
+-----+---+------+------+
|Alice| 25|     F| 60000|
|Diana| 40|     F| 52000|
|Fiona| 32|     F| 75000|
+-----+---+------+------+



In [None]:
spark.sql("""SELECT *
FROM employees
WHERE age BETWEEN 30 AND 40;""").show()



+-------+---+------+------+
|   name|age|gender|salary|
+-------+---+------+------+
|    Bob| 30|     M| 48000|
|Charlie| 35|     M| 70000|
|  Diana| 40|     F| 52000|
|  Fiona| 32|     F| 75000|
+-------+---+------+------+



#Males with salary between 45kâ€“75k AND age > 28

In [None]:

spark.sql("""SELECT *
FROM employees
WHERE gender = 'M'
  AND salary BETWEEN 45000 AND 75000
  AND age > 28;""").show()


+-------+---+------+------+
|   name|age|gender|salary|
+-------+---+------+------+
|    Bob| 30|     M| 48000|
|Charlie| 35|     M| 70000|
+-------+---+------+------+



#PySpark SQL aggregation examples

In [None]:

spark.sql("""SELECT COUNT(*) AS total_employees
FROM employees;""").show()

+---------------+
|total_employees|
+---------------+
|              6|
+---------------+



#Count by gender

In [None]:

spark.sql("""SELECT gender, COUNT(*) AS count_by_gender
FROM employees
GROUP BY gender;""").show()

+------+---------------+
|gender|count_by_gender|
+------+---------------+
|     F|              3|
|     M|              3|
+------+---------------+



#Aggregations grouped by gender

In [None]:

spark.sql("""SELECT
    gender,
    COUNT(*) AS total_people,
    AVG(salary) AS avg_salary,
    MIN(age) AS youngest,
    MAX(age) AS oldest
FROM employees
GROUP BY gender;""").show()

+------+------------+------------------+--------+------+
|gender|total_people|        avg_salary|youngest|oldest|
+------+------------+------------------+--------+------+
|     F|           3|62333.333333333336|      25|    40|
|     M|           3|           51000.0|      28|    35|
+------+------------+------------------+--------+------+

