In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, when

In [5]:
df = spark.read.parquet("s3://miaowang1009/ANLY502_Final/df.parquet/*")

In [6]:
df.printSchema()

root
 |-- GLOBALEVENTID: string (nullable = true)
 |-- SQLDATE: date (nullable = true)
 |-- MonthYear: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- FractionDate: string (nullable = true)
 |-- Actor1Code: string (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1CountryCode: string (nullable = true)
 |-- Actor1KnownGroupCode: string (nullable = true)
 |-- Actor1EthnicCode: string (nullable = true)
 |-- Actor1Religion1Code: string (nullable = true)
 |-- Actor1Religion2Code: string (nullable = true)
 |-- Actor1Type1Code: string (nullable = true)
 |-- Actor1Type2Code: string (nullable = true)
 |-- Actor1Type3Code: string (nullable = true)
 |-- Actor2Code: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2CountryCode: string (nullable = true)
 |-- Actor2KnownGroupCode: string (nullable = true)
 |-- Actor2EthnicCode: string (nullable = true)
 |-- Actor2Religion1Code: string (nullable = true)
 |-- Actor2Religion2Code: string

In [7]:
df_feat = df.select(df.AvgTone, df.NumArticles,df.NumMentions, df.ActionGeo_Type, df.EventRootCode, df.QuadClass)

In [8]:
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ -100, 0, 100, float('Inf') ],inputCol="AvgTone", outputCol="AvgToneBin")
df_feat = bucketizer.setHandleInvalid("keep").transform(df_feat)

#df_buck.show()


from pyspark.sql.functions import udf
from pyspark.sql.types import *

t = {0.0:"Negtive", 1.0:"Positive"}
udf_foo = udf(lambda x: t[x], StringType())
df_feat = df_feat.withColumn("AvgToneTag", udf_foo("AvgToneBin"))
df_feat.show()

+-----------+-----------+-----------+--------------+-------------+---------+----------+----------+
|    AvgTone|NumArticles|NumMentions|ActionGeo_Type|EventRootCode|QuadClass|AvgToneBin|AvgToneTag|
+-----------+-----------+-----------+--------------+-------------+---------+----------+----------+
|  0.7025761|         10|         10|             4|           04|        1|       1.0|  Positive|
|-0.73937154|          6|          6|             5|           01|        1|       0.0|   Negtive|
|-0.73937154|          2|          2|             4|           01|        1|       0.0|   Negtive|
|-0.73937154|          2|          2|             1|           04|        1|       0.0|   Negtive|
|  0.7025761|          2|          2|             4|           04|        1|       1.0|  Positive|
|-0.73937154|          6|          6|             4|           03|        1|       0.0|   Negtive|
|  0.7025761|         10|         10|             4|           04|        1|       1.0|  Positive|
| -2.13114

In [9]:
df_feats = df_feat.drop('AvgTone').drop('AvgToneTag')
df_feats.printSchema()

root
 |-- NumArticles: integer (nullable = true)
 |-- NumMentions: integer (nullable = true)
 |-- ActionGeo_Type: string (nullable = true)
 |-- EventRootCode: string (nullable = true)
 |-- QuadClass: string (nullable = true)
 |-- AvgToneBin: double (nullable = true)



In [10]:
splitted_data = df_feats.randomSplit([0.8,0.18,0.02],24)
train_data = splitted_data[0]
test_data = splitted_data[1]
predicted_data = splitted_data[2]

Create pipeline and train a model

In [11]:
StringIndexer_label = StringIndexer(inputCol="AvgToneBin", outputCol = "label")
StringIndexer_AGT = StringIndexer(inputCol="ActionGeo_Type", outputCol = "ActionGeo_Type_IX")
StringIndexer_ERC = StringIndexer(inputCol="EventRootCode", outputCol = "EventRootCode_IX")
StringIndexer_QC = StringIndexer(inputCol="QuadClass", outputCol = "QuadClass_IX")

In [12]:
si_label_fit = StringIndexer(inputCol="AvgToneBin", outputCol = "label").fit(df_feats)
si_label_fit.labels

['0.0', '1.0']

In [13]:
#Create a feature vector by combining all features together using the vectorAssembler method:

vectorAssembler_features = VectorAssembler(
    inputCols=["ActionGeo_Type_IX", "EventRootCode_IX", "QuadClass_IX", "NumArticles","NumMentions"],
    outputCol="features")

In [14]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model

# define estimators 
rf = RandomForestClassifier(labelCol='label', featuresCol='features')

In [15]:
# indexed labels back to original labels.
labelConverter = IndexToString(inputCol='prediction', 
                               outputCol='predictedLabel', 
                               labels=StringIndexer_label.fit(df_feats).labels)

In [16]:
# Build pipeline. A pipeline consists of transformers and an estimator.

pipeline_rf = Pipeline(stages=[StringIndexer_label,
                              StringIndexer_AGT,
                              StringIndexer_ERC,
                              StringIndexer_QC,
                              vectorAssembler_features,
                              rf, labelConverter])

In [17]:
train_data.printSchema()

root
 |-- NumArticles: integer (nullable = true)
 |-- NumMentions: integer (nullable = true)
 |-- ActionGeo_Type: string (nullable = true)
 |-- EventRootCode: string (nullable = true)
 |-- QuadClass: string (nullable = true)
 |-- AvgToneBin: double (nullable = true)



In [18]:
model_rf = pipeline_rf.fit(train_data)

In [23]:
predictions = model_rf.transform(test_data)
evaluatorRF = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = evaluatorRF.evaluate(predictions)

In [24]:
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0-accuracy))

Accuracy = 0.566251
Test Error = 0.433749


In [25]:
predictions.show()

+-----------+-----------+--------------+-------------+---------+----------+-----+-----------------+----------------+------------+--------------------+--------------------+--------------------+----------+--------------+
|NumArticles|NumMentions|ActionGeo_Type|EventRootCode|QuadClass|AvgToneBin|label|ActionGeo_Type_IX|EventRootCode_IX|QuadClass_IX|            features|       rawPrediction|         probability|prediction|predictedLabel|
+-----------+-----------+--------------+-------------+---------+----------+-----+-----------------+----------------+------------+--------------------+--------------------+--------------------+----------+--------------+
|          1|          1|             0|           02|        1|       0.0|  0.0|              4.0|             4.0|         0.0|[4.0,4.0,0.0,1.0,...|[13.2873127890885...|[0.66436563945442...|       0.0|           0.0|
|          1|          1|             0|           03|        1|       1.0|  1.0|              4.0|             3.0|        

In [47]:
importances = model_rf.stages[-1].featureImportances()

AttributeError: 'IndexToString' object has no attribute 'featureImportances'

In [26]:
lr = LogisticRegression(maxIter=10, regParam=0.001)

In [33]:
pipeline_lr = Pipeline(stages=[StringIndexer_AGT, StringIndexer_ERC, StringIndexer_QC, StringIndexer_label, 
                               vectorAssembler_features, 
                               lr, labelConverter])

In [34]:
model_lr = pipeline_lr.fit(train_data)

In [36]:
evaluator = BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")

In [39]:
predictions_lr = model_lr.transform(test_data)
predictions.show(5)

+-----------+-----------+--------------+-------------+---------+----------+-----+-----------------+----------------+------------+--------------------+--------------------+--------------------+----------+--------------+
|NumArticles|NumMentions|ActionGeo_Type|EventRootCode|QuadClass|AvgToneBin|label|ActionGeo_Type_IX|EventRootCode_IX|QuadClass_IX|            features|       rawPrediction|         probability|prediction|predictedLabel|
+-----------+-----------+--------------+-------------+---------+----------+-----+-----------------+----------------+------------+--------------------+--------------------+--------------------+----------+--------------+
|          1|          1|             0|           02|        1|       0.0|  0.0|              4.0|             4.0|         0.0|[4.0,4.0,0.0,1.0,...|[13.2873127890885...|[0.66436563945442...|       0.0|           0.0|
|          1|          1|             0|           03|        1|       1.0|  1.0|              4.0|             3.0|        

In [40]:
accuracy_lr = evaluator.evaluate(predictions_lr)

In [41]:
print('Test Area Under ROC', evaluator.evaluate(predictions_lr))

Test Area Under ROC 0.6141103430789744


In [42]:
print("Accuracy = %g" % accuracy_lr)
print("Test Error = %g" % (1.0 - accuracy_lr))

Accuracy = 0.61411
Test Error = 0.38589


Save models to S3

In [50]:
model_rf.write().save('s3://miaowang1009/ANLY502_Final/model_rf')

In [51]:
model_lr.write().save('s3://miaowang1009/ANLY502_Final/model_lr')

In [19]:
# Linear Regression

In [None]:
df_feats = df_feat.drop('AvgTone').drop('AvgToneTag')
df_feats.printSchema()

In [None]:
regression = LinearRegression(labelCol='Avg')