## Initialize Spark Session

In [1]:
# set env
import os
os.environ['SPARK_HOME'] = "/home/cloud_user/apps/spark/current"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

# Import PySpark
from pyspark.sql import SparkSession, Window

# Create SparkSession
spark = SparkSession.builder \
    .appName("sparksql") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/26 18:56:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load data into dataframe

In [2]:
data_file_path = "./data/products.csv"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

                                                                                

In [3]:
# Print the Schema
df.printSchema()
# Show a sample
df.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  7| Samsung 4k Smart TV|    Electronics|       8|799.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
| 10|  Google Pixel 8 Pro|    Electronics|       4|699.99|
+---+--------------------+------------

## Register dataframe as a Temporary Table

In [4]:
df.createOrReplaceTempView("products")

## Perform SQL-Like Queries

In [5]:
# Select all rows where price is greater than 20
result = spark.sql("SELECT * FROM products WHERE price > 20")
print(f"Price greater than 20: {result.count()}")
result.show()

Price greater than 20: 10
+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  7| Samsung 4k Smart TV|    Electronics|       8|799.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
| 10|  Google Pixel 8 Pro|    Electronics|       4|699.99|
| 11|    Darth Vader Lego|           Toys|      10| 49.99|
| 12|       Colosium Lego|           Toys|       2|500.98|
+---+--------------------+---------------+--------+------+



In [6]:
# compute revenue
avg_price = spark.sql("SELECT category, AVG(price) as avg_price FROM products GROUP BY category")
avg_price.show()

+---------------+-----------------+
|       category|        avg_price|
+---------------+-----------------+
|         Sports|            29.99|
|    Electronics|799.9900000000001|
|       Clothing|            84.99|
|          Books|            12.99|
|Home Appliances|           349.99|
|         Beauty|             9.99|
|           Toys|          275.485|
+---------------+-----------------+



## Checking if tables exist

In [7]:
view_exists = spark.catalog.tableExists("products")
print(f"Does products table exist? {view_exists}")

Does products table exist? True


## Dropping a Temporary View

In [8]:
spark.catalog.dropTempView("products")

True

## Subqueries
Subqueries are nested SQL queries.

In [9]:
# Create DataFrames
employee_data = [
    (1, "John"), (2, "Alice"), (3, "Bob"), (4, "Emily"), 
    (5, "David"), (6, "Sarah"), (7, "Michael"), (8, "Lisa"), 
    (9, "William")]
employees = spark.createDataFrame(employee_data, ["id", "name"])
employees.printSchema()
employees.show(1)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



[Stage 10:>                                                         (0 + 1) / 1]

+---+----+
| id|name|
+---+----+
|  1|John|
+---+----+
only showing top 1 row



                                                                                

In [10]:
salary_data = [
    ("HR", 1, 60000), ("HR",2,55200), ("HR", 3, 58000),
    ("IT", 4, 70000), ("IT", 5, 72000), ("IT", 6, 68000),
    ("Sales", 7,75000), ("Sales", 8, 78000), ("Sales", 9, 77000)]
salaries = spark.createDataFrame(salary_data, ["department","id","salary"])
salaries.printSchema()
salaries.show()

root
 |-- department: string (nullable = true)
 |-- id: long (nullable = true)
 |-- salary: long (nullable = true)

+----------+---+------+
|department| id|salary|
+----------+---+------+
|        HR|  1| 60000|
|        HR|  2| 55200|
|        HR|  3| 58000|
|        IT|  4| 70000|
|        IT|  5| 72000|
|        IT|  6| 68000|
|     Sales|  7| 75000|
|     Sales|  8| 78000|
|     Sales|  9| 77000|
+----------+---+------+



In [11]:
# Register temporary views
employees.createOrReplaceTempView("employees")
salaries.createOrReplaceTempView("salaries")

In [12]:
# Create Subquery
result = spark.sql("""
    SELECT e.name
    FROM employees as e
    WHERE e.id IN (
        SELECT id
        FROM salaries
        WHERE salary > (SELECT AVG(salary) from salaries)
    )
""")
result.show()

+-------+
|   name|
+-------+
|  Emily|
|  David|
|Michael|
|   Lisa|
|William|
+-------+



## Window Function

In [13]:
# create a df of the two views joined
employee_salary = spark.sql("""
    select salaries.*, employees.name
    from salaries
    left join employees on salaries.id = employees.id
""")
employee_salary.show()

+----------+---+------+-------+
|department| id|salary|   name|
+----------+---+------+-------+
|        HR|  1| 60000|   John|
|        HR|  3| 58000|    Bob|
|        HR|  2| 55200|  Alice|
|        IT|  4| 70000|  Emily|
|     Sales|  7| 75000|Michael|
|        IT|  6| 68000|  Sarah|
|     Sales|  9| 77000|William|
|        IT|  5| 72000|  David|
|     Sales|  8| 78000|   Lisa|
+----------+---+------+-------+



In [14]:
from pyspark.sql.functions import col, desc, rank
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [15]:
# Create a window spec
window_spec = Window.partitionBy("department").orderBy(F.desc("salary"))

In [18]:
# create the column "rank" within each department
employee_salary.withColumn("rank", F.rank().over(window_spec)).show()

+----------+---+------+-------+----+
|department| id|salary|   name|rank|
+----------+---+------+-------+----+
|        HR|  1| 60000|   John|   1|
|        HR|  3| 58000|    Bob|   2|
|        HR|  2| 55200|  Alice|   3|
|        IT|  5| 72000|  David|   1|
|        IT|  4| 70000|  Emily|   2|
|        IT|  6| 68000|  Sarah|   3|
|     Sales|  8| 78000|   Lisa|   1|
|     Sales|  9| 77000|William|   2|
|     Sales|  7| 75000|Michael|   3|
+----------+---+------+-------+----+



In [19]:
# Stop the session
spark.stop()