In [181]:
import findspark
findspark.init()

import sys
assert sys.version_info >= (3, 5) # make sure we have Python 3.5+
import pandas as pd
import numpy as np
import re
import string
from textblob import TextBlob

from pyspark.sql import SparkSession, functions, types
spark = SparkSession.builder.appName('Emotion prediction').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
assert spark.version >= '2.3' # make sure we have Spark 2.3+
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [155]:
#!pip install textblob

In [156]:
inputs = '/Users/syedikram/Documents/BigData-733/cryptointel/news_cc/btcnews.csv'
data = spark.read.csv(inputs,header=True, mode="DROPMALFORMED")


In [157]:
data = data.withColumn("title_body", concat_ws(' ', data.title, data.body))
lower_udf = udf(lambda x: " ".join(x.lower() for x in x.split()))
data = data.withColumn("title_body", lower_udf(data['title_body']))


In [158]:
def remove_punct(text):
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    nopunct = regex.sub(" ", text)
    return nopunct

In [159]:
punct_remover = udf(lambda x: remove_punct(x))

In [160]:
data = data.withColumn("title_body", punct_remover(data['title_body']))


In [161]:
spaces_udf = udf(lambda x: " ".join(x.split()))
data = data.withColumn("title_body", spaces_udf(data['title_body']))


In [162]:
sentiment_udf = udf(lambda x: 1 if TextBlob(str(x)).sentiment.polarity > 0 else (0 if TextBlob(str(x)).sentiment.polarity == 0 else -1))

In [163]:
data = data.withColumn("sentiment", sentiment_udf(data['title_body']))
data = data.select('title_body','sentiment')

In [164]:
#Tokenizing and Vectorizing
tok = Tokenizer(inputCol="title_body", outputCol="words")
tokenized = tok.transform(data)

In [165]:
stopword_rm = StopWordsRemover(inputCol='words', outputCol='words_rm')
rm_tokenized = stopword_rm.transform(tokenized)

In [166]:
cv = CountVectorizer(inputCol='words_rm', outputCol='tf')
cvModel = cv.fit(rm_tokenized)
count_vectorized = cvModel.transform(rm_tokenized)

In [167]:
idf_ngram = IDF().setInputCol('tf').setOutputCol('tfidf')
tfidfModel_ngram = idf_ngram.fit(count_vectorized)
tfidf_df = tfidfModel_ngram.transform(count_vectorized)

In [168]:
indexer = StringIndexer(inputCol="sentiment", outputCol="sentimentIndex")
indexed = indexer.fit(data).transform(data)
indexed.show()

+--------------------+---------+--------------+
|          title_body|sentiment|sentimentIndex|
+--------------------+---------+--------------+
|quantum computing...|        1|           0.0|
|bitcoin trading p...|        1|           0.0|
|crypto analyst as...|        1|           0.0|
|bitcoin btc price...|        1|           0.0|
|bitcoin sv bsv pr...|       -1|           1.0|
|cboe halts bitcoi...|       -1|           1.0|
|ripple xrp price ...|       -1|           1.0|
|ripple xrp and ca...|        1|           0.0|
|bitcoin price ana...|        1|           0.0|
|bitcoin ethereum ...|       -1|           1.0|
|craig wright rage...|        1|           0.0|
|million in weeks ...|        0|           2.0|
|bitcoin btc becom...|        1|           0.0|
|stablecoins are n...|        1|           0.0|
|winklevoss capita...|        1|           0.0|
|bitcoin btc price...|        1|           0.0|
|binance announces...|        0|           2.0|
|canadian regulato...|       -1|        

In [169]:
splits = tfidf_df.randomSplit([0.8,0.2],seed=100)
train = splits[0]
val = splits[1]

In [170]:
train.show()

+--------------------+---------+--------------------+--------------------+--------------------+--------------------+
|          title_body|sentiment|               words|            words_rm|                  tf|               tfidf|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+
|a bad week for th...|       -1|[a, bad, week, fo...|[bad, week, crypt...|(30169,[0,4,6,7,1...|(30169,[0,4,6,7,1...|
|a better bitcoin ...|        1|[a, better, bitco...|[better, bitcoin,...|(30169,[0,1,2,3,1...|(30169,[0,1,2,3,1...|
|a bitcoin backed ...|        1|[a, bitcoin, back...|[bitcoin, backed,...|(30169,[0,10,13,2...|(30169,[0,10,13,2...|
|a bitcoin etf is ...|        1|[a, bitcoin, etf,...|[bitcoin, etf, ‘v...|(30169,[0,1,2,3,2...|(30169,[0,1,2,3,2...|
|a bitcoin etf – o...|        1|[a, bitcoin, etf,...|[bitcoin, etf, –,...|(30169,[0,1,2,3,1...|(30169,[0,1,2,3,1...|
|a bitcoin rat is ...|       -1|[a, bitcoin, rat,...|[bitcoin, r

In [174]:
hm_assembler = VectorAssembler(inputCols=[ "tfidf"], outputCol="features")
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0,labelCol="sentimentIndex",featuresCol = "features")
hm_pipeline = Pipeline(stages=[hm_assembler, indexer, lr])

In [175]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.3]).addGrid(lr.elasticNetParam, [0.0, 0.1]).build()
crossval = CrossValidator(estimator=hm_pipeline,estimatorParamMaps=paramGrid,\
            evaluator=MulticlassClassificationEvaluator(labelCol="sentimentIndex", predictionCol="prediction",metricName="accuracy"),numFolds=5)

model = crossval.fit(train)
prediction_train = model.transform(val)

In [176]:
evaluator = MulticlassClassificationEvaluator(labelCol="sentimentIndex", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(prediction_train)
print("Model Accuracy = " + str(accuracy))

Model Accuracy = 0.8192949907235622


In [186]:
converter = IndexToString(inputCol="sentimentIndex", outputCol="originalSentiment")
converted = converter.transform(indexed)
converted.select('title_body','sentiment','originalSentiment').show(10)

+--------------------+---------+-----------------+
|          title_body|sentiment|originalSentiment|
+--------------------+---------+-----------------+
|quantum computing...|        1|                1|
|bitcoin trading p...|        1|                1|
|crypto analyst as...|        1|                1|
|bitcoin btc price...|        1|                1|
|bitcoin sv bsv pr...|       -1|               -1|
|cboe halts bitcoi...|       -1|               -1|
|ripple xrp price ...|       -1|               -1|
|ripple xrp and ca...|        1|                1|
|bitcoin price ana...|        1|                1|
|bitcoin ethereum ...|       -1|               -1|
+--------------------+---------+-----------------+
only showing top 10 rows

