# Assignment 3. Text mining using Spark streaming

## Group 23 

## Collecting data from Spark streaming

In [1]:
#importing the necessary libraries
'''import pandas as pd
import os
import glob
from tqdm import tqdm'''

'import pandas as pd\nimport os\nimport glob\nfrom tqdm import tqdm'

In [2]:
#reading the collected data from multiple folders and merging into one data frame
'''data = pd.DataFrame()
folder_list = os.listdir("C:/Users/altyn/Desktop/spark/")
for folder in tqdm(folder_list):
    os.chdir(os.path.join("C:/Users/altyn/Desktop/spark/", folder))
    files = glob.glob('*'.format('json'))
    files.pop(-1)
    for file in files:
        df = pd.read_json(file, lines=True)
        data=pd.concat([data, df])

data = data.reset_index()
data = data.drop('index', axis=1)'''

'data = pd.DataFrame()\nfolder_list = os.listdir("C:/Users/altyn/Desktop/spark/")\nfor folder in tqdm(folder_list):\n    os.chdir(os.path.join("C:/Users/altyn/Desktop/spark/", folder))\n    files = glob.glob(\'*\'.format(\'json\'))\n    files.pop(-1)\n    for file in files:\n        df = pd.read_json(file, lines=True)\n        data=pd.concat([data, df])\n\ndata = data.reset_index()\ndata = data.drop(\'index\', axis=1)'

In [3]:
#splitting the data into training and testing samples.
'''from sklearn.model_selection import train_test_split

X = data.drop('channel', axis=1)
y = data['channel']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)'''

#saving data to csv files
'''X_train['channel'] = y_train
X_test['channel'] = y_test

X_train.to_csv('data/train.csv')
X_test.to_csv('data/test.csv')'''

"X_train['channel'] = y_train\nX_test['channel'] = y_test\n\nX_train.to_csv('data/train.csv')\nX_test.to_csv('data/test.csv')"

Both files with training and testing samples were uploaded to github for convenience.

## Building the model

As the dataset is large, we are going to use spark DataFrame type to accelerate the calculations. Our data has 4 columns:

 - datetime (the moment the message was sent to the chat)

 - username

 - message

 - channel

Below we can see the first 20 rows of the DataFrame

In [4]:
#reading the file and converting to spark DataFrame
import pandas as pd
import warnings
warnings.filterwarnings("ignore")

train_raw = pd.read_csv(r"data\train.csv", \
                        index_col=0).reset_index(drop=True)
train_df = spark.createDataFrame(train_raw.astype(str))
train_df.show()

+--------+--------------------+--------------------+----------+
|datetime|            username|             message|   channel|
+--------+--------------------+--------------------+----------+
| 33:57.5|      thenormalbeast|             hahahah|#asmongold|
| 04:12.0|            orckykat|amber slaps girls...|#asmongold|
| 59:14.5|           intranett| HAHAHAHAHHAHAHAHAHA|#asmongold|
| 28:37.0|               erixp|     BROLLAN????????|      #pgl|
| 09:41.0|        cerensdipity|It’s cheaper to p...|#asmongold|
| 14:53.8|               thied|                  no|#asmongold|
| 29:54.4|          poggz__one|     somebody farted|#asmongold|
| 19:45.8|iknowanythingabou...|               Obama|#asmongold|
| 57:33.1|            madjelly|depp stole amber'...|#asmongold|
| 54:56.3|            klagitsz|chr1st1an1337:  D...|      #pgl|
| 41:36.0|      humanlawnmower|                KEKW|#asmongold|
| 34:17.2|             setcrow|           MR WRIGHT|#asmongold|
| 42:49.6|              voryen|All to Ga

The column of interest is `message` which contains the text of the comments. We need to preprocess it and covert into features we can use for modelling. The main tool we are using is `pyspark.ml`. We faced some problems since it differs from the `sklearn`. 

In [5]:
#importing necessary classes from pyspark.ml for preprocessing and building the model
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, LogisticRegressionSummary
from pyspark.ml.feature import StopWordsRemover, Tokenizer, CountVectorizer, Binarizer, StringIndexer, IndexToString
from pyspark.sql.functions import lower, regexp_replace, col
from nltk.corpus import stopwords
eng_stopwords = stopwords.words('english')

First of all, the text is cleaned out of the unnecessary unformation like numbers, symbols, usernames (when the message is a response to the previous message). 

In [6]:
def clean_text(c):
    c = lower(c)
    c = regexp_replace(c, "(https?\://)\S+", "") # Remove links
    c = regexp_replace(c, "(\\n)|\n|\r|\t", "") # Remove CR, tab, and LR
    c = regexp_replace(c, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # Remove dates
    c = regexp_replace(c, "@([A-Za-z0-9_]+)", "") # Remove usernames
    c = regexp_replace(c, "[0-9]", "") # Remove numbers
    c = regexp_replace(c, "\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # Remove symbols
    return c

train_df = train_df.withColumn("message_clean", clean_text(col("message")))
train_df.show(5)

+--------+--------------+--------------------+----------+--------------------+
|datetime|      username|             message|   channel|       message_clean|
+--------+--------------+--------------------+----------+--------------------+
| 33:57.5|thenormalbeast|             hahahah|#asmongold|             hahahah|
| 04:12.0|      orckykat|amber slaps girls...|#asmongold|amber slaps girls...|
| 59:14.5|     intranett| HAHAHAHAHHAHAHAHAHA|#asmongold| hahahahahhahahahaha|
| 28:37.0|         erixp|     BROLLAN????????|      #pgl|             brollan|
| 09:41.0|  cerensdipity|It’s cheaper to p...|#asmongold|it’s cheaper to p...|
+--------+--------------+--------------------+----------+--------------------+
only showing top 5 rows



From the first 5 rows, it is visible that the text is successfully cleaned.

Now we can build a pipeline which transforms the text into valid features:

1. encoding the target channel value into binary variable

2. tokenizing the message into separate words

3. removing the stop words which are meaningless

4. transforming the words into vectors. We do not take words which are met less than 20 comments in the dataset, otherwise we end up with too many features.

5. as we do not care how often the word is repeated in the message, we transform them into binary vectors

6. finally building the logistic regression

7. decoding the binary target variable into the names of the channels for prediction column

8. building a pipeline for convenience

In [7]:
# Configure an ML pipeline, which consists of 6 stages:

stringind = StringIndexer(inputCol="channel", outputCol="channel_bin")
tokenizer = Tokenizer(inputCol="message_clean", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="words_clean", stopWords= eng_stopwords)
cv = CountVectorizer(inputCol="words_clean", outputCol="features", binary=True, minDF = 20)
lr = LogisticRegression(featuresCol="features", labelCol = "channel_bin")
indtostr = IndexToString(inputCol="prediction", outputCol="pred_channel", labels = ['#asmongold', '#pgl'])

pipeline = Pipeline(stages=[stringind,tokenizer, remover,cv, lr, indtostr])

#fitting the pipeline to training data set
model = pipeline.fit(train_df)
prediction = model.transform(train_df)

In [8]:
#analyzing the corfficients
coeffs = pd.DataFrame([model.stages[3].vocabulary, model.stages[4].coefficients]).T
coeffs.columns = ['word','weight']
coeffs.sort_values(by='weight', ascending = True).head(20)

Unnamed: 0,word,weight
2943,festival,-33.223894
1552,sponsored,-23.564451
3119,joining,-21.089192
2683,asmonweird,-20.886121
2647,gotcha,-20.674827
2696,mats,-20.362149
2784,bruise,-19.231872
320,judge,-18.45312
842,otk,-18.250787
746,jd,-17.729267


In [9]:
coeffs.sort_values(by='weight', ascending = False).head(20)

Unnamed: 0,word,weight
3103,library,87.844072
2514,xantares,43.479954
2433,stockholm,37.353896
2542,numerous,31.405646
2915,here’s,28.096239
1123,nexa,26.410098
1814,vertigo,25.501868
1766,brazilians,23.437427
1598,world’s,22.315404
646,hltv,20.056768


In [10]:
prediction.select(['message', 'channel','pred_channel']).show(10)

+--------------------+----------+------------+
|             message|   channel|pred_channel|
+--------------------+----------+------------+
|             hahahah|#asmongold|  #asmongold|
|amber slaps girls...|#asmongold|  #asmongold|
| HAHAHAHAHHAHAHAHAHA|#asmongold|  #asmongold|
|     BROLLAN????????|      #pgl|        #pgl|
|It’s cheaper to p...|#asmongold|  #asmongold|
|                  no|#asmongold|  #asmongold|
|     somebody farted|#asmongold|  #asmongold|
|               Obama|#asmongold|  #asmongold|
|depp stole amber'...|#asmongold|  #asmongold|
|chr1st1an1337:  D...|      #pgl|        #pgl|
+--------------------+----------+------------+
only showing top 10 rows



In [11]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Make prediction
predictionAndTarget = prediction.select("channel_bin", "prediction")

# Create evaluators
metrics_binary = BinaryClassificationMetrics(predictionAndTarget.rdd.map(tuple))

auc = metrics_binary.areaUnderROC
auc

0.8573022907468333

In [12]:
test_raw = pd.read_csv(r"data\test.csv", index_col=0).\
                        reset_index(drop=True)
test_df = spark.createDataFrame(test_raw.astype(str))
test_df = test_df.withColumn("message_clean", clean_text(col("message")))
test_pred = model.transform(test_df)

test_pred.select(['message', 'channel','pred_channel']).show(10)

+--------------------+----------+------------+
|             message|   channel|pred_channel|
+--------------------+----------+------------+
|KappaPride IM JIG...|      #pgl|  #asmongold|
|           gachiBASS|#asmongold|  #asmongold|
|I only punched hi...|#asmongold|  #asmongold|
|                KEKW|#asmongold|  #asmongold|
|Yo I'd be gay for...|#asmongold|  #asmongold|
|           YEBIII :D|      #pgl|  #asmongold|
|          PauseChamp|#asmongold|  #asmongold|
|   HAHAAHHAHAHHAHAAH|      #pgl|  #asmongold|
|    ok sure dude lol|#asmongold|  #asmongold|
|CANT HEAR ANY ING...|      #pgl|        #pgl|
+--------------------+----------+------------+
only showing top 10 rows



In [13]:
# Make prediction
predictionAndTarget = test_pred.select("channel_bin", "prediction")

# Create evaluator
metrics_binary = BinaryClassificationMetrics(predictionAndTarget.rdd.map(tuple))

#calculate auc for testing sample
auc = metrics_binary.areaUnderROC
auc

0.8520920705019877

In [14]:
model.save("twitch_model.model")

## Predicting the stream data

In [None]:
from pyspark.ml import PipelineModel
model = PipelineModel.load("twitch_model.model")

In [None]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [None]:
sc

In [None]:
spark

In [None]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [None]:
globals()['models_loaded'] = False
globals()['my_model'] = None


def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    df = df.withColumn("message_clean", clean_text(col("message")))
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = model # Replace '***' with:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    df_result = globals()['my_model'].transform(df)
    df_result.select(['datetime', 'username','message', 'channel','pred_channel']).show()

In [None]:
ssc = StreamingContext(sc, 10)

In [None]:
lines = ssc.socketTextStream("localhost", 8080)
lines.foreachRDD(process)

In [None]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

In [None]:
ssc_t.stop()