# Chapter 3: SQL Queries in Spark

In [0]:
salary_data_with_id = [(1, "John", "Field-eng", 3500, 40), \
    (2, "Robert", "Sales", 4000, 38), \
    (3, "Maria", "Finance", 3500, 28), \
    (4, "Michael", "Sales", 3000, 20), \
    (5, "Kelly", "Finance", 3500, 35), \
    (6, "Kate", "Finance", 3000, 45), \
    (7, "Martin", "Finance", 3500, 26), \
    (8, "Kiran", "Sales", 2200, 35), \
  ]
columns= ["ID", "Employee", "Department", "Salary", "Age"]
salary_data_with_id = spark.createDataFrame(data = salary_data_with_id, schema = columns)
salary_data_with_id.show()


+---+--------+----------+------+---+
| ID|Employee|Department|Salary|Age|
+---+--------+----------+------+---+
|  1|    John| Field-eng|  3500| 40|
|  2|  Robert|     Sales|  4000| 38|
|  3|   Maria|   Finance|  3500| 28|
|  4| Michael|     Sales|  3000| 20|
|  5|   Kelly|   Finance|  3500| 35|
|  6|    Kate|   Finance|  3000| 45|
|  7|  Martin|   Finance|  3500| 26|
|  8|   Kiran|     Sales|  2200| 35|
+---+--------+----------+------+---+



In [0]:
salary_data_with_id.write.format("csv").mode("overwrite").option("header", "true").save("salary_data.csv")


In [0]:
csv_data = spark.read.csv('/salary_data.csv', header=True)

In [0]:
csv_data.show()

+---+--------+----------+------+---+
| ID|Employee|Department|Salary|Age|
+---+--------+----------+------+---+
|  1|    John| Field-eng|  3500| 40|
|  2|  Robert|     Sales|  4000| 38|
|  3|   Maria|   Finance|  3500| 28|
|  4| Michael|     Sales|  3000| 20|
|  5|   Kelly|   Finance|  3500| 35|
|  6|    Kate|   Finance|  3000| 45|
|  7|  Martin|   Finance|  3500| 26|
|  8|   Kiran|     Sales|  2200| 35|
+---+--------+----------+------+---+



In [0]:
# Perform transformations on the loaded data 
processed_data = csv_data.filter(csv_data["Salary"] > 3000) 
# Save the processed data as a table 
processed_data.createOrReplaceTempView("high_salary_employees") 
# Perform SQL queries on the saved table 
results = spark.sql("SELECT * FROM high_salary_employees ") 
results.show()


+---+--------+----------+------+---+
| ID|Employee|Department|Salary|Age|
+---+--------+----------+------+---+
|  1|    John| Field-eng|  3500| 40|
|  2|  Robert|     Sales|  4000| 38|
|  3|   Maria|   Finance|  3500| 28|
|  5|   Kelly|   Finance|  3500| 35|
|  7|  Martin|   Finance|  3500| 26|
+---+--------+----------+------+---+



In [0]:
# Save the processed data as a view 
salary_data_with_id.createOrReplaceTempView("employees") 
#Apply filtering on data
filtered_data = spark.sql("SELECT Employee, Department, Salary, Age FROM employees WHERE age > 30") 
# Display the results 
filtered_data.show()


+--------+----------+------+---+
|Employee|Department|Salary|Age|
+--------+----------+------+---+
|    John| Field-eng|  3500| 40|
|  Robert|     Sales|  4000| 38|
|   Kelly|   Finance|  3500| 35|
|    Kate|   Finance|  3000| 45|
|   Kiran|     Sales|  2200| 35|
+--------+----------+------+---+



In [0]:
# Perform an aggregation to calculate the average salary 
average_salary = spark.sql("SELECT AVG(Salary) AS average_salary FROM employees") 
# Display the average salary 
average_salary.show() 


+--------------+
|average_salary|
+--------------+
|        3275.0|
+--------------+



In [0]:
# Sort the data based on the salary column in descending order 
sorted_data = spark.sql("SELECT * FROM employees ORDER BY Salary DESC") 
# Display the sorted data 
sorted_data.show() 


+---+--------+----------+------+---+
| ID|Employee|Department|Salary|Age|
+---+--------+----------+------+---+
|  2|  Robert|     Sales|  4000| 38|
|  1|    John| Field-eng|  3500| 40|
|  7|  Martin|   Finance|  3500| 26|
|  3|   Maria|   Finance|  3500| 28|
|  5|   Kelly|   Finance|  3500| 35|
|  4| Michael|     Sales|  3000| 20|
|  6|    Kate|   Finance|  3000| 45|
|  8|   Kiran|     Sales|  2200| 35|
+---+--------+----------+------+---+



In [0]:
# Sort the data based on the salary column in descending order 
filtered_data = spark.sql("SELECT Employee, Department, Salary, Age FROM employees WHERE age > 30 AND Salary > 3000 ORDER BY Salary DESC") 
# Display the results 
filtered_data.show()


+--------+----------+------+---+
|Employee|Department|Salary|Age|
+--------+----------+------+---+
|  Robert|     Sales|  4000| 38|
|    John| Field-eng|  3500| 40|
|   Kelly|   Finance|  3500| 35|
+--------+----------+------+---+



In [0]:
# Group the data based on the Department column and take average salary for each department  
grouped_data = spark.sql("SELECT Department, avg(Salary) FROM employees GROUP BY Department") 
# Display the results 
grouped_data.show()


+----------+------------------+
|Department|       avg(Salary)|
+----------+------------------+
|     Sales|3066.6666666666665|
|   Finance|            3375.0|
| Field-eng|            3500.0|
+----------+------------------+



In [0]:
# Perform grouping and multiple aggregations  
aggregated_data = spark.sql("SELECT Department, sum(Salary) AS total_salary, max(Salary) AS max_salary FROM employees GROUP BY Department") 

# Display the results 
aggregated_data.show()


+----------+------------+----------+
|Department|total_salary|max_salary|
+----------+------------+----------+
|     Sales|        9200|      4000|
|   Finance|       13500|      3500|
| Field-eng|        3500|      3500|
+----------+------------+----------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum

# Define the window specification
window_spec = Window.partitionBy("Department").orderBy("Age")

# Calculate the cumulative sum using window function
df_with_cumulative_sum = salary_data_with_id.withColumn("cumulative_sum", sum(col("Salary")).over(window_spec))

# Display the result
df_with_cumulative_sum.show()


+---+--------+----------+------+---+--------------+
| ID|Employee|Department|Salary|Age|cumulative_sum|
+---+--------+----------+------+---+--------------+
|  1|    John| Field-eng|  3500| 40|          3500|
|  7|  Martin|   Finance|  3500| 26|          3500|
|  3|   Maria|   Finance|  3500| 28|          7000|
|  5|   Kelly|   Finance|  3500| 35|         10500|
|  6|    Kate|   Finance|  3000| 45|         13500|
|  4| Michael|     Sales|  3000| 20|          3000|
|  8|   Kiran|     Sales|  2200| 35|          5200|
|  2|  Robert|     Sales|  4000| 38|          9200|
+---+--------+----------+------+---+--------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a UDF to capitalize a string
capitalize_udf = udf(lambda x: x.upper(), StringType())

# Apply the UDF to a column
df_with_capitalized_names = salary_data_with_id.withColumn("capitalized_name", capitalize_udf("Employee"))

# Display the result
df_with_capitalized_names.show()


+---+--------+----------+------+---+----------------+
| ID|Employee|Department|Salary|Age|capitalized_name|
+---+--------+----------+------+---+----------------+
|  1|    John| Field-eng|  3500| 40|            JOHN|
|  2|  Robert|     Sales|  4000| 38|          ROBERT|
|  3|   Maria|   Finance|  3500| 28|           MARIA|
|  4| Michael|     Sales|  3000| 20|         MICHAEL|
|  5|   Kelly|   Finance|  3500| 35|           KELLY|
|  6|    Kate|   Finance|  3000| 45|            KATE|
|  7|  Martin|   Finance|  3500| 26|          MARTIN|
|  8|   Kiran|     Sales|  2200| 35|           KIRAN|
+---+--------+----------+------+---+----------------+



In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a UDF to capitalize a string
capitalize_udf = udf(lambda x: x.upper(), StringType())

# Apply the UDF to a column
df_with_capitalized_names = salary_data_with_id.withColumn("capitalized_name", capitalize_udf("Employee"))

# Display the result
df_with_capitalized_names.show()

+---+--------+----------+------+---+----------------+
| ID|Employee|Department|Salary|Age|capitalized_name|
+---+--------+----------+------+---+----------------+
|  1|    John| Field-eng|  3500| 40|            JOHN|
|  2|  Robert|     Sales|  4000| 38|          ROBERT|
|  3|   Maria|   Finance|  3500| 28|           MARIA|
|  4| Michael|     Sales|  3000| 20|         MICHAEL|
|  5|   Kelly|   Finance|  3500| 35|           KELLY|
|  6|    Kate|   Finance|  3000| 45|            KATE|
|  7|  Martin|   Finance|  3500| 26|          MARTIN|
|  8|   Kiran|     Sales|  2200| 35|           KIRAN|
+---+--------+----------+------+---+----------------+



In [0]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

salary_data_with_id.select(pandas_plus_one(salary_data_with_id.Salary)).show()


+-----------------------+
|pandas_plus_one(Salary)|
+-----------------------+
|                   3501|
|                   4001|
|                   3501|
|                   3001|
|                   3501|
|                   3001|
|                   3501|
|                   2201|
+-----------------------+



In [0]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(Salary) FROM employees").show()


+---------------+
|add_one(Salary)|
+---------------+
|           3501|
|           4001|
|           3501|
|           3001|
|           3501|
|           3001|
|           3501|
|           2201|
+---------------+

