## PySpark Tutorial in VSC

In [1]:
# Import necessary libraries
import os
import shutil

from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
# pyspark type of data 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# pyspark functions
from pyspark.sql.functions import col, desc, round

from pyspark.sql.window import Window
from pyspark.sql import functions as F

import pandas as pd

from delta import configure_spark_with_delta_pip

In [2]:
# To use Spark with VSC please run this path if you need change path to SPARK_HOME where you install it
os.environ['SPARK_HOME'] = "/Users/maxrogowski/PycharmProjects/Spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'code'
os.environ['PYSPARK_PYTHON'] = 'python'

# path to downloaded spark-avro
avro_jar_path = "./spark-avro_2.12-3.5.1.jar"

# Add paths to PYSPARK_SUBMIT_ARGS to use both AVRO and Delta Lake
os.environ["PYSPARK_SUBMIT_ARGS"] = f"--jars {avro_jar_path} pyspark-shell"


In [3]:
# Create a Session
spark = SparkSession.builder.appName("PySpark-Get-Started").getOrCreate()


24/03/26 21:11:15 WARN Utils: Your hostname, Maxs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.57 instead (on interface en0)
24/03/26 21:11:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/03/26 21:11:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# Test the setup check if data is running on spark
data = [("Max", 40), ("Aga", 14), ("Tomas", 18)]
df = spark.createDataFrame(data, ["Name", "Body Age"])
df.show()

                                                                                

+-----+--------+
| Name|Body Age|
+-----+--------+
|  Max|      40|
|  Aga|      14|
|Tomas|      18|
+-----+--------+



In [5]:
# Stop Spark session 
spark.stop()

In [6]:
# Create a SparkSession RDD - demo 
spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()
# Perform operations using the SparkSession
spark

In [7]:
# Put list to RDD
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)

# Collect action: Retrieve all elements of the RDD
rdd.collect()

[1, 2, 3, 4, 5]

In [8]:
# Create an RDD from a list of tuples
data = [("Max", 25), ("Tomas", 30), ("Aga", 35), ("Max", 40)]
rdd = spark.sparkContext.parallelize(data)

# Print all elements
print("All elements of the rdd: ", rdd.collect())

# Count action: Count the number of elements in the RDD
count = rdd.count()
print("The total number of elements in rdd: ", count)

# First action: Retrieve the first element of the RDD
first_element = rdd.first()
print("The first element of the rdd: ", first_element)

# Take action: Retrieve the n elements of the RDD
taken_elements = rdd.take(2)
print("The first two elements of the rdd: ", taken_elements)

All elements of the rdd:  [('Max', 25), ('Tomas', 30), ('Aga', 35), ('Max', 40)]


                                                                                

The total number of elements in rdd:  4
The first element of the rdd:  ('Max', 25)
The first two elements of the rdd:  [('Max', 25), ('Tomas', 30)]


In [9]:
# Pint in loop
rdd.foreach(lambda x: print(x))


('Tomas', 30)
('Max', 25)
('Aga', 35)
('Max', 40)


In [10]:
# Map transformation: Convert name to uppercase
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))
result = mapped_rdd.collect()
print("RDD with uppercase name: ", result) 

# Filter transformation: Filter records where age is greater than 30
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
print("Filter records where age is greater than 30: ", filtered_rdd.collect())

RDD with uppercase name:  [('MAX', 25), ('TOMAS', 30), ('AGA', 35), ('MAX', 40)]
Filter records where age is greater than 30:  [('Aga', 35), ('Max', 40)]


In [11]:
# ReduceByKey transformation: Calculate the total age for each name
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()
print("Calculate the total age for each name: ",  reduced_rdd.collect())

# SortBy transformation: Sort the RDD by age in descending order
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()
print("SortBy transformation: Sort the RDD by age in descending order: ",  sorted_rdd.collect())

# Save action: Save the RDD to a text file
# Removing the existing output directory if it exists
shutil.rmtree("data/output.txt", ignore_errors=False)

# Saving the RDD to a text file
rdd.saveAsTextFile("data/output.txt")


Calculate the total age for each name:  [('Max', 65), ('Tomas', 30), ('Aga', 35)]
SortBy transformation: Sort the RDD by age in descending order:  [('Max', 40), ('Aga', 35), ('Tomas', 30), ('Max', 25)]


In [12]:
# Shut down Spark Session
spark.stop()


## Create-DataFrame

In [13]:
spark = SparkSession.builder.appName("Create-DataFrame").getOrCreate()

In [14]:
# Read DataFrame from file
df = spark.read.text("./data/data.txt")

# Group by words and count to create statistic
result_df = df.selectExpr("explode(split(value, ' ')) as word").groupBy("word").count().orderBy(desc("count"))

In [15]:
# Print first 10 most popular word 
result_df.take(10)

[Row(word='the', count=12),
 Row(word='of', count=7),
 Row(word='a', count=7),
 Row(word='in', count=5),
 Row(word='distributed', count=5),
 Row(word='Spark', count=4),
 Row(word='API', count=3),
 Row(word='RDD', count=3),
 Row(word='is', count=3),
 Row(word='on', count=3)]

## Read CSV file into DataFrame


In [16]:
# to open file using bash

In [17]:
%%bash
head -n 6 ./data/products.csv

id,name,category,quantity,price
1,iPhone 12,Electronics,10,899.99
2,Nike Air Max 90,Clothing,25,119.99
3,KitchenAid Stand Mixer,Home Appliances,5,299.99
4,The Great Gatsby,Books,50,12.99
5,L'Oreal Paris Mascara,Beauty,100,9.99


## Read CSV with header


In [18]:
# Read CSV file into DataFrame with header
csv_file_path = "./data/products.csv"
df = spark.read.csv(csv_file_path, header=True)

In [19]:
# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price: string (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



In [20]:
# Schema price is not correct it is a string to read it correctly we can use schema like in db

# Define the schema 
schema = StructType([
    StructField(name="id", dataType=IntegerType(), nullable=True),
    StructField(name="name", dataType=StringType(), nullable=True),
    StructField(name="category", dataType=StringType(), nullable=True),
    # type quantity is Double not string
    StructField(name="quantity", dataType=IntegerType(), nullable=True),
    # type price is Double not string
    StructField(name="price", dataType=DoubleType(), nullable=True)
])

In [21]:
# Read CSV file into DataFrame with correct schema definition
csv_file_path = "./data/products.csv"
df = spark.read.csv(csv_file_path, header=True, schema=schema)

In [22]:
# Print schema of DataFrame
df.printSchema()

# Print content of DataFrame
df.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  7| Samsung 4K Smart TV|    Electronics|       8|799.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
| 10| Harry Potter Series|          Books|      20| 15.99|
+---+--------------------+------------

## Read CSV with inferSchema - spark defines the data types itself 


In [23]:
# Read CSV file into DataFrame with inferSchema
csv_file_path = "./data/products.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Print schema of DataFrame
df.printSchema()

# Print content of DataFrame
df.show(5)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



## Read JSON file into DataFrame

### Single line json file 

In [24]:
%%bash
head -n 5 ./data/products_singleline.json


{"id":1,"name":"iPhone 12","category":"Electronics","quantity":10,"price":899.99}
{"id":2,"name":"Nike Air Max 90","category":"Clothing","quantity":25,"price":119.99}
{"id":3,"name":"KitchenAid Stand Mixer","category":"Home Appliances","quantity":5,"price":299.99}
{"id":4,"name":"The Great Gatsby","category":"Books","quantity":50,"price":12.99}
{"id":5,"name":"L'Oreal Paris Mascara","category":"Beauty","quantity":100,"price":9.99}


In [25]:
# Read single line JSON
# Each row is a JSON record, records are separated by new line
json_file_path = "./data/products_singleline.json"
df = spark.read.json(json_file_path)

# Print schema of DataFrame
df.printSchema()

# Print content of DataFrame
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



cat ./data/products_singleline.json


In [26]:
%%bash
head -n 22 ./data/products_multiline.json


[
  {
    "id": 1,
    "name": "iPhone 12",
    "category": "Electronics",
    "quantity": 10,
    "price": 899.99
  },
  {
    "id": 2,
    "name": "Nike Air Max 90",
    "category": "Clothing",
    "quantity": 25,
    "price": 119.99
  },
  {
    "id": 3,
    "name": "KitchenAid Stand Mixer",
    "category": "Home Appliances",
    "quantity": 5,
    "price": 299.99
  },


In [27]:
# Read multi-line JSON
# JSON is an array of record, records are separated by a comma each record is defined in multiple lines
json_file_path = "./data/products_multiline.json"
df = spark.read.json(json_file_path, multiLine=True)

# Print schema of DataFrame
df.printSchema()

# Print content of DataFrame
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



## Write and Read parquet file

In [28]:
# Write DataFrame into parquet file
parquet_file_path = "./data/products.parquet"
df.write.mode('overwrite').parquet(parquet_file_path)

In [29]:
# Read data from Parquet file
df = spark.read.parquet(parquet_file_path)

# Print schema of DataFrame
df.printSchema()

# Print content of DataFrame
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



In [30]:
spark.stop()


## DataFrame Operations: Select, Filter, GroupBy, Join, Sort, Distinct, Drop, WithColumn, Alias

In [31]:
%%bash
head -5 data/stocks.txt

id,name,category,quantity,price
1,iPhone,Electronics,10,899.99
2,Macbook,Electronics,5,1299.99
3,iPad,Electronics,15,499.99
4,Samsung TV,Electronics,8,799.99


In [32]:
# Create a SparkSession
spark = SparkSession.builder.appName("DataFrame-Operations").getOrCreate()

# Load the synthetic data into a DataFrame
data_file_path = "./data/stocks.txt"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

# Display schema of DataFrame
df.printSchema()

# Show the initial DataFrame
print("Initial DataFrame:")
df.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

Initial DataFrame:
+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  1|          iPhone|Electronics|      10| 899.99|
|  2|         Macbook|Electronics|       5|1299.99|
|  3|            iPad|Electronics|      15| 499.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
| 10|    Dining Table|  Furniture|      10| 249.99|
+---+----------------+-----------+--------+-------+
only showing top 10 rows



In [33]:
# Select specific columns
selected_columns = df.select("id", "name", "price")
print("Selected Columns:")
selected_columns.show(10)

Selected Columns:
+---+----------------+-------+
| id|            name|  price|
+---+----------------+-------+
|  1|          iPhone| 899.99|
|  2|         Macbook|1299.99|
|  3|            iPad| 499.99|
|  4|      Samsung TV| 799.99|
|  5|           LG TV| 699.99|
|  6|      Nike Shoes|  99.99|
|  7|    Adidas Shoes|  89.99|
|  8| Sony Headphones| 149.99|
|  9|Beats Headphones| 199.99|
| 10|    Dining Table| 249.99|
+---+----------------+-------+
only showing top 10 rows



In [34]:
# Filter rows based on a condition
filtered_data = df.filter(df.quantity > 20)
print("Filtered Data:", filtered_data.count())
filtered_data.show()

# GroupBy and Aggregations Group By: Group data based on category, data is sum quantity and average price
grouped_data = df.groupBy("category").agg({"quantity": "sum", "price": "avg"})
print("Grouped and Aggregated Data:")
grouped_data.show()

# Join with another DataFrame to avoid duplicate category rename it to secondary category
df2 = df.select("id", "category").limit(10).withColumnRenamed("category", "secondary category")
joined_data = df.join(df2, "id", "inner")
print("Joined Data:")
joined_data.show()

# Sort by a column
sorted_data = df.orderBy(desc("price"))
print("Data Sorted DESC:")
sorted_data.show(10)

# Sort by a column
sorted_data = df.orderBy("price")
print("Data Sorted ASC:")
sorted_data.show(10)

# Sort by a column desc by 2 columns
sorted_data = df.orderBy(col("price").desc(), col("id").desc())
print("Sorted Data Descending:")
sorted_data.show(10)


Filtered Data: 12
+---+--------------+-----------+--------+-----+
| id|          name|   category|quantity|price|
+---+--------------+-----------+--------+-----+
|  6|    Nike Shoes|   Clothing|      30|99.99|
|  7|  Adidas Shoes|   Clothing|      25|89.99|
| 12|        Apples|       Food|     100|  0.5|
| 13|       Bananas|       Food|     150| 0.25|
| 14|       Oranges|       Food|     120| 0.75|
| 15|Chicken Breast|       Food|      50| 3.99|
| 16| Salmon Fillet|       Food|      30| 5.99|
| 24|    Laptop Bag|Accessories|      25|29.99|
| 25|      Backpack|Accessories|      30|24.99|
| 28|         Jeans|   Clothing|      30|59.99|
| 29|       T-shirt|   Clothing|      50|14.99|
| 30|      Sneakers|   Clothing|      40|79.99|
+---+--------------+-----------+--------+-----+

Grouped and Aggregated Data:
+-----------+-------------+------------------+
|   category|sum(quantity)|        avg(price)|
+-----------+-------------+------------------+
|       Food|          450|2.29600000000000

In [35]:
# Get distinct product category (Get unique rows)
distinct_rows = df.select("category").distinct()
print("Distinct Product Categories:")
distinct_rows.show()

# Drop: Remove specified columns.
dropped_columns = df.drop("quantity", "category")
print("Dropped Columns:")
dropped_columns.show(10)

# WithColumn: Add new calculated columns.
df_with_new_column = df.withColumn("revenue", df.quantity * df.price)
print("DataFrame with New 'revenue' Column:")
df_with_new_column.show(10)


# Alias: Rename columns for better readability.
df_with_alias = df.withColumnRenamed("price", "product_price")
print("DataFrame with Aliased Column:")
df_with_alias.show(10)

Distinct Product Categories:
+-----------+
|   category|
+-----------+
|       Food|
|     Sports|
|Electronics|
|   Clothing|
|  Furniture|
|Accessories|
+-----------+

Dropped Columns:
+---+----------------+-------+
| id|            name|  price|
+---+----------------+-------+
|  1|          iPhone| 899.99|
|  2|         Macbook|1299.99|
|  3|            iPad| 499.99|
|  4|      Samsung TV| 799.99|
|  5|           LG TV| 699.99|
|  6|      Nike Shoes|  99.99|
|  7|    Adidas Shoes|  89.99|
|  8| Sony Headphones| 149.99|
|  9|Beats Headphones| 199.99|
| 10|    Dining Table| 249.99|
+---+----------------+-------+
only showing top 10 rows

DataFrame with New 'revenue' Column:
+---+----------------+-----------+--------+-------+-------+
| id|            name|   category|quantity|  price|revenue|
+---+----------------+-----------+--------+-------+-------+
|  1|          iPhone|Electronics|      10| 899.99| 8999.9|
|  2|         Macbook|Electronics|       5|1299.99|6499.95|
|  3|           

In [36]:
# Stop the SparkSession
spark.stop()

## Spark-SQL

In [37]:
# Create a SparkSession
spark = SparkSession.builder.appName("DataFrameSQL").getOrCreate()

# Load the synthetic data into a DataFrame
data_file_path = "./data/persons.csv"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

# Display schema of DataFrame
df.printSchema()

# Show the initial DataFrame
print("Initial DataFrame:")
df.show(10)

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

Initial DataFrame:
+------------------+---+------+------+
|              name|age|gender|salary|
+------------------+---+------+------+
|          John Doe| 30|  Male| 50000|
|        Jane Smith| 25|Female| 45000|
|     David Johnson| 35|  Male| 60000|
|       Emily Davis| 28|Female| 52000|
|    Michael Wilson| 40|  Male| 75000|
|       Sarah Brown| 32|Female| 58000|
|        Robert Lee| 29|  Male| 51000|
|       Lisa Garcia| 27|Female| 49000|
|    James Martinez| 38|  Male| 70000|
|Jennifer Rodriguez| 26|Female| 47000|
+------------------+---+------+------+
only showing top 10 rows



In [38]:
# Register the DataFrame as a Temporary Table (dump DataFrame to table)
df.createOrReplaceTempView("my_table")

In [39]:
# Select all rows where age is greater than 25
result = spark.sql("SELECT * FROM my_table WHERE age > 25")

result.show()

+------------------+---+------+------+
|              name|age|gender|salary|
+------------------+---+------+------+
|          John Doe| 30|  Male| 50000|
|     David Johnson| 35|  Male| 60000|
|       Emily Davis| 28|Female| 52000|
|    Michael Wilson| 40|  Male| 75000|
|       Sarah Brown| 32|Female| 58000|
|        Robert Lee| 29|  Male| 51000|
|       Lisa Garcia| 27|Female| 49000|
|    James Martinez| 38|  Male| 70000|
|Jennifer Rodriguez| 26|Female| 47000|
|  William Anderson| 33|  Male| 62000|
|   Karen Hernandez| 31|Female| 55000|
|Christopher Taylor| 37|  Male| 69000|
|     Matthew Davis| 36|  Male| 67000|
|    Patricia White| 29|Female| 50000|
|     Daniel Miller| 34|  Male| 64000|
| Elizabeth Jackson| 30|Female| 52000|
|     Joseph Harris| 28|  Male| 53000|
|      Linda Martin| 39|Female| 71000|
+------------------+---+------+------+



In [40]:
# Compute the average salary by gender
avg_salary_by_gender = spark.sql("SELECT gender, AVG(salary) as avg_salary FROM my_table GROUP BY gender")
avg_salary_by_gender.show()

+------+----------+
|gender|avg_salary|
+------+----------+
|Female|   52300.0|
|  Male|   62100.0|
+------+----------+



In [41]:
# Compute the average salary by gender and age, rounded to every 10 years
avg_salary_by_gender_age = spark.sql("SELECT gender, FLOOR(age/10)*10 AS age_group, AVG(salary) AS avg_salary FROM my_table GROUP BY gender, age_group ORDER BY age_group DESC, gender ASC")
avg_salary_by_gender_age = avg_salary_by_gender_age.withColumn("avg_salary", round(avg_salary_by_gender_age["avg_salary"], 2))
avg_salary_by_gender_age.show()

+------+---------+----------+
|gender|age_group|avg_salary|
+------+---------+----------+
|  Male|       40|   75000.0|
|Female|       30|   59000.0|
|  Male|       30|  63142.86|
|Female|       20|  47833.33|
|  Male|       20|   52000.0|
+------+---------+----------+



## Creating and managing temporary views.


In [42]:
# Create a temporary view
df.createOrReplaceTempView("people")

# Query the temporary view
result = spark.sql("SELECT * FROM people WHERE age > 25")

result.show()

+------------------+---+------+------+
|              name|age|gender|salary|
+------------------+---+------+------+
|          John Doe| 30|  Male| 50000|
|     David Johnson| 35|  Male| 60000|
|       Emily Davis| 28|Female| 52000|
|    Michael Wilson| 40|  Male| 75000|
|       Sarah Brown| 32|Female| 58000|
|        Robert Lee| 29|  Male| 51000|
|       Lisa Garcia| 27|Female| 49000|
|    James Martinez| 38|  Male| 70000|
|Jennifer Rodriguez| 26|Female| 47000|
|  William Anderson| 33|  Male| 62000|
|   Karen Hernandez| 31|Female| 55000|
|Christopher Taylor| 37|  Male| 69000|
|     Matthew Davis| 36|  Male| 67000|
|    Patricia White| 29|Female| 50000|
|     Daniel Miller| 34|  Male| 64000|
| Elizabeth Jackson| 30|Female| 52000|
|     Joseph Harris| 28|  Male| 53000|
|      Linda Martin| 39|Female| 71000|
+------------------+---+------+------+



In [43]:
# Check if a temporary view exists
view_exists_1 = spark.catalog.tableExists("people")
print(f'If view exist nr.1: {view_exists_1}')

# Drop a temporary view
spark.catalog.dropTempView("people")

# Check if a temporary view exists
view_exists_2 = spark.catalog.tableExists("people")

print(f'If view exist nr.2: {view_exists_2}')

If view exist nr.1: True
If view exist nr.2: False


## Subquries

In [44]:
# Create DataFrames
employee_data = [
    (1, "John"), (2, "Alice"), (3, "Bob"), (4, "Emily"),
    (5, "David"), (6, "Sarah"), (7, "Michael"), (8, "Lisa"),
    (9, "William")
]
employees = spark.createDataFrame(employee_data, ["id", "name"])

salary_data = [
    ("HR", 1, 66000), ("HR", 2, 55000), ("HR", 3, 58000),
    ("IT", 4, 70000), ("IT", 5, 72000), ("IT", 6, 68000),
    ("Sales", 7, 75000), ("Sales", 8, 78000), ("Sales", 9, 77000)
]
salaries = spark.createDataFrame(salary_data, ["department", "id", "salary"])

employees.show()

salaries.show()

# Register as temporary views
employees.createOrReplaceTempView("employees")
salaries.createOrReplaceTempView("salaries")

+---+-------+
| id|   name|
+---+-------+
|  1|   John|
|  2|  Alice|
|  3|    Bob|
|  4|  Emily|
|  5|  David|
|  6|  Sarah|
|  7|Michael|
|  8|   Lisa|
|  9|William|
+---+-------+

+----------+---+------+
|department| id|salary|
+----------+---+------+
|        HR|  1| 66000|
|        HR|  2| 55000|
|        HR|  3| 58000|
|        IT|  4| 70000|
|        IT|  5| 72000|
|        IT|  6| 68000|
|     Sales|  7| 75000|
|     Sales|  8| 78000|
|     Sales|  9| 77000|
+----------+---+------+



In [45]:
# get average salary
result = spark.sql("""
    SELECT AVG(salary) FROM salaries
""")
result.show()

# Subquery to find employees with salaries above average and add salary 
result = spark.sql("""
    SELECT employees.name, salaries.salary
    FROM employees
    JOIN salaries ON employees.id = salaries.id
    WHERE salaries.salary > (
        SELECT AVG(salary)
        FROM salaries
    );
""")

result.show()

+-----------------+
|      avg(salary)|
+-----------------+
|68777.77777777778|
+-----------------+

+-------+------+
|   name|salary|
+-------+------+
|  Emily| 70000|
|  David| 72000|
|Michael| 75000|
|   Lisa| 78000|
|William| 77000|
+-------+------+



## Window Function


In [46]:
employee_salary = spark.sql("""
    select  salaries.*, employees.name
    from salaries 
    left join employees on salaries.id = employees.id
""")

employee_salary.show()

+----------+---+------+-------+
|department| id|salary|   name|
+----------+---+------+-------+
|        HR|  1| 66000|   John|
|        HR|  2| 55000|  Alice|
|        HR|  3| 58000|    Bob|
|        IT|  4| 70000|  Emily|
|        IT|  5| 72000|  David|
|        IT|  6| 68000|  Sarah|
|     Sales|  7| 75000|Michael|
|     Sales|  9| 77000|William|
|     Sales|  8| 78000|   Lisa|
+----------+---+------+-------+



In [47]:
# Create a window specification
window_spec = Window.partitionBy("department").orderBy(F.desc("salary"))

# Calculate the rank of employees within each department based on salary
employee_salary_with_rank = employee_salary.withColumn("rank", F.rank().over(window_spec))
employee_salary_with_rank.show()

# Get name with best salary per department
employee_salary_with_rank.filter(employee_salary_with_rank.rank == 1).show()


+----------+---+------+-------+----+
|department| id|salary|   name|rank|
+----------+---+------+-------+----+
|        HR|  1| 66000|   John|   1|
|        HR|  3| 58000|    Bob|   2|
|        HR|  2| 55000|  Alice|   3|
|        IT|  5| 72000|  David|   1|
|        IT|  4| 70000|  Emily|   2|
|        IT|  6| 68000|  Sarah|   3|
|     Sales|  8| 78000|   Lisa|   1|
|     Sales|  9| 77000|William|   2|
|     Sales|  7| 75000|Michael|   3|
+----------+---+------+-------+----+

+----------+---+------+-----+----+
|department| id|salary| name|rank|
+----------+---+------+-----+----+
|        HR|  1| 66000| John|   1|
|        IT|  5| 72000|David|   1|
|     Sales|  8| 78000| Lisa|   1|
+----------+---+------+-----+----+



In [48]:
# Definition of a function that formats the table appearance using pandas
def pretty_print(df):
    print("Table Preview:")
    df_pandas = df.toPandas()
    print(df_pandas)
    
pretty_print(df)

# Print just using pandas 
df_pandas = df.toPandas()
df_pandas

Table Preview:
                  name  age  gender  salary
0             John Doe   30    Male   50000
1           Jane Smith   25  Female   45000
2        David Johnson   35    Male   60000
3          Emily Davis   28  Female   52000
4       Michael Wilson   40    Male   75000
5          Sarah Brown   32  Female   58000
6           Robert Lee   29    Male   51000
7          Lisa Garcia   27  Female   49000
8       James Martinez   38    Male   70000
9   Jennifer Rodriguez   26  Female   47000
10    William Anderson   33    Male   62000
11     Karen Hernandez   31  Female   55000
12  Christopher Taylor   37    Male   69000
13       Mary Gonzalez   24  Female   44000
14       Matthew Davis   36    Male   67000
15      Patricia White   29  Female   50000
16       Daniel Miller   34    Male   64000
17   Elizabeth Jackson   30  Female   52000
18       Joseph Harris   28    Male   53000
19        Linda Martin   39  Female   71000


Unnamed: 0,name,age,gender,salary
0,John Doe,30,Male,50000
1,Jane Smith,25,Female,45000
2,David Johnson,35,Male,60000
3,Emily Davis,28,Female,52000
4,Michael Wilson,40,Male,75000
5,Sarah Brown,32,Female,58000
6,Robert Lee,29,Male,51000
7,Lisa Garcia,27,Female,49000
8,James Martinez,38,Male,70000
9,Jennifer Rodriguez,26,Female,47000


In [49]:
# Stop the SparkSession
spark.stop()


## Avro file format Example

In [50]:
# Create a SparkSession
spark = SparkSession.builder.appName("AvroExample").config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.0").getOrCreate()
# Load the synthetic data into a DataFrame
data_file_path = "./data/stocks.txt"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

# Display schema of DataFrame
df.printSchema()

# Show the initial DataFrame
print("Initial DataFrame:")
df.show(10)

# Convert DataFrame to Avro format
df.write.format("avro").mode("overwrite").save("data/operations.avro")

# Load data in Avro format
df_avro = spark.read.format("avro").load("data/operations.avro")

# Display loaded data
print("Display loaded data:")
df_avro.show()

# Stop the SparkSession
spark.stop()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

Initial DataFrame:
+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  1|          iPhone|Electronics|      10| 899.99|
|  2|         Macbook|Electronics|       5|1299.99|
|  3|            iPad|Electronics|      15| 499.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
| 10|    Dining Table|  Furniture|      10| 249.99|
+---+----------------+-----------+--------+-------+
only showing top 10 rows

Display loaded data:
+---+------