<a href="https://colab.research.google.com/github/Anubhav0311/snowflake-migration/blob/main/pyspark_colab_practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark_One_Shot

This notebook sets up PySpark in Google Colab, loads sample data, and lets you practice DataFrame operations.

In [2]:
!apt-get install openjdk-11-jdk-headless -qq
!rm -f spark-3.5.3-bin-hadoop3.tgz
!wget -q https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar -xzf spark-3.5.3-bin-hadoop3.tgz
!pip install -q findspark





In [3]:
import os, findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySparkPractice").getOrCreate()



In [4]:
# ✅ Step 4: Upload sample CSV files
from google.colab import files
uploaded = files.upload()


Saving employees.csv to employees.csv
Saving salaries.csv to salaries.csv
Saving departments.csv to departments.csv


In [5]:
# ✅ Step 5: Load CSVs into DataFrames
employees_df = spark.read.option("header", True).option("inferSchema", True).csv("employees.csv")
departments_df = spark.read.option("header", True).option("inferSchema", True).csv("departments.csv")
salaries_df = spark.read.option("header", True).option("inferSchema", True).csv("salaries.csv")

# Preview
employees_df.show()
departments_df.show()
salaries_df.show()


+------+-------+-------------+------+----------+---+
|emp_id|   name|department_id|salary|manager_id|age|
+------+-------+-------------+------+----------+---+
|   101|  Alice|            1| 50000|       101| 34|
|   102|    Bob|            2| 60000|       102| 41|
|   103|Charlie|            1| 55000|       101| 29|
|   104|  David|            3| 70000|       104| 45|
|   105|    Eva|            2| 62000|       102| 36|
|   106|  Frank|            1| 58000|       101| 31|
+------+-------+-------------+------+----------+---+

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            1|             HR|
|            2|             IT|
|            3|        Finance|
|            4|      Marketing|
+-------------+---------------+

+------+-----+-----------+
|emp_id|month|salary_paid|
+------+-----+-----------+
|   101|  Jan|      50000|
|   102|  Jan|      60000|
|   103|  Jan|      55000|
|   104|  Jan|      70000|
|   105|  Jan|      620

🔹 1. Basic Selection & Filtering
Select all columns.

Select only name and salary.

Get employees with salary > 50,000.

Get female employees older than 30.

Get distinct department IDs.

🔹 2. Sorting and Limiting
List top 5 highest paid employees.

List youngest 3 employees in the company.

Sort employees by dept_id and then by salary descending.

🔹 3. Aggregations
Find average salary.

Find max and min salary.

Count total number of employees.

Find total salary expense per department.

🔹 4. Grouping
Group by department and find average salary.

Find number of employees by gender.

Group by age and count employees.

🔹 5. Filtering After Grouping (HAVING Equivalent)
Get departments with average salary > 60,000.

Find genders with more than 5 employees.

🔹 6. Joins (Assume departments table: dept_id, dept_name)
Join employees with departments to get dept names.

Get employee names and their department names.

Find employees working in the "Sales" department.

🔹 7. Window Functions
Rank employees by salary within each department.

Get top 1 highest paid employee per department.

Calculate running total of salary by department.

🔹 8. Subqueries / Nested logic
Get employees whose salary > average salary.

Find employees older than the average age.

Get employees not in the department with max total salary.

🔹 9. String and Date Functions (assume a hire_date column)
Extract year from hire_date.

Find employees hired after 2020.

Count number of employees hired each year.

Get length of employee names.

Filter employees whose names start with 'A'.

🔹 10. Advanced / Use Case Specific
Find the second highest salary.

Get duplicate salaries (i.e., salaries shared by more than 1 person).

Find departments with male-to-female ratio > 2.

Pivot: Show count of employees by gender for each department.

Unpivot: Convert wide gender-based counts into tall format.



In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window


***1. Basic Selection & Filtering Select all columns.***

**Select only name and salary.**

In [14]:
employees_df.select("name",col("salary")).show()


+-------+------+
|   name|salary|
+-------+------+
|  Alice| 50000|
|    Bob| 60000|
|Charlie| 55000|
|  David| 70000|
|    Eva| 62000|
|  Frank| 58000|
+-------+------+



**Get employees with salary > 50,000.**

In [15]:
employees_df.filter(col("salary")>50000).show()

+------+-------+-------------+------+
|emp_id|   name|department_id|salary|
+------+-------+-------------+------+
|   102|    Bob|            2| 60000|
|   103|Charlie|            1| 55000|
|   104|  David|            3| 70000|
|   105|    Eva|            2| 62000|
|   106|  Frank|            1| 58000|
+------+-------+-------------+------+



**Get female employees older than 30.**

In [17]:
employees_df.show()

# No data of age , can't be Proccessed.

+------+-------+-------------+------+
|emp_id|   name|department_id|salary|
+------+-------+-------------+------+
|   101|  Alice|            1| 50000|
|   102|    Bob|            2| 60000|
|   103|Charlie|            1| 55000|
|   104|  David|            3| 70000|
|   105|    Eva|            2| 62000|
|   106|  Frank|            1| 58000|
+------+-------+-------------+------+



**Get distinct department IDs.**

In [20]:
employees_df.select("department_id").distinct().show()
employees_df.printSchema()
  #printSchema definesd the Schema i.e column name and data type of the column name.



+-------------+
|department_id|
+-------------+
|            1|
|            3|
|            2|
+-------------+

root
 |-- emp_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- salary: integer (nullable = true)



***2. Sorting and Limiting ***

List top 5 highest paid employees.

In [31]:
employees_df.select(col("name"),col("salary"))\
            .orderBy(desc("salary"))\
            .limit (5)\
            .show()


+-------+------+
|   name|salary|
+-------+------+
|  David| 70000|
|    Eva| 62000|
|    Bob| 60000|
|  Frank| 58000|
|Charlie| 55000|
+-------+------+



List youngest 3 employees in the company.

In [9]:
employees_df.select("name","age")\
            .orderBy(asc("age"))\
            .limit(3)\
            .show()

+-------+---+
|   name|age|
+-------+---+
|Charlie| 29|
|  Frank| 31|
|  Alice| 34|
+-------+---+



Find Employees who Earn more than Manager

In [34]:
employees_df.alias("e").join(employees_df.alias("m"),col("e.manager_id") == col("m.emp_id"),"inner")\
            .filter(col("e.salary") > col("m.salary"))\
            .select(col("e.emp_id").alias("employee_id"),col("e.name").alias("employee_name"),col("e.salary").alias("employee_salary"),
                    col("m.emp_id").alias("manager_id"),col("m.name").alias("manager_name"),col("m.salary").alias("manager_salary"))\
            .show()

+-----------+-------------+---------------+----------+------------+--------------+
|employee_id|employee_name|employee_salary|manager_id|manager_name|manager_salary|
+-----------+-------------+---------------+----------+------------+--------------+
|        103|      Charlie|          55000|       101|       Alice|         50000|
|        105|          Eva|          62000|       102|         Bob|         60000|
|        106|        Frank|          58000|       101|       Alice|         50000|
+-----------+-------------+---------------+----------+------------+--------------+



In [14]:
employees_df.createOrReplaceTempView("employees")


In [15]:
spark.sql("select * from employees").show()

+------+-------+-------------+------+----------+---+
|emp_id|   name|department_id|salary|manager_id|age|
+------+-------+-------------+------+----------+---+
|   101|  Alice|            1| 50000|       101| 34|
|   102|    Bob|            2| 60000|       102| 41|
|   103|Charlie|            1| 55000|       101| 29|
|   104|  David|            3| 70000|       104| 45|
|   105|    Eva|            2| 62000|       102| 36|
|   106|  Frank|            1| 58000|       101| 31|
+------+-------+-------------+------+----------+---+



In [24]:
spark.sql("SELECT e.emp_id AS employee_id,e.name AS employee_name,e.salary AS employee_salary,m.emp_id AS manager_id,m.name AS manager_name,m.salary AS manager_salary FROM employees AS e JOIN employees AS m ON e.manager_id = m.emp_id WHERE e.salary > m.salary").show()

+-----------+-------------+---------------+----------+------------+--------------+
|employee_id|employee_name|employee_salary|manager_id|manager_name|manager_salary|
+-----------+-------------+---------------+----------+------------+--------------+
|        103|      Charlie|          55000|       101|       Alice|         50000|
|        105|          Eva|          62000|       102|         Bob|         60000|
|        106|        Frank|          58000|       101|       Alice|         50000|
+-----------+-------------+---------------+----------+------------+--------------+



Sort employees by dept_id and then by salary descending.

In [35]:
employees_df.select("department_id","salary")\
            .orderBy(asc("department_id"),desc("salary"))\
            .show()

+-------------+------+
|department_id|salary|
+-------------+------+
|            1| 58000|
|            1| 55000|
|            1| 50000|
|            2| 62000|
|            2| 60000|
|            3| 70000|
+-------------+------+



***Aggregations***

Find average salary.

In [13]:
employees_df.select(avg(col("salary")).alias("average_salary")).show()

+------------------+
|    average_salary|
+------------------+
|59166.666666666664|
+------------------+



Find max and min salary.

In [14]:
employees_df.select(max(col("salary")).alias("max_salary"),min(col("salary")).alias("min_salary")).show()

+----------+----------+
|max_salary|min_salary|
+----------+----------+
|     70000|     50000|
+----------+----------+



Count total number of employees.

In [15]:
employees_df.select(count(col("emp_id")).alias("no.of_employees")).show()

+---------------+
|no.of_employees|
+---------------+
|              6|
+---------------+



***Grouping***

Find total salary expense per department.

In [14]:
employees_df.groupBy("department_id")\
            .agg(sum(col("salary")).alias("total_salary"))\
            .orderBy(asc("department_id"))\
            .show()

+-------------+------------+
|department_id|total_salary|
+-------------+------------+
|            1|      163000|
|            2|      122000|
|            3|       70000|
+-------------+------------+



In [10]:
employees_df.show()
departments_df.show()
salaries_df.show()

+------+-------+-------------+------+----------+---+
|emp_id|   name|department_id|salary|manager_id|age|
+------+-------+-------------+------+----------+---+
|   101|  Alice|            1| 50000|       101| 34|
|   102|    Bob|            2| 60000|       102| 41|
|   103|Charlie|            1| 55000|       101| 29|
|   104|  David|            3| 70000|       104| 45|
|   105|    Eva|            2| 62000|       102| 36|
|   106|  Frank|            1| 58000|       101| 31|
+------+-------+-------------+------+----------+---+

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            1|             HR|
|            2|             IT|
|            3|        Finance|
|            4|      Marketing|
+-------------+---------------+

+------+-----+-----------+
|emp_id|month|salary_paid|
+------+-----+-----------+
|   101|  Jan|      50000|
|   102|  Jan|      60000|
|   103|  Jan|      55000|
|   104|  Jan|      70000|
|   105|  Jan|      620

Group by department and find average salary,also department name.

In [12]:
employees_df.alias("e").join(departments_df.alias("d"),col("e.department_id")==col("d.department_id"),"inner")\
                        .groupBy(("d.department_id"),("d.department_name")).agg(avg(col("e.salary")).alias("average_salary"))\
                        .orderBy(asc("d.department_id"))\
                        .show()




+-------------+---------------+------------------+
|department_id|department_name|    average_salary|
+-------------+---------------+------------------+
|            1|             HR|54333.333333333336|
|            2|             IT|           61000.0|
|            3|        Finance|           70000.0|
+-------------+---------------+------------------+



***Joins***

Join employees with departments to get dept names.\
Get employee names and their department names.

In [42]:
employees_df.alias("e").join(departments_df.alias("d"),col("e.department_id") == col("d.department_id"),"inner")\
                       .select((col("e.name").alias("employee_name")),(col("d.department_name").alias("department_name")))\
                       .show()



+-------------+---------------+
|employee_name|department_name|
+-------------+---------------+
|        Alice|             HR|
|          Bob|             IT|
|      Charlie|             HR|
|        David|        Finance|
|          Eva|             IT|
|        Frank|             HR|
+-------------+---------------+



Find employees working in the "IT" department.

In [44]:
employees_df.alias("e").join(departments_df.alias("d"),col("e.department_id") == col("d.department_id"),"inner")\
                       .select((col("e.name").alias("employee_name")),(col("d.department_name").alias("department_name")))\
                       .filter(col("d.department_name")=="IT")\
                       .show()



+-------------+---------------+
|employee_name|department_name|
+-------------+---------------+
|          Bob|             IT|
|          Eva|             IT|
+-------------+---------------+



In [7]:
employees_df.show()
departments_df.show()
salaries_df.show()

+------+-------+-------------+------+----------+---+
|emp_id|   name|department_id|salary|manager_id|age|
+------+-------+-------------+------+----------+---+
|   101|  Alice|            1| 50000|       101| 34|
|   102|    Bob|            2| 60000|       102| 41|
|   103|Charlie|            1| 55000|       101| 29|
|   104|  David|            3| 70000|       104| 45|
|   105|    Eva|            2| 62000|       102| 36|
|   106|  Frank|            1| 58000|       101| 31|
+------+-------+-------------+------+----------+---+

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            1|             HR|
|            2|             IT|
|            3|        Finance|
|            4|      Marketing|
+-------------+---------------+

+------+-----+-----------+
|emp_id|month|salary_paid|
+------+-----+-----------+
|   101|  Jan|      50000|
|   102|  Jan|      60000|
|   103|  Jan|      55000|
|   104|  Jan|      70000|
|   105|  Jan|      620

***Window Functions***

Rank employees by salary within each department.

In [10]:
window = Window.partitionBy("department_id").orderBy(desc("salary"))
employees_df.withColumn("rank",rank().over(window))\
            .withColumn("dense_rank",dense_rank().over(window))\
            .withColumn("row_number",row_number().over(window)).alias("e").join(departments_df.alias("d"),col("e.department_id")==col("d.department_id"),"inner")\
                       .show()


+------+-------+-------------+------+----------+---+----+----------+----------+-------------+---------------+
|emp_id|   name|department_id|salary|manager_id|age|rank|dense_rank|row_number|department_id|department_name|
+------+-------+-------------+------+----------+---+----+----------+----------+-------------+---------------+
|   106|  Frank|            1| 58000|       101| 31|   1|         1|         1|            1|             HR|
|   103|Charlie|            1| 55000|       101| 29|   2|         2|         2|            1|             HR|
|   101|  Alice|            1| 50000|       101| 34|   3|         3|         3|            1|             HR|
|   105|    Eva|            2| 62000|       102| 36|   1|         1|         1|            2|             IT|
|   102|    Bob|            2| 60000|       102| 41|   2|         2|         2|            2|             IT|
|   104|  David|            3| 70000|       104| 45|   1|         1|         1|            3|        Finance|
+------+--

In [8]:
employee_df_1.show()

+------+-------+-------------+------+----------+---+----+----------+----------+
|emp_id|   name|department_id|salary|manager_id|age|rank|dense_rank|row_number|
+------+-------+-------------+------+----------+---+----+----------+----------+
|   106|  Frank|            1| 58000|       101| 31|   1|         1|         1|
|   103|Charlie|            1| 55000|       101| 29|   2|         2|         2|
|   101|  Alice|            1| 50000|       101| 34|   3|         3|         3|
|   105|    Eva|            2| 62000|       102| 36|   1|         1|         1|
|   102|    Bob|            2| 60000|       102| 41|   2|         2|         2|
|   104|  David|            3| 70000|       104| 45|   1|         1|         1|
+------+-------+-------------+------+----------+---+----+----------+----------+



In [9]:
employee_df_1.alias("e").join(departments_df.alias("d"),col("e.department_id")==col("d.department_id"),"inner")\
                       .show()

+------+-------+-------------+------+----------+---+----+----------+----------+-------------+---------------+
|emp_id|   name|department_id|salary|manager_id|age|rank|dense_rank|row_number|department_id|department_name|
+------+-------+-------------+------+----------+---+----+----------+----------+-------------+---------------+
|   106|  Frank|            1| 58000|       101| 31|   1|         1|         1|            1|             HR|
|   103|Charlie|            1| 55000|       101| 29|   2|         2|         2|            1|             HR|
|   101|  Alice|            1| 50000|       101| 34|   3|         3|         3|            1|             HR|
|   105|    Eva|            2| 62000|       102| 36|   1|         1|         1|            2|             IT|
|   102|    Bob|            2| 60000|       102| 41|   2|         2|         2|            2|             IT|
|   104|  David|            3| 70000|       104| 45|   1|         1|         1|            3|        Finance|
+------+--

Get top 1 highest paid employee per department.

In [9]:
window = Window.partitionBy("department_id").orderBy(desc("salary"))
employees_df.withColumn("rank",rank().over(window))\
            .filter(col("rank")==1)\
            .show()

+------+-----+-------------+------+----------+---+----+
|emp_id| name|department_id|salary|manager_id|age|rank|
+------+-----+-------------+------+----------+---+----+
|   106|Frank|            1| 58000|       101| 31|   1|
|   105|  Eva|            2| 62000|       102| 36|   1|
|   104|David|            3| 70000|       104| 45|   1|
+------+-----+-------------+------+----------+---+----+



Rank employees by salary within each department also departyment name

In [11]:
employees_df.show()

+------+-------+-------------+------+----------+---+
|emp_id|   name|department_id|salary|manager_id|age|
+------+-------+-------------+------+----------+---+
|   101|  Alice|            1| 50000|       101| 34|
|   102|    Bob|            2| 60000|       102| 41|
|   103|Charlie|            1| 55000|       101| 29|
|   104|  David|            3| 70000|       104| 45|
|   105|    Eva|            2| 62000|       102| 36|
|   106|  Frank|            1| 58000|       101| 31|
+------+-------+-------------+------+----------+---+



In [13]:
windows = Window.partitionBy("department_id").orderBy(desc("salary"))

In [15]:
employees_df.alias("e").join(departments_df.alias("d"),col("e.department_id")==col("d.department_id"),"inner")\
                       .select(col("e.emp_id").alias("emp_id"),col("e.name").alias("name"),col("e.salary").alias("salary"),col("d.department_name").alias("department_name"),col("d.department_id").alias("department_id"))\
                       .withColumn("rank",rank().over(windows))\
                       .filter(col("rank")==1)\
                       .show()

# windows = Window.partitionBy("department_id").orderBy(desc("salary"))
# employees_df_joined.withColumn("rank",rank().over(windows))\
#             .show()


+------+-----+------+---------------+-------------+----+
|emp_id| name|salary|department_name|department_id|rank|
+------+-----+------+---------------+-------------+----+
|   106|Frank| 58000|             HR|            1|   1|
|   105|  Eva| 62000|             IT|            2|   1|
|   104|David| 70000|        Finance|            3|   1|
+------+-----+------+---------------+-------------+----+



AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `salary` cannot be resolved. Did you mean one of the following? [`emp_salary`, `emp_id`, `emp_name`, `department_id`, `department_name`].;
'Project [emp_id#1097, emp_name#1098, emp_salary#1099, department_name#1100, department_id#1101, rank() windowspecdefinition(department_id#1101, 'salary DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#1115]
+- Project [emp_id#313 AS emp_id#1097, name#314 AS emp_name#1098, salary#316 AS emp_salary#1099, department_name#343 AS department_name#1100, department_id#342 AS department_id#1101]
   +- Join Inner, (department_id#315 = department_id#342)
      :- SubqueryAlias e
      :  +- Relation [emp_id#313,name#314,department_id#315,salary#316,manager_id#317,age#318] csv
      +- SubqueryAlias d
         +- Relation [department_id#342,department_name#343] csv


In [23]:
employees_df_2 = employees_df.alias("e").join(departments_df.alias("d"),col("e.department_id")==col("d.department_id"),"inner")\
                       .select("e.*","d.*")\

windows = Window.partitionBy("department_id").orderBy(desc("salary"))
employees_df_2.withColumn("rank",rank().over(window))\
            .show()

AnalysisException: [AMBIGUOUS_REFERENCE] Reference `department_id` is ambiguous, could be: [`d`.`department_id`, `e`.`department_id`].

What will be the output of inner, outer, left, right, and cartesian joins?
A B
1 null
null 2
null 2
3 1
6 5
4 6
6






In [None]:
What will be the output of inner, outer, left, right, and cartesian joins?
A B
1 null
null 2
null 2
3 1
6 5
4 6
6


In [17]:
arr1 = [1,None, None,3,6,4,6]
arr2 = [None,2,2,1,5,6,None]

In [23]:
df_1 = spark.createDataFrame([(i,) for i in arr1], ["value"])
df_2 = spark.createDataFrame([(i,) for i in arr2], ["value"])


df_1.show()
df_2.show()
df_1.printSchema()

+-----+
|value|
+-----+
|    1|
| NULL|
| NULL|
|    3|
|    6|
|    4|
|    6|
+-----+

+-----+
|value|
+-----+
| NULL|
|    2|
|    2|
|    1|
|    5|
|    6|
| NULL|
+-----+

root
 |-- value: long (nullable = true)



In [None]:
1,1
6,6
6,6

In [None]:
1,1
null,nulli
null,null
3,nullif
6,6
4,nullif
6,6

In [24]:
df_1.alias("d1").join(df_2.alias("d2"),col("d1.value")==col("d2.value"),"inner").show()

+-----+-----+
|value|value|
+-----+-----+
|    1|    1|
|    6|    6|
|    6|    6|
+-----+-----+



In [25]:
df_1.alias("d1").join(df_2.alias("d2"),col("d1.value")==col("d2.value"),"left").show()

+-----+-----+
|value|value|
+-----+-----+
| NULL| NULL|
| NULL| NULL|
|    1|    1|
|    6|    6|
|    6|    6|
|    3| NULL|
|    4| NULL|
+-----+-----+



In [31]:
df_1.crossJoin(df_2).show(truncate = False)

+-----+-----+
|value|value|
+-----+-----+
|1    |NULL |
|1    |2    |
|1    |2    |
|NULL |NULL |
|NULL |2    |
|NULL |2    |
|NULL |NULL |
|NULL |2    |
|NULL |2    |
|1    |1    |
|1    |5    |
|1    |6    |
|1    |NULL |
|NULL |1    |
|NULL |5    |
|NULL |6    |
|NULL |NULL |
|NULL |1    |
|NULL |5    |
|NULL |6    |
+-----+-----+
only showing top 20 rows



In [27]:
df_1.alias("d1").join(df_2.alias("d2"),col("d1.value")==col("d2.value"),"outer").show()

+-----+-----+
|value|value|
+-----+-----+
| NULL| NULL|
| NULL| NULL|
| NULL| NULL|
| NULL| NULL|
|    1|    1|
| NULL|    2|
| NULL|    2|
|    3| NULL|
|    4| NULL|
| NULL|    5|
|    6|    6|
|    6|    6|
+-----+-----+

