In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, lower, regexp_replace

file_review="/FileStore/tables/selected_features_for_pyspark.txt"
sc = SparkContext.getOrCreate()
spark = SQLContext(sc)

df=spark.read.option("sep", "\t").csv(file_review)
#df.show(5)

df=df.select(col("_c0").cast("int").alias("user_id"),col("_c1").cast("int").alias("prod_id"),col("_c2").cast("string").alias("date"),
                 col("_c3").cast("float").alias("rating"),col("_c4").cast("int").alias("num_of_words"),col("_c5").cast("int").alias("num_of_verbs"),
                 col("_c6").cast("float").alias("avg_word_length"),col("_c7").cast("float").alias("emotiveness_ratio"),col("_c8").cast("int").alias("num_of_positive")
                     ,col("_c9").cast("int").alias("num_of_negative"), col("_c10").cast("float").alias("sentiment"),col("_c11").cast("int").alias("label"))
df=df.select(col("rating"),col("num_of_words"),col("num_of_verbs"),col("avg_word_length"),col("emotiveness_ratio"),col("num_of_positive"),
                   col("num_of_negative"),col("sentiment"),col("label"))

# df=df.select(col("_c0").cast("int").alias("num_of_words"),col("_c1").cast("int").alias("num_of_verbs"),col("_c2").cast("float").alias("avg_word_length"),
#             col("_c3").cast("float").alias("emotiveness_ratio"),col("_c4").cast("int").alias("num_of_posi"),col("_c5").cast("int").alias("num_of_nega"),
#             col("_c6").cast("float").alias("sentiment"),col("_c7").cast("int").alias("label"))
#data.show(5)

from pyspark.sql.functions import when
df=df.withColumn("label",when(df.label==-1,0).otherwise(1))
df=df.toPandas()
df=df.iloc[0:50000,:]
print(len(df))
#print(df.head())

df = spark.createDataFrame(df)
print("total real review numbers:",df[df.label==1].count())
print("total fake review numbers:",df[df.label==0].count())
print("total fake review percentage:",df[df.label==0].count()/df[df.label==1].count())
#df.show(20)
#df=df[0:10000]
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import numpy as np
import pandas as pd
#------------------------------------------scaling start--------------------------------------------------------------------------------------
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())
for i in ["rating","num_of_words","num_of_verbs","avg_word_length","emotiveness_ratio","num_of_positive","num_of_negative","sentiment"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
df=df.select(["rating_Scaled","num_of_words_Scaled","num_of_verbs_Scaled","avg_word_length_Scaled","emotiveness_ratio_Scaled","num_of_positive_Scaled","num_of_negative_Scaled","sentiment_Scaled","label"])
#df.show(10)
#------------------------------------------scaling end----------------------------------------------------------------------------------------
features=np.array(["rating_Scaled","num_of_words_Scaled","num_of_verbs_Scaled","avg_word_length_Scaled","emotiveness_ratio_Scaled","num_of_positive_Scaled","num_of_negative_Scaled","sentiment_Scaled"])

#print(type(features))
va=VectorAssembler(inputCols = features, outputCol='features')
va_df = va.transform(df)
va_df = va_df.select(["features","label"])
#va_df.show(20)
#train=va_df.filter(va_df.rand<0.7)
#test=df.filter(va_df.rand>=0.7)
(train, test) = va_df.randomSplit([0.7, 0.3],seed=2)
#train.show(100)
print("total review numbers in training:", train.count())
print("total review numbers in testing", test.count())
print("real review numbers in training:",train[train.label==1].count())
print("real review numbers in testing:",test[test.label==1].count())
# train.show()
# test.show()
# test_to_pd=test.toPandas()
# print(test_to_pd.iloc[0:100,:])
lsvc = LinearSVC(labelCol="label", maxIter=50)
lsvc = lsvc.fit(train)

pred = lsvc.transform(test)
pred.show(5)
evaluator=MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(pred)
 
print("accuracy score: ", accuracy)

y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

tp = pred[(pred.label == 1) & (pred.prediction == 1)].count()
tn = pred[(pred.label == 0) & (pred.prediction == 0)].count()
fp = pred[(pred.label == 0) & (pred.prediction == 1)].count()
fn = pred[(pred.label == 1) & (pred.prediction == 0)].count()
fake_review_count=pred[pred.label==0].count()
fake_review_02=test[test.label==0].count()
print(fake_review_count)
print(fake_review_02)
print("tp:",tp)
print("tn:",tn)
print("fp:",fp)
print("fn:",fn)

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm) 

50000
total real review numbers: 43063
total fake review numbers: 6937
total fake review percentage: 0.1610895664491559
35100
14900
real review numbers in training: 30229
real review numbers in testing: 12834
+--------------------+-----+--------------------+----------+
|            features|label|       rawPrediction|prediction|
+--------------------+-----+--------------------+----------+
|(8,[0,1,3,7],[0.2...|    1|[-1.0000956356031...|       1.0|
|(8,[0,1,3,7],[0.5...|    1|[-1.0000940943682...|       1.0|
|(8,[0,1,3,7],[1.0...|    0|[-1.0000690306545...|       1.0|
|(8,[0,1,3,7],[1.0...|    0|[-1.0000269087807...|       1.0|
|(8,[0,3,5,7],[0.7...|    0|[-0.9998020314576...|       1.0|
+--------------------+-----+--------------------+----------+
only showing top 5 rows

accuracy score:  0.8613422818791946
2066
2066
tp: 12834
tn: 0
fp: 2066
fn: 0
Confusion Matrix:
[[    0  2066]
 [    0 12834]]
