import libraries(I provide all libs that I need when make this tasks, if you need some external import them here)

In [35]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import max, avg, min
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import when

import findspark
findspark.init()

create local SparkSession

In [36]:
file_path = 'dataset/ds_salaries.csv'

In [37]:
%%time
spark = SparkSession.builder.appName('myApp').master('local[*]').getOrCreate()

CPU times: user 796 µs, sys: 449 µs, total: 1.25 ms
Wall time: 1.67 ms


read csv with inferschema

In [38]:
%%time
df = spark.read.options(header=True, inferSchema=True).csv(file_path)

CPU times: user 1.69 ms, sys: 408 µs, total: 2.1 ms
Wall time: 115 ms


read csv one more time with the same code and you will see that it almostly don't take time, because info already in SparkSession and it will not read nothing
from this file

In [39]:
%%time
df_schema = spark.read.options(header=True, inferSchema=True).csv(file_path)

CPU times: user 3.26 ms, sys: 0 ns, total: 3.26 ms
Wall time: 148 ms


write schema of scv on screen

In [40]:
%%time
df_schema.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)

CPU times: user 0 ns, sys: 944 µs, total: 944 µs
Wall time: 705 µs


create schema of this scv

In [41]:
my_schema = StructType([
        StructField('id', IntegerType(), True),
        StructField('work_year', IntegerType(), True),
        StructField('experience_lev', StringType(), True),
        StructField('employee_type', StringType(), True),
        StructField('job_title', StringType(), True),
        StructField('salary', IntegerType(), True),
        StructField('salary_currency', StringType(), True),
        StructField('salary_in_usd', IntegerType(), True),
        StructField('employee_residence', StringType(), True),
        StructField('remote_ratio', IntegerType(), True),
        StructField('company_location', StringType(), True),
        StructField('company_size', StringType(), True)
     ])

restart kernel without cleaning output and after restarting you need to initialize SparkSession, after initialize start execute only cells from cell with schema=
=StructType.... 
To restart kernel click Kernel, Restart.

read ds_salaries with predefined schema and compare results from this cell and cell with inferSchema

In [42]:
%%time
new_df = spark.read.options(header=True).schema(my_schema).csv(file_path)

CPU times: user 1.82 ms, sys: 1.01 ms, total: 2.83 ms
Wall time: 14.9 ms


this happens because read operation is lazy(transformation), but if you use inferschema it start to be action that will create Spark Job, because Spark need to loop throw all file to check datatypes for all columns and this can harm to your code(if we compare to parquet, it will also go to check data types, but parquet provide meta information, so Spark will not go throw all file, he will just read meta information, but csv don't provide such meta information). Also header make Spark to create one more Spark Job to check first line
to define name of columns and remember to skeep it when reading. Actual reading start when you will use first action. More about Spark Jobs you will see in next topic

write schema of scv on screen one more time and compare with previous

In [43]:
%%time
new_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_lev: string (nullable = true)
 |-- employee_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)

CPU times: user 1.18 ms, sys: 659 µs, total: 1.84 ms
Wall time: 1.86 ms


now continue to work with one of the dataframes that you create

print data in dataframe using df.show

In [44]:
%%time
df = df.withColumnRenamed('_c0', 'id')
df.show()

+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
| id|work_year|experience_level|employment_type|           job_title|  salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+--------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist|   70000|            EUR|        79833|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|  260000|            USD|       260000|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer|   85000|            GBP|       109024|                GB|          50|     

23/04/25 00:50:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
 Schema: _c0, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
Expected: _c0 but found: 
CSV file: file:///home/tonipaltus/Innowise/spark_demo/spark_demo_course/1_PySpark_Basics/dataset/ds_salaries.csv


print data in dataframe using display(df.toPandas())

In [45]:
%%time
display(df.toPandas())

23/04/25 00:50:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
 Schema: _c0, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
Expected: _c0 but found: 
CSV file: file:///home/tonipaltus/Innowise/spark_demo/spark_demo_course/1_PySpark_Basics/dataset/ds_salaries.csv


Unnamed: 0,id,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,0,2020,MI,FT,Data Scientist,70000,EUR,79833,DE,0,DE,L
1,1,2020,SE,FT,Machine Learning Scientist,260000,USD,260000,JP,0,JP,S
2,2,2020,SE,FT,Big Data Engineer,85000,GBP,109024,GB,50,GB,M
3,3,2020,MI,FT,Product Data Analyst,20000,USD,20000,HN,0,HN,S
4,4,2020,SE,FT,Machine Learning Engineer,150000,USD,150000,US,50,US,L
...,...,...,...,...,...,...,...,...,...,...,...,...
602,602,2022,SE,FT,Data Engineer,154000,USD,154000,US,100,US,M
603,603,2022,SE,FT,Data Engineer,126000,USD,126000,US,100,US,M
604,604,2022,SE,FT,Data Analyst,129000,USD,129000,US,0,US,M
605,605,2022,SE,FT,Data Analyst,150000,USD,150000,US,100,US,M


CPU times: user 27 ms, sys: 1.28 ms, total: 28.2 ms
Wall time: 73.1 ms


create df_job_title that consists from all job_titles without duplicates

In [46]:
%%time
df_job_title = df.select('job_title').dropDuplicates(['job_title']).toPandas()

CPU times: user 4.57 ms, sys: 2.12 ms, total: 6.69 ms
Wall time: 117 ms


print all rows from df_job_titles without truncating jobs

In [47]:
%%time
df_job_title

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 4.53 µs


Unnamed: 0,job_title
0,3D Computer Vision Researcher
1,Lead Data Engineer
2,Head of Machine Learning
3,Data Specialist
4,Data Analytics Lead
5,Machine Learning Scientist
6,Lead Data Analyst
7,Data Engineering Manager
8,Staff Data Scientist
9,ETL Developer


create  df_analytic that will consists from max, avg, min USD salaries for all job_titles using groupBy. name of fields is avg_salary, min_salary, max_salary

In [48]:
%%time
df_analytic = df.filter(col('salary_currency') == 'USD').groupby('job_title') \
        .agg(
        min('salary').alias('min_salary'),
        max('salary').alias('max_salary'),
        avg('salary').alias('avg_salary')
)

CPU times: user 5.28 ms, sys: 0 ns, total: 5.28 ms
Wall time: 21.1 ms


print all rows from df_analytic without trancating jobs

In [49]:
%%time
df_analytic.toPandas()

CPU times: user 6.6 ms, sys: 161 µs, total: 6.77 ms
Wall time: 91.9 ms


Unnamed: 0,job_title,min_salary,max_salary,avg_salary
0,Lead Data Engineer,56000,276000,154250.0
1,Data Specialist,165000,165000,165000.0
2,Data Analytics Lead,405000,405000,405000.0
3,Machine Learning Scientist,12000,260000,158412.5
4,Lead Data Analyst,87000,170000,128500.0
5,Data Engineering Manager,150000,174000,159000.0
6,Staff Data Scientist,105000,105000,105000.0
7,Director of Data Engineering,200000,200000,200000.0
8,Product Data Analyst,20000,20000,20000.0
9,Principal Data Scientist,151000,416000,255500.0


now you need to add in df_analytic column row_id, that will show order of all job_titles depending on avg salary. they should be descending

In [50]:
%%time
windowSpec = Window.partitionBy().orderBy(col('avg_salary').desc())
df_analytic_sort = df_analytic.withColumn("row_id", row_number().over(windowSpec))

CPU times: user 1.64 ms, sys: 877 µs, total: 2.52 ms
Wall time: 12.4 ms


print all data from df_analytic

In [51]:
%%time
df_analytic_sort.toPandas()

CPU times: user 8.68 ms, sys: 0 ns, total: 8.68 ms
Wall time: 157 ms


23/04/25 00:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 0

Unnamed: 0,job_title,min_salary,max_salary,avg_salary,row_id
0,Data Analytics Lead,405000,405000,405000.0,1
1,Principal Data Engineer,185000,600000,328333.333333,2
2,Financial Data Analyst,100000,450000,275000.0,3
3,ML Engineer,256000,270000,263000.0,4
4,Principal Data Scientist,151000,416000,255500.0,5
5,Director of Data Science,168000,325000,247666.666667,6
6,Applied Data Scientist,157000,380000,238000.0,7
7,Head of Data,200000,235000,221666.666667,8
8,Director of Data Engineering,200000,200000,200000.0,9
9,Machine Learning Infrastructure Engineer,195000,195000,195000.0,10


it isn't beautifull, so we need to put now row_id on first place in df_analytic

In [52]:
%%time
df_analytic = df_analytic_sort.select('row_id', 'job_title', 'max_salary', 'avg_salary', 'min_salary')

CPU times: user 2.62 ms, sys: 54 µs, total: 2.67 ms
Wall time: 12.1 ms


print df_analytic now

In [53]:
%%time
df_analytic.toPandas()

23/04/25 00:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


CPU times: user 7.53 ms, sys: 0 ns, total: 7.53 ms
Wall time: 184 ms


23/04/25 00:50:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/25 00:50:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,row_id,job_title,max_salary,avg_salary,min_salary
0,1,Data Analytics Lead,405000,405000.0,405000
1,2,Principal Data Engineer,600000,328333.333333,185000
2,3,Financial Data Analyst,450000,275000.0,100000
3,4,ML Engineer,270000,263000.0,256000
4,5,Principal Data Scientist,416000,255500.0,151000
5,6,Director of Data Science,325000,247666.666667,168000
6,7,Applied Data Scientist,380000,238000.0,157000
7,8,Head of Data,235000,221666.666667,200000
8,9,Director of Data Engineering,200000,200000.0,200000
9,10,Machine Learning Infrastructure Engineer,195000,195000.0,195000


here you need to create df_exp_lvl with the biggest usd_salary(biggest_salary) for each experience_level(you need to save all fields like in entire dataframe)

In [54]:
%%time
group_data = df.groupBy('experience_level').max('salary_in_usd').show()

+----------------+------------------+
|experience_level|max(salary_in_usd)|
+----------------+------------------+
|              EX|            600000|
|              MI|            450000|
|              EN|            250000|
|              SE|            412000|
+----------------+------------------+

CPU times: user 4.23 ms, sys: 59 µs, total: 4.29 ms
Wall time: 89.1 ms


In [55]:
%%time
# Не знаю как сделать красивее
df_exp_lvl = df.filter(
        (df['experience_level'] == 'EX') & (df['salary_in_usd'] == 600000) |
        (df['experience_level'] == 'MI') & (df['salary_in_usd'] == 450000) |
        (df['experience_level'] == 'EN') & (df['salary_in_usd'] == 250000) |
        (df['experience_level'] == 'SE') & (df['salary_in_usd'] == 412000)
        )

CPU times: user 2.82 ms, sys: 57 µs, total: 2.88 ms
Wall time: 5.69 ms


print here df_exp_lvl

In [56]:
df_exp_lvl.show(truncate=False)

+---+---------+----------------+---------------+-------------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|id |work_year|experience_level|employment_type|job_title                |salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+-------------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|33 |2020     |MI              |FT             |Research Scientist       |450000|USD            |450000       |US                |0           |US              |M           |
|37 |2020     |EN              |FT             |Machine Learning Engineer|250000|USD            |250000       |US                |50          |US              |L           |
|63 |2020     |SE              |FT             |Data Scientist           |412000|USD            |412000       |US                |

23/04/25 00:50:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
 Schema: _c0, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
Expected: _c0 but found: 
CSV file: file:///home/tonipaltus/Innowise/spark_demo/spark_demo_course/1_PySpark_Basics/dataset/ds_salaries.csv


Или можно так

In [57]:
windowSpec = Window.partitionBy('experience_level').orderBy(col('salary_in_usd').desc())
df_exp_lvl = df.withColumn('salary_order', row_number().over(windowSpec)).filter(col('salary_order') == 1).drop('salary_order')

In [58]:
df_exp_lvl.toPandas()

23/04/25 00:50:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
 Schema: _c0, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
Expected: _c0 but found: 
CSV file: file:///home/tonipaltus/Innowise/spark_demo/spark_demo_course/1_PySpark_Basics/dataset/ds_salaries.csv


Unnamed: 0,id,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,37,2020,EN,FT,Machine Learning Engineer,250000,USD,250000,US,50,US,L
1,252,2021,EX,FT,Principal Data Engineer,600000,USD,600000,US,100,US,L
2,33,2020,MI,FT,Research Scientist,450000,USD,450000,US,0,US,M
3,63,2020,SE,FT,Data Scientist,412000,USD,412000,US,100,US,L


create df_best that consists from rows where salary of guy same as biggest salary for other people in his exp_lvl and choose only columns: id, experience_level, biggest_salary, employee_residence

In [59]:
%%time
windowSpec = Window.partitionBy('experience_level').orderBy(col('salary').desc())
df_best = df.withColumn('salary_order', row_number().over(windowSpec)).filter(col('salary_order') == 1).drop('salary_order').select('id', 'experience_level', 'salary', 'employee_residence')

CPU times: user 5.79 ms, sys: 0 ns, total: 5.79 ms
Wall time: 30.1 ms


print df_best

In [60]:
%%time
df_best.toPandas()

23/04/25 00:50:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , experience_level, salary, employee_residence
 Schema: _c0, experience_level, salary, employee_residence
Expected: _c0 but found: 
CSV file: file:///home/tonipaltus/Innowise/spark_demo/spark_demo_course/1_PySpark_Basics/dataset/ds_salaries.csv


CPU times: user 6.45 ms, sys: 0 ns, total: 6.45 ms
Wall time: 83.1 ms


Unnamed: 0,id,experience_level,salary,employee_residence
0,16,EN,4450000,JP
1,384,EX,6000000,IN
2,177,MI,30400000,CL
3,285,SE,7000000,IN


drop duplicates if exist by experience_level

In [61]:
%%time
df_best = df_best.dropDuplicates(['experience_level'])

CPU times: user 1.32 ms, sys: 357 µs, total: 1.67 ms
Wall time: 2.87 ms


print df_best

In [62]:
%%time
df_best.toPandas()

CPU times: user 6.68 ms, sys: 1.05 ms, total: 7.73 ms
Wall time: 124 ms


23/04/25 00:50:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , experience_level, salary, employee_residence
 Schema: _c0, experience_level, salary, employee_residence
Expected: _c0 but found: 
CSV file: file:///home/tonipaltus/Innowise/spark_demo/spark_demo_course/1_PySpark_Basics/dataset/ds_salaries.csv


Unnamed: 0,id,experience_level,salary,employee_residence
0,16,EN,4450000,JP
1,384,EX,6000000,IN
2,177,MI,30400000,CL
3,285,SE,7000000,IN


create df_new_best from df_best without id, and make the next: when exp_level = MI we want middle, when SE we want senior, else Null

In [70]:
%%time
df_new_best = df_best.withColumn('degree',
                                 when(df.experience_level == 'MI', 'middle').
                                 when(df.experience_level == 'SE', 'senior')
                                 ).drop('id')

CPU times: user 1.6 ms, sys: 779 µs, total: 2.38 ms
Wall time: 7.12 ms


print df_new_best

In [71]:
%%time
df_new_best.toPandas()

CPU times: user 7.4 ms, sys: 0 ns, total: 7.4 ms
Wall time: 130 ms


Unnamed: 0,experience_level,salary,employee_residence,degree
0,EN,4450000,JP,
1,EX,6000000,IN,
2,MI,30400000,CL,middle
3,SE,7000000,IN,senior


write df_new_best like 1.csv and load then it to df_final

In [72]:
%%time
df_new_best.write.options(header=True).format('csv').mode('overwrite').save('dataset/1.csv')
df_final = spark.read.options(header=True).format('csv').load('dataset/1.csv')

CPU times: user 4.69 ms, sys: 235 µs, total: 4.93 ms
Wall time: 214 ms


print df_final

In [73]:
%%time
df_final.toPandas()

CPU times: user 11.9 ms, sys: 0 ns, total: 11.9 ms
Wall time: 138 ms


Unnamed: 0,experience_level,salary,employee_residence,degree
0,EN,4450000,JP,
1,EX,6000000,IN,
2,MI,30400000,CL,middle
3,SE,7000000,IN,senior


filter df_final to delete experience_level where it Null, then join this table by biggest_salary(salary_in_usd) and employee_residence with entire df

In [74]:
%%time
df_final = df_final.dropna().join(df, ['salary', 'experience_level'])

CPU times: user 5.67 ms, sys: 4.25 ms, total: 9.92 ms
Wall time: 53.3 ms


print df_final

In [75]:
df_final.toPandas()

23/04/25 00:53:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
 Schema: _c0, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
Expected: _c0 but found: 
CSV file: file:///home/tonipaltus/Innowise/spark_demo/spark_demo_course/1_PySpark_Basics/dataset/ds_salaries.csv


Unnamed: 0,salary,experience_level,employee_residence,degree,id,work_year,employment_type,job_title,salary_currency,salary_in_usd,employee_residence.1,remote_ratio,company_location,company_size
0,30400000,MI,CL,middle,177,2021,FT,Data Scientist,CLP,40038,CL,100,CL,L
1,7000000,SE,IN,senior,285,2021,FT,Data Science Manager,INR,94665,IN,50,IN,L


last task is to save in variable and then print this variable of the biggest salary_in_usd from df_final

In [76]:
%%time
biggest_salary = df_final.agg(max('salary_in_usd')).collect()[0][0]
print(biggest_salary)

94665
CPU times: user 10.2 ms, sys: 52 µs, total: 10.3 ms
Wall time: 184 ms


It is the end of PySpark basics. In other lessons you will learn optimizations technics and how to make distributed system