# Spark SQL

This notebook demonstrates how to use Spark SQL to perform data analysis using SQL queries on DataFrames.


In [None]:
import os
import sys

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'

import pyspark
os.environ['SPARK_HOME'] = pyspark.__path__[0]

os.environ['PATH'] = f"{os.environ['JAVA_HOME']}/bin:{os.environ['SPARK_HOME']}/bin:{os.environ.get('PATH', '')}"

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

print(f"JAVA_HOME: {os.environ['JAVA_HOME']}")
print(f"SPARK_HOME: {os.environ['SPARK_HOME']}")
print("Environment configured.")

In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import array
from pyspark.sql.functions import explode


In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Spark SQL Exercises") \
    .master("local[*]") \
    .getOrCreate()

print("Spark Session Created Successfully!")
print(f"Spark Version: {spark.version}")


## Load and familiarize yourself with the dataset

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Re-create the students DataFrame from the previous session for SQL examples
data = [
    ("Alice", 20, 85.5, "Math"),
    ("Bob", 21, 78.0, "Math"),
    ("Charlie", 19, 92.0, "Science"),
    ("David", 22, 88.5, "Science"),
    ("Eve", 20, 95.0, "Math")
]

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("grade", DoubleType(), True),
    StructField("subject", StringType(), True)
])

df_students = spark.createDataFrame(data, schema=schema)
df_students.createOrReplaceTempView("students")

In [None]:
# Count students by age and display the results
query = """
SELECT age, count(*) as count
FROM students
GROUP BY age
ORDER BY age
"""
spark.sql(query).show()

In [None]:
# Create datasets directory to store our csv file
!mkdir -p ../datasets

Let's prepare the dataset.

To load the dataset in our "datasets" directory, you have two options:
1. **Mount the local data/tickets.csv file** from the repository into your notebook environment in the docker-compose setup.
2. **Upload the csv file manually** in this Jupyter environment.

You can find the csv file in the repositoy or download it directly from [this link](https://www.kaggle.com/datasets/tobiasbueck/multilingual-customer-support-tickets?select=dataset-tickets-multi-lang3-4k.csv).


In [16]:
# Check if you have uploaded the csv file sucessfully.
!ls -lh ../datasets

total 4,0K
drwxr-xr-x 2 dalpenyes dalpenyes 4,0K ene 24 11:48 data


In [21]:
# Read first 5 rows of the csv file
!head -n 5 ../datasets/data/tickets.csv

subject,body,answer,type,queue,priority,language,business_type,tag_1,tag_2,tag_3,tag_4,tag_5,tag_6,tag_7,tag_8,tag_9
Problema crítico del servidor requiere atención inmediata,Es necesaria una investigación inmediata sobre la interrupción en el servicio de gestión de AWS que está impactando funciones comerciales esenciales.,Estamos investigando urgentemente el problema con el servicio de gestión de AWS. Proporcionaremos actualizaciones sobre nuestro progreso.,Incident,Technical Support,high,es,IT Services,Urgent Issue,Service Disruption,Incident Report,Service Recovery,System Maintenance,,,,
Anfrage zur Verfügbarkeit des Dell XPS 13 9310,"Sehr geehrter Kundenservice,

ich hoffe, diese E-Mail erreicht Sie wohl. Ich schreibe, um mich nach der Verfügbarkeit und den Lieferoptionen für das Dell XPS 13 9310 Ultrabook zu erkundigen. Könnten Sie mir bitte mitteilen, ob dieses Modell derzeit auf Lager ist und die geschätzte Lieferzeit zu meinem Standort? Ich bin sehr daran interessiert, es so sc

## Read CSV with Spark

In [24]:
# Create Dataframe from csv file
ticketsDF = (
    spark.read
        .option("header", True)
        .option("inferSchema", True)
        .option("multiLine", True)
        .option("escape", "\"")                # handle inner quotes
        .csv("../datasets/data/tickets.csv")
)


# Print schema and first 5 rows
ticketsDF.printSchema()
ticketsDF.show(5)

root
 |-- subject: string (nullable = true)
 |-- body: string (nullable = true)
 |-- answer: string (nullable = true)
 |-- type: string (nullable = true)
 |-- queue: string (nullable = true)
 |-- priority: string (nullable = true)
 |-- language: string (nullable = true)
 |-- business_type: string (nullable = true)
 |-- tag_1: string (nullable = true)
 |-- tag_2: string (nullable = true)
 |-- tag_3: string (nullable = true)
 |-- tag_4: string (nullable = true)
 |-- tag_5: string (nullable = true)
 |-- tag_6: string (nullable = true)
 |-- tag_7: string (nullable = true)
 |-- tag_8: string (nullable = true)
 |-- tag_9: string (nullable = true)

+--------------------+--------------------+--------------------+--------+-----------------+--------+--------+-----------------+-----------------+------------------+--------------------+----------------+--------------------+------------------+------------------+-----+-----+
|             subject|                body|              answer|    type|   

## Recap - Basic Filtering, Grouping & Aggregation

In [25]:
# Task 1: Show only high-priority tickets
highPriorityDF = ticketsDF.filter(col("priority") == "high")

highPriorityDF.show(truncate=False)

+------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [26]:
# Task 2: Return the number of tickets by type (Incident, Request…), ordering them from biggest amount to least
ticketsByTypeDF = ticketsDF.groupBy("type").count().orderBy(col("count").desc())

ticketsByTypeDF.show()


+--------+-----+
|    type|count|
+--------+-----+
|Incident| 1608|
| Request| 1097|
| Problem|  853|
|  Change|  442|
+--------+-----+



In [27]:
## Task 3: Count tickets by language
ticketsByLangDF = ticketsDF.groupBy("language").count().orderBy(col("count").desc())

ticketsByLangDF.show()


+--------+-----+
|language|count|
+--------+-----+
|      en| 1391|
|      de|  848|
|      es|  812|
|      fr|  476|
|      pt|  473|
+--------+-----+



## SQL Exercises

In order to use SQL query language, we need to register a table based on our DataFrame.

In [28]:
ticketsDF.createOrReplaceTempView("tickets")

In [30]:
# Show first 5 rows
spark.sql("""
SELECT * FROM tickets
LIMIT 5
""").show()

+--------------------+--------------------+--------------------+--------+-----------------+--------+--------+-----------------+-----------------+------------------+--------------------+----------------+--------------------+------------------+------------------+-----+-----+
|             subject|                body|              answer|    type|            queue|priority|language|    business_type|            tag_1|             tag_2|               tag_3|           tag_4|               tag_5|             tag_6|             tag_7|tag_8|tag_9|
+--------------------+--------------------+--------------------+--------+-----------------+--------+--------+-----------------+-----------------+------------------+--------------------+----------------+--------------------+------------------+------------------+-----+-----+
|Problema crítico ...|Es necesaria una ...|Estamos investiga...|Incident|Technical Support|    high|      es|      IT Services|     Urgent Issue|Service Disruption|     Incident 

In [31]:
# Task 4: Count tickets by priority (SQL version)
spark.sql("""
SELECT priority, COUNT(*) as count
FROM tickets
GROUP BY priority
ORDER BY count DESC
""").show()

+--------+-----+
|priority|count|
+--------+-----+
|    high| 1649|
|  medium| 1603|
|     low|  748|
+--------+-----+



In [32]:
# Task 5: Which ticket subjects contain the keyword “Account” (SQL version)
spark.sql("""
SELECT * FROM tickets
WHERE subject LIKE '%Account%'
""").show(truncate=False)

+------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## UDFs (User-Defined Functions)

UDFs (User-Defined Functions) allow you to create custom functions that can be applied to DataFrame columns in Spark SQL. They are useful when you need to perform operations that are not available in the built-in functions provided by Spark.

In [41]:
# Task 6: Create a UDF to detect whether a ticket is security-related

security_keywords = ["security", "cyber", "breach", "attack", "incident", "risk"]

# Create a function that checks if any of the security keywords (case insensitive) are present in the subject or body of the ticket
def is_security_ticket(subject, body):
    # Handle None values to avoid TypeError: unsupported operand type(s) for +: 'NoneType' and 'str'
    full_text = (str(subject or "") + " " + str(body or "")).lower()
    return any(keyword.lower() in full_text for keyword in security_keywords)

# Register for DataFrame use
isSecurityUDF = udf(is_security_ticket, BooleanType())


In [42]:

# Register for SQL use
spark.udf.register("is_security_ticket", is_security_ticket, BooleanType())

<function __main__.is_security_ticket(subject, body)>

In [43]:
# Apply the UDF to our tickets DataFrame
ticketsSecurityDF = ticketsDF.withColumn(
    "is_security_ticket",
    isSecurityUDF(col("subject"), col("body"))
)

# Print the results, showing the subject and whether it's a security ticket
ticketsSecurityDF.select("subject", "is_security_ticket").show(truncate=False)

+---------------------------------------------------------------------+------------------+
|subject                                                              |is_security_ticket|
+---------------------------------------------------------------------+------------------+
|Problema crítico del servidor requiere atención inmediata            |false             |
|Anfrage zur Verfügbarkeit des Dell XPS 13 9310                       |false             |
|Erro na Autocompletação de Código do IntelliJ IDEA                   |false             |
|Urgent Assistance Required: AWS Service                              |false             |
|Problème d'affichage de MacBook Air                                  |false             |
|Urgent: Issue with Zoom Screen Sharing Feature                       |true              |
|Discrepancia de facturación en Google Workspace                      |false             |
|Service outage resolution requested for ongoing issues               |false             |

In [44]:
# Count the security tickets
securityTicketCount = ticketsSecurityDF.filter(col("is_security_ticket") == True).count()

In [45]:
# Use the same UDF but on SQL
# Ensure the table is registered
ticketsDF.createOrReplaceTempView("tickets")

spark.sql("""
SELECT subject, is_security_ticket(subject, body) as is_security
FROM tickets
LIMIT 5
""").show(truncate=False)

+---------------------------------------------------------+-----------+
|subject                                                  |is_security|
+---------------------------------------------------------+-----------+
|Problema crítico del servidor requiere atención inmediata|false      |
|Anfrage zur Verfügbarkeit des Dell XPS 13 9310           |false      |
|Erro na Autocompletação de Código do IntelliJ IDEA       |false      |
|Urgent Assistance Required: AWS Service                  |false      |
|Problème d'affichage de MacBook Air                      |false      |
+---------------------------------------------------------+-----------+



In [46]:
# Count the security tickets with SQL
spark.sql("""
SELECT count(*) as security_ticket_count
FROM tickets
WHERE is_security_ticket(subject, body) = true
""").show()

+---------------------+
|security_ticket_count|
+---------------------+
|                  165|
+---------------------+



## Working with arrays

As you might have notices we have multiple columns for tags. Let's convert these tags columns to an array and perform operations on an array column.

In [49]:
# Task 7: Create new column "tags" of type array for our Dataframe, containing all the tags (tag_1 to tag_8)
tagsDF = ticketsDF.withColumn("tags", array("tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8"))


In [50]:
# Task 8: Explode tags to find most common tags
explodedTagsDF = tagsDF.select(explode("tags").alias("tag"))
explodedTagsDF.show()

+--------------------+
|                 tag|
+--------------------+
|        Urgent Issue|
|  Service Disruption|
|     Incident Report|
|    Service Recovery|
|  System Maintenance|
|                NULL|
|                NULL|
|                NULL|
|       Sales Inquiry|
|     Product Support|
|    Customer Service|
|         Order Issue|
|Returns and Excha...|
|                NULL|
|                NULL|
|                NULL|
|   Technical Support|
|        Software Bug|
|  Problem Resolution|
|        Urgent Issue|
+--------------------+
only showing top 20 rows



In [53]:
# Count the number of tags and print the results from most used to least used, ignoring NULL values
tagCountsDF = (
    explodedTagsDF.filter(col("tag").isNotNull())
    .groupBy("tag")
    .count()
    .orderBy(col("count").desc())
)

tagCountsDF.show()

+--------------------+-----+
|                 tag|count|
+--------------------+-----+
|   Technical Support| 3164|
|  Problem Resolution| 2835|
|     Product Support| 1850|
|        Urgent Issue| 1766|
|    Customer Service| 1077|
|        Software Bug| 1041|
|          IT Support| 1026|
|  Technical Guidance| 1022|
|  Service Disruption|  892|
|    Hardware Failure|  814|
|  Performance Tuning|  735|
|    Service Recovery|  656|
|     General Inquiry|  594|
|  Account Assistance|  566|
|       Network Issue|  547|
|  System Maintenance|  435|
|     Incident Report|  398|
|       Billing Issue|  380|
|Returns and Excha...|  318|
|Service Notification|  305|
+--------------------+-----+
only showing top 20 rows



In [54]:
# Stop the Spark Session
spark.stop()