In [1]:
#Load all the required library
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, udf,when, lower, regexp_replace, round, max
from pyspark.sql.types import StringType
import re

In [2]:
spark = SparkSession.builder.appName("jobAnalysis").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
df = spark.read.json("/mnt/data/marketing_sample.ldjson")
df.show(10)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/22 09:55:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/22 09:55:44 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+-------------------+--------------------+-------------+-------+--------------------+-----------------+----------------+-------------+---+-----------+--------------------+-------------+----------------+-----------------------+-----------------------+------------------------+--------------------+-------------------------+------------------+--------------+---------------+-------------+--------------------+-------------+----------------+--------------------+---------+----------------------+------------------------+----------+----------------------------+-----------------+--------------+--------------------+-----+------------------+--------------------+--------------------+-------------+
|                city|company_description|        company_name|contact_email|country|     crawl_timestamp|           domain|duplicate_status|fitness_score|geo|has_expired|html_job_description|inferred_city|inferred_country|inferred_iso2_lang_code|inferred_iso3_lang_code|inferred_salary

In [3]:
# 1. Calculate number of jobs posted on a daily basis per city
df_daily_jobs = df.groupBy("city", "post_date").agg(count("uniq_id").alias("job_count")).orderBy("job_count", ascending = False)
df_daily_jobs.show()
df_daily_jobs.coalesce(1).write.csv('/mnt/csv/daily_jobs', header = True , sep=';', mode = 'overwrite')

                                                                                

+------------+----------+---------+
|        city| post_date|job_count|
+------------+----------+---------+
|    Portland|2020-06-04|       39|
|      Denver|2020-06-23|       36|
|Philadelphia|2020-05-14|       23|
|      Boston|2020-06-26|       22|
|  Cincinnati|2020-05-23|       22|
|  Cincinnati|2020-05-24|       22|
|   St. Louis|2020-05-20|       21|
|    Portland|2020-04-28|       20|
|      Boston|2020-05-21|       20|
|Philadelphia|2020-06-15|       20|
|  Cincinnati|2020-06-04|       19|
|   St. Louis|2020-05-11|       18|
|      Boston|2020-05-26|       18|
|     Trenton|2020-06-01|       18|
|  Cincinnati|2020-06-09|       17|
|    Portland|2020-05-23|       17|
|      Boston|2020-06-11|       17|
|      Denver|2020-06-02|       17|
|     Trenton|2020-05-30|       16|
|Philadelphia|2020-06-19|       16|
+------------+----------+---------+
only showing top 20 rows



                                                                                

In [4]:
#2.   calculate average salary per job title and state
# Analysing data and values: Need to transform everything in dollars
df.select("inferred_salary_currency").distinct().show()



+------------------------+
|inferred_salary_currency|
+------------------------+
|                    null|
|                       $|
|                       £|
+------------------------+



                                                                                

In [5]:
#Creating subset of needed fields
offered_salary_df = df.select("job_title","inferred_state","salary_offered","inferred_salary_from","inferred_salary_to","inferred_salary_time_unit","inferred_salary_currency").where("inferred_salary_currency is not null and inferred_salary_from <> 'null' and inferred_salary_to is not null")
offered_salary_df.show()
offered_salary_df.count()

+--------------------+--------------+--------------------+--------------------+------------------+-------------------------+------------------------+
|           job_title|inferred_state|      salary_offered|inferred_salary_from|inferred_salary_to|inferred_salary_time_unit|inferred_salary_currency|
+--------------------+--------------+--------------------+--------------------+------------------+-------------------------+------------------------+
| Assembly Electrical|     Minnesota|        $14.00/ hour|               14.00|             14.00|                    /hour|                       $|
|Customer Service ...|      Maryland|$41,000.00 - $62,...|           41,000.00|         62,000.00|                    /year|                       $|
|Call Center Repre...|         Texas|$12.00 - $13.50 /...|               12.00|             13.50|                    /hour|                       $|
|Site Lead Securit...|      Colorado|        $16.50/ hour|               16.50|             16.50|  

3518

In [6]:
#Insluding only first 3 most frequent time units
df_time_units = offered_salary_df.groupBy("inferred_salary_time_unit").agg(count("job_title").alias("job_count")).orderBy("job_count", ascending = False)
df_time_units.show()



+-------------------------+---------+
|inferred_salary_time_unit|job_count|
+-------------------------+---------+
|                    /hour|     2161|
|                    /year|     1304|
|                    /Hour|       42|
|          /$0.44 Per mile|        2|
|     /USD $110000.00 t...|        1|
|                     /30!|        1|
|                      /hr|        1|
|     /US$50 per hour +...|        1|
|                   /$150K|        1|
|     /USD $150000.00 p...|        1|
|           /$16.00 hourly|        1|
|           /$12.75 Hourly|        1|
|         /up to $4,000.00|        1|
+-------------------------+---------+



                                                                                

In [7]:
#Other time units are not usuable
list_time_units = [row["inferred_salary_time_unit"] for row in df_time_units.where("job_count <= 2").collect()]
offered_salary_df.filter(offered_salary_df.inferred_salary_time_unit.isin(list_time_units)).show(truncate= False)

                                                                                

+-------------------------------------------------------------------+--------------+----------------------------------------------------------------+--------------------+------------------+-----------------------------------------------------------------+------------------------+
|job_title                                                          |inferred_state|salary_offered                                                  |inferred_salary_from|inferred_salary_to|inferred_salary_time_unit                                        |inferred_salary_currency|
+-------------------------------------------------------------------+--------------+----------------------------------------------------------------+--------------------+------------------+-----------------------------------------------------------------+------------------------+
|Regional Class A CDL                                               |Texas         |Average pay is $1500 per week/hr                                |Average 

In [8]:
#Calculating avrage yearly salary
hours_per_week = 40
weeks_per_year = 52
pounds_to_dollars = 1.3  #conversion rate for GBP to USD

# Clean and convert the salary columns to numeric and removing everything that is not digit
cleaned_salary_df = offered_salary_df.withColumn(
    "inferred_salary_from",
    regexp_replace(col("inferred_salary_from"), "[^0-9.]", "").cast("float")
).withColumn(
    "inferred_salary_to",
    regexp_replace(col("inferred_salary_to"), "[^0-9.]", "").cast("float")
)

# Normalize the salary to a yearly amount in USD
normalized_salary_df = cleaned_salary_df.withColumn(
    "normalized_salary",
    when(
        (lower(col("inferred_salary_time_unit")).like("%hour")) & (col("inferred_salary_currency") == "$"),
        ((col("inferred_salary_from") + col("inferred_salary_to")) / 2) * hours_per_week * weeks_per_year
    ).when(
        (lower(col("inferred_salary_time_unit")).like("%hour")) & (col("inferred_salary_currency") == "£"),
        ((col("inferred_salary_from") + col("inferred_salary_to")) / 2) * hours_per_week * weeks_per_year * pounds_to_dollars
    ).when(
        (lower(col("inferred_salary_time_unit")).like("%year%")) & (col("inferred_salary_currency") == "$"),
        (col("inferred_salary_from") + col("inferred_salary_to")) / 2
    ).when(
        (lower(col("inferred_salary_time_unit")).like("%year%")) & (col("inferred_salary_currency") == "£"),
        (col("inferred_salary_from") + col("inferred_salary_to")) / 2 * pounds_to_dollars
    )
)

# Calculate the average salary per job title and state
average_salary_df = normalized_salary_df.groupBy("job_title", "inferred_state").agg(
    round(avg("normalized_salary"), 2).alias("average_yearly_salary") 
).orderBy("average_yearly_salary", ascending = False)


average_salary_df.show()
average_salary_df.coalesce(1).write.csv('/mnt/csv/avg_salary', header = True, sep=';', mode = 'overwrite')

                                                                                

+--------------------+--------------------+---------------------+
|           job_title|      inferred_state|average_yearly_salary|
+--------------------+--------------------+---------------------+
|           Estimator|           Minnesota|             999999.0|
|       Bounty Hunter|      North carolina|             500000.0|
|Chief Revenue Off...|            Virginia|             475000.0|
|Chief Commerical ...|            Arkansas|             475000.0|
|Vice President of...|          California|             475000.0|
|Vice President of...|District of columbia|             475000.0|
|Vice President of...|            Illinois|             475000.0|
|Vice President of...|             Arizona|             475000.0|
|Outside Sales Age...|        Pennsylvania|             475000.0|
|Vice President of...|            Nebraska|             475000.0|
|Chief Information...|          New mexico|             475000.0|
|Chief Human Resou...|               Texas|             475000.0|
|Advertisi

                                                                                

In [9]:
#3.identify the top 10 most active companies by number of positions opened
#There is no duplicates and non of the jobs are expired
df.groupBy("duplicate_status","has_expired").agg(count("uniq_id").alias("job_count")).orderBy("job_count").show()

+----------------+-----------+---------+
|duplicate_status|has_expired|job_count|
+----------------+-----------+---------+
|              NA|      false|    29983|
+----------------+-----------+---------+



In [10]:
#Currently there is no opened positions
job_data_df = df.withColumn("valid_through", col("valid_through").cast("date"))
max_valid_through_date = job_data_df.agg(max(col("valid_through"))).collect()[0][0]
print(f"Maximum Valid Through Date: {max_valid_through_date}")



Maximum Valid Through Date: 2020-07-28


                                                                                

In [11]:
active_companies_df = df.groupBy("company_name").count()
top_active_companies_df = active_companies_df.orderBy("count", ascending = False).limit(10)
top_active_companies_df.show()
top_active_companies_df.coalesce(1).write.csv('/mnt/csv/top_10', header=True, mode = 'overwrite', sep =';')

                                                                                

+--------------------+-----+
|        company_name|count|
+--------------------+-----+
|  Amazon Fulfillment| 7501|
|                GPAC|  924|
|LanguageLine Solu...|  871|
|Lowe's Home Impro...|  861|
|      CDL Career Now|  545|
|  Advance Auto Parts|  492|
|             Aerotek|  477|
|         Robert Half|  444|
|    Driveline Retail|  372|
|            Circle K|  334|
+--------------------+-----+



                                                                                

In [12]:
#4. create a UDF function to clean job description from HTML code contained inside
@udf(returnType=StringType()) 
def clean_html(html):
    clean = re.compile('<.*?>')
    return re.sub(clean, '', html)

cleaned_df = df.withColumn("cleaned_job_description", clean_html("html_job_description"))

job_desc = cleaned_df.select("cleaned_job_description","html_job_description")
job_desc.coalesce(1).write.csv('/mnt/csv/cleaned_job_description', header = True, sep=';', mode = 'overwrite')

                                                                                

In [13]:
#spark.stop()

UDF performance issue: https://medium.com/sparkbyexamples/why-avoid-udfs-in-spark-pyspark-e5903050e33c


In [14]:
#If it is possible, better solution is to use pyspark built-in functions
cleaned_df2 = df.withColumn("cleaned_job_description", regexp_replace("html_job_description","'<.*?>",""))
job_desc2 = cleaned_df2.select("cleaned_job_description","html_job_description")
job_desc2.show(2, truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------