In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
import pyspark.sql.functions as F
from pyspark.ml.feature import (
   Tokenizer, 
    StopWordsRemover, 
    HashingTF, 
    IDF, 
    StringIndexer
)
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes

In [2]:
# get or create Spark session

app_name = "job-john-snow"
spark = SparkSession.builder.appName(app_name).getOrCreate()

In [3]:
file_uri = "/mnt/mnt_project3/job.csv"

df = spark.read \
    .format("com.databricks.spark.csv") \
    .options(header='true', inferSchema="true") \
    .load(file_uri)
df.printSchema()

In [4]:
# Create a length column to be used as a future feature 
df=df.filter(df.description.isNotNull())
df=df.filter(df.fraudulent.isNotNull())
data_df = df.withColumn('length', F.length(df['description']))
data=data_df.select('description','fraudulent','length')
data.show()

In [5]:
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()

data.count()

In [6]:
data_df2=data.filter(data.fraudulent<2)
data_df2.show()

In [7]:
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='fraudulent',outputCol='label')
tokenizer = Tokenizer(inputCol="description", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [8]:

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [9]:
data_prep_pipeline = Pipeline(stages=[
  pos_neg_to_num,
    tokenizer, 
    stopremove, 
    hashingTF, 
    idf, 
    clean_up
])

In [10]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df2)
cleaned = cleaner.transform(data_df2)
cleaned.show(10)

In [11]:
cleaned.select(['label', 'features']).show()

In [12]:
from pyspark.ml.classification import LogisticRegression
training, testing = cleaned.randomSplit([0.7, 0.3])
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(training)

In [13]:
test_results = lrModel.transform(testing)
display(test_results.show(5))


In [14]:
pdf_lr = test_results.select("fraudulent", "prediction").toPandas()
from sklearn.metrics import classification_report
print(classification_report(pdf_lr.fraudulent.astype(float), pdf_lr.prediction))

In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Accuracy of model at predicting reviews was:', evaluator.evaluate(test_results))

In [16]:
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)
# Tranform the model with the testing data
nbtest_results = predictor.transform(testing)
nbtest_results.show(5)

In [17]:
pdf = nbtest_results.select("fraudulent", "prediction").toPandas()
pdf.head()

Unnamed: 0,fraudulent,prediction
0,0,0.0
1,1,1.0
2,0,0.0
3,0,0.0
4,0,0.0


In [18]:
# Use the Class Evaluator for a cleaner description
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(nbtest_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

In [19]:
from sklearn.metrics import classification_report
print(classification_report(pdf.fraudulent.astype(float), pdf.prediction))