## Data analysis on ***"Big-Five Factor Markers"*** 
### 1.Load the processed dataset

In [0]:
file_location = "/FileStore/tables/data_processed.csv"

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv(file_location, header = True)
df.printSchema()

In [0]:
print((df.count(), len(df.columns)))

### 2.Processing datatype

In [0]:
from pyspark.sql.types import *

In [0]:
features = ["EXT1", "EXT2" ,"EXT3" ,"EXT4" ,"EXT5" ,"EXT6" ,"EXT7" ,"EXT8" ,"EXT9" ,"EXT10",
            "EST1" ,"EST2" ,"EST3" ,"EST4" ,"EST5" ,"EST6" ,"EST7" ,"EST8" ,"EST9" ,"EST10",
            "AGR1" ,"AGR2" ,"AGR3" ,"AGR4" ,"AGR5" ,"AGR6" ,"AGR7" ,"AGR8" ,"AGR9" ,"AGR10",
            "CSN1" ,"CSN2" ,"CSN3" ,"CSN4" ,"CSN5" ,"CSN6" ,"CSN7" ,"CSN8" ,"CSN9" ,"CSN10",
            "OPN1" ,"OPN2" ,"OPN3" ,"OPN4" ,"OPN5" ,"OPN6" ,"OPN7" ,"OPN8" ,"OPN9" ,"OPN10"
           ]


for each_feature in df.columns:
    if each_feature in features:
            df = df.withColumn(each_feature, df[each_feature].cast(FloatType()))

In [0]:
df.show()

### 3.Preparing LabeledPoint for mllib

In [0]:
features_1 = ["EXT1" ,"EXT2" ,"EXT3" ,"EXT4" ,"EXT5" ,"EXT6" ,"EXT7" ,"EXT8" ,"EXT9" ,"EXT10",
            "EST1" ,"EST2" ,"EST3" ,"EST4" ,"EST5" ,"EST6" ,"EST7" ,"EST8" ,"EST9" ,"EST10",
            "AGR1" ,"AGR2" ,"AGR3" ,"AGR4" ,"AGR5" ,"AGR6" ,"AGR7" ,"AGR8" ,"AGR9" ,"AGR10",
            "CSN1" ,"CSN2" ,"CSN3" ,"CSN4" ,"CSN5" ,"CSN6" ,"CSN7" ,"CSN8" ,"CSN9" ,"CSN10",
            "OPN1" ,"OPN2" ,"OPN3" ,"OPN4" ,"OPN5" ,"OPN6" ,"OPN7" ,"OPN8" ,"OPN9" ,"OPN10"
           ]
new_df = df.select(features_1)

In [0]:
from pyspark.mllib.regression import LabeledPoint
labelpointRDD = new_df.rdd.map(lambda row:LabeledPoint(row[0], row[1:]))

In [0]:
train, test = labelpointRDD.randomSplit([0.8, 0.2])

### 4.Multiclass logistic regression

In [0]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
model = LogisticRegressionWithLBFGS.train(train, numClasses=6)

In [0]:
from pyspark.mllib.evaluation import MulticlassMetrics
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))
metrics = MulticlassMetrics(predictionAndLabels)
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)
print("Accuracy = %s" % metrics.accuracy)

In [0]:
no_0_df = new_df.rdd.filter(lambda row: row[0] > 0)
labelpointRDD_no_0 = no_0_df.map(lambda row:LabeledPoint(row[0], row[1:]))

In [0]:
labelpointRDD_no_0 = labelpointRDD_no_0.map(lambda x: LabeledPoint(x.label-1, x.features))
train_no_0, test_no_0 = labelpointRDD_no_0.randomSplit([0.8, 0.2])

In [0]:
# train_no_0.collect()

In [0]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
model = LogisticRegressionWithLBFGS.train(train_no_0, numClasses=5)

In [0]:
from pyspark.mllib.evaluation import MulticlassMetrics
predictionAndLabels = test_no_0.map(lambda lp: (float(model.predict(lp.features)), lp.label))
metrics = MulticlassMetrics(predictionAndLabels)
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)
print("Accuracy = %s" % metrics.accuracy)