In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local"). \
    appName("pyspark-1"). \
    getOrCreate()

### Read data

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField('Job ID', StringType(), True),
    StructField('Agency', StringType(), True),
    StructField('Posting Type', StringType(), True),
    StructField('# Of Positions', IntegerType(), True),
    StructField('Business Title', StringType(), True),
    StructField('Civil Service Title', StringType(), True),
    StructField('Title Code No', StringType(), True),
    StructField('Level', StringType(), True),
    StructField('Job Category', StringType(), True),
    StructField('Full-Time/Part-Time indicator', StringType(), True),
    StructField('Salary Range From', DoubleType(), True),
    StructField('Salary Range To', DoubleType(), True),
    StructField('Salary Frequency', StringType(), True),
    StructField('Work Location', StringType(), True),
    StructField('Division/Work Unit', StringType(), True),
    StructField('Job Description', StringType(), True),
    StructField('Minimum Qual Requirements', StringType(), True),
    StructField('Preferred Skills', StringType(), True),
    StructField('Additional Information', StringType(), True),
    StructField('To Apply', StringType(), True),
    StructField('Hours/Shift', StringType(), True),
    StructField('Work Location 1', StringType(), True),
    StructField('Recruitment Contact', StringType(), True),
    StructField('Residency Requirement', StringType(), True),
    StructField('Posting Date', StringType(), True),
    StructField('Post Until', StringType(), True),
    StructField('Posting Updated', StringType(), True),
    StructField('Process Date', StringType(), True)
])

In [5]:
df = spark.read.csv("/dataset/nyc-jobs.csv", header=True, schema=schema)
df.printSchema()

root
 |-- Job ID: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Posting Type: string (nullable = true)
 |-- # Of Positions: integer (nullable = true)
 |-- Business Title: string (nullable = true)
 |-- Civil Service Title: string (nullable = true)
 |-- Title Code No: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Job Category: string (nullable = true)
 |-- Full-Time/Part-Time indicator: string (nullable = true)
 |-- Salary Range From: double (nullable = true)
 |-- Salary Range To: double (nullable = true)
 |-- Salary Frequency: string (nullable = true)
 |-- Work Location: string (nullable = true)
 |-- Division/Work Unit: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Minimum Qual Requirements: string (nullable = true)
 |-- Preferred Skills: string (nullable = true)
 |-- Additional Information: string (nullable = true)
 |-- To Apply: string (nullable = true)
 |-- Hours/Shift: string (nullable = true)
 |-- Work Locati

### Sample function

In [6]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col,max,min

In [20]:
def get_salary_frequency(df: DataFrame) -> list:
    row_list = df.select('Salary Frequency').distinct().collect()
    return [row['Salary Frequency'] for row in row_list]

**Whats the number of jobs posting per category (Top 10)?**

In [7]:
def get_top10_jobs_posting_per_category(df: DataFrame) -> list:
    job_posting_per_category = df.groupBy('Job Category').count().orderBy(col('count').desc())
    return [(row[0], row[1]) for row in job_posting_per_category.take(10)]

In [8]:
schema = list = ['Job Category', 'Job Id']
result_data = get_top10_jobs_posting_per_category(df)
result_df = spark.createDataFrame(data = result_data, schema = schema)
result_df.show()
result_df.coalesce(1).write.mode('overwrite').format('csv').save('resultset/get_top10_jobs_posting_per_category')

+--------------------+------+
|        Job Category|Job Id|
+--------------------+------+
|Engineering, Arch...|   504|
|Technology, Data ...|   313|
|       Legal Affairs|   226|
|Public Safety, In...|   182|
|Building Operatio...|   181|
|Finance, Accounti...|   169|
|Administration & ...|   134|
|Constituent Servi...|   129|
|              Health|   125|
|Policy, Research ...|   124|
+--------------------+------+



**Whats the salary distribution per job category?**

In [9]:
def get_salary_distribution_per_job_category(df: DataFrame) -> list:
    salary_distribution_per_job_category = df.groupBy(col('Job Category'))\
                                            .agg(min(col('Salary Range From')).alias('Salary Range From')\
                                                 , max(col('Salary Range To')).alias('Salary Range To'))

    return [(row[0], row[1], row[2]) for row in salary_distribution_per_job_category.collect()]

In [10]:
schema = StructType([
    StructField('Job Category', StringType(), True),
    StructField('Salary Range From', DoubleType(), True),
    StructField('Salary Range To', DoubleType(), True),
])
result_data = get_salary_distribution_per_job_category(df)
result_df = spark.createDataFrame(data = result_data, schema = schema)
result_df.show()
result_df.coalesce(1).write.mode('overwrite').format('csv').save('resultset/get_salary_distribution_per_job_category')

+--------------------+-----------------+---------------+
|        Job Category|Salary Range From|Salary Range To|
+--------------------+-----------------+---------------+
|Administration & ...|          90000.0|       100000.0|
|Health Policy, Re...|          82008.0|       180000.0|
|Administration & ...|          54100.0|        83981.0|
|Information Techn...|          68239.0|        85644.0|
|Finance, Accounti...|          55659.0|        70390.0|
|Engineering, Arch...|           539.12|       118610.0|
|Legal Affairs Pol...|          54165.0|       168433.0|
|Administration & ...|          45491.0|        60660.0|
|Constituent Servi...|             15.5|       156829.0|
|Building Operatio...|             15.5|       234402.0|
|Engineering, Arch...|          69940.0|       186555.0|
|Constituent Servi...|          52524.0|        81535.0|
|Administration & ...|          65731.0|        75591.0|
|       Legal Affairs|          19.9179|       208826.0|
|Engineering, Arch...|         

**Is there any correlation between the higher degree and the salary?**

In [11]:
from pyspark.sql.functions import when,instr

In [12]:
def get_correlation_degree_level_salary(df: DataFrame) -> float:
    df_target = df.withColumn('Degree Level', when(instr(col('Minimum Qual Requirements'), 'master''s degree') > 0, 2)\
                                            .when(instr(col('Minimum Qual Requirements'), 'baccalaureate degree') > 0, 1)\
                                             .otherwise(0))

    return df_target.corr('Degree Level', 'Salary Range From')

In [13]:
get_correlation_degree_level_salary(df)

0.21567669052524582

The above exercise shows positive correlation between higher degree and the salary

**Whats the job posting having the highest salary per agency?**

In [62]:
from pyspark.sql.functions import row_number, desc
from pyspark.sql.window import Window

In [67]:
def get_job_posting_having_highest_salary_per_agency(df: DataFrame) -> list:
    windowSpec = Window.partitionBy('Agency').orderBy(desc('Salary Range To'))
    df_with_rank = df.withColumn('RowNum', row_number().over(windowSpec))
    job_posting_having_highest_salary_per_agency = df_with_rank.where(col('RowNum') == 1)
    job_posting_having_highest_salary_per_agency = job_posting_having_highest_salary_per_agency\
                                                    .select('Agency', 'Posting Type', 'Salary Range To')\
                                                    .withColumnRenamed('Salary Range To', 'Max Salary')
    return [(row[0], row[1], row[2]) for row in job_posting_having_highest_salary_per_agency.collect()]

In [68]:
schema = StructType([
    StructField('Agency', StringType(), True),
    StructField('Posting Type', StringType(), True),
    StructField('Max Salary', DoubleType(), True)
])
result_data = get_job_posting_having_highest_salary_per_agency(df)
result_df = spark.createDataFrame(data = result_data, schema = schema)
result_df.show()
result_df.coalesce(1).write.mode('overwrite').format('csv').save('resultset/get_job_posting_having_highest_salary_per_agency')

+--------------------+------------+----------+
|              Agency|Posting Type|Max Salary|
+--------------------+------------+----------+
|LANDMARKS PRESERV...|    Internal|   64297.0|
|OFFICE OF COLLECT...|    Internal|     10.36|
|     FIRE DEPARTMENT|    Internal|  144929.0|
|ADMIN FOR CHILDRE...|    External|  156829.0|
|MANHATTAN COMMUNI...|    Internal|      19.0|
|      TAX COMMISSION|    Internal|   90177.0|
|HRA/DEPT OF SOCIA...|    Internal|  153017.0|
|TAXI & LIMOUSINE ...|    External|  160000.0|
|EQUAL EMPLOY PRAC...|    External|   72712.0|
|DEPARTMENT OF BUS...|    Internal|  162014.0|
|DEPT OF DESIGN & ...|    Internal|  217244.0|
|TEACHERS RETIREME...|    External|   75760.0|
|DEPARTMENT OF COR...|    Internal|  145000.0|
|FINANCIAL INFO SV...|    Internal|  150000.0|
|OFFICE OF EMERGEN...|    Internal|   85000.0|
|HOUSING PRESERVAT...|    External|  140000.0|
|CIVILIAN COMPLAIN...|    External|  168433.0|
|OFFICE OF MANAGEM...|    External|  117810.0|
|MAYORS OFFIC

**Whats the job positings average salary per agency for the last 2 years?**

In [100]:
from pyspark.sql.functions import to_date, lit, avg

In [105]:
def get_avg_salary_per_agency_for_last_2_years(df: DataFrame) -> list:
    df_new = df.select('Agency', 'Salary Range From', 'Salary Range To', 'Posting Date')
    df_new = df_new.select('Agency', 'Salary Range From', 'Salary Range To', to_date(col('Posting Date'), 'yyyy-MM-dd').alias('Posting Date'))
    df_new = df_new.where(col('Posting Date') != 'null')
    df_new = df_new.filter(col('Posting Date') > lit('2017-12-17'))
    average_salary_per_agency = df_new.groupBy(col('Agency'))\
                                    .agg(avg(col('Salary Range From')).alias('Average Salary Range From')\
                                     , avg(col('Salary Range To')).alias('Average Salary Range To'))
    return [(row[0], row[1], row[2]) for row in average_salary_per_agency.collect()]

In [107]:
schema = StructType([
    StructField('Agency', StringType(), True),
    StructField('Average Salary Range From', DoubleType(), True),
    StructField('Average Salary Range To', DoubleType(), True)
])
result_data = get_avg_salary_per_agency_for_last_2_years(df)
result_df = spark.createDataFrame(data = result_data, schema = schema)
result_df.show()
result_df.coalesce(1).write.mode('overwrite').format('csv').save('resultset/get_avg_salary_per_agency_for_last_2_years')

+--------------------+-------------------------+-----------------------+
|              Agency|Average Salary Range From|Average Salary Range To|
+--------------------+-------------------------+-----------------------+
|     FIRE DEPARTMENT|                 47113.89|               62235.89|
|ADMIN FOR CHILDRE...|        57751.41176470588|      69569.11764705883|
|MANHATTAN COMMUNI...|                     19.0|                   19.0|
|      TAX COMMISSION|                 16432.75|               24479.95|
|HRA/DEPT OF SOCIA...|                  55053.7|                66534.3|
|TAXI & LIMOUSINE ...|       41189.463567741936|      52562.84839999999|
|DEPARTMENT OF BUS...|       49631.088528571425|      70365.55927857143|
|DEPT OF DESIGN & ...|              59553.84375|                96060.5|
|TEACHERS RETIREME...|                  62397.0|                75760.0|
|FINANCIAL INFO SV...|                  78496.3|               109492.7|
|DEPARTMENT OF COR...|         58655.1094736842|   

**What are the highest paid skills in the US market?**

In [123]:
def get_highest_paid_skills_list(df: DataFrame) -> list:
    df_new = df.select('Preferred Skills', 'Salary Range To')
    df_new = df_new.filter(~col('Preferred Skills').contains('ERROR'))
    highest_paid_skills = df_new.groupBy(col('Preferred Skills')).agg(max(col('Salary Range To')).alias('Max Salary'))\
                .orderBy(col('Max Salary').desc())
    return [(row[0], row[1]) for row in highest_paid_skills.collect()]

In [124]:
schema = StructType([
    StructField('Preferred Skills', StringType(), True),
    StructField('Max Salary', DoubleType(), True)
])
result_data = get_highest_paid_skills_list(df)
result_df = spark.createDataFrame(data = result_data, schema = schema)
result_df.show()
result_df.coalesce(1).write.mode('overwrite').format('csv').save('resultset/get_highest_paid_skills_list')

+--------------------+----------+
|    Preferred Skills|Max Salary|
+--------------------+----------+
| ""2"" or ""3"" a...|  234402.0|
| and one year of ...|  225217.0|
|          commercial|  224749.0|
| at least 18 mont...|  224749.0|
|The Deputy Commis...|  218587.0|
|Candidate must ha...|  217244.0|
| all candidates m...|  209585.0|
|â€¢	10+ years of ...|  208826.0|
|â€¢ Integrity â€“...|  202744.0|
|          journalism|  202744.0|
|Extensive experie...|  198518.0|
|â€¢	Managerial tr...|  194395.0|
|â€¢ Expert knowle...|  194395.0|
| including making...|  194395.0|
|          managerial|  194395.0|
|Certification/lic...|  192152.0|
| all candidates m...|  192152.0|
| ""b"" and ""c"" ...|  192152.0|
|Required Skills: ...|  190000.0|
|Strongly Preferre...|  187000.0|
+--------------------+----------+
only showing top 20 rows



### Example of test function

In [24]:
mock_data = [('A', 'Annual'), ('B', 'Daily')]
expected_result = ['Annual', 'Daily']

In [25]:
def test_get_salary_frequency(mock_data: list, 
                              expected_result: list,
                              schema: list = ['id', 'Salary Frequency']):  
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert get_salary_frequency(mock_df) == expected_result

**Test : get_top10_jobs_posting_per_category**

In [39]:
def test_get_top10_jobs_posting_per_category(mock_data: list, 
                              expected_result: list,
                              schema: list = ['Job Category', 'Job Id']):
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert sorted(get_top10_jobs_posting_per_category(mock_df)) == sorted(expected_result)

In [40]:
mock_data = [('C1', 1), ('C1', 2), ('C1', 3),
            ('C2', 4), ('C2', 5),
            ('C3', 6), ('C3', 7), ('C3', 8),
            ('C4', 9), ('C4', 10), ('C4', 11),
            ('C5', 12), ('C5', 13), ('C5', 14),('C5', 15),
            ('C6', 16), ('C6', 17), ('C6', 18),
            ('C7', 19), ('C7', 20),
            ('C8', 21), ('C8', 22), ('C8', 23),
            ('C9', 24), ('C9', 25), ('C9', 26),('C9', 27), ('C9', 28),
            ('C10', 29), ('C10', 30), ('C10', 31),
            ('C11', 32)]
expected_result = [('C9', 5),
                  ('C5', 4),
                  ('C6', 3),
                  ('C4', 3),
                  ('C10', 3),
                  ('C3', 3),
                  ('C8', 3),
                  ('C1', 3),
                  ('C7', 2),
                  ('C2', 2)]

schema = list = ['Job Category', 'Job Id']
test_get_top10_jobs_posting_per_category(mock_data, expected_result, schema)

**Test: get_salary_distribution_per_job_category**

In [44]:
def test_get_salary_distribution_per_job_category(mock_data: list, 
                              expected_result: list,
                              schema: StructType([StructField('Job Category', StringType(), True),StructField('Salary Range From', DoubleType(), True),
    StructField('Salary Range To', DoubleType(), True),
])):
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert sorted(get_salary_distribution_per_job_category(mock_df)) == sorted(expected_result)

In [45]:
mock_data = [('C1', 10.00, 20.00), ('C1', 15.00, 30.00), ('C1', 30.00, 40.00),
            ('C2', 52.50, 60.00), ('C2', 63.00, 70.00), ('C2', 22.14, 30.00), ('C2', 21.14, 75.50),
            ('C3', 900.00, 950.00), ('C3', 700.00, 760.00), ('C3', 810.00, 900.00)]
expected_result = [('C1', 10.00, 40.00),
                  ('C2', 21.14, 75.50),
                  ('C3', 700.00, 950.00)]

schema = StructType([
    StructField('Job Category', StringType(), True),
    StructField('Salary Range From', DoubleType(), True),
    StructField('Salary Range To', DoubleType(), True),
])
test_get_salary_distribution_per_job_category(mock_data, expected_result, schema)

**Test: get_correlation_degree_level_salary - test whether the degree level has a positive correlation with salary**

In [18]:
def test_get_correlation_degree_level_salary(mock_data: list,
                              schema: StructType([StructField('Minimum Qual Requirements', StringType(), True),StructField('Salary Range From', DoubleType(), True)
                            ])):
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert get_correlation_degree_level_salary(mock_df) > 0

In [19]:
mock_data = [('Has a master''s degree', 1000.00), ('Five years of industry training', 500.00),
             ('Has a baccalaureate degree', 650.00),
            ('Has baccalaureate degree and master''s degree', 900.00)]

schema = StructType([
    StructField('Minimum Qual Requirements', StringType(), True),
    StructField('Salary Range From', DoubleType(), True)
])
test_get_correlation_degree_level_salary(mock_data, schema)

**Test: get_job_posting_having_highest_salary_per_agency**

In [71]:
def test_get_job_posting_having_highest_salary_per_agency(mock_data: list, 
                              expected_result: list,
                              schema: StructType([StructField('Agency', StringType(), True),StructField('Posting Type', StringType(), True),
    StructField('Salary Range To', DoubleType(), True),
])):
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert sorted(get_job_posting_having_highest_salary_per_agency(mock_df)) == sorted(expected_result)

In [72]:
mock_data = [('A1', 'P1', 100.00), ('A1', 'P2', 150.00),
            ('A2', 'P1', 60.00), ('A2', 'P2', 70.00), ('A2', 'P3', 30.00), ('A2', 'P4', 75.50),
            ('A3', 'P1', 950.00), ('A3', 'P2', 760.00), ('A3', 'P3', 900.00)]
expected_result = [('A1', 'P2', 150.00),
                  ('A2', 'P4', 75.50),
                  ('A3', 'P1', 950.00)]

schema = StructType([
    StructField('Agency', StringType(), True),
    StructField('Posting Type', StringType(), True),
    StructField('Salary Range To', DoubleType(), True),
])
test_get_job_posting_having_highest_salary_per_agency(mock_data, expected_result, schema)

**Test: get_avg_salary_per_agency_for_last_2_years**

In [112]:
def test_get_avg_salary_per_agency_for_last_2_years(mock_data: list, 
                              expected_result: list,
                              schema: StructType([StructField('Agency', StringType(), True),StructField('Salary Range To', DoubleType(), True),
    StructField('Salary Range To', DoubleType(), True), StructField('Posting Date', StringType(), True)
])):
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert sorted(get_avg_salary_per_agency_for_last_2_years(mock_df)) == sorted(expected_result)

In [114]:
mock_data = [('A1', 80.00, 100.00, '2018-01-09T00:00:00.000'), ('A1', 85.00, 150.00, '2015-01-09T00:00:00.000'),
            ('A2', 51.50, 60.00, '2018-01-09T00:00:00.000'), ('A2', 53.00, 70.00, '2019-01-09T00:00:00.000'), ('A2', 29.00, 30.00, '2018-01-09T00:00:00.000'),
            ('A3', 910.00, 950.00, '2019-01-09T00:00:00.000'), ('A3', 700.00, 760.00, '2018-01-09T00:00:00.000'), ('A3', 800.00, 900.00, '2018-01-09T00:00:00.000')]
expected_result = [('A1', 80.00, 100.00),
                  ('A2', 44.5, 53.333333333333336),
                  ('A3', 803.3333333333334, 870.0)]

schema = StructType([
    StructField('Agency', StringType(), True),
    StructField('Salary Range From', DoubleType(), True),
    StructField('Salary Range To', DoubleType(), True),
    StructField('Posting Date', StringType(), True)
])
test_get_avg_salary_per_agency_for_last_2_years(mock_data, expected_result, schema)

**Test: get_highest_paid_skills_list**

In [129]:
def test_get_highest_paid_skills_list(mock_data: list, 
                              expected_result: list,
                              schema: StructType([StructField('Preferred Skills', StringType(), True),
    StructField('Salary Range To', DoubleType(), True)
])):
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert get_highest_paid_skills_list(mock_df) == expected_result

In [130]:
mock_data = [('S1', 100.00), ('S1', 150.00),
            ('S2', 60.00), ('S2',170.00), ('S2', 145.00),
            ('S3', 150.00), ('S3', 160.00), ('S3', 171.50)]
expected_result = [('S3', 171.50),
                  ('S2', 170.00),
                  ('S1', 150.00)]

schema = StructType([
    StructField('Preferred Skills', StringType(), True),
    StructField('Salary Range To', DoubleType(), True)
])
test_get_highest_paid_skills_list(mock_data, expected_result, schema)