In [None]:
import findspark
findspark.init()
import numpy as np # linear algebra
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from pyspark.ml.feature import VectorAssembler
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, array, lit
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import time
from pyspark.ml.feature import VectorAssembler
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, array, lit
import matplotlib.pyplot as plt

# Data Preprocessing
data = pd.read_csv('job_dataset_test.csv',encoding ='latin-1')
data=data.dropna(how='all')
cols = ["title", "company_profile", "description", "requirements", "benefits"]
for c in cols:
    data[c] = data[c].fillna("")    
def extract_features(data):    
    for c in cols:
        data[c+"_len"] = data[c].apply(lambda x : len(str(x)))
        data[c+"_wc"] = data[c].apply(lambda x : len(str(x.split())))    
extract_features(data)

data['combined_text'] = data['company_profile'] + " " + data['description'] + " " + data['requirements'] + " " + data['benefits']
n_features = {
    "title" : 100,
    "combined_text" : 500
}
for c, n in n_features.items():
    tfidf = TfidfVectorizer(max_features=n, stop_words = 'english')
    tfidf.fit(data[c])
    tfidf_train = np.array(tfidf.transform(data[c]).toarray(), dtype=np.float16)
    
    for i in range(n_features[c]):
        
        data[c+"_tfidf_"+ str(i)]= tfidf_train[:, i]        
drop_cols = ['title', 'location', 'department', 'salary_range', 'company_profile', 
             'description', 'requirements', 'benefits', 'combined_text']
data=data.drop(drop_cols,axis=1)  

# One Hot Encoding
data=pd.get_dummies(data)
data.shape
data_train= data.drop(data.tail(5).index)
data_input= data[-5:]
data_test=data_input.drop("fraudulent",axis=1)
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")
df=spark.createDataFrame(data_train)
df_test=spark.createDataFrame(data_test)

# Up Sampling 
major_df = df.filter(col("fraudulent") == 0)
minor_df = df.filter(col("fraudulent") == 1)
ratio = int(major_df.count()/minor_df.count())
a = range(ratio)
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')
df_upsampled = major_df.unionAll(oversampled_df)
input_features=[]
for item in df_upsampled.dtypes:
    input_features.append(item[0])    
input_features.remove("fraudulent")

# Vector Assembler
assembler = VectorAssembler(inputCols= input_features, outputCol="features")
input_df= assembler.transform(df_upsampled).select("fraudulent", "features")
input_test= assembler.transform(df_test).select("features")

# Data Modelling
gbt = GBTClassifier(labelCol="fraudulent", featuresCol="features", maxIter=50)
rf=RandomForestClassifier(labelCol="fraudulent", featuresCol="features",numTrees=50)
lsvc = LinearSVC(labelCol="fraudulent", featuresCol="features",maxIter=50, regParam=0.1)
inpt=[0.25, 0.50, 0.75, 1.0]
models=[gbt, rf, lsvc]
acc_lst=[]
precis_lst=[]
rec_lst=[]
area_uc_lst=[]
time_lst=[]
for i in inpt:    
    input_df_new = input_df.sample(False,i, 42)    
    train_df, test_df = input_df_new.randomSplit([.8,.2])
    print("{}% of data has records = {}".format(i, input_df_new.count()))    
    acc=[]
    precis=[]
    rec=[]
    area_uc=[]
    time_taken=[]    
    for item in models:        
        start = time.time()
        model = item.fit(train_df)
        print(item)
        predictions = model.transform(test_df)
        evaluator = MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol="prediction", metricName="accuracy")
        accuracy = evaluator.evaluate(predictions)
        print("Accuracy:",accuracy)
        acc.append(accuracy)        
        evaluator1 = MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol="prediction", 
                                              metricName="precisionByLabel", metricLabel= 1.0)
        precision = evaluator1.evaluate(predictions)
        print("Precision:",precision)
        precis.append(precision)        
        evaluator2 = MulticlassClassificationEvaluator(labelCol="fraudulent", predictionCol="prediction", 
                                                      metricName="recallByLabel", metricLabel= 1.0)
        recall = evaluator2.evaluate(predictions)
        print("Recall:",recall)
        rec.append(recall)        
        evaluator3 = BinaryClassificationEvaluator(labelCol="fraudulent", metricName='areaUnderROC')
        auc = evaluator3.evaluate(predictions)
        print("AUC:",auc)
        area_uc.append(auc)        
        end = time.time()
        time_diff= end - start
        print(time_diff)
        time_taken.append(time_diff)        
        print()        
    acc_lst.append(acc) 
    precis_lst.append(precis)
    rec_lst.append(rec)
    area_uc_lst.append(area_uc)
    time_lst.append(time_taken)    
    print("------------------------------------")    
print("List of Accuracy:{}".format(acc_lst))
print("List of Precision:{}".format(precis_lst))
print("List of Recall:{}".format(rec_lst))
print("List of AUC:{}".format(area_uc_lst))
print("List of time taken:{}".format(time_lst))

# Data Visualization
acc_y1=[]
acc_y2=[]
acc_y3=[]
for item in acc_lst:
    acc_y1.append(round(item[0],3))
    acc_y2.append(round(item[1],3))
    acc_y3.append(round(item[2],3))    
acc_label=["25%","50%","75%","100%"]
fig, ax = plt.subplots(ncols=1, figsize=(10, 5), dpi=200)
ax.plot(acc_label, acc_y1,marker="o",
        markerfacecolor='white', markeredgewidth=2, linewidth=2.5, label="GBT")
ax.plot(acc_label, acc_y2,marker="o", markerfacecolor='white', 
        markeredgewidth=2, linewidth=2.5, label="RF")
ax.plot(acc_label, acc_y3,marker="o", markerfacecolor='white', 
        markeredgewidth=2, linewidth=2.5, label="SVM")
acc_all=[acc_y1,acc_y2,acc_y3]
for item in acc_all:        
    for i,j in zip(acc_label,item):        
        ax.text(i, j-0.005, str(j))
ax.legend(loc='upper right', bbox_to_anchor=(1.0, 1.10),
          ncol=3, fancybox=True, shadow=True)
ax.set_ylim([0.875,0.975])
ax.set_xlabel("Scaling",  fontsize=14)
ax.set_ylabel("Accuracy", fontsize=14)
ax.set_title("Accuracy vs Scaling",loc="left", fontsize=18)
fig.savefig("accuracy.png",dpi=200)
plt.show()