### Import the libraries. We have provided the initial list of imports below but feel free to import yours as well

In [58]:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import max, avg, min, avg, count
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank
from pyspark.sql.functions import when
from pyspark.sql.functions import filter


PATH = 'ds_salaries.csv'

### Create a local SparkSession

In [59]:
spark=SparkSession.builder.appName('Spark_Task').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

### Read downloaded csv file with inferschema

In [60]:
df_schema = spark.read.csv(PATH, header=True, inferSchema=True)

### Read the csv one more time and you will notice that it almost doesn't take time. This happens because the information is already in SparkSession and there is no need to read from the file again

In [61]:
df_schema

DataFrame[_c0: int, work_year: int, experience_level: string, employment_type: string, job_title: string, salary: int, salary_currency: string, salary_in_usd: int, employee_residence: string, remote_ratio: int, company_location: string, company_size: string]

### Print a schema of this csv

In [62]:
df_schema.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



### Create a schema of this csv by yourself. You will need to use it in the future. First column should be named as 'id' create schema of this scv

In [63]:
schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('work_year', StringType(), True),
    StructField('experience_level', StringType(), True),
    StructField('employment_type', StringType(), True),
    StructField('job_title', StringType(), True),
    StructField('salary', IntegerType(), True),
    StructField('salary_currency', StringType(), True),
    StructField('salary_in_usd', IntegerType(), True),
    StructField('employee_residence', StringType(), True),
    StructField('remote_ratio', IntegerType(), True),
    StructField('company_location', StringType(), True),
    StructField('company_size', StringType(), True),
])

### Restart the engine by clicking on 'Restart Kernel' button. After that you need to run the imports and initialize the SparkSession again. Then please read csv file, but now instead of using inferschema use the schema you have created. Save the results in new DataFrame (you can name it df) 

In [64]:
df = spark.read.csv(PATH, header=True, schema=schema)

### This difference happens because the read operation is lazy(transformation), but if you use inferschema - it becomes the action and creates a Spark Job that needs to loop through the file to check for all datatypes of all columns and it might be time-consuming. When working with parquet - parquet provides all meta information so Spark doesn't need to loop through the file. Working with csv if different because it doesn't provide any meta information so Spark needs to loop through the file as we discussed before. Also it should be noted that adding a headed option creates one more Spark Job that requires Spark to read the first line to get the column names and keep in memory to skip this row while reading the file.

### Print a schema of the csv file one more time and compare with the previous

In [65]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- work_year: string (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



### For the next steps you need to work with one of the DataFrames that you have created. Please print the data in DataFrame using df.show()

In [66]:
df.show(5)

+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
| id|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|  0|     2020|              MI|             FT|      Data Scientist| 70000|            EUR|        79833|                DE|           0|              DE|           L|
|  1|     2020|              SE|             FT|Machine Learning ...|260000|            USD|       260000|                JP|           0|              JP|           S|
|  2|     2020|              SE|             FT|   Big Data Engineer| 85000|            GBP|       109024|                GB|          50|              GB|

### Print this data again using display(df.toPandas()) 

In [67]:
df.toPandas()

Unnamed: 0,id,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,0,2020,MI,FT,Data Scientist,70000,EUR,79833,DE,0,DE,L
1,1,2020,SE,FT,Machine Learning Scientist,260000,USD,260000,JP,0,JP,S
2,2,2020,SE,FT,Big Data Engineer,85000,GBP,109024,GB,50,GB,M
3,3,2020,MI,FT,Product Data Analyst,20000,USD,20000,HN,0,HN,S
4,4,2020,SE,FT,Machine Learning Engineer,150000,USD,150000,US,50,US,L
...,...,...,...,...,...,...,...,...,...,...,...,...
602,602,2022,SE,FT,Data Engineer,154000,USD,154000,US,100,US,M
603,603,2022,SE,FT,Data Engineer,126000,USD,126000,US,100,US,M
604,604,2022,SE,FT,Data Analyst,129000,USD,129000,US,0,US,M
605,605,2022,SE,FT,Data Analyst,150000,USD,150000,US,100,US,M


In [68]:
display(df.toPandas())

Unnamed: 0,id,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,0,2020,MI,FT,Data Scientist,70000,EUR,79833,DE,0,DE,L
1,1,2020,SE,FT,Machine Learning Scientist,260000,USD,260000,JP,0,JP,S
2,2,2020,SE,FT,Big Data Engineer,85000,GBP,109024,GB,50,GB,M
3,3,2020,MI,FT,Product Data Analyst,20000,USD,20000,HN,0,HN,S
4,4,2020,SE,FT,Machine Learning Engineer,150000,USD,150000,US,50,US,L
...,...,...,...,...,...,...,...,...,...,...,...,...
602,602,2022,SE,FT,Data Engineer,154000,USD,154000,US,100,US,M
603,603,2022,SE,FT,Data Engineer,126000,USD,126000,US,100,US,M
604,604,2022,SE,FT,Data Analyst,129000,USD,129000,US,0,US,M
605,605,2022,SE,FT,Data Analyst,150000,USD,150000,US,100,US,M


### Create df_job_title that consists of all job_titles without duplicates

In [69]:
df_job_title = df.select('job_title').distinct().toPandas().sort_values(by='job_title')

### Print all rows from df_job_title without truncating the jobs names

In [70]:
df_job_title

Unnamed: 0,job_title
0,3D Computer Vision Researcher
13,AI Scientist
32,Analytics Engineer
45,Applied Data Scientist
36,Applied Machine Learning Scientist
47,BI Data Analyst
44,Big Data Architect
41,Big Data Engineer
34,Business Data Analyst
22,Cloud Data Engineer


### Create df_analytic that will contain the following information about each job_title: 
    *   avg_salary - average USD salary 
    *   min_salary - min USD salary
    *   max_salary - max USD salary

In [71]:
df_analytic = df.filter(col('salary_currency') == 'USD').groupBy('job_title', 'salary_currency').agg(
    avg('salary_in_usd').alias('avg_salary'), 
    min('salary_in_usd').alias('min_salary'), max('salary_in_usd').alias('max_salary'), 
    count('salary_in_usd').alias('count')
    )

### Print all rows from df_analytic without truncating the jobs names

In [72]:
df_analytic.toPandas()

Unnamed: 0,job_title,salary_currency,avg_salary,min_salary,max_salary,count
0,Data Analytics Manager,USD,127134.285714,105400,150260,7
1,Lead Data Scientist,USD,152500.0,115000,190000,2
2,BI Data Analyst,USD,82454.4,9272,150000,5
3,Research Scientist,USD,139428.428571,42000,450000,7
4,Data Engineer,USD,139465.8,4000,324000,85
5,Computer Vision Software Engineer,USD,110000.0,70000,150000,2
6,Lead Data Engineer,USD,154250.0,56000,276000,4
7,Big Data Engineer,USD,49333.333333,18000,70000,3
8,AI Scientist,USD,79800.0,12000,200000,5
9,Head of Data Science,USD,146718.75,85000,224000,4


### Now add in df_analytic row_id column, that will show order of all job_titles depending on avg salary. They should be descending

In [73]:
windowSpec = Window.partitionBy().orderBy(col('avg_salary').desc())

df_analytic_sorted = df_analytic.withColumn('row_id', row_number().over(windowSpec))

### Print all data from df_analytic

In [74]:
df_analytic_sorted.toPandas()

Unnamed: 0,job_title,salary_currency,avg_salary,min_salary,max_salary,count,row_id
0,Data Analytics Lead,USD,405000.0,405000,405000,1,1
1,Principal Data Engineer,USD,328333.333333,185000,600000,3,2
2,Financial Data Analyst,USD,275000.0,100000,450000,2,3
3,ML Engineer,USD,263000.0,256000,270000,2,4
4,Principal Data Scientist,USD,255500.0,151000,416000,4,5
5,Director of Data Science,USD,247666.666667,168000,325000,3,6
6,Applied Data Scientist,USD,238000.0,157000,380000,3,7
7,Head of Data,USD,221666.666667,200000,235000,3,8
8,Director of Data Engineering,USD,200000.0,200000,200000,1,9
9,Machine Learning Infrastructure Engineer,USD,195000.0,195000,195000,1,10


### Rearrange the columns. Move column row_id to the first place in df_analytic

In [75]:
#по идее можно пройтись итерацией по df_analytic_sorted.columns[0:5] но я не мог вспомнить как именно в пайспарке
df_analytic = df_analytic_sorted.select('row_id', 'job_title', 'salary_currency', 'avg_salary', 'min_salary', 'max_salary', 'count')

### Print all data from df_analytic again

In [76]:
df_analytic.show()

+------+--------------------+---------------+------------------+----------+----------+-----+
|row_id|           job_title|salary_currency|        avg_salary|min_salary|max_salary|count|
+------+--------------------+---------------+------------------+----------+----------+-----+
|     1| Data Analytics Lead|            USD|          405000.0|    405000|    405000|    1|
|     2|Principal Data En...|            USD| 328333.3333333333|    185000|    600000|    3|
|     3|Financial Data An...|            USD|          275000.0|    100000|    450000|    2|
|     4|         ML Engineer|            USD|          263000.0|    256000|    270000|    2|
|     5|Principal Data Sc...|            USD|          255500.0|    151000|    416000|    4|
|     6|Director of Data ...|            USD|247666.66666666666|    168000|    325000|    3|
|     7|Applied Data Scie...|            USD|          238000.0|    157000|    380000|    3|
|     8|        Head of Data|            USD|221666.66666666666|    20

### Create df_exp_level with the column that has contains the information about the biggest usd_salary for each experience level (Save all rows like in entire DataFrame)

In [77]:
#можно ли тут сразу в фильр пропиххнуть оконку?
windowExp = Window.partitionBy('experience_level').orderBy(col('salary_in_usd').desc())
df_exp_level = df.withColumn('salary_rank_per_level', rank().over(windowExp)).filter(col('salary_rank_per_level') == 1)

### Print all data from df_exp_level

In [78]:
df_exp_level.show()

+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+---------------------+
| id|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|salary_rank_per_level|
+---+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+---------------------+
| 37|     2020|              EN|             FT|Machine Learning ...|250000|            USD|       250000|                US|          50|              US|           L|                    1|
|252|     2021|              EX|             FT|Principal Data En...|600000|            USD|       600000|                US|         100|              US|           L|                    1|
| 33|     2020|              MI|             

### Create df_best that consists of rows with the biggest salaries of each exp_lvl. Keep the following columns: id, experience_level, biggest_salary, employee_residence

In [79]:
windowBest = Window.partitionBy('experience_level').orderBy(col('salary').desc())

df_best = df.withColumn('salary_rank_per_level', rank().over(windowBest)).filter(col('salary_rank_per_level') == 1).select("id", "experience_level", "salary", "employee_residence", "salary_rank_per_level")

### Print all data from df_best

In [80]:
df_best.show(5)

+---+----------------+--------+------------------+---------------------+
| id|experience_level|  salary|employee_residence|salary_rank_per_level|
+---+----------------+--------+------------------+---------------------+
| 16|              EN| 4450000|                JP|                    1|
|384|              EX| 6000000|                IN|                    1|
|177|              MI|30400000|                CL|                    1|
|285|              SE| 7000000|                IN|                    1|
+---+----------------+--------+------------------+---------------------+



### Drop duplicates if exist by experience_level

In [81]:
df_best = df_best.dropDuplicates(['experience_level'])

### Print all data from df_best

In [82]:
df_best.show()

+---+----------------+--------+------------------+---------------------+
| id|experience_level|  salary|employee_residence|salary_rank_per_level|
+---+----------------+--------+------------------+---------------------+
| 16|              EN| 4450000|                JP|                    1|
|384|              EX| 6000000|                IN|                    1|
|177|              MI|30400000|                CL|                    1|
|285|              SE| 7000000|                IN|                    1|
+---+----------------+--------+------------------+---------------------+



### Create df_new_best from df_best but without id, and create the column based on the following condition : when exp_level = MI we want middle, when SE we want senior, else Null

In [83]:
df_new_best = df_best.drop('id').withColumn('desired_level', when(df.experience_level == 'MI', 'middle').when(df.experience_level == 'SE', 'senior').otherwise(None))

### Print all data from df_new_best

In [84]:
df_new_best.show()

+----------------+--------+------------------+---------------------+-------------+
|experience_level|  salary|employee_residence|salary_rank_per_level|desired_level|
+----------------+--------+------------------+---------------------+-------------+
|              EN| 4450000|                JP|                    1|         null|
|              EX| 6000000|                IN|                    1|         null|
|              MI|30400000|                CL|                    1|       middle|
|              SE| 7000000|                IN|                    1|       senior|
+----------------+--------+------------------+---------------------+-------------+



### Write df_new_best to csv and then load this csv to df_final 

In [85]:
df_new_best.write.csv('df_new_best/file', header=True)
df_final = spark.read.csv('df_new_best/file', header=True, inferSchema=True)

### Print all data from df_final


In [86]:
df_final.show()

+----------------+--------+------------------+---------------------+-------------+
|experience_level|  salary|employee_residence|salary_rank_per_level|desired_level|
+----------------+--------+------------------+---------------------+-------------+
|              EN| 4450000|                JP|                    1|         null|
|              EX| 6000000|                IN|                    1|         null|
|              MI|30400000|                CL|                    1|       middle|
|              SE| 7000000|                IN|                    1|       senior|
+----------------+--------+------------------+---------------------+-------------+



### Drop nulls in df_final. Then join this DataFrame with df using salary and employee residence

In [87]:
df_final = df_final.na.drop().join(df, (df_final.salary == df.salary) & (df_final.employee_residence == df.employee_residence),'inner')

### Print all data from df_final

In [88]:
df_final.toPandas()

Unnamed: 0,experience_level,salary,employee_residence,salary_rank_per_level,desired_level,id,work_year,experience_level.1,employment_type,job_title,salary.1,salary_currency,salary_in_usd,employee_residence.1,remote_ratio,company_location,company_size
0,MI,30400000,CL,1,middle,177,2021,MI,FT,Data Scientist,30400000,CLP,40038,CL,100,CL,L
1,SE,7000000,IN,1,senior,285,2021,SE,FT,Data Science Manager,7000000,INR,94665,IN,50,IN,L


### Get the data from biggest_salary and save the results in variable. Print this variable

In [89]:
A = df_final
A.select(col('salary_in_usd')).show()

+-------------+
|salary_in_usd|
+-------------+
|        40038|
|        94665|
+-------------+



It is the end of PySpark basics. In other lessons you will learn optimizations technics and how to make distributed system