In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [0]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [67]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://job-postings-dataviz.s3.amazonaws.com/fake_jobs_clean.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("fake_jobs_clean.csv"), sep=",", header=True )

# Show DataFrame
df.show(5)

+------+-----------+--------------+-------+--------------------+----------+--------------------+--------------------+------------+---------------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+----------------+-------------+---------------+-------------------+------------------+----------+
|job_id|       city|state/province|country|               title|department|            industry|            function|salary_range|salary_provided|     company_profile|         description|        requirements|            benefits|benefits_provided|telecommuting|has_company_logo|has_questions|employment_type|required_experience|required_education|fraudulent|
+------+-----------+--------------+-------+--------------------+----------+--------------------+--------------------+------------+---------------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+----------------+-

In [0]:
# Read in csv file
from pyspark import SparkFiles
# pd.set_option('display.max_columns', 30)
fake_jobs_df = spark.read.csv("fake_jobs_clean.csv",sep=",", header=True)
# fake_jobs_df.show()

In [70]:
# Take a look at job description and fraudulent cols
df = fake_jobs_df[['description', 'fraudulent']]
df.show()


+--------------------+-----------------+
|         description|       fraudulent|
+--------------------+-----------------+
|Food52, a fast-gr...|                0|
|Organised - Focus...|                0|
|Our client, locat...|                0|
|THE COMPANY: ESRI...|                0|
|JOB TITLE: Itemiz...|                0|
|Job OverviewApex ...|                0|
|Your Responsibili...|                0|
|Who is Airenvy?He...|                0|
|Implementation/Co...|                0|
|The Customer Serv...|                0|
|Position : #URL_8...|                0|
|TransferWise is t...|                0|
|The Applications ...|                0|
|Event Industry In...|                0|
|Are you intereste...|                0|
|About Vault Drago...|Bachelor's Degree|
|We are looking fo...|                0|
|Government fundin...|                0|
|Kettle is hiring ...|                0|
|Experienced Proce...|                0|
+--------------------+-----------------+
only showing top

In [71]:
# Drop NaN values and duplicates
df= df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

17697
14500


In [0]:
# df.show()

In [73]:
# Make sure number of real vs fraudulent are more balanced
df_fraudulent= df[df['fraudulent'] == 1] 
df_normal = df[df['fraudulent'] == 0] 
# Count how many Real vs Fraudulent postings
n_f=df_fraudulent.count()
n_n=df_normal.count()
print(n_f)
print(n_f/n_n*100)

722
5.409050044950554


In [76]:
df_normalnew= df_normal.sample(withReplacement=None,fraction=n_f/n_n  , seed= 300)
df_unders = df_normalnew.union(df_fraudulent)
print(df_normalnew.count())
print(df_unders.count())

735
1457


In [0]:
# Shuffle the dataframe so fraudulent postings more evenly distributed 
df_unders = df_unders.sample(withReplacement=None,fraction=n_f/n_f  , seed= 300) 


### Feature Transformations


In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='fraudulent',outputCol='label')
tokenizer = Tokenizer(inputCol="description", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token'], outputCol='features')

In [0]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [0]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(df_unders)
cleaned = cleaner.transform(df_unders)

In [82]:
# Show label and resulting features
cleaned.select(['label', 'features']).show(truncate= False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.ml.classification import NaiveBayes

# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])



Create a Naive Bayes Model 

In [0]:
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [85]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+----------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|         description|fraudulent|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+----------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|SkyConsult is see...|         0|  0.0|[skyconsult, is, ...|[skyconsult, seek...|(262144,[619,1156...|(262144,[619,1156...|(262144,[619,1156...|[-9755.5961991993...|[1.0,3.2218215827...|       0.0|
|Healthcare Assist...|         0|  0.0|[healthcare, assi...|[healthcare, assi...|(262144,[666,9639...|(262144,[666,9639...|(262144,[666,9639...|[-4159.2449490257...|[0.99999999946670...|       0.0|
| The Cust

Save the model 

In [86]:
# save the model
from google.colab import drive
drive.mount('/content/drive')


Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
# Save to the model to the local drive
predictor.save('/content/drive/My Drive/underNB')

In [93]:
# Load the model

# from tensorflow.keras.models import load_model

nb2 = NaiveBayes.load('/content/drive/My Drive/underNB')
test_results2 = nb2.transform(testing)
test_results2.show(5)


Py4JJavaError: ignored

In [0]:
# from sklearn.metrics import classification_report

# print(classification_report(testing, test_results,
                            # target_names= ["real", "Fake"]))

In [0]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.790453
