# Predict Twitch Live Chat Data Channel
If this notebook, we trained a model with chat data from three different Twitch channels to predict the channel of a chat message.

In [1]:
sc

### Read Saved Chat Data

In [2]:
rdd = sc.textFile("./data-three/twitch-data*/part-00000")
df = spark.read.json(rdd)
df.show()

                                                                                

+--------------+--------------------+--------------------+--------------------+
|       channel|            datetime|             message|            username|
+--------------+--------------------+--------------------+--------------------+
| #easportsfifa|2022-05-27T11:19:...|             Benzema|       denizturcalis|
| #easportsfifa|2022-05-27T11:19:...|             benzema|         leanschmeat|
| #easportsfifa|2022-05-27T11:19:...|             Benzema|          darkoparko|
| #easportsfifa|2022-05-27T11:19:...|             Benzema|           sembirafa|
|    #loltyler1|2022-05-27T11:19:...|                  ez|        arhilelegend|
|    #loltyler1|2022-05-27T11:19:...|                 ???|             grrxves|
|#trainwreckstv|2022-05-27T11:19:...| Ya ya ya zoroBaller|      i_am_jerusalem|
|    #loltyler1|2022-05-27T11:19:...|             !uptime|             geet0ut|
| #easportsfifa|2022-05-27T11:19:...|             benzama|           laksefar1|
| #easportsfifa|2022-05-27T11:19:...|   

## Data Discovery

In [3]:
from pyspark.sql.functions import desc

df.printSchema()

# Show the distribution of each channel data
df.groupby("channel").count().show()

# Show some users that has the most messages
df.groupby("username").count().sort(desc("count")).show()

root
 |-- channel: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- message: string (nullable = true)
 |-- username: string (nullable = true)



                                                                                

+--------------+-----+
|       channel|count|
+--------------+-----+
|    #loltyler1| 3125|
|#trainwreckstv| 1467|
| #easportsfifa| 2984|
+--------------+-----+





+--------------------+-----+
|            username|count|
+--------------------+-----+
|            fossabot|  147|
|            nightbot|   90|
|            audlonlx|   88|
|              rare1_|   77|
|             tichyou|   75|
|                lyod|   68|
|          streamlabs|   59|
|      i_am_jerusalem|   58|
|              xfaraz|   45|
|           elwardian|   35|
|            nardclop|   35|
|         ssterrorman|   34|
|       covidvariant1|   32|
|ronnie_dungeon_ma...|   31|
|    supsupmammothman|   29|
|                l3c7|   26|
|           pumpzera_|   26|
|             soufian|   25|
|      akshan_gaming1|   24|
|       ikhlooody_999|   23|
+--------------------+-----+
only showing top 20 rows



                                                                                

## Train the Model

### Split Data
We are going to shuffle the data first then split into training and test data with 80% being the training data. We need to make sure that our test and train data have data from all three channels.

In [4]:
from pyspark.sql.functions import rand

df = df.orderBy(rand())
train, test = df.randomSplit([0.8, 0.2], seed = 42)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))
test.groupby("channel").count().show()

                                                                                

Training Dataset Count: 6034




Test Dataset Count: 1542




+--------------+-----+
|       channel|count|
+--------------+-----+
|    #loltyler1|  638|
|#trainwreckstv|  303|
| #easportsfifa|  601|
+--------------+-----+



                                                                                

### Categorical Data to Numeric Data
We map channel names to integers to use them in our model

In [5]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="channel", outputCol="label")

string_indexer_model = indexer.fit(train)
display_data = string_indexer_model.transform(train)

                                                                                

### TF-IDF

We are going to use tf-idf to vectorize our message data. We haven't made extra text manipulation as any typos or extra information are valid and valuable for chat data.

In [6]:
from pyspark.ml.feature import IDF, Tokenizer, CountVectorizer

# A tokenizer that converts the input string to lowercase and then splits it by white spaces.
tokenizer = Tokenizer(inputCol="message", outputCol="words")
display_data = tokenizer.transform(display_data)

# Extracts a vocabulary from document collections
vectorizer = CountVectorizer(inputCol='words', outputCol='rawFeatures')
vectorizer_model = vectorizer.fit(display_data)
display_data = vectorizer_model.transform(display_data)

# Compute the Inverse Document Frequency (IDF) given a collection of documents.
idf = IDF(inputCol="rawFeatures", outputCol="idf_features")
idf_model = idf.fit(display_data)
display_data = idf_model.transform(display_data)

                                                                                

We can visualize our feature vector after running the whole preprocessing

In [7]:
from pyspark.ml.linalg import VectorUDT, Vectors
from pyspark.sql.functions import udf
import pandas as pd
import numpy as np

def to_dense(in_vec):
    return Vectors.dense(in_vec.toArray())

to_dense_udf = udf(to_dense, VectorUDT())

# create dense vector
display_data = display_data.withColumn("features", to_dense_udf('idf_features'))

# Let's see how our data is vectorized
display_data_pandas = display_data.toPandas()
display_data_features = pd.DataFrame([np.array(i) for i in display_data_pandas.features], columns=vectorizer_model.vocabulary)
display_data_features

                                                                                

Unnamed: 0,squadnodders,lebronjam,the,Unnamed: 4,gerrard,is,benzema,trikool,i,squadbpr,...,shut,"@al_fart_g,",lazy_eagle10,slept,exist,minions,sk1t,tyleruuuuuu,et,must
0,0.000000,0.000000,0.0,0.000000,0.0,0.0,3.092203,0.000000,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.000000,0.000000,0.0,0.000000,0.0,0.0,3.092203,0.000000,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.000000,0.000000,0.0,0.000000,0.0,0.0,3.092203,0.000000,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.000000,0.000000,0.0,0.000000,0.0,0.0,0.000000,0.000000,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,0.000000,0.000000,0.0,0.000000,0.0,0.0,3.092203,0.000000,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6029,0.000000,0.000000,0.0,0.000000,0.0,0.0,0.000000,3.869049,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6030,6.980791,7.706602,0.0,3.575432,0.0,0.0,0.000000,0.000000,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6031,0.000000,0.000000,0.0,0.000000,0.0,0.0,0.000000,0.000000,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6032,0.000000,0.000000,0.0,0.000000,0.0,0.0,0.000000,0.000000,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


Our token list, these `4392` words are used to create vector for `message` column

In [8]:
vectorizer_model.vocabulary

['squadnodders',
 'lebronjam',
 'the',
 '',
 'gerrard',
 'is',
 'benzema',
 'trikool',
 'i',
 'squadbpr',
 'kekw',
 'bigbrother',
 'eafifaletsgo',
 'tridance',
 'squadkooldance',
 'with',
 'on',
 '!rank',
 'squadw',
 'lul',
 'pain',
 'werner',
 'maldini',
 'a',
 'you',
 'have',
 'to',
 'eafifagg',
 'big',
 't',
 'they',
 'numb',
 'rng',
 'tonka',
 'lp',
 'mount',
 'master',
 'currently',
 'accounts:',
 '💰',
 'eafifatrofeo',
 'this',
 'cams',
 'steven',
 'even',
 'eafifahype',
 '-',
 'can',
 'dont',
 'for',
 'cheating',
 'literally',
 'squadwave',
 'in',
 'gigachad',
 'and',
 'potfriend',
 'omegalul',
 'it',
 'draven',
 'just',
 '140',
 'go',
 'be',
 'that',
 'ayaya',
 '?',
 'yump',
 'karim',
 'he',
 'win',
 'mods',
 'are',
 '4dog',
 'ivern',
 'play',
 'timo',
 'why',
 'gg',
 'me',
 'batchest',
 'u',
 'too',
 'squadl',
 'good',
 'd:',
 'squadg',
 '<message',
 'my',
 'yep',
 'adenwiggle',
 'eafifafgs22',
 'frenetibetano',
 'deleted>',
 'what',
 'chat',
 'how',
 'ggx',
 'huhh',
 'no',
 '-

### Logistic Regression
We are going to use `LogisticRegression` from `pyspark.ml` library. This model supports multinomial logistic (softmax) and binomial logistic regression. Since we have three labels, it's important that our model can support multiple labels.

In [9]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'idf_features', labelCol = 'label', maxIter=10)

### Create Pipeline Model 
This model will help us to fit to the training data and predict test data easily

In [10]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    indexer,
    tokenizer,
    vectorizer,
    idf,
    lr])
pipeline_model = pipeline.fit(train)

22/05/27 18:18:02 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/27 18:18:02 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/05/27 18:18:04 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/05/27 18:18:04 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

## Prediction
`MulticlassClassificationEvaluator` will help us measure how successful we are with F1 Score

In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create evaluator
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Make predicitons for test data
predictions = pipeline_model.transform(test)

# Print evaluation metrics
predictionAndTarget = predictions.select("label", "prediction")
acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
f1_label_0 = evaluatorMulti.evaluate(predictionAndTarget, 
                            {evaluatorMulti.metricName: "fMeasureByLabel", evaluatorMulti.metricLabel:0.0})
f1_label_1 = evaluatorMulti.evaluate(predictionAndTarget, 
                            {evaluatorMulti.metricName: "fMeasureByLabel", evaluatorMulti.metricLabel:1.0})
f1_label_2 = evaluatorMulti.evaluate(predictionAndTarget, 
                            {evaluatorMulti.metricName: "fMeasureByLabel", evaluatorMulti.metricLabel:2.0})
weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})

print(f"F1 Score for #loltyler1: {f1_label_0}")
print(f"F1 Score for #easportsfifa: {f1_label_1}")
print(f"F1 Score for #trainwreckstv: {f1_label_2}\n")
print(f"Accuracy: {acc}")
print(f"F1 Score: {f1}")
print(f"Weighted Precision: {weightedPrecision}")
print(f"Weighted Recall: {weightedRecall}")



F1 Score for #loltyler1: 0.8790322580645161
F1 Score for #easportsfifa: 0.8514851485148515
F1 Score for #trainwreckstv: 0.7148703956343793

Accuracy: 0.8300907911802854
F1 Score: 0.8360381872761375
Weighted Precision: 0.8557769169864429
Weighted Recall: 0.8300907911802854




Although we don't have a perfectly balanced data, we have a well performant model. Let's investigate the results more.

### Revert Prediction Indexes to Channel Labels
We are going to map predicted channel indexes back to channel names

In [12]:
from pyspark.ml.feature import IndexToString

index_to_string_model = IndexToString(inputCol="prediction", outputCol="prediction_channel", labels=string_indexer_model.labels)
predictions = index_to_string_model.transform(predictions)
predictions.orderBy(rand()).select('message', 'label', 'prediction','channel', "prediction_channel").show()



+--------------------+-----+----------+--------------+------------------+
|             message|label|prediction|       channel|prediction_channel|
+--------------------+-----+----------+--------------+------------------+
|           so BM man|  2.0|       1.0|#trainwreckstv|     #easportsfifa|
|               !rank|  0.0|       0.0|    #loltyler1|        #loltyler1|
|     Love Ronaldinho|  1.0|       1.0| #easportsfifa|     #easportsfifa|
|             gerrand|  1.0|       2.0| #easportsfifa|    #trainwreckstv|
|             future*|  2.0|       2.0|#trainwreckstv|    #trainwreckstv|
|squadG squadSquad...|  2.0|       2.0|#trainwreckstv|    #trainwreckstv|
|      holding breath|  2.0|       2.0|#trainwreckstv|    #trainwreckstv|
|fifabi1OHerz fifa...|  1.0|       1.0| #easportsfifa|     #easportsfifa|
|@audlonlx audiooo...|  2.0|       2.0|#trainwreckstv|    #trainwreckstv|
|         Timo Werner|  1.0|       1.0| #easportsfifa|     #easportsfifa|
|          FrankerZ ?|  0.0|       0.0

                                                                                

## Save the Pipeline Model
We are going to use this model for our predictions

In [13]:
pipeline_model.write().overwrite().save("pipeline_model")

                                                                                