## Simple feature training

In [1]:
# Load the Dataset 
from pyspark.sql import DataFrame
#data_file = '/home/user/elicon/data/Training_Data/Binary_Classification_Data/Nepal'
data_file = '/home/user/elicon/data/Test_Data/California_Quake/unlabelled/retweeted/processed/napa_fin.csv'
df = spark.read.option("header","true").csv(data_file)
df = df.select(df.tweet_text,df.label.cast("double").alias("label"))
df = df.dropna()
df.createOrReplaceTempView("tweets")
tweet_label = spark.sql("SELECT tweet_text, label FROM tweets")

tweet_text_only = spark.sql("SELECT tweet_text FROM tweets")

In [2]:
# Tokenise tweets
from pyspark.ml.feature import Tokenizer
#from pyspark.sql.functions import col, udf
#from pyspark.sql.types import IntegerType

tokenizer = Tokenizer(inputCol="tweet_text", outputCol="words")
#countTokens = udf(lambda words: len(words), IntegerType())
#tokenized = tokenizer.transform(tweet_label)
#tokenized = tokenized.withColumn("tokens", countTokens(col("words")))

In [3]:
from pyspark.ml.feature import StopWordsRemover
# stop words
add_stopwords = ["amp","rt","ca","pg","ga","cb","ap","sce","st"] # standard stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

In [4]:
# 3-Gram for context feature
from pyspark.ml.feature import NGram

ngram = NGram(n=3, inputCol="filtered", outputCol="ngrams")
#ngrams = ngram.transform(tokenized)

## Use HashingTF Feature Extraction

In [5]:
# Hashing TF feature extraction
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.linalg import Vector as MLVector, Vectors as MLVectors
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import VectorAssembler

hashingTF = HashingTF(inputCol="ngrams", outputCol="rawFeatures", numFeatures=50000)
#featurizedData = hashingTF.transform(ngrams)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
#idfModel = idf.fit(featurizedData)
#tfidfData = idfModel.transform(featurizedData)
#assembler = VectorAssembler(
#    inputCols=["features", "tokens"],
#    outputCol="union_features")
#assembledData = assembler.transform(tfidfData)
#pairs = assembledData.select("label","features").rdd
#pairs = tfidfData.select("label","features").rdd
#data = pairs.map(lambda x: LabeledPoint(x[0], as_mllib(x[1])))


In [6]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [7]:
# Convert to labeled vectors
from pyspark.ml import linalg as ml_linalg

def as_mllib(v):
    if isinstance(v, ml_linalg.SparseVector):
        return MLLibVectors.sparse(v.size, v.indices, v.values)
    elif isinstance(v, ml_linalg.DenseVector):
        return MLLibVectors.dense(v.toArray())
    else:
        raise TypeError("Unsupported type: {0}".format(type(v)))        

## Word2Vec Feature Extraction

In [8]:
from pyspark.ml.feature import Word2Vec
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=1000, minCount=5, inputCol="ngrams", outputCol="features")

## Pipeline

In [9]:
from pyspark.ml import Pipeline
#pipeline = Pipeline(stages=[tokenizer, ngram, word2Vec])# Fit the pipeline to training documents.
pipeline = Pipeline(stages=[tokenizer, stopwordsRemover, ngram, hashingTF, idf])# Fit the pipeline to training documents.

In [10]:
pipelineFit = pipeline.fit(tweet_label)
dataset = pipelineFit.transform(tweet_label)
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

In [11]:
tokenized = tokenizer.transform(tweet_label)
stopword = stopwordsRemover.transform(tokenized)
ngrams = ngram.transform(stopword)
featurizedData = hashingTF.transform(ngrams)
idfModel = idf.fit(featurizedData)
tfidfData = idfModel.transform(featurizedData)
pairs = tfidfData.select("label","features").rdd
data = pairs.map(lambda x: LabeledPoint(x[0], as_mllib(x[1])))

In [12]:
splits = data.randomSplit([0.6, 0.4], 1234)
mllib_trainingData = splits[0]
mllib_testData = splits[1]

## Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
# Build the model
lr = LogisticRegression(maxIter=50, regParam=0.3, elasticNetParam=0)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [None]:
# Run Results on Training Data
trainingSummary = lrModel.summary
print("Training: Area Under ROC: " + str(trainingSummary.areaUnderROC))

In [None]:
# Predict on Test Data
predictions = lrModel.transform(testData)
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [None]:
import matplotlib.pyplot as plt
import numpy as np

beta = np.sort(lrModel.coefficients)

plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [None]:
# Extract the summary from the returned LogisticRegressionModel instance trained
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
plt.plot(objectiveHistory)
plt.ylabel('Objective Function')
plt.xlabel('Iteration')
plt.show()

In [None]:
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

#trainingSummary.roc.show(n=10, truncate=15)
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
# Set the model threshold to maximize F-Measure
#trainingSummary.fMeasureByThreshold.show(n=10, truncate = 15)
f = trainingSummary.fMeasureByThreshold.toPandas()
plt.plot(f['threshold'],f['F-Measure'])
plt.ylabel('F-Measure')
plt.xlabel('Threshold')
plt.show()

## Random Forest

In [14]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 10, \
                            maxDepth = 4, \
                            maxBins = 12)

# Train model with Training Data
rfModel = rf.fit(trainingData)

Py4JJavaError: An error occurred while calling o268.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15, localhost): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
	at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:144)
	at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:76)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$1$$anonfun$9.apply(PairRDDFunctions.scala:505)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$1$$anonfun$9.apply(PairRDDFunctions.scala:505)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:91)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:745)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:744)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:744)
	at org.apache.spark.ml.tree.impl.RandomForest$.findSplitsBySorting(RandomForest.scala:894)
	at org.apache.spark.ml.tree.impl.RandomForest$.findSplits(RandomForest.scala:872)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:118)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:118)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:45)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
	at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:144)
	at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:76)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$1$$anonfun$9.apply(PairRDDFunctions.scala:505)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$1$$anonfun$9.apply(PairRDDFunctions.scala:505)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:91)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


## Naive Bayes

In [15]:
from pyspark.ml.classification import NaiveBayes

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1, modelType="multinomial")

# train the model
model = nb.fit(trainingData)

# select example rows to display.
predictions = model.transform(testData)

# compute accuracy on the test set
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test: Area Under ROC: 0.719976253295


## SVM

In [36]:
# ## Linear SVM
from pyspark.mllib.classification import SVMWithSGD, SVMModel 
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Build the model
model = SVMWithSGD.train(mllib_trainingData, iterations=100)

# Evaluating the model on training data
labelsAndPreds = mllib_trainingData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(mllib_trainingData.count())
print("Training Error = " + str(trainErr))

Training Error = 0.0923802553468


## Model Evaluation

In [38]:
scoreAndLabels = mllib_testData.map(lambda p: (p.label, model.predict(p.features)))
srt = scoreAndLabels.map(lambda xs: [float(x) for x in xs])
nest = srt.map(lambda l : tuple(l) )
metrics = BinaryClassificationMetrics(nest)
au_ROC = metrics.areaUnderROC
au_PR = metrics.areaUnderPR

In [39]:
au_PR

0.7278760022933515

## Model Tuning

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

## Save the trained model

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, ngram, hashingTF, idf])
pipeline.save("target/tmp/HashingTF_Binary_pipeline")

In [None]:
pipeModel = pipeline.fit(tweet_label)
pipeModel.save("target/tmp/HashingTF_Binary_model")

In [None]:
model.save(sc, "target/tmp/SVMWithSGD_Binary_model")

## Multi-feature training

In [None]:
import re
def search(text, search_word):
    word = r"\W*([\w]+)"
    groups = re.search(r"{}\W*{}{}".format(word, search_word, word), text).groups()
    return list(groups[:1]),list(groups[1:])   

In [None]:
t = "fire erupt quak rock california wine countri"
tuple1 = search(t, "quak")
contents = list(tuple1)
content = [item for sub in contents for item in sub]

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["label", "tokens"],
    outputCol="union_features")
assembledData = assembler.transform(ngrams)

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="ngrams", outputCol="ngrams_numeric").fit(ngrams)
indexed_df = indexer.transform(ngrams)
#indexed_df.drop("bar").show()