import libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import max, avg, min, col, row_number, when, desc, monotonically_increasing_id, lit, rank, expr
from pyspark.sql.window import Window

create local SparkSession

In [2]:
%%time
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('Spark_App') \
                    .getOrCreate()

# local[*] - use all logical cpu cores = number of partitions

24/02/06 20:33:03 WARN Utils: Your hostname, user-thinkpad-e15-gen-2 resolves to a loopback address: 127.0.1.1; using 10.80.51.3 instead (on interface wlp0s20f3)
24/02/06 20:33:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/06 20:33:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


CPU times: user 41.1 ms, sys: 30.3 ms, total: 71.4 ms
Wall time: 20.1 s


read csv with inferschema

In [3]:
%%time
df = spark.read \
    .options(header='True', inferSchema='True', delimiter=',') \
    .format("csv") \
    .load("ds_salaries.csv", header=True)

CPU times: user 4.56 ms, sys: 382 µs, total: 4.94 ms
Wall time: 6.01 s


read csv one more time with the same code and you will see that it almostly don"t take time, because info already in SparkSession and it will not read nothing
from this file

In [4]:
%%time
df = spark.read \
    .option('inferSchema', True) \
    .option('delimiter', ',') \
    .csv("ds_salaries.csv", header=True)

CPU times: user 5.38 ms, sys: 1.59 ms, total: 6.96 ms
Wall time: 328 ms


write schema of csv on screen

In [5]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



create schema of this csv

In [6]:
schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("work_year", IntegerType(), nullable=False),
    StructField("experience_level", StringType(), nullable=False),
    StructField("employment_type", StringType(), nullable=False),
    StructField("job_title", StringType(), nullable=False),
    StructField("salary", IntegerType(), nullable=False),
    StructField("salary_currency", StringType(), nullable=False),
    StructField("salary_in_usd", DoubleType(), nullable=False),
    StructField("employee_residence", StringType(), nullable=False),
    StructField("remote_ratio", IntegerType(), nullable=False),
    StructField("company_location", StringType(), nullable=False),
    StructField("company_size", StringType(), nullable=False)
])

restart kernel without cleaning output and after restarting you need to initialize SparkSession, after initialize start execute only cells from cell with schema=
=StructType.... 
To restart kernel click Kernel, Restart.

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import max, avg, min, col, row_number, when, desc, monotonically_increasing_id, lit, rank, expr
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('Spark_App') \
                    .getOrCreate()

24/02/06 20:35:52 WARN Utils: Your hostname, user-thinkpad-e15-gen-2 resolves to a loopback address: 127.0.1.1; using 10.80.51.3 instead (on interface wlp0s20f3)
24/02/06 20:35:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/06 20:36:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("work_year", IntegerType(), nullable=False),
    StructField("experience_level", StringType(), nullable=False),
    StructField("employment_type", StringType(), nullable=False),
    StructField("job_title", StringType(), nullable=False),
    StructField("salary", IntegerType(), nullable=False),
    StructField("salary_currency", StringType(), nullable=False),
    StructField("salary_in_usd", DoubleType(), nullable=False),
    StructField("employee_residence", StringType(), nullable=False),
    StructField("remote_ratio", IntegerType(), nullable=False),
    StructField("company_location", StringType(), nullable=False),
    StructField("company_size", StringType(), nullable=False)
])

read ds_salaries with predefined schema and compare results from this cell and cell with inferSchema

In [4]:
df2 = spark.read.csv("ds_salaries.csv", header=True, schema=schema)

this happens because read operation is lazy(transformation), but if you use inferschema it start to be action that will create Spark Job, because Spark need to loop throw all file to check datatypes for all columns and this can harm to your code(if we compare to parquet, it will also go to check data types, but parquet provide meta information, so Spark will not go throw all file, he will just read meta information, but csv don"t provide such meta information). Also header make Spark to create one more Spark Job to check first line to define name of columns and remember to skip it when reading. Actual reading start when you will use first action. More about Spark Jobs you will see in next topic

write schema of scv on screen one more time and compare with previous

In [5]:
df2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: double (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



now continue to work with one of the dataframes that you create

print data in dataframe using df.show

In [6]:
df2.show()

+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
| id|work_year|experience_level|employment_type|           job_title|  salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist|   70000|            EUR|      79833.0|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|  260000|            USD|     260000.0|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer|   85000|            GBP|     109024.0|                GB|          50|     

print data in dataframe using display(df.toPandas())

In [7]:
display(df2.toPandas())

Unnamed: 0,id,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,0,2020,MI,FT,Data Scientist,70000,EUR,79833.0,DE,0,DE,L
1,1,2020,SE,FT,Machine Learning Scientist,260000,USD,260000.0,JP,0,JP,S
2,2,2020,SE,FT,Big Data Engineer,85000,GBP,109024.0,GB,50,GB,M
3,3,2020,MI,FT,Product Data Analyst,20000,USD,20000.0,HN,0,HN,S
4,4,2020,SE,FT,Machine Learning Engineer,150000,USD,150000.0,US,50,US,L
...,...,...,...,...,...,...,...,...,...,...,...,...
602,602,2022,SE,FT,Data Engineer,154000,USD,154000.0,US,100,US,M
603,603,2022,SE,FT,Data Engineer,126000,USD,126000.0,US,100,US,M
604,604,2022,SE,FT,Data Analyst,129000,USD,129000.0,US,0,US,M
605,605,2022,SE,FT,Data Analyst,150000,USD,150000.0,US,100,US,M


create df_job_title that consists from all job_titles without duplicates

In [8]:
df_job_title = df2.select("job_title").distinct()
df_job_title.show()
print("Distinct count:", df_job_title.count())

+--------------------+
|           job_title|
+--------------------+
|3D Computer Visio...|
|  Lead Data Engineer|
|Head of Machine L...|
|     Data Specialist|
| Data Analytics Lead|
|Machine Learning ...|
|   Lead Data Analyst|
|Data Engineering ...|
|Staff Data Scientist|
|       ETL Developer|
|Director of Data ...|
|Product Data Analyst|
|Principal Data Sc...|
|        AI Scientist|
|Director of Data ...|
|Machine Learning ...|
| Lead Data Scientist|
|Machine Learning ...|
|Data Science Engi...|
|Machine Learning ...|
+--------------------+
only showing top 20 rows

Distinct count: 50


In [9]:
# v2 - return full df and selects only first occurrence of each category
dropDisDF = df2.dropDuplicates(["job_title",])
display(dropDisDF.toPandas())
print(dropDisDF.count())

Unnamed: 0,id,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,77,2021,MI,PT,3D Computer Vision Researcher,400000,INR,5409.0,IN,50,IN,M
1,52,2020,EN,FT,AI Scientist,300000,DKK,45896.0,DK,50,DK,S
2,344,2022,EX,FT,Analytics Engineer,175000,USD,175000.0,US,100,US,M
3,82,2021,MI,FT,Applied Data Scientist,68000,CAD,54238.0,GB,50,CA,L
4,132,2021,MI,FT,Applied Machine Learning Scientist,38400,USD,38400.0,VN,100,US,M
5,23,2020,MI,FT,BI Data Analyst,98000,USD,98000.0,US,0,US,M
6,255,2021,SE,FT,Big Data Architect,125000,CAD,99703.0,CA,50,CA,M
7,2,2020,SE,FT,Big Data Engineer,85000,GBP,109024.0,GB,50,GB,M
8,8,2020,MI,FT,Business Data Analyst,135000,USD,135000.0,US,100,US,L
9,95,2021,MI,FT,Cloud Data Engineer,120000,SGD,89294.0,SG,50,SG,L


50


print all rows from df_job_titles without truncating jobs

In [10]:
df_job_title.show(truncate=False)

+----------------------------------------+
|job_title                               |
+----------------------------------------+
|3D Computer Vision Researcher           |
|Lead Data Engineer                      |
|Head of Machine Learning                |
|Data Specialist                         |
|Data Analytics Lead                     |
|Machine Learning Scientist              |
|Lead Data Analyst                       |
|Data Engineering Manager                |
|Staff Data Scientist                    |
|ETL Developer                           |
|Director of Data Engineering            |
|Product Data Analyst                    |
|Principal Data Scientist                |
|AI Scientist                            |
|Director of Data Science                |
|Machine Learning Engineer               |
|Lead Data Scientist                     |
|Machine Learning Infrastructure Engineer|
|Data Science Engineer                   |
|Machine Learning Manager                |
+----------

create  df_analytic that will consists from max, avg, min USD salaries for all job_titles using groupBy. name of fields is avg_salary, min_salary, max_salary

In [12]:
df_analytic = df2\
    .filter(col('salary_currency') == 'USD')\
    .groupby("job_title")\
    .agg(avg("salary_in_usd").alias("avg_salary"),
         min("salary_in_usd").alias("min_salary"),
         max("salary_in_usd").alias("max_salary")
    )

print all rows from df_analytic without truncating jobs

In [13]:
df_analytic.show(truncate=False)

+----------------------------------------+------------------+----------+----------+
|job_title                               |avg_salary        |min_salary|max_salary|
+----------------------------------------+------------------+----------+----------+
|Lead Data Engineer                      |154250.0          |56000.0   |276000.0  |
|Data Specialist                         |165000.0          |165000.0  |165000.0  |
|Data Analytics Lead                     |405000.0          |405000.0  |405000.0  |
|Machine Learning Scientist              |158412.5          |12000.0   |260000.0  |
|Lead Data Analyst                       |128500.0          |87000.0   |170000.0  |
|Data Engineering Manager                |159000.0          |150000.0  |174000.0  |
|Staff Data Scientist                    |105000.0          |105000.0  |105000.0  |
|Director of Data Engineering            |200000.0          |200000.0  |200000.0  |
|Product Data Analyst                    |20000.0           |20000.0   |2000

now you need to add in df_analytic column row_id, that will show order of all job_titles depending on avg salary. they should be descending

In [14]:
window = Window.partitionBy().orderBy(desc("avg_salary"))
df_analytic_avg_salary_rowid = df_analytic.withColumn("row_id", row_number().over(window))

# used empty Window.partitionBy() instead of partitionBy("job_title") because df_analytics has already grouped by job_title

print all data from df_analytic

In [129]:
# to avoid warnings below when show - use following code (add tmp column filled with 1):
# df_analytic_partition = df_analytic.withColumn("partition", lit(1))
# window = Window.partitionBy("partition").orderBy(desc("avg_salary"))
# df_analytic_avg_salary_rowid = df_analytic_partition.withColumn("row_id", row_number().over(window))

In [15]:
df_analytic_avg_salary_rowid.show(truncate=False)

24/02/06 20:46:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------------------------------------+------------------+----------+----------+------+
|job_title                               |avg_salary        |min_salary|max_salary|row_id|
+----------------------------------------+------------------+----------+----------+------+
|Data Analytics Lead                     |405000.0          |405000.0  |405000.0  |1     |
|Principal Data Engineer                 |328333.3333333333 |185000.0  |600000.0  |2     |
|Financial Data Analyst                  |275000.0          |100000.0  |450000.0  |3     |
|ML Engineer                             |263000.0          |256000.0  |270000.0  |4     |
|Principal Data Scientist                |255500.0          |151000.0  |416000.0  |5     |
|Director of Data Science                |247666.66666666666|168000.0  |325000.0  |6     |
|Applied Data Scientist                  |238000.0          |157000.0  |380000.0  |7     |
|Head of Data                            |221666.66666666666|200000.0  |235000.0  |8     |

In [16]:
# v2
df_analytic_avg_salary = df_analytic.orderBy(desc("avg_salary"))
df_analytic_avg_salary_rowid_2 = df_analytic_avg_salary.withColumn("row_id", monotonically_increasing_id()+1)
df_analytic_avg_salary_rowid_2.show(truncate=False)

+----------------------------------------+------------------+----------+----------+------+
|job_title                               |avg_salary        |min_salary|max_salary|row_id|
+----------------------------------------+------------------+----------+----------+------+
|Data Analytics Lead                     |405000.0          |405000.0  |405000.0  |1     |
|Principal Data Engineer                 |328333.3333333333 |185000.0  |600000.0  |2     |
|Financial Data Analyst                  |275000.0          |100000.0  |450000.0  |3     |
|ML Engineer                             |263000.0          |256000.0  |270000.0  |4     |
|Principal Data Scientist                |255500.0          |151000.0  |416000.0  |5     |
|Director of Data Science                |247666.66666666666|168000.0  |325000.0  |6     |
|Applied Data Scientist                  |238000.0          |157000.0  |380000.0  |7     |
|Head of Data                            |221666.66666666666|200000.0  |235000.0  |8     |

it isn"t beautifull, so we need to put now row_id on first place in df_analytic

In [17]:
df_analytic_avg_salary_rowid = df_analytic_avg_salary_rowid.select("row_id", "job_title", "avg_salary", "min_salary", "max_salary")

print df_analytic now

In [18]:
df_analytic_avg_salary_rowid.show(truncate=False)

+------+----------------------------------------+------------------+----------+----------+
|row_id|job_title                               |avg_salary        |min_salary|max_salary|
+------+----------------------------------------+------------------+----------+----------+
|1     |Data Analytics Lead                     |405000.0          |405000.0  |405000.0  |
|2     |Principal Data Engineer                 |328333.3333333333 |185000.0  |600000.0  |
|3     |Financial Data Analyst                  |275000.0          |100000.0  |450000.0  |
|4     |ML Engineer                             |263000.0          |256000.0  |270000.0  |
|5     |Principal Data Scientist                |255500.0          |151000.0  |416000.0  |
|6     |Director of Data Science                |247666.66666666666|168000.0  |325000.0  |
|7     |Applied Data Scientist                  |238000.0          |157000.0  |380000.0  |
|8     |Head of Data                            |221666.66666666666|200000.0  |235000.0  |

24/02/06 20:46:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/06 20:46:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


here you need to create df_exp_lvl with the biggest usd_salary(biggest_salary) for each experience_level(you need to save all fields like in entire dataframe)

In [19]:
# v1 - regardless of salary_currency: (ex. USD, INR)
df_exp_lvl_group = df2.groupby("experience_level").agg(max("salary_in_usd").alias("salary"))
df_exp_lvl = df_exp_lvl_group.join(df2, ["experience_level", "salary"], 'left')
df_exp_lvl = df_exp_lvl.withColumnRenamed("salary", "biggest_salary")
display(df_exp_lvl.toPandas())

Unnamed: 0,experience_level,biggest_salary,id,work_year,employment_type,job_title,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,EX,600000.0,252,2021,FT,Principal Data Engineer,USD,600000.0,US,100,US,L
1,MI,450000.0,97,2021,FT,Financial Data Analyst,USD,450000.0,US,100,US,L
2,MI,450000.0,33,2020,FT,Research Scientist,USD,450000.0,US,0,US,M
3,MI,450000.0,21,2020,FT,Product Data Analyst,INR,6072.0,IN,100,IN,L
4,EN,250000.0,37,2020,FT,Machine Learning Engineer,USD,250000.0,US,50,US,L
5,SE,412000.0,63,2020,FT,Data Scientist,USD,412000.0,US,100,US,L


In [20]:
# v2 - only USD currency
windowExp = Window.partitionBy("experience_level").orderBy(col('salary_in_usd').desc())
df_exp_level2 = df2.withColumn("salary_rank_per_level", rank().over(windowExp)).filter(col("salary_rank_per_level") == 1)
df_exp_level2 = df_exp_level2.withColumnRenamed("salary", "biggest_salary")
display(df_exp_level2.toPandas())

Unnamed: 0,id,work_year,experience_level,employment_type,job_title,biggest_salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size,salary_rank_per_level
0,37,2020,EN,FT,Machine Learning Engineer,250000,USD,250000.0,US,50,US,L,1
1,252,2021,EX,FT,Principal Data Engineer,600000,USD,600000.0,US,100,US,L,1
2,33,2020,MI,FT,Research Scientist,450000,USD,450000.0,US,0,US,M,1
3,97,2021,MI,FT,Financial Data Analyst,450000,USD,450000.0,US,100,US,L,1
4,63,2020,SE,FT,Data Scientist,412000,USD,412000.0,US,100,US,L,1


create df_best that consists from rows where salary of guy same as biggest salary for other people in his exp_lvl and choose only columns: id, experience_level, biggest_salary, employee_residence

In [21]:
df_best = df_exp_lvl.join(df2, "experience_level", 'inner').where(expr("salary = biggest_salary"))
df_best = df_best.select(df_exp_lvl.id, df_exp_lvl.experience_level, df_exp_lvl.biggest_salary, df_exp_lvl.employee_residence)
display(df_best.toPandas())

Unnamed: 0,id,experience_level,biggest_salary,employee_residence
0,252,EX,600000.0,US
1,97,MI,450000.0,US
2,97,MI,450000.0,US
3,97,MI,450000.0,US
4,33,MI,450000.0,US
5,33,MI,450000.0,US
6,33,MI,450000.0,US
7,21,MI,450000.0,IN
8,21,MI,450000.0,IN
9,21,MI,450000.0,IN


In [22]:
# v2
windowBest = Window.partitionBy('experience_level').orderBy(col('salary_in_USD').desc())
df_best2 = df2.withColumn('salary_rank_per_level', rank().over(windowBest)).filter(col('salary_rank_per_level') == 1)
df_best2 = df_best2.select("id", "experience_level", "salary", "employee_residence")
display(df_best2.toPandas())

Unnamed: 0,id,experience_level,salary,employee_residence
0,37,EN,250000,US
1,252,EX,600000,US
2,33,MI,450000,US
3,97,MI,450000,US
4,63,SE,412000,US


drop duplicates if exist by experience_level

In [23]:
df_best_drop_duplicates = df_best.dropDuplicates(["experience_level"])
df_best_drop_duplicates.show()
# dropped id=33 - but values the same!

+---+----------------+--------------+------------------+
| id|experience_level|biggest_salary|employee_residence|
+---+----------------+--------------+------------------+
| 37|              EN|      250000.0|                US|
|252|              EX|      600000.0|                US|
| 97|              MI|      450000.0|                US|
| 63|              SE|      412000.0|                US|
+---+----------------+--------------+------------------+



In [24]:
df_best_drop_duplicates = df_best2.dropDuplicates(["experience_level"])
df_best_drop_duplicates.show()
# dropped id=97 - but values the same!

+---+----------------+------+------------------+
| id|experience_level|salary|employee_residence|
+---+----------------+------+------------------+
| 37|              EN|250000|                US|
|252|              EX|600000|                US|
| 33|              MI|450000|                US|
| 63|              SE|412000|                US|
+---+----------------+------+------------------+



create df_new_best from df_best without id, and make the next: when exp_level = exp_level we want middle, when SE we want senior, else Null

In [25]:
df_new_best = df_best_drop_duplicates.drop("id")
# v2
# df_new_best = df_best.select([col for col in df_best.columns if col != "id"])

In [26]:
df_new_best_ = df_new_best.withColumn("new_experience_level",    # to replace current col use "experience_level"
                                      when(df_new_best.experience_level == "MI", "middle")
                                     .when(df_new_best.experience_level == "SE", "senior")
                                     .otherwise(None))

print df_new_best

In [27]:
df_new_best_.show()

+----------------+------+------------------+--------------------+
|experience_level|salary|employee_residence|new_experience_level|
+----------------+------+------------------+--------------------+
|              EN|250000|                US|                NULL|
|              EX|600000|                US|                NULL|
|              MI|450000|                US|              middle|
|              SE|412000|                US|              senior|
+----------------+------+------------------+--------------------+



write df_new_best like 1.csv and load then it to df_final

In [31]:
df_new_best_.write.csv("1.csv", header=True, mode='overwrite')  # folder is created
df_new_best_.toPandas().to_csv("2.csv", index=False)     # single file is created
df_final = spark.read.csv("1.csv", header=True)

print df_final

In [32]:
df_final.show()

+----------------+------+------------------+--------------------+
|experience_level|salary|employee_residence|new_experience_level|
+----------------+------+------------------+--------------------+
|              EN|250000|                US|                NULL|
|              EX|600000|                US|                NULL|
|              MI|450000|                US|              middle|
|              SE|412000|                US|              senior|
+----------------+------+------------------+--------------------+



filter df_final to delete experience_level where it Null, then join this table by biggest_salary(salary_in_usd) and employee_residence with entire df

In [33]:
df_final_filtered = df_final.filter(df_final.new_experience_level.isNotNull())
# df_final_filtered = df_final.na.drop()
df_joined_final = df_final_filtered.join(df2,
            (df2.salary_in_usd == df_final_filtered.salary) & (df2.employee_residence == df_final_filtered.employee_residence),
            "inner").dropDuplicates(["experience_level"])
display(df_joined_final.toPandas())

24/02/06 21:53:27 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,experience_level,salary,employee_residence,new_experience_level,id,work_year,experience_level.1,employment_type,job_title,salary.1,salary_currency,salary_in_usd,employee_residence.1,remote_ratio,company_location,company_size
0,MI,450000,US,middle,33,2020,MI,FT,Research Scientist,450000,USD,450000.0,US,0,US,M
1,SE,412000,US,senior,63,2020,SE,FT,Data Scientist,412000,USD,412000.0,US,100,US,L


last task is to save in variable and then print this variable of the biggest salary_in_usd from df_final

In [39]:
max_salary = df_joined_final.select(col('salary_in_usd')).collect()[0][0]
print("The biggest salary_in_usd from df_final is:", max_salary)

The biggest salary_in_usd from df_final is: 450000.0


It is the end of PySpark basics. In other lessons you will learn optimizations technics and how to make distributed system.