In [3]:
import os
import findspark
findspark.init()

from pyspark.sql import SparkSession
os.environ['SPARK_HOME']='/Users/audioworkstation/Documents/WORKSPACE/LEARNING/pyspark-training/spark'
os.environ['PYSPARK_DRIVER_PYTHON']='jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']='lab'
os.environ['PYSPARK_PYTHON']='python'


spark = SparkSession.builder.appName("SQL DF").getOrCreate()

23/11/10 13:48:04 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
%%bash
head -n 10 ./resources/persons.csv

name,age,gender,salary
John Doe,30,Male,50000
Jane Smith,25,Female,45000
David Johnson,35,Male,60000
Emily Davis,28,Female,52000
Michael Wilson,40,Male,75000
Sarah Brown,32,Female,58000
Robert Lee,29,Male,51000
Lisa Garcia,27,Female,49000
James Martinez,38,Male,70000


In [9]:
df = spark.read.csv(
    './resources/persons.csv',
    inferSchema=True,
    header=True
)

df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+------------------+---+------+------+
|              name|age|gender|salary|
+------------------+---+------+------+
|          John Doe| 30|  Male| 50000|
|        Jane Smith| 25|Female| 45000|
|     David Johnson| 35|  Male| 60000|
|       Emily Davis| 28|Female| 52000|
|    Michael Wilson| 40|  Male| 75000|
|       Sarah Brown| 32|Female| 58000|
|        Robert Lee| 29|  Male| 51000|
|       Lisa Garcia| 27|Female| 49000|
|    James Martinez| 38|  Male| 70000|
|Jennifer Rodriguez| 26|Female| 47000|
|  William Anderson| 33|  Male| 62000|
|   Karen Hernandez| 31|Female| 55000|
|Christopher Taylor| 37|  Male| 69000|
|     Mary Gonzalez| 24|Female| 44000|
|     Matthew Davis| 36|  Male| 67000|
|    Patricia White| 29|Female| 50000|
|     Daniel Miller| 34|  Male| 64000|
| Elizabeth Jackson| 30|Female| 52000|
|     Joseph Harris| 28|  

### register a dataframe as a temporary view

In [17]:
df.createOrReplaceTempView('persons_view')

In [21]:
spark.sql("SELECT * FROM persons_view WHERE age > 25").show()

+------------------+---+------+------+
|              name|age|gender|salary|
+------------------+---+------+------+
|          John Doe| 30|  Male| 50000|
|     David Johnson| 35|  Male| 60000|
|       Emily Davis| 28|Female| 52000|
|    Michael Wilson| 40|  Male| 75000|
|       Sarah Brown| 32|Female| 58000|
|        Robert Lee| 29|  Male| 51000|
|       Lisa Garcia| 27|Female| 49000|
|    James Martinez| 38|  Male| 70000|
|Jennifer Rodriguez| 26|Female| 47000|
|  William Anderson| 33|  Male| 62000|
|   Karen Hernandez| 31|Female| 55000|
|Christopher Taylor| 37|  Male| 69000|
|     Matthew Davis| 36|  Male| 67000|
|    Patricia White| 29|Female| 50000|
|     Daniel Miller| 34|  Male| 64000|
| Elizabeth Jackson| 30|Female| 52000|
|     Joseph Harris| 28|  Male| 53000|
|      Linda Martin| 39|Female| 71000|
+------------------+---+------+------+



In [25]:
spark.sql("SELECT gender, AVG(salary) as avg_salary from persons_view GROUP BY gender;").show()

+------+----------+
|gender|avg_salary|
+------+----------+
|Female|   52300.0|
|  Male|   62100.0|
+------+----------+



 ### check if a temporary view exist or not?

In [28]:
spark.catalog.tableExists('persons_view')


True

In [29]:
spark.catalog.tableExists('persons')


False

### Drop a temporary view

In [30]:
spark.catalog.dropTempView('persons')

False

### Using advance SQL for data analysis

#### Subqueries

In [31]:
employee_data = [
    (1, "John"), (2, "Alice"), (3, "Bob"), (4, "Emily"),
    (5, "David"), (6, "Sarah"), (7, "Michael"), (8, "Lisa"),
    (9, "William")
]
employees = spark.createDataFrame(employee_data, ["id", "name"])

salary_data = [
    ("HR", 1, 60000), ("HR", 2, 55000), ("HR", 3, 58000),
    ("IT", 4, 70000), ("IT", 5, 72000), ("IT", 6, 68000),
    ("Sales", 7, 75000), ("Sales", 8, 78000), ("Sales", 9, 77000)
]

salaries = spark.createDataFrame(salary_data, ['department', 'id', 'salary'])

In [32]:
employees.show()
salaries.show()

                                                                                

+---+-------+
| id|   name|
+---+-------+
|  1|   John|
|  2|  Alice|
|  3|    Bob|
|  4|  Emily|
|  5|  David|
|  6|  Sarah|
|  7|Michael|
|  8|   Lisa|
|  9|William|
+---+-------+

+----------+---+------+
|department| id|salary|
+----------+---+------+
|        HR|  1| 60000|
|        HR|  2| 55000|
|        HR|  3| 58000|
|        IT|  4| 70000|
|        IT|  5| 72000|
|        IT|  6| 68000|
|     Sales|  7| 75000|
|     Sales|  8| 78000|
|     Sales|  9| 77000|
+----------+---+------+



In [34]:
employees.join(salaries, 'id', 'inner').show()



+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|   John|        HR| 60000|
|  2|  Alice|        HR| 55000|
|  3|    Bob|        HR| 58000|
|  4|  Emily|        IT| 70000|
|  5|  David|        IT| 72000|
|  6|  Sarah|        IT| 68000|
|  7|Michael|     Sales| 75000|
|  8|   Lisa|     Sales| 78000|
|  9|William|     Sales| 77000|
+---+-------+----------+------+



                                                                                

In [35]:
# Register as temporary views
employees.createOrReplaceTempView("employees")
salaries.createOrReplaceTempView("salaries")

In [37]:
spark.sql("""
    SELECT name
    FROM employees 
    WHERE id IN (
        SELECT id 
        FROM salaries 
        WHERE salary > (SELECT AVG(salary) FROM salaries)
    )
""").show()

                                                                                

+-------+
|   name|
+-------+
|  Emily|
|  David|
|Michael|
|   Lisa|
|William|
+-------+



### Window Function

In [38]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F



In [42]:
emp_sal = spark.sql(
    """
    SELECT sal.*, emp.name
    FROM salaries sal
    LEFT JOIN employees emp on sal.id = emp.id
    """
)

In [44]:
window_spec = Window.partitionBy('department').orderBy(F.desc('salary'))


In [45]:
emp_sal.withColumn('rank', F.rank().over(window_spec)).show()

                                                                                

+----------+---+------+-------+----+
|department| id|salary|   name|rank|
+----------+---+------+-------+----+
|        HR|  1| 60000|   John|   1|
|        HR|  3| 58000|    Bob|   2|
|        HR|  2| 55000|  Alice|   3|
|        IT|  5| 72000|  David|   1|
|        IT|  4| 70000|  Emily|   2|
|        IT|  6| 68000|  Sarah|   3|
|     Sales|  8| 78000|   Lisa|   1|
|     Sales|  9| 77000|William|   2|
|     Sales|  7| 75000|Michael|   3|
+----------+---+------+-------+----+



In [46]:
spark.stop()