In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import explode, col, count, collect_list, array
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import lower

spark = SparkSession.builder.appName("Assignment").getOrCreate()



In [0]:
#Read file skill2vec_50K.csv.gz
filepath="dbfs:/FileStore/tables/"
filename_skills= "skill2vec_50K.csv.gz"
filename_techskills = "Technology_Skills.txt"

jd_csv_df = spark.read.text(filepath+filename_skills)
#Preprocessing of skills file by splitting and then jd_id assigned as key and skills as values
jd_id_rdd = jd_csv_df.rdd.map(lambda line: line.value.split(",")).map(lambda x: (x[0], x[1:]))


#Read files technology skills
tech_skills_df = spark.read \
    .option("header", True) \
    .option("delimiter", "\t") \
    .option("inferSchema", True) \
    .option("mode", "DROPMALFORMED") \
    .csv(filepath+filename_techskills)
#Preprocessing of Technology Skills File

#Convert skills column to lower case 
tech_skills_df=tech_skills_df.withColumn("Example", lower(col("Example")))
# Rename the 'example' column to 'skill'
tech_skills_df = tech_skills_df.withColumnRenamed('example', 'Skill')

In [0]:
#Q1 Number of job descriptions
job_description_count=jd_csv_df.count()
job_description_count

Out[15]: 50000

In [0]:
#Q2
top_skills = (jd_id_rdd.flatMap(lambda x: [(x[0], skill) for skill in x[1]])
                       .distinct()
                       .filter(lambda x:x[1] != "")
                       .map(lambda word: (word[1],1))
                       .reduceByKey(lambda x,y:x+y)
                       .sortBy(lambda x:x[1],False)
                       )

top_skills.take(10)

Out[16]: [('Java', 1911),
 ('Javascript', 1770),
 ('Sales', 1705),
 ('Business Development', 1545),
 ('Web Technologies', 1313),
 ('Communication Skills', 1305),
 ('development', 1238),
 ('Marketing', 1184),
 ('Finance', 1078),
 ('HTML', 1067)]

In [0]:
#Q3

top_skills_jd = (jd_id_rdd.flatMap(lambda x: [(x[0], skill) for skill in x[1]])
                       .distinct()
                       .filter(lambda x:x[1] != "")
                       .map(lambda word: (word[0],1))
                       .reduceByKey(lambda x,y:x+y)
                       )
#Reduce by key to count the number of IDs have same values

#count_rdd = top_skills.filter(lambda x:x[1] != "").reduceByKey(lambda a, b: a + b)

#Swap value for key

swapped_rdd = top_skills_jd.map(lambda x: (x[1], x[0]))
count_rdd = swapped_rdd.filter(lambda x:x[1] != "").map(lambda word: (word[0],1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],False)
# Sort the RDD by count in descending order
#sorted_rdd = swapped_rdd.sortBy(lambda x: x[0], ascending=False)

count_rdd.take(5)

Out[17]: [(10, 10477), (5, 3432), (6, 3405), (1, 3386), (7, 3345)]

In [0]:
#Q4

skills=(jd_id_rdd.flatMap(lambda x: [(x[0], skill) for skill in x[1]])
                       .distinct()
                       .filter(lambda x:x[1] != "")
                       .map(lambda word: (word[1].lower(),1))
                       .reduceByKey(lambda x,y:x+y)
                       .sortBy(lambda x:x[1],False)
                       )
skills.take(10)

Out[18]: [('java', 2759),
 ('javascript', 2738),
 ('sales', 2680),
 ('business development', 2108),
 ('marketing', 1809),
 ('sql', 1564),
 ('jquery', 1547),
 ('html', 1539),
 ('communication skills', 1537),
 ('bpo', 1530)]

In [0]:
#Q5

skills_rdd=(jd_id_rdd.flatMap(lambda x: [(x[0], skill) for skill in x[1]])
                       .distinct()
                       .filter(lambda x:x[1] != "")
                       .map(lambda word: (word[1].lower(),1)))

#skills_rdd = skills_rdd.map(lambda row: (row[0].lower(),row[1]))
tech_skills_rdd = tech_skills_df.rdd
tech_skills_rdd

joined_tech_skill_rdd = skills_rdd.join(tech_skills_rdd.map(lambda x: (x[1].lower(), x)))

print ("Before Join " + str(skills_rdd.count()) + " After Join " + str(joined_tech_skill_rdd.count()) )

Before Join 463803 After Join 1101498


In [0]:
#Q6
top_Commodity_title_rdd = (joined_tech_skill_rdd.map(lambda x: (x[1][1][3],1))
                                           
                                             .reduceByKey(lambda x,y:x+y)
                                              .sortBy(lambda x:x[1],False))
                                            

top_Commodity_title_rdd.take(10)

Out[20]: [('Object or component oriented development software', 324521),
 ('Web platform development software', 298754),
 ('Operating system software', 190926),
 ('Development environment software', 53013),
 ('Data base management system software', 44132),
 ('Analytical or scientific software', 33552),
 ('Web page creation and editing software', 31682),
 ('Data base user interface and query software', 29436),
 ('Spreadsheet software', 18568),
 ('File versioning software', 13846)]