# Emirates NBD

## Technical Assessment_Senior Data Engineer

In [314]:
import findspark
findspark.init()

In [315]:
from pyspark.sql import SparkSession
#from pyspark.sql.functions import *
#from pyspark.sql.types import StringType, LongType # Get the properties
from pyspark.sql.functions import to_date, col # conversion of data types
from pyspark.sql.functions import avg, min, max,mean # salary ditribution per job posting
from pyspark.sql.functions import desc # for sorting descending
from pyspark.sql.functions import when, lower, corr # correlation between the higher degree and the salary
from pyspark.ml.feature import StringIndexer #correlation coefficient between different columns (string and integer) columns
from pyspark.sql.functions import current_date, date_sub, year, date_format, current_timestamp # Used for calculating dataframe from the last 2 years, 

spark = SparkSession.builder. \
    appName("pyspark-1"). \
    getOrCreate()

### Read data

In [316]:
df = spark.read.csv("/dataset/nyc-jobs.csv", header=True)
#df.printSchema()

###  Change the datatype of some columns

In [317]:

# convert Posting Date column to date datatype
#df = df.withColumn("Posting Date", to_date(col("Posting Date"), "yyyy/MM/dd"))

# convert Salary Range From column to integer datatype
df = df.withColumn("Salary Range From", df["Salary Range From"].cast("integer"))

# convert Salary Range To column to integer datatype
df = df.withColumn("Salary Range To", df["Salary Range To"].cast("integer"))
#df.printSchema()

### Sample function

In [None]:
def get_salary_frequency(df: DataFrame) -> list:
    row_list = df.select('Salary Frequency').distinct().collect()
    return [row['Salary Frequency'] for row in row_list]

### Example of test function

In [None]:
mock_data = [('A', 'Annual'), ('B', 'Daily')]
expected_result = ['Annual', 'Daily']

In [None]:
def test_get_salary_frequency(mock_data: list, 
                              expected_result: list,
                              schema: list = ['id', 'Salary Frequency']):  
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert get_salary_frequency(mock_df) == expected_result

# Data Exploration

### Provide a detailed analysis of source data: Column values (eg: Numerical vs character), categorical columns, etc.

In [63]:
# Getting list of columns and printing
# result
dt = df.dtypes
print("dtypes result:", dt)

dtypes result: [('Job ID', 'string'), ('Agency', 'string'), ('Posting Type', 'string'), ('# Of Positions', 'string'), ('Business Title', 'string'), ('Civil Service Title', 'string'), ('Title Code No', 'string'), ('Level', 'string'), ('Job Category', 'string'), ('Full-Time/Part-Time indicator', 'string'), ('Salary Range From', 'int'), ('Salary Range To', 'int'), ('Salary Frequency', 'string'), ('Work Location', 'string'), ('Division/Work Unit', 'string'), ('Job Description', 'string'), ('Minimum Qual Requirements', 'string'), ('Preferred Skills', 'string'), ('Additional Information', 'string'), ('To Apply', 'string'), ('Hours/Shift', 'string'), ('Work Location 1', 'string'), ('Recruitment Contact', 'string'), ('Residency Requirement', 'string'), ('Posting Date', 'date'), ('Post Until', 'string'), ('Posting Updated', 'string'), ('Process Date', 'string')]


In [64]:
# Printing DataFrame structure
print("DataFrame structure:", df)

DataFrame structure: DataFrame[Job ID: string, Agency: string, Posting Type: string, # Of Positions: string, Business Title: string, Civil Service Title: string, Title Code No: string, Level: string, Job Category: string, Full-Time/Part-Time indicator: string, Salary Range From: int, Salary Range To: int, Salary Frequency: string, Work Location: string, Division/Work Unit: string, Job Description: string, Minimum Qual Requirements: string, Preferred Skills: string, Additional Information: string, To Apply: string, Hours/Shift: string, Work Location 1: string, Recruitment Contact: string, Residency Requirement: string, Posting Date: date, Post Until: string, Posting Updated: string, Process Date: string]


### Whats the number of jobs posting per category (Top 10)?

In [74]:
# Group the data by category and count the number of job postings
category_counts = df.groupBy("Job Category").count()

# Sort the category counts in descending order and show the top 10 categories
##top_categories = category_counts.orderBy(desc("count")).limit(10)
top_categories = category_counts.sort("count", ascending=False).limit(10)

# Display the top categories and their counts
top_categories.toPandas()

# top_categories.plot.bar(x='count') 

Unnamed: 0,Job Category,count
0,"Engineering, Architecture, & Planning",504
1,"Technology, Data & Innovation",313
2,Legal Affairs,226
3,"Public Safety, Inspections, & Enforcement",182
4,Building Operations & Maintenance,181
5,"Finance, Accounting, & Procurement",169
6,Administration & Human Resources,134
7,Constituent Services & Community Programs,129
8,Health,125
9,"Policy, Research & Analysis",124


### Whats the salary distribution per job category?


In [75]:
# Group by job category and calculate the average salary for each category
salary_distribution = df.groupBy("Job Category").avg("Salary Range From", "Salary Range To")

# Sort by job category
salary_distribution = salary_distribution.orderBy("Job Category")

# Display the results
salary_distribution.show(truncate=80)


+--------------------------------------------------------------------------------+----------------------+--------------------+
|                                                                    Job Category|avg(Salary Range From)|avg(Salary Range To)|
+--------------------------------------------------------------------------------+----------------------+--------------------+
|                                                                            null|               51572.5|            113749.5|
|                                                Administration & Human Resources|     43638.10447761194|   63873.44029850746|
|              Administration & Human Resources Building Operations & Maintenance|     50770.28571428572|   72091.14285714286|
|Administration & Human Resources Building Operations & Maintenance Policy, Re...|               54100.0|             83981.0|
|     Administration & Human Resources Communications & Intergovernmental Affairs|               37251.0|      

### Is there any correlation between the higher degree and the salary?

In [206]:
# Select the relevant columns
#df = df.select("Salary Range To", "Minimum Qual Requirements")

# Convert "Salary Range To" column to integer data type
#df = df.withColumn("Salary Range To", df["Salary Range To"].cast("integer"))

# Group the data by category and count the number of Education Level
#minimum_qual_requirements_counts = df.groupBy("Minimum Qual Requirements").count()
#minimum_qual_requirements_counts = minimum_qual_requirements_counts.sort("count", ascending=False)
#minimum_qual_requirements_counts.show(truncate=110)


# Extract educational qualification from "Minimum Qual Requirements" column
df = df.withColumn("Education Level", when(lower(col("Minimum Qual Requirements")).contains("master"), "Master")
                      .when(lower(col("Minimum Qual Requirements")).contains("baccalaureate"), "Bachelor")
                      .when(lower(col("Minimum Qual Requirements")).contains("associate"), "Associate")
                      .when(lower(col("Minimum Qual Requirements")).contains("high school"), "High School")
                     .otherwise("Other"))

# Group the data by category and count the number of Education Level
#education_level_counts = df.groupBy("Education Level").count()

#education_level_counts.show(truncate=80)

# Calculate the mean salary for each educational qualification level
salary_by_education = df.groupBy("Education Level").mean("Salary Range To")

#Sort by densending order
salary_by_education = salary_by_education.sort("avg(Salary Range To)", ascending=True).withColumnRenamed("avg(Salary Range To)", "Salary")
salary_by_education.show()


# Calculate the correlation coefficient between salary and educational qualification
#correlation = df.stat.corr("avg(Salary Range To)", "Education Level")

# Calculate the correlation coefficient between Salary Range To and Minimum Qual Requirements
#correlation_coefficient = df.select(corr("Salary Range To", "Education Level")).collect()[0][0]

#print(f"The correlation coefficient between Salary Range To and Minimum Qual Requirements is: {correlation_coefficient}")

salary_by_education = salary_by_education.withColumn("Salary", salary_by_education["Salary"].cast("integer"))

#salary_by_education.printSchema()

#corr_val = salary_by_education.corr("Education Level", "Salary")

#print("Correlation between columns Education Level and Salary:", corr_val)

# Convert string column to numerical column using StringIndexer
indexer = StringIndexer(inputCol="Education Level", outputCol="Education_Level_Index")
indexed = indexer.fit(salary_by_education).transform(salary_by_education)

# Calculate correlation coefficient between columns
corr = indexed.select(
    [col("Salary"), col("Education_Level_Index")]
).na.drop().corr("Salary", "Education_Level_Index")

print("Correlation coefficient: ", corr)


# convert the string column to a categorical variable
# indexer1 = StringIndexer(inputCol="Education Level", outputCol="Education_Level_Index")
# df_indexed = indexer1.fit(salary_by_education).transform(salary_by_education)

# calculate the correlation between the two columns
# corr = Correlation.corr(df_indexed, "Salary", "spearman").head()
# print("Correlation between Salary and Education_Level_Index: ", corr[0][1])

+---------------+------------------+
|Education Level|            Salary|
+---------------+------------------+
|    High School|49691.780303030304|
|      Associate|           61377.1|
|          Other|  84629.0890510949|
|       Bachelor| 86606.73129525341|
|         Master| 98625.73066298342|
+---------------+------------------+

Correlation coefficient:  -0.8456587540332213


### Whats the job posting having the highest salary per agency?


In [266]:
# group the DataFrame by agency and find the highest salary for each agency
highest_salary = df.groupBy("Agency").agg(max("Salary Range From").alias("Highest Salary"))
#max_salary_per_agency = df.groupBy("Agency").max("Salary Range From")
#highest_salary_per_agency = df.groupBy("Agency").agg({"Salary Range From": "max"})

# Sort the data by the highest salary in descending order
highest_salary_per_agency = highest_salary.sort(desc("Highest Salary"))

# display the results
highest_salary_per_agency.show(truncate=70)



+------------------------------+--------------+
|                        Agency|Highest Salary|
+------------------------------+--------------+
|DEPT OF ENVIRONMENT PROTECTION|        218587|
|             POLICE DEPARTMENT|        200000|
|DISTRICT ATTORNEY KINGS COUNTY|        175000|
|         NYC HOUSING AUTHORITY|        175000|
|                LAW DEPARTMENT|        164104|
|   DEPARTMENT OF INVESTIGATION|        160000|
|     OFFICE OF THE COMPTROLLER|        160000|
| DEPT OF HEALTH/MENTAL HYGIENE|        157725|
|      DEPARTMENT FOR THE AGING|        150371|
|HOUSING PRESERVATION & DVLPMNT|        130000|
|      DEPARTMENT OF CORRECTION|        130000|
|  NYC EMPLOYEES RETIREMENT SYS|        130000|
|    DEPT OF PARKS & RECREATION|        128000|
|         DEPARTMENT OF FINANCE|        120000|
|    FINANCIAL INFO SVCS AGENCY|        120000|
|   TAXI & LIMOUSINE COMMISSION|        120000|
| OFFICE OF MANAGEMENT & BUDGET|        117810|
|               FIRE DEPARTMENT|        

### Whats the job positings average salary per agency for the last 2 years?

In [311]:

# Filter the dataframe to only include records from the last 2 years
last_2years = df.filter(year("Posting Date") >=  year(current_date()) - 2)

last_2years.show()

# Aggregate data by agency and job position and calculate the average salary
highest_salary = last_2years.groupBy("Agency").agg(max("Salary Range From").alias("Highest Salary"))

# Display the results
highest_salary.show()

# average_salary_agency_last_two_years.show()

# There's no record returned becuase the latest date is 2019 while the current year is 2023

+------+------+------------+--------------+--------------+-------------------+-------------+-----+------------+-----------------------------+-----------------+---------------+----------------+-------------+------------------+---------------+-------------------------+----------------+----------------------+--------+-----------+---------------+-------------------+---------------------+------------+----------+---------------+------------+
|Job ID|Agency|Posting Type|# Of Positions|Business Title|Civil Service Title|Title Code No|Level|Job Category|Full-Time/Part-Time indicator|Salary Range From|Salary Range To|Salary Frequency|Work Location|Division/Work Unit|Job Description|Minimum Qual Requirements|Preferred Skills|Additional Information|To Apply|Hours/Shift|Work Location 1|Recruitment Contact|Residency Requirement|Posting Date|Post Until|Posting Updated|Process Date|
+------+------+------------+--------------+--------------+-------------------+-------------+-----+------------+---------

### What are the highest paid skills in the US market?


In [214]:
# Extracting relevant columns and filtering for only jobs in the US
us_jobs_df = df.select("Job Category", "Salary Range From", "Salary Range To")#.filter(df.Country == "US")

# Calculating the average salary for each job category
us_jobs_df = us_jobs_df.groupBy("Job Category").avg("Salary Range From", "Salary Range To")

# Adding a new column with the average of the salary range from and salary range to
us_jobs_df = us_jobs_df.withColumn("Average Salary", (us_jobs_df["avg(Salary Range From)"] + us_jobs_df["avg(Salary Range To)"]) / 2)

# Sorting by average salary in descending order and showing the top 10 highest paid skills
highest_paid_skills = us_jobs_df.sort(desc("Average Salary")).show(10)

+--------------------+----------------------+--------------------+------------------+
|        Job Category|avg(Salary Range From)|avg(Salary Range To)|    Average Salary|
+--------------------+----------------------+--------------------+------------------+
|Administration & ...|              218587.0|            218587.0|          218587.0|
|Engineering, Arch...|              198518.0|            198518.0|          198518.0|
|Engineering, Arch...|              192292.5|            199792.5|          196042.5|
|Health Policy, Re...|              113504.0|            143885.0|          128694.5|
|Engineering, Arch...|               69940.0|            186555.0|          128247.5|
|Engineering, Arch...|               69940.0|            186555.0|          128247.5|
|Communications & ...|              115000.0|            135000.0|          125000.0|
|Administration & ...|               78574.0|            158000.0|          118287.0|
|Constituent Servi...|    106666.66666666667|         

In [344]:
# group by "Preferred Skills" and calculate the average of "Salary Range From" and "Salary Range To"
grouped_df = df.groupBy("Preferred Skills").avg("Salary Range From", "Salary Range To").alias("Average Salary")

# show the result
grouped_df.show()

#df.groupBy("Preferred Skills").agg(avg("Salary Range From", "Salary Range To").alias("Average Salary"))


#avg_salary_by_skill = df.groupBy("Preferred Skills").agg(avg("Salary Range From", "Salary Range To")).alias("Average Salary")\


#df.groupBy("Preferred Skills").agg(avg("Salary Range From"), avg("Salary Range To"))\
                        #.alias("Average Salary")\
                        #.sort("Average Salary", ascending=False)
                        #.orderBy(desc("Average Salary"))

#avg_salary_by_skill = avg_salary_by_skill.sort("Average Salary", ascending=False)

#df.groupBy("Preferred Skills") \
               #         .avg("Salary Range From", "Salary Range To").alias("Average Salary")\
                #        .orderBy(desc("Average Salary"))


# Display the top-paying skills
#avg_salary_by_skill.show(truncate=90)

+--------------------+----------------------+--------------------+
|    Preferred Skills|avg(Salary Range From)|avg(Salary Range To)|
+--------------------+----------------------+--------------------+
|PREFERRED SKILLS ...|               35330.0|             40629.0|
|â€¢ Knowledge of ...|               75000.0|            100000.0|
|The preferred can...|               50078.0|             57590.0|
|â€¢The Deputy Chi...|               73255.0|            123174.5|
|Excellent interpe...|                  32.0|                36.0|
|The employee must...|               43111.0|             63969.0|
|â€¢	Able to lift ...|               35330.0|             40629.0|
|â€¢	Experience in...|               58700.0|            100000.0|
|â€¢	Knowledge of ...|               35330.0|             40629.0|
| or sufficient tr...|                  52.0|                52.0|
|1.   Demonstrated...|               50000.0|             55000.0|
|â€¢ Strong AutoCA...|               78210.0|            10865

# Data Processing

### Create functions to process your dataset (Cleaning, column pre-processing, data wrangling, transformation etc)

### Data Cleaning

### Column pre-processing

### Data wrangling

### Data Transformation