In [1]:
import time
start = time.time()

# Creating spark context

Below would have been the perfect pyspark ML data stream, although the results are extremely poor. The olny function that can be applied to two different spark dataframes are the approxSimilarityJoin functions for MinHashLSH and RandomBucketizedProjection algorithms. These two methods work on Term Frequency vectors that are then hashed and compute a similarity metric which much ressembles the Jaccard similarity. It seems the results are extremely poor, although this is the fastest and most efficient code we could come up with. 

only if needed (local):

In [None]:
import os
import findspark
your_path = os.getcwd()
findspark.init(your_path + 'spark-2.4.3-bin-hadoop2.7')

In [1]:
import pyspark 
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)
sc = spark.sparkContext

import pandas as pd 
import numpy as np

import pickle
import copy

from pyspark.sql import udf
from pyspark.sql.types import StructType,StringType,StructField,FloatType
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, NGram, Word2Vec, HashingTF, MinHashLSH

pyspark.autoBroadcastJoinThreshold = -1
pyspark.broadcastTimeout = -1

# Loading spark dataframes

unfortunately the spark.read directly from the path gives broken up spark dataframes... We'll have to pass onto pandas then specify schema and load into spark dataframe. Ineffeicency point.

### Course schema

In [2]:
courseSchema = StructType([ #StructField('index', StringType(), True),
                           StructField('University', StringType(), True)\
                           ,StructField("Program", StringType(), True)\
                           ,StructField("Courses", StringType(), True)\
                           ,StructField("text", StringType(), True)])

courses_pd = pd.read_csv("proto1/universities_full.csv",index_col = 0)
courses = spark.createDataFrame(courses_pd,schema=courseSchema)

#courses = (spark.read
#    .schema(courseSchema)
#    .option("header", "true")
#    .option("mode", "DROPMALFORMED")
#    .csv("proto1/universities_full.csv"))
['University',"Program","Courses"]

['University', 'Program', 'Courses']

### Skill schema

In [3]:
skillSchema = StructType([ #StructField('index', StringType(), True)\,
                          StructField("Skill", StringType(), True)\
                          ,StructField("text", StringType(), True)])
#skillSchema = StructType([ StructField("text", StringType(), True)])

skills_pd = pd.read_csv("proto1/skills_full.csv",index_col = 0)
skills = spark.createDataFrame(skills_pd,schema=skillSchema)

#skills = (spark.read
#    .schema(skillSchema)
#    .option("header", "true")
#    .option("mode", "DROPMALFORMED")
#    .csv("proto1/skills_full.csv"))

### Occupation schema

In [4]:
#occupationSchema = StructType([ StructField("Occupation", StringType(), True)\
#                               ,StructField("Skill", StringType(), True)])

occupations_pd = pd.read_csv('proto1/occupations_full.csv')
#occupations = spark.createDataFrame(occupations_pd,schema=occupationSchema)

#occupations = (spark.read
#    .schema(occupationSchema)
#    .option("header", "true")
#    .option("mode", "DROPMALFORMED")
#    .csv("proto1/occupations_full.csv"))

# Defining pipeline for string similarity

### Defining stopwords for pre-processing (faster than loading from nltk)

In [5]:
stopW = ['i','me','my', 'myself','we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours',
 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its',
 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these',
 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did',
 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about',
 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in',
 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how',
 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same',
 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm',
 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't",
 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan',
 "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't","pdf"]

### Spark pipeline 

Just a small check first

In [None]:
course = courses.select("text").toDF("text").filter(col('text').isNotNull())
skill = skills.select("text").toDF("text").filter(col('text').isNotNull())

We fit the pipeline on the courses

In [6]:
# Word2Vec trained on course strings
model_course = Pipeline(stages=[
    RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'text', outputCol = 'tokens'),
    StopWordsRemover(stopWords = stopW, inputCol = 'tokens', outputCol = 'tokens_sw'),
    HashingTF(inputCol="ngrams", outputCol="rawFeatures"),
    #IDF(inputCol="rawFeatures", outputCol="features"),
    MinHashLSH(inputCol="vectors", outputCol="lsh")
]).fit(course)

# Generating columns defined above
course_hashed = model_course.transform(course)

Applying them to our data:

In [7]:
skill_hashed = model_course.transform(skill)
course_hashed = model_course.transform(course)

### Generating dataframe with matches

In [10]:
matches = model_skill.stages[-1].approxSimilarityJoin(course_hashed, skill_hashed, threshold = 100, distCol="Distance")#.select('datasetA','datasetB')

# Preparing the data for Graph construction 

### Extracting only text 

In [11]:
matches = matches.withColumn("courses", matches["datasetA"]["text"]).withColumn("skills", matches["datasetB"]["text"]).select("courses",'skills')

### Matching course labels 

In [26]:
matches.count()

26072

In [12]:
left_join = matches.join(courses, courses.text == matches.courses, how='left') 

# Free space in cache !
#matches.unpersist()
#courses.unpersist()

# Keeping only the interesting info
left_join = left_join.select('University','Program','skills')

### Matching skill labels

In [13]:
full_data = left_join.join(skills, skills.text == left_join.skills, how = 'left')

# Free space in cache !
left_join.unpersist()
skills.unpersist()

# Keeping only the interesting info
full_data = full_data.select("University","Program","Skill").withColumnRenamed("Skill", "Skill_description")

### Matching occupation labels 

In [14]:
prototype1_data = full_data.join(occupations, occupations.Skill == full_data.Skill_description, how = 'left')

# Free space in cache !
full_data.unpersist()
occupations.unpersist()

prototype1_data = prototype1_data.select("University","Program","Skill","Occupation")

### Cleaning the data

In [15]:
prototype1_data = prototype1_data.filter(col('Program').isNotNull()).filter(col('Skill').isNotNull()).filter(col('University').isNotNull())

In [16]:
prototype1_data.show(10)

+--------------------+--------------------+-----+--------------------+
|          University|             Program|Skill|          Occupation|
+--------------------+--------------------+-----+--------------------+
|University of Ant...|Bachelor of Bioch...|   C#|telecommunication...|
|University of Ant...|Bachelor of Bioch...|   C#|    software analyst|
|University of Ant...|Bachelor of Bioch...|   C#|integration engineer|
|University of Ant...|Bachelor of Bioch...|   C#|embedded system d...|
|University of Ant...|Bachelor of Bioch...|   C#|     software tester|
|University of Ant...|Bachelor of Bioch...|   C#|data warehouse de...|
|University of Ant...|Bachelor of Bioch...|   C#|chief ICT securit...|
|University of Ant...|Bachelor of Bioch...|   C#|enterprise architect|
|University of Ant...|Bachelor of Bioch...|   C#|mobile applicatio...|
|University of Ant...|Bachelor of Bioch...|   C#|ICT intelligent s...|
+--------------------+--------------------+-----+--------------------+
only s

In [25]:
(prototype1_data.write
  .option("header", "true")
  .csv("pyspark__results_proto1_2.csv"))

### Pandas coversion?

This is probably the most time consuming process of all... If we could somehow avoid having to transfer data as pandas that would be great.

In [18]:
test = prototype1_data.toPandas()

In [22]:
test.to_csv('pyspark__results_proto1.csv')

In [None]:
end = time.time()
print(end-start)