In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import max, avg, min
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import when

In [None]:
!pip install pyspark findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark
findspark.init()

create local SparkSession

In [None]:
spark = SparkSession.builder.master("local[*]").appName("PySpark_Basics").getOrCreate()
spark

read csv with inferschema

In [None]:
df = spark.read.csv("ds_salaries.csv", header=True, inferSchema=True)
df.show(10)

+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
|_c0|work_year|experience_level|employment_type|           job_title|  salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist|   70000|            EUR|        79833|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|  260000|            USD|       260000|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer|   85000|            GBP|       109024|                GB|          50|     

read csv one more time with the same code and you will see that it almostly don't take time, because info already in SparkSession and it will not read nothing from this file

In [None]:
%%time
df = spark.read.csv("ds_salaries.csv", header=True, inferSchema=True)

CPU times: user 619 µs, sys: 1.96 ms, total: 2.58 ms
Wall time: 670 ms


In [None]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



create schema of this csv

In [None]:
schema = StructType([
    StructField("_c0", IntegerType(), True),
    StructField("work_year", IntegerType(), True),
    StructField("experience_level", StringType(), True),
    StructField("employment_type", StringType(), True),
    StructField("job_title", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("salary_currency", StringType(), True),
    StructField("salary_in_usd", IntegerType(), True),
    StructField("employee_residence", StringType(), True),
    StructField("remote_ratio", IntegerType(), True),
    StructField("company_location", StringType(), True),
    StructField("company_size", StringType(), True)
])

In [None]:
df = spark.read.csv("ds_salaries.csv", header=True, schema=schema)
df.show(10)
df.printSchema()

+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
|_c0|work_year|experience_level|employment_type|           job_title|  salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist|   70000|            EUR|        79833|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|  260000|            USD|       260000|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer|   85000|            GBP|       109024|                GB|          50|     

In [None]:
df_infer = spark.read.csv("ds_salaries.csv", header=True, inferSchema=True)
df_infer.show(5)
df_infer.printSchema()

+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|_c0|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist| 70000|            EUR|        79833|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|260000|            USD|       260000|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer| 85000|            GBP|       109024|                GB|          50|              GB|

print data in dataframe using df.show

In [None]:
df.show()

+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
|_c0|work_year|experience_level|employment_type|           job_title|  salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist|   70000|            EUR|        79833|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|  260000|            USD|       260000|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer|   85000|            GBP|       109024|                GB|          50|     

print data in dataframe using display(df.toPandas())

In [None]:
display(df.toPandas())

Unnamed: 0,_c0,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,0,2020,MI,FT,Data Scientist,70000,EUR,79833,DE,0,DE,L
1,1,2020,SE,FT,Machine Learning Scientist,260000,USD,260000,JP,0,JP,S
2,2,2020,SE,FT,Big Data Engineer,85000,GBP,109024,GB,50,GB,M
3,3,2020,MI,FT,Product Data Analyst,20000,USD,20000,HN,0,HN,S
4,4,2020,SE,FT,Machine Learning Engineer,150000,USD,150000,US,50,US,L
...,...,...,...,...,...,...,...,...,...,...,...,...
602,602,2022,SE,FT,Data Engineer,154000,USD,154000,US,100,US,M
603,603,2022,SE,FT,Data Engineer,126000,USD,126000,US,100,US,M
604,604,2022,SE,FT,Data Analyst,129000,USD,129000,US,0,US,M
605,605,2022,SE,FT,Data Analyst,150000,USD,150000,US,100,US,M


create df_job_title that consists from all job_titles without duplicates

In [None]:
df_job_title = df.select("job_title").distinct()

print all rows from df_job_titles without truncating jobs

In [None]:
df_job_title.show(truncate=False)

+----------------------------------------+
|job_title                               |
+----------------------------------------+
|3D Computer Vision Researcher           |
|Lead Data Engineer                      |
|Head of Machine Learning                |
|Data Specialist                         |
|Data Analytics Lead                     |
|Machine Learning Scientist              |
|Lead Data Analyst                       |
|Data Engineering Manager                |
|Staff Data Scientist                    |
|ETL Developer                           |
|Director of Data Engineering            |
|Product Data Analyst                    |
|Principal Data Scientist                |
|AI Scientist                            |
|Director of Data Science                |
|Machine Learning Engineer               |
|Lead Data Scientist                     |
|Machine Learning Infrastructure Engineer|
|Data Science Engineer                   |
|Machine Learning Manager                |
+----------

create df_analytic that will consists from max, avg, min USD salaries for all job_titles using groupBy. name of fields is avg_salary, min_salary, max_salary

In [None]:
df_analytic = df.groupby("job_title").agg(
    max("salary_in_usd").alias("max_salary"),
    avg("salary_in_usd").alias("avg_salary"),
    min("salary_in_usd").alias("min_salary")
)
df_analytic.show(truncate=False)

+----------------------------------------+----------+------------------+----------+
|job_title                               |max_salary|avg_salary        |min_salary|
+----------------------------------------+----------+------------------+----------+
|3D Computer Vision Researcher           |5409      |5409.0            |5409      |
|Lead Data Engineer                      |276000    |139724.5          |56000     |
|Head of Machine Learning                |79039     |79039.0           |79039     |
|Data Specialist                         |165000    |165000.0          |165000    |
|Data Analytics Lead                     |405000    |405000.0          |405000    |
|Machine Learning Scientist              |260000    |158412.5          |12000     |
|Lead Data Analyst                       |170000    |92203.0           |19609     |
|Data Engineering Manager                |174000    |123227.2          |59303     |
|Staff Data Scientist                    |105000    |105000.0          |1050

now you need to add in df_analytic column row_id, that will show order of all job_titles depending on avg salary. they should be descending

In [None]:
from pyspark.sql.functions import desc

window_function = Window.orderBy(desc("avg_salary"))
df_analytic_with_id = df_analytic.withColumn("row_id", row_number().over(window_function))
df_analytic_with_id.show(truncate=False)

+----------------------------------+----------+------------------+----------+------+
|job_title                         |max_salary|avg_salary        |min_salary|row_id|
+----------------------------------+----------+------------------+----------+------+
|Data Analytics Lead               |405000    |405000.0          |405000    |1     |
|Principal Data Engineer           |600000    |328333.3333333333 |185000    |2     |
|Financial Data Analyst            |450000    |275000.0          |100000    |3     |
|Principal Data Scientist          |416000    |215242.42857142858|148261    |4     |
|Director of Data Science          |325000    |195074.0          |130026    |5     |
|Data Architect                    |266400    |177873.9090909091 |90700     |6     |
|Applied Data Scientist            |380000    |175655.0          |54238     |7     |
|Analytics Engineer                |205300    |175000.0          |135000    |8     |
|Data Specialist                   |165000    |165000.0          

it isn't beautifull, so we need to put now row_id on first place in df_analytic

In [None]:
cols = ["row_id"] + [col for col in df_analytic_with_id.columns if col != "row_id"]
df_analytic_with_id = df_analytic_with_id.select(cols)
df_analytic_with_id.show(truncate=False)

+------+----------------------------------+----------+------------------+----------+
|row_id|job_title                         |max_salary|avg_salary        |min_salary|
+------+----------------------------------+----------+------------------+----------+
|1     |Data Analytics Lead               |405000    |405000.0          |405000    |
|2     |Principal Data Engineer           |600000    |328333.3333333333 |185000    |
|3     |Financial Data Analyst            |450000    |275000.0          |100000    |
|4     |Principal Data Scientist          |416000    |215242.42857142858|148261    |
|5     |Director of Data Science          |325000    |195074.0          |130026    |
|6     |Data Architect                    |266400    |177873.9090909091 |90700     |
|7     |Applied Data Scientist            |380000    |175655.0          |54238     |
|8     |Analytics Engineer                |205300    |175000.0          |135000    |
|9     |Data Specialist                   |165000    |165000.0   

here you need to create df_exp_lvl with the biggest usd_salary(biggest_salary) for each experience_level(you need to save all fields like in entire dataframe)

In [None]:
window_func = Window.partitionBy("experience_level").orderBy(desc("salary_in_usd"))
df_with_rank = df.withColumn("rank", row_number().over(window_func))
df_exp_lvl = df_with_rank.filter(col("rank") == 1).drop("rank")
df_exp_lvl.show(truncate=False)

+---+---------+----------------+---------------+-------------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|_c0|work_year|experience_level|employment_type|job_title                |salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+-------------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|37 |2020     |EN              |FT             |Machine Learning Engineer|250000|USD            |250000       |US                |50          |US              |L           |
|252|2021     |EX              |FT             |Principal Data Engineer  |600000|USD            |600000       |US                |100         |US              |L           |
|33 |2020     |MI              |FT             |Research Scientist       |450000|USD            |450000       |US                |

create df_best that consists from rows where salary of guy same as biggest salary for other people in his exp_lvl and choose only columns: id, experience_level, biggest_salary, employee_residence

In [None]:
max_salaries = df.groupBy("experience_level").agg(max("salary_in_usd").alias("biggest_salary"))
df_best = df.join(max_salaries, on="experience_level").filter(col("salary_in_usd") == col("biggest_salary")).select(
    col("_c0").alias("id"),
    col("experience_level"),
    col("biggest_salary"),
    col("employee_residence")
)
df_best.show(truncate=False)

+---+----------------+--------------+------------------+
|id |experience_level|biggest_salary|employee_residence|
+---+----------------+--------------+------------------+
|33 |MI              |450000        |US                |
|37 |EN              |250000        |US                |
|63 |SE              |412000        |US                |
|97 |MI              |450000        |US                |
|252|EX              |600000        |US                |
+---+----------------+--------------+------------------+



drop duplicates if exist by experience_level

In [None]:
df_best_unique = df_best.dropDuplicates(["experience_level"])
df_best_unique.show(truncate=False)

+---+----------------+--------------+------------------+
|id |experience_level|biggest_salary|employee_residence|
+---+----------------+--------------+------------------+
|37 |EN              |250000        |US                |
|252|EX              |600000        |US                |
|33 |MI              |450000        |US                |
|63 |SE              |412000        |US                |
+---+----------------+--------------+------------------+



create df_new_best from df_best without id, and make the next: when exp_level = MI we want middle, when SE we want senior, else Null

In [None]:
df_new_best = df_best_unique.drop("id").withColumn("exp_level_desc", when(col("experience_level") == "MI", "middle").when(col("experience_level") == "SE", "senior").otherwise(None))
df_new_best.show(truncate=False)

+----------------+--------------+------------------+--------------+
|experience_level|biggest_salary|employee_residence|exp_level_desc|
+----------------+--------------+------------------+--------------+
|EN              |250000        |US                |NULL          |
|EX              |600000        |US                |NULL          |
|MI              |450000        |US                |middle        |
|SE              |412000        |US                |senior        |
+----------------+--------------+------------------+--------------+



write df_new_best like 1.csv and load then it to df_final

In [None]:
df_new_best.coalesce(1).write.mode("overwrite").option("header", True).csv("1.csv")
df_final = spark.read.option("header", True).csv("1.csv")
df_final.show(truncate=False)

+----------------+--------------+------------------+--------------+
|experience_level|biggest_salary|employee_residence|exp_level_desc|
+----------------+--------------+------------------+--------------+
|EN              |250000        |US                |NULL          |
|EX              |600000        |US                |NULL          |
|MI              |450000        |US                |middle        |
|SE              |412000        |US                |senior        |
+----------------+--------------+------------------+--------------+



filter df_final to delete experience_level where it Null, then join this table by biggest_salary(salary_in_usd) and employee_residence with entire df

In [None]:
df_filtered = df_final.filter(col("experience_level").isNotNull())
df_joined = df_filtered.join(df, (df_filtered.biggest_salary == df.salary_in_usd) & (df_filtered.employee_residence == df.employee_residence), how="inner")
df_joined.show(truncate=False)

+----------------+--------------+------------------+--------------+---+---------+----------------+---------------+-------------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|experience_level|biggest_salary|employee_residence|exp_level_desc|_c0|work_year|experience_level|employment_type|job_title                |salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+----------------+--------------+------------------+--------------+---+---------+----------------+---------------+-------------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|MI              |450000        |US                |middle        |33 |2020     |MI              |FT             |Research Scientist       |450000|USD            |450000       |US                |0           |US              |M           |
|EN              |250000        |US     

last task is to save in variable and then print this variable of the biggest salary_in_usd from df_final

In [None]:
max_salary_usd = df_final.agg(max("biggest_salary")).collect()[0][0]
print(f"Max salary_in_usd in df_final: {max_salary_usd}")


Max salary_in_usd in df_final: 600000
