<a href="https://colab.research.google.com/github/Vinaydahiya76/SQL-and-Pyspark-Dump/blob/main/pyspark_basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [None]:
from pyspark.sql.functions import *

In [None]:
df = spark.read.format("csv").option("header", "true").load("original.csv")

In [None]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [None]:
clean_df = df.withColumn("clean_city", when(df.City.isNull(), 'Unknown').otherwise(df.City))

In [None]:
clean_df = clean_df.withColumn("clean_job_title", when(df.JobTitle.isNull(), 'Unknown').otherwise(df.JobTitle))

In [None]:
clean_df = clean_df.filter(clean_df.clean_city.isNotNull())

In [None]:
clean_df = clean_df.withColumn("clean_salary", clean_df.Salary.substr(2,100).cast("float"))

In [23]:
mean = clean_df.groupBy().avg("clean_salary").take(1)[0][0]
print(mean)

55487.95562890625


In [27]:
# Replace empty strings with null values
clean_df = clean_df.replace('', None, subset=["clean_salary"])

# Replace null values in 'clean_salary' with the calculated mean
clean_df = clean_df.fillna(mean, subset=["clean_salary"])

clean_df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+--------------------+------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|     clean_job_title|clean_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+--------------------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda| Assistant Professor|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|       Programmer II|     62846.6|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|        Unknown|             Unknown|    57576.52|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...

In [28]:
from pyspark.sql.window import Window

# Filter for females in sales
sales_females_df = clean_df.filter((clean_df.gender == 'Female') & (clean_df.clean_job_title.like('%Sales%')))

# Define a window specification to order by salary in descending order
window_spec = Window.orderBy(desc("clean_salary"))

# Add a rank column
ranked_sales_females_df = sales_females_df.withColumn("rank", rank().over(window_spec))

# Filter for the second highest salary
second_highest_salary_df = ranked_sales_females_df.filter(ranked_sales_females_df.rank == 2)

# Show the result
second_highest_salary_df.show()

+---+----------+----------+------+-------+-----------------+---------+----------+-----------+----------+-----------------+------------+----+
| id|first_name| last_name|gender|   City|         JobTitle|   Salary|  Latitude|  Longitude|clean_city|  clean_job_title|clean_salary|rank|
+---+----------+----------+------+-------+-----------------+---------+----------+-----------+----------+-----------------+------------+----+
|889|    Janith|Laurenceau|Female|Ganfang|Director of Sales|$93860.65|40.0580479|116.3333034|   Ganfang|Director of Sales|    93860.65|   2|
+---+----------+----------+------+-------+-----------------+---------+----------+-----------+----------+-----------------+------------+----+

