import libraries(I provide all libs that I need when make this tasks, if you need some external import them here)

In [2]:
import pyspark
import os
import sys
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import max, avg, min, round, row_number, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import when
import findspark

create local SparkSession

In [7]:
spark = SparkSession.builder \
.appName("DS_Salaries_Analysis") \
.master("local[4]") \
.config("spark.driver.memory", "2g") \
.config("spark.executor.memory", "2g") \
.config("spark.sql.shuffle.partitions", "8") \
.getOrCreate()

print("Spark version:", spark.version) 
spark.range(5).show()



Spark version: 3.1.3
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



read csv with inferschema

read csv one more time with the same code and you will see that it almostly don't take time, because info already in SparkSession and it will not read nothing
from this file

In [9]:
df = spark.read.csv(
    "ds_salaries.csv",
    header=True,
    inferSchema=True
)

write schema of scv on screen

In [10]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



create schema of this scv

In [11]:
dataset_schema = StructType([
    StructField("id", IntegerType(), nullable=True),
    StructField("work_year", IntegerType(), nullable=False),
    StructField("experience_level", StringType(), nullable=True),
    StructField("employment_type", StringType(), nullable=True),
    StructField("job_title", StringType(), nullable=True),
    StructField("salary", IntegerType(), nullable=False),
    StructField("salary_currency", StringType(), nullable=False),  
    StructField("salary_in_usd", IntegerType(), nullable=False),
    StructField("employee_residence", StringType(), nullable=True),
    StructField("remote_ratio", IntegerType(), nullable=False),
    StructField("company_location", StringType(), nullable=True),
    StructField("company_size", StringType(), nullable=True)
])
df = spark.read.csv(
    "ds_salaries.csv",
    header = True,
    inferSchema=True
)
df.show(3)

+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|_c0|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist| 70000|            EUR|        79833|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|260000|            USD|       260000|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer| 85000|            GBP|       109024|                GB|          50|              GB|

restart kernel without cleaning output and after restarting you need to initialize SparkSession, after initialize start execute only cells from cell with schema=
=StructType.... 
To restart kernel click Kernel, Restart.

read ds_salaries with predefined schema and compare results from this cell and cell with inferSchema

In [28]:
df = spark.read.csv(
    "ds_salaries.csv",
    header = True,
    schema = dataset_schema
)

this happens because read operation is lazy(transformation), but if you use inferschema it start to be action that will create Spark Job, because Spark need to loop throw all file to check datatypes for all columns and this can harm to your code(if we compare to parquet, it will also go to check data types, but parquet provide meta information, so Spark will not go throw all file, he will just read meta information, but csv don't provide such meta information). Also header make Spark to create one more Spark Job to check first line
to define name of columns and remember to skeep it when reading. Actual reading start when you will use first action. More about Spark Jobs you will see in next topic

write schema of scv on screen one more time and compare with previous

In [29]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



now continue to work with one of the dataframes that you create

print data in dataframe using df.show

In [30]:
df.show(3)

+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
| id|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist| 70000|            EUR|        79833|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|260000|            USD|       260000|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer| 85000|            GBP|       109024|                GB|          50|              GB|

print data in dataframe using display(df.toPandas())

In [25]:
df = spark.read.csv(
    "ds_salaries.csv",
    header = True,
    schema = dataset_schema
)

pandas_df = df.limit(3).toPandas()
print(pandas_df)

   id  work_year experience_level employment_type                   job_title  \
0   0       2020               MI              FT              Data Scientist   
1   1       2020               SE              FT  Machine Learning Scientist   
2   2       2020               SE              FT           Big Data Engineer   

   salary salary_currency  salary_in_usd employee_residence  remote_ratio  \
0   70000             EUR          79833                 DE             0   
1  260000             USD         260000                 JP             0   
2   85000             GBP         109024                 GB            50   

  company_location company_size  
0               DE            L  
1               JP            S  
2               GB            M  


create df_job_title that consists from all job_titles without duplicates

In [23]:
df_job_title = df.select(col("job_title")).distinct()


print all rows from df_job_titles without truncating jobs

In [24]:
df_job_title.show(truncate=False)

+-----------------------------+
|job_title                    |
+-----------------------------+
|Data Analyst                 |
|3D Computer Vision Researcher|
|Cloud Data Engineer          |
|Financial Data Analyst       |
|Data Analytics Manager       |
|Data Architect               |
|Staff Data Scientist         |
|Machine Learning Engineer    |
|Lead Data Engineer           |
|BI Data Analyst              |
|AI Scientist                 |
|Principal Data Scientist     |
|Marketing Data Analyst       |
|Head of Data Science         |
|ETL Developer                |
|NLP Engineer                 |
|Director of Data Science     |
|ML Engineer                  |
|Director of Data Engineering |
|Head of Machine Learning     |
+-----------------------------+
only showing top 20 rows



create  df_analytic that will consists from max, avg, min USD salaries for all job_titles using groupBy. name of fields is avg_salary, min_salary, max_salary

In [17]:
df_analytic = df.groupBy("job_title").agg( 
        max("salary_in_usd").alias("max_salary"), 
        round(avg("salary_in_usd"), 0).alias("avg_salary"),
        min("salary_in_usd").alias("min_salary")
)


print all rows from df_analytic without trancating jobs

In [18]:
df_analytic.show(truncate=False)

+-----------------------------+----------+----------+----------+
|job_title                    |max_salary|avg_salary|min_salary|
+-----------------------------+----------+----------+----------+
|Data Analyst                 |200000    |92893.0   |6072      |
|3D Computer Vision Researcher|5409      |5409.0    |5409      |
|Cloud Data Engineer          |160000    |124647.0  |89294     |
|Financial Data Analyst       |450000    |275000.0  |100000    |
|Data Analytics Manager       |150260    |127134.0  |105400    |
|Data Architect               |266400    |177874.0  |90700     |
|Staff Data Scientist         |105000    |105000.0  |105000    |
|Machine Learning Engineer    |250000    |104880.0  |20000     |
|Lead Data Engineer           |276000    |139725.0  |56000     |
|BI Data Analyst              |150000    |74755.0   |9272      |
|AI Scientist                 |200000    |66136.0   |12000     |
|Principal Data Scientist     |416000    |215242.0  |148261    |
|Marketing Data Analyst  

now you need to add in df_analytic column row_id, that will show order of all job_titles depending on avg salary. they should be descending

In [19]:
window_f = Window.orderBy(desc("avg_salary"))
df_analytic = df_analytic.withColumn(
    "row_id",
    row_number().over(window_f)
)


print all data from df_analytic

In [20]:
df_analytic.orderBy("row_id").show(truncate=False)

+----------------------------------+----------+----------+----------+------+
|job_title                         |max_salary|avg_salary|min_salary|row_id|
+----------------------------------+----------+----------+----------+------+
|Data Analytics Lead               |405000    |405000.0  |405000    |1     |
|Principal Data Engineer           |600000    |328333.0  |185000    |2     |
|Financial Data Analyst            |450000    |275000.0  |100000    |3     |
|Principal Data Scientist          |416000    |215242.0  |148261    |4     |
|Director of Data Science          |325000    |195074.0  |130026    |5     |
|Data Architect                    |266400    |177874.0  |90700     |6     |
|Applied Data Scientist            |380000    |175655.0  |54238     |7     |
|Analytics Engineer                |205300    |175000.0  |135000    |8     |
|Data Specialist                   |165000    |165000.0  |165000    |9     |
|Head of Data                      |235000    |160163.0  |32974     |10    |

it isn't beautifull, so we need to put now row_id on first place in df_analytic

In [21]:
new_columns = ["row_id"] + [c for c in df_analytic.columns if c != "row_id"]
df_analytic = df_analytic.select(new_columns)


print df_analytic now

In [22]:
df_analytic.show(truncate=False)

+------+----------------------------------+----------+----------+----------+
|row_id|job_title                         |max_salary|avg_salary|min_salary|
+------+----------------------------------+----------+----------+----------+
|1     |Data Analytics Lead               |405000    |405000.0  |405000    |
|2     |Principal Data Engineer           |600000    |328333.0  |185000    |
|3     |Financial Data Analyst            |450000    |275000.0  |100000    |
|4     |Principal Data Scientist          |416000    |215242.0  |148261    |
|5     |Director of Data Science          |325000    |195074.0  |130026    |
|6     |Data Architect                    |266400    |177874.0  |90700     |
|7     |Applied Data Scientist            |380000    |175655.0  |54238     |
|8     |Analytics Engineer                |205300    |175000.0  |135000    |
|9     |Data Specialist                   |165000    |165000.0  |165000    |
|10    |Head of Data                      |235000    |160163.0  |32974     |

here you need to create df_exp_lvl with the biggest usd_salary(biggest_salary) for each experience_level(you need to save all fields like in entire dataframe)

In [33]:
max_sal = df.groupBy("experience_level").agg(max("salary_in_usd").alias("max_salary"))
df_exp_lvl = df.join(max_sal,
                     (df.experience_level == max_sal.experience_level) &
                     (df.salary_in_usd ==  max_sal.max_salary),
                     "inner")

print here df_exp_lvl

In [36]:
pandas_df = df_exp_lvl.limit(10).toPandas()
print(pandas_df)

    id  work_year experience_level employment_type                  job_title  \
0   33       2020               MI              FT         Research Scientist   
1   37       2020               EN              FT  Machine Learning Engineer   
2   63       2020               SE              FT             Data Scientist   
3   97       2021               MI              FT     Financial Data Analyst   
4  252       2021               EX              FT    Principal Data Engineer   

   salary salary_currency  salary_in_usd employee_residence  remote_ratio  \
0  450000             USD         450000                 US             0   
1  250000             USD         250000                 US            50   
2  412000             USD         412000                 US           100   
3  450000             USD         450000                 US           100   
4  600000             USD         600000                 US           100   

  company_location company_size experience_level  

create df_best that consists from rows where salary of guy same as biggest salary for other people in his exp_lvl and choose only columns: id, experience_level, biggest_salary, employee_residence

In [32]:
df_guy_sal = df.groupBy("experience_level").agg(max("salary_in_usd").alias("biggest_salary"))
df_best = df.join(df_guy_sal,
                  ["experience_level"],
                  "inner").filter(df.salary_in_usd == col("biggest_salary")).select(col("id"),
                                                                                    col("experience_level"),
                                                                                    col("biggest_salary"),
                                                                                    col("employee_residence")
                                                                                       )

print df_best

In [33]:
df_best.show(truncate=False)

+---+----------------+--------------+------------------+
|id |experience_level|biggest_salary|employee_residence|
+---+----------------+--------------+------------------+
|33 |MI              |450000        |US                |
|37 |EN              |250000        |US                |
|63 |SE              |412000        |US                |
|97 |MI              |450000        |US                |
|252|EX              |600000        |US                |
+---+----------------+--------------+------------------+



drop duplicates if exist by experience_level

In [34]:
df_best = df_best.dropDuplicates(["experience_level"])

print df_best

In [35]:
df_best.show(truncate=False)

+---+----------------+--------------+------------------+
|id |experience_level|biggest_salary|employee_residence|
+---+----------------+--------------+------------------+
|252|EX              |600000        |US                |
|37 |EN              |250000        |US                |
|33 |MI              |450000        |US                |
|63 |SE              |412000        |US                |
+---+----------------+--------------+------------------+



create df_new_best from df_best without id, and make the next: when exp_level = MI we want middle, when SE we want senior, else Null

In [40]:
df_new_best = df_best.select(when(col("experience_level") == "MI", "middle")
                             .when(col("experience_level") == "SE", "senior")
                             .otherwise(None).alias("experience_level"),
                             "biggest_salary",
                             "employee_residence")

print df_new_best

In [41]:
df_new_best.show()

+----------------+--------------+------------------+
|experience_level|biggest_salary|employee_residence|
+----------------+--------------+------------------+
|          middle|        450000|                US|
|            null|        250000|                US|
|          senior|        412000|                US|
|          middle|        450000|                US|
|            null|        600000|                US|
+----------------+--------------+------------------+



write df_new_best like 1.csv and load then it to df_final

In [42]:
df_new_best.write.csv("1.csv", header=True, mode="overwrite")

print df_final

In [44]:
df_final = spark.read.csv("1.csv", header=True, inferSchema=True)
df_final.printSchema()

root
 |-- experience_level: string (nullable = true)
 |-- biggest_salary: integer (nullable = true)
 |-- employee_residence: string (nullable = true)



filter df_final to delete experience_level where it Null, then join this table by biggest_salary(salary_in_usd) and employee_residence with entire df

In [45]:
df_final_cleaned = df_final.filter(df_final.experience_level.isNotNull())

print df_final

In [47]:
df_joined = df_final_cleaned.join(df,
    (df_final_cleaned.biggest_salary == df.salary_in_usd) &
    (df_final_cleaned.employee_residence == df.employee_residence),
    how="inner"
)

last task is to save in variable and then print this variable of the biggest salary_in_usd from df_final

In [48]:
df_joined.show()

+----------------+--------------+------------------+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|experience_level|biggest_salary|employee_residence| id|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+----------------+--------------+------------------+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|          middle|        450000|                US| 33|     2020|              MI|             FT|  Research Scientist|450000|            USD|       450000|                US|           0|              US|           M|
|          middle|        450000|                US| 33|     2020|              MI|             FT|  Research Scientist|

It is the end of PySpark basics. In other lessons you will learn optimizations technics and how to make distributed system