In [1]:
path1 = "/Users/liyitong/Desktop/BigData/checkpoint"
path2 = "/Users/liyitong/Desktop/BigData/dataset"
main = "/Users/liyitong/Desktop/BigData/spark/notebooks/"
path3 = "/Users/liyitong/Desktop/BigData/checkpoint_rf"
path4 = "/Users/liyitong/Desktop/BigData/dataset_rf"

In [2]:
sc

In [3]:
spark

In [4]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)
        

# Save streaming data

In [5]:
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

In [6]:
socketDF = spark.readStream.format("socket").option("host", "localhost").option("port", 8080).load()
socketDF.printSchema()

22/05/27 12:27:04 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


root
 |-- value: string (nullable = true)



In [7]:
query = socketDF \
        .writeStream\
        .outputMode("append")\
        .format("json")\
        .option('checkpointLocation',path1)\
        .option('path',path2)\
        .start()
query.awaitTermination(timeout=600)
query.stop()

22/05/27 12:27:16 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


#  Load data

In [8]:
dataset = spark.read.json(path2)

In [163]:
schema = StructType(
    [
        StructField('datetime', StringType(), True),
        StructField('channel', StringType(), True),
        StructField('username', StringType(), True),
        StructField('message', StringType(), True)
    ]
)

dataset = dataset.withColumn("value", from_json("value", schema))\
    .select(col('value.*'))

In [164]:
print(len(dataset.columns), dataset.count())

4 1749


In [165]:
dataset.show()

+--------------------+------------+-----------------+--------------------+
|            datetime|     channel|         username|             message|
+--------------------+------------+-----------------+--------------------+
|2022-05-20T23:34:...|#cohhcarnage|fluorescentpickle|@georgreaver This...|
|2022-05-20T23:34:...|     #quin69|       punishermk|           PainChamp|
|2022-05-20T23:34:...|#cohhcarnage|      n1njac00kie|is there a perma ...|
|2022-05-20T23:34:...|     #quin69|           textue|that guy is a ret...|
|2022-05-20T23:34:...|     #quin69|     friendlyfyre|WhomstInquired Wh...|
|2022-05-20T23:34:...|     #quin69|        lojackatk|               ICANT|
|2022-05-20T23:34:...|     #quin69|            affx5|triggered libtard...|
|2022-05-20T23:34:...|     #quin69|         tyree372|he loves to eat b...|
|2022-05-20T23:34:...|     #quin69|    demonstrate86|I work at Microso...|
|2022-05-20T23:34:...|     #quin69|          omgacow|That guy is proof...|
|2022-05-20T23:34:...|   

In [166]:
train, test = dataset.randomSplit([0.7, 0.3], seed=12345)

# Classification Model

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,FeatureHasher
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## Cross-validation logistic regression model

In [168]:
# Configure an ML pipeline.
indexer = StringIndexer(inputCol="channel", outputCol="label")
hasher = FeatureHasher(inputCols=["message", "username"],
                       outputCol="indexfeatures")
scaler = StandardScaler(inputCol="indexfeatures", outputCol="features",
                        withStd=True, withMean=True)
lr = LogisticRegression(maxIter=20)
pipeline = Pipeline(stages=[indexer, hasher, scaler, lr])

In [169]:
paramGrid = ParamGridBuilder() \
    .addGrid(hasher.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)  # use 3+ folds in practice

In [170]:
# Run cross-validation, and choose the best set of parameters.
lrcvModel = crossval.fit(train)

                                                                                

In [171]:
# Save the best model
lrcvPath = main + "lrcvModel"
lrcvModel.bestModel.write().overwrite().save(lrcvPath)

## Naive Bayes 

In [9]:
from pyspark.ml.classification import NaiveBayes

In [22]:
indexer = StringIndexer(inputCol="channel", outputCol="label")
hasher = FeatureHasher(inputCols=["message", "username"],
                       outputCol="features")
nb = NaiveBayes(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[indexer, hasher, nb])

In [23]:
# Create ParamGrid for Cross Validation
nbparamGrid = ParamGridBuilder()\
               .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])\
               .build()

In [24]:
# Create 3-fold CrossValidator
nbcv = CrossValidator(estimator = pipeline,
                    estimatorParamMaps = nbparamGrid,
                    evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

In [25]:
nbcvModel = nbcv.fit(train)

22/05/26 19:53:02 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:06 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:09 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:12 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:15 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:18 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:22 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:25 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:27 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:30 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:32 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/05/26 19:53:34 WARN DAGScheduler: Broadcasting larg

In [26]:
# Save the best model
nbcvPath = main + "nbcvModel"
nbcvModel.bestModel.write().overwrite().save(nbcvPath)

22/05/26 19:55:52 WARN TaskSetManager: Stage 170 contains a task of very large size (4185 KiB). The maximum recommended task size is 1000 KiB.


## Linear Support Vector Machines ---- Binary Classification

In [52]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [53]:
indexer = StringIndexer(inputCol="channel", outputCol="label")
hasher = FeatureHasher(inputCols=["message", "username"],
                       outputCol="features")
lsvc = LinearSVC(maxIter=10, labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[indexer, hasher, lsvc])

In [54]:
# Create ParamGrid for Cross Validation
paramGrid = ParamGridBuilder()\
               .addGrid(lsvc.regParam, [0.1, 0.01])\
               .build()
cvlsvc = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)  # use 3+ folds in practice

In [55]:
# Run cross-validation, and choose the best set of parameters.
cvlsvcModel = cvlsvc.fit(train)

22/05/21 00:26:39 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/05/21 00:26:49 WARN BlockManager: Asked to remove block broadcast_2620, which does not exist
22/05/21 00:27:01 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/05/21 00:27:27 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/05/21 00:27:45 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/05/21 00:28:13 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/05/21 00:28:33 WARN BlockManager: Asked to remove block broadcast_3125, which does not exist
22/05/21 00:28:33 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/05/21 00:28:42 WARN BlockManager: Asked to remove block broadcast_3161, which does not exist


In [56]:
# Save the best model
cvlsvcPath = main + "cvlsvcModel"
cvlsvcModel.bestModel.write().overwrite().save(cvlsvcPath)

22/05/21 00:29:33 WARN TaskSetManager: Stage 2761 contains a task of very large size (2094 KiB). The maximum recommended task size is 1000 KiB.


# Test accuracy

## Logistic Classification

In [28]:
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [179]:
# read pickled model via pipeline api
lrcv = PipelineModel.load(lrcvPath)

In [180]:
predictions = lrcv.transform(test)

In [181]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
Accuracy =  evaluator.evaluate(predictions)
print("Accuracy:" + str(Accuracy))



Accuracy:0.8635153990552861




## Naive Bayes

In [29]:
nbcv = PipelineModel.load(nbcvPath)

                                                                                

In [31]:
nbpredictions = nbcv.transform(test)

In [32]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
nbevaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Accuracy:', nbevaluator.evaluate(nbpredictions))

22/05/26 19:56:16 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB

Accuracy: 0.8300804785677398




## Linear Support Vector Machine

In [65]:
cvlsvc = PipelineModel.load(cvlsvcPath)

In [66]:
cvlsvcpredictions = cvlsvc.transform(test)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Accuracy:', evaluator.evaluate(cvlsvcpredictions))

22/05/21 00:30:38 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB

Accuracy: 0.9378240217656277




# Random Forest

We used a small dataset to estimate random forest model since the memory space is not enough

## Load data

In [9]:
socketDF = spark.readStream.format("socket").option("host", "localhost").option("port", 8080).load()
socketDF.printSchema()

root
 |-- value: string (nullable = true)



22/05/27 12:29:31 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [10]:
query = socketDF \
        .writeStream\
        .outputMode("append")\
        .format("json")\
        .option('checkpointLocation',path3)\
        .option('path',path4)\
        .start()
query.awaitTermination(timeout=300)
query.stop()

22/05/27 12:29:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [11]:
df = spark.read.json(path4)

In [16]:
schema = StructType(
    [
        StructField('datetime', StringType(), True),
        StructField('channel', StringType(), True),
        StructField('username', StringType(), True),
        StructField('message', StringType(), True)
    ]
)

df = df.withColumn("value", from_json("value", schema))\
    .select(col('value.*'))

In [17]:
print(len(df.columns), df.count())



4 1688


                                                                                

In [20]:
rf_train, rf_test= df.randomSplit([0.5, 0.5], seed=12345)

In [21]:
rf_train.count()

856

## Train a model

In [11]:
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier

In [14]:
# Configure an ML pipeline.
indexer = StringIndexer(inputCol="channel", outputCol="label")
#indexer = StringIndexer(inputCol="channel", outputCol="label")
hasher = FeatureHasher(inputCols=["message", "username"],
                       outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[indexer, hasher, rf])

In [15]:
# Train model with Training Data
rfModel = pipeline.fit(rf_train)

22/05/26 19:40:30 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
22/05/26 19:40:36 WARN DAGScheduler: Broadcasting large task binary with size 1034.7 KiB
22/05/26 19:40:37 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/05/26 19:40:41 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/05/26 19:40:44 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/05/26 19:40:46 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/05/26 19:40:48 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                

In [20]:
# Save the best model
rfPath = main + "rfModel"
rfModel.write().overwrite().save(rfPath)

## Test accuracy

In [22]:
rf = PipelineModel.load(rfPath)
rfpredictions = rf.transform(rf_test)
rfevaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Accuracy:', rfevaluator.evaluate(rfpredictions))

[Stage 45:>                                                         (0 + 1) / 1]                                                                                

Accuracy: 0.5711333579404357




# Deployed Predicting

## Logistic classification

In [67]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [68]:
globals()['models_loaded'] = False
globals()['my_model'] = None

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load(lrcvPath)
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    df_result = globals()['my_model'].transform(df)
    df_result.select("channel","datetime","message","username","label","prediction").show()

In [69]:
ssc = StreamingContext(sc, 10)

In [70]:
lines = ssc.socketTextStream("localhost", 8080)
lines.foreachRDD(process)

In [71]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

22/05/21 00:31:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:31:28 WARN BlockManager: Block input-0-1653085888200 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:31:29 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:31:29 WARN BlockManager: Block input-0-1653085889200 replicated to only 0 peer(s) instead of 1 peers
                                                                                

+-------+--------------------+--------------------+--------------+
|channel|            datetime|             message|      username|
+-------+--------------------+--------------------+--------------+
| #lirik|2022-05-20T22:31:...|ratJAM ratJAM ratJAM|     elfatbear|
| #lirik|2022-05-20T22:31:...|NomuJem xar2EDM  ...|        aur1so|
| #lirik|2022-05-20T22:31:...|         pepeFASTJAM|       mahdewd|
| #lirik|2022-05-20T22:31:...|song is: utopia b...|superbirdy2037|
|#quin69|2022-05-20T22:31:...|ACTION Join the ...|streamelements|
| #lirik|2022-05-20T22:31:...|              docPls|       xaolinn|
+-------+--------------------+--------------------+--------------+



22/05/21 00:31:30 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:31:30 WARN BlockManager: Block input-0-1653085890200 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:31:31 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:31:31 WARN BlockManager: Block input-0-1653085891200 replicated to only 0 peer(s) instead of 1 peers


+-------+--------------------+--------------------+--------------+-----+----------+
|channel|            datetime|             message|      username|label|prediction|
+-------+--------------------+--------------------+--------------+-----+----------+
| #lirik|2022-05-20T22:31:...|ratJAM ratJAM ratJAM|     elfatbear|  0.0|       0.0|
| #lirik|2022-05-20T22:31:...|NomuJem xar2EDM  ...|        aur1so|  0.0|       0.0|
| #lirik|2022-05-20T22:31:...|         pepeFASTJAM|       mahdewd|  0.0|       1.0|
| #lirik|2022-05-20T22:31:...|song is: utopia b...|superbirdy2037|  0.0|       0.0|
|#quin69|2022-05-20T22:31:...|ACTION Join the ...|streamelements|  1.0|       1.0|
| #lirik|2022-05-20T22:31:...|              docPls|       xaolinn|  0.0|       1.0|
+-------+--------------------+--------------------+--------------+-----+----------+



22/05/21 00:31:32 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:31:32 WARN BlockManager: Block input-0-1653085892200 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:31:33 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:31:33 WARN BlockManager: Block input-0-1653085893200 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:31:34 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:31:34 WARN BlockManager: Block input-0-1653085894200 replicated to only 0 peer(s) instead of 1 peers


In [72]:
ssc_t.stop()

22/05/21 00:31:35 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:31:35 WARN BlockManager: Block input-0-1653085895200 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:31:35 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
	at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.base/java.io.BufferedReader.readLin

----- Stopping... this may take a few seconds -----


Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
	at java.base/java.lang.Thread.sleep(Native Method)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.$anonfun$restartReceiver$1(ReceiverSupervisor.scala:196)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.ja

+-------+--------------------+--------------------+----------------+
|channel|            datetime|             message|        username|
+-------+--------------------+--------------------+----------------+
| #lirik|2022-05-20T22:31:...|              docPls|          xxtayo|
| #lirik|2022-05-20T22:31:...|              catJAM|        nuilzero|
| #lirik|2022-05-20T22:31:...|                LGen|          ninput|
| #lirik|2022-05-20T22:31:...|             CatsJAM|        rotator1|
| #lirik|2022-05-20T22:31:...|              catJAM|         sikefox|
| #lirik|2022-05-20T22:31:...|         MILKMANRAVE|  originaljacket|
| #lirik|2022-05-20T22:31:...|         MILKMANRAVE|          crioos|
| #lirik|2022-05-20T22:31:...|djFR xar2EDM FRpl...|        mrbeannm|
| #lirik|2022-05-20T22:31:...|              docPls|           lctr_|
| #lirik|2022-05-20T22:31:...|           rabbitJAM|       kocheng11|
| #lirik|2022-05-20T22:31:...|pepeD pepeD pepeD...|       mistletea|
| #lirik|2022-05-20T22:31:...|ratJ

## Naive Bayes

In [73]:
globals()['models_loaded'] = False
globals()['my_model'] = None

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load(nbcvPath)
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    df_result = globals()['my_model'].transform(df)
    df_result.select("channel","datetime","message","username","label","prediction").show()

In [74]:
ssc = StreamingContext(sc, 10)

In [75]:
lines = ssc.socketTextStream("localhost", 8080)
lines.foreachRDD(process)

In [76]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

22/05/21 00:32:10 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:32:10 WARN BlockManager: Block input-0-1653085930400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:32:11 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:32:11 WARN BlockManager: Block input-0-1653085931400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:32:12 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:32:12 WARN BlockManager: Block input-0-1653085932400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:32:13 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:32:13 WARN BlockManager: Block input-0-1653085933400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:32:14 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:32:14 WARN BlockManager: Block input-0-1653085934400 replicated to

+-------+--------------------+--------------------+----------------+
|channel|            datetime|             message|        username|
+-------+--------------------+--------------------+----------------+
|#quin69|2022-05-20T22:32:...| 3mil is only ignite|        domdomuh|
| #lirik|2022-05-20T22:32:...|              jumpFR|       alejan2ro|
| #lirik|2022-05-20T22:32:...|xar2EDM docPls VI...|          scluse|
|#quin69|2022-05-20T22:32:...|     it will be less|verygoodplayerow|
| #lirik|2022-05-20T22:32:...|FRpls2 FRpls Jamm...|      marcuswolf|
| #lirik|2022-05-20T22:32:...|               song?|          daz__7|
|#quin69|2022-05-20T22:32:...|i wish i could mu...|    dunkeysballs|
| #lirik|2022-05-20T22:32:...|            pandaPls|          moasat|
| #lirik|2022-05-20T22:32:...|             neonPls|        nuilzero|
| #lirik|2022-05-20T22:32:...|I need some heali...|           nrnee|
|#quin69|2022-05-20T22:32:...|       Meta cuck YEP|          z_neex|
| #lirik|2022-05-20T22:32:...|    

22/05/21 00:32:20 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:32:20 WARN BlockManager: Block input-0-1653085940400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:32:21 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
22/05/21 00:32:21 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
22/05/21 00:32:21 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


+-------+--------------------+--------------------+----------------+-----+----------+
|channel|            datetime|             message|        username|label|prediction|
+-------+--------------------+--------------------+----------------+-----+----------+
|#quin69|2022-05-20T22:32:...| 3mil is only ignite|        domdomuh|  1.0|       0.0|
| #lirik|2022-05-20T22:32:...|              jumpFR|       alejan2ro|  0.0|       0.0|
| #lirik|2022-05-20T22:32:...|xar2EDM docPls VI...|          scluse|  0.0|       0.0|
|#quin69|2022-05-20T22:32:...|     it will be less|verygoodplayerow|  1.0|       1.0|
| #lirik|2022-05-20T22:32:...|FRpls2 FRpls Jamm...|      marcuswolf|  0.0|       0.0|
| #lirik|2022-05-20T22:32:...|               song?|          daz__7|  0.0|       0.0|
|#quin69|2022-05-20T22:32:...|i wish i could mu...|    dunkeysballs|  1.0|       0.0|
| #lirik|2022-05-20T22:32:...|            pandaPls|          moasat|  0.0|       0.0|
| #lirik|2022-05-20T22:32:...|             neonPls|   

22/05/21 00:32:21 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:32:21 WARN BlockManager: Block input-0-1653085941400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:32:22 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:32:22 WARN BlockManager: Block input-0-1653085942400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:32:24 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:32:24 WARN BlockManager: Block input-0-1653085944600 replicated to only 0 peer(s) instead of 1 peers


In [77]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----


22/05/21 00:32:25 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
	at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:121)
	at org.a

+-------+--------------------+--------------------+--------------+
|channel|            datetime|             message|      username|
+-------+--------------------+--------------------+--------------+
| #lirik|2022-05-20T22:32:...|             xar2EDM|   dinosaure56|
| #lirik|2022-05-20T22:32:...|xar2EDM docPls AD...|        scluse|
| #lirik|2022-05-20T22:32:...|LPPM xar2EDM LPPM...|       sikefox|
|#quin69|2022-05-20T22:32:...|   goosePls EDM HOLY| vanitylicious|
|#quin69|2022-05-20T22:32:...|                !rip|      veyristv|
|#quin69|2022-05-20T22:32:...|https://youtu.be/...|streamelements|
| #lirik|2022-05-20T22:32:...|              ratJAM|        inkode|
|#quin69|2022-05-20T22:32:...|you really overes...|       zofurie|
+-------+--------------------+--------------------+--------------+



22/05/21 00:32:30 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
22/05/21 00:32:30 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


+-------+--------------------+--------------------+--------------+-----+----------+
|channel|            datetime|             message|      username|label|prediction|
+-------+--------------------+--------------------+--------------+-----+----------+
| #lirik|2022-05-20T22:32:...|             xar2EDM|   dinosaure56|  0.0|       0.0|
| #lirik|2022-05-20T22:32:...|xar2EDM docPls AD...|        scluse|  0.0|       0.0|
| #lirik|2022-05-20T22:32:...|LPPM xar2EDM LPPM...|       sikefox|  0.0|       0.0|
|#quin69|2022-05-20T22:32:...|   goosePls EDM HOLY| vanitylicious|  1.0|       1.0|
|#quin69|2022-05-20T22:32:...|                !rip|      veyristv|  1.0|       1.0|
|#quin69|2022-05-20T22:32:...|https://youtu.be/...|streamelements|  1.0|       1.0|
| #lirik|2022-05-20T22:32:...|              ratJAM|        inkode|  0.0|       0.0|
|#quin69|2022-05-20T22:32:...|you really overes...|       zofurie|  1.0|       0.0|
+-------+--------------------+--------------------+--------------+-----+----

## Linear Support Vector Machines

In [78]:
globals()['models_loaded'] = False
globals()['my_model'] = None

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load(cvlsvcPath)
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    df_result = globals()['my_model'].transform(df)
    df_result.select("channel","datetime","message","username","label","prediction").show()

In [79]:
ssc = StreamingContext(sc, 10)

In [80]:
lines = ssc.socketTextStream("localhost", 8080)
lines.foreachRDD(process)

In [81]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

22/05/21 00:33:40 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:40 WARN BlockManager: Block input-0-1653086020000 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:33:40 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:40 WARN BlockManager: Block input-0-1653086020200 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:33:41 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:41 WARN BlockManager: Block input-0-1653086021200 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:33:42 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:42 WARN BlockManager: Block input-0-1653086022000 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:33:43 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:43 WARN BlockManager: Block input-0-1653086023000 replicated to

+-------+--------------------+--------------------+--------------------+
|channel|            datetime|             message|            username|
+-------+--------------------+--------------------+--------------------+
|#quin69|2022-05-20T22:33:...|                  70|               raduk|
|#quin69|2022-05-20T22:33:...|       infernal cry?|          airfusionz|
| #lirik|2022-05-20T22:33:...|               24hr?|       lootumshootum|
|#quin69|2022-05-20T22:33:...|    level ur EA LOLW|             catatrr|
|#quin69|2022-05-20T22:33:...|                  72|          deathnell3|
| #lirik|2022-05-20T22:33:...|          PauseChamp|            mrbeannm|
|#quin69|2022-05-20T22:33:...|HIDEOUT SIMULATOR...|       culliganman76|
|#quin69|2022-05-20T22:33:...|             Weirdga|          adamoftime|
|#quin69|2022-05-20T22:33:...|                THIS|      chrono_crosser|
| #lirik|2022-05-20T22:33:...|          PauseChamp|           alejan2ro|
|#quin69|2022-05-20T22:33:...|                  69|

22/05/21 00:33:50 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:50 WARN BlockManager: Block input-0-1653086030200 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:33:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:51 WARN BlockManager: Block input-0-1653086031000 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:33:51 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
22/05/21 00:33:51 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
22/05/21 00:33:51 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB


+-------+--------------------+--------------------+--------------------+-----+----------+
|channel|            datetime|             message|            username|label|prediction|
+-------+--------------------+--------------------+--------------------+-----+----------+
|#quin69|2022-05-20T22:33:...|                  70|               raduk|  1.0|       0.0|
|#quin69|2022-05-20T22:33:...|       infernal cry?|          airfusionz|  1.0|       0.0|
| #lirik|2022-05-20T22:33:...|               24hr?|       lootumshootum|  0.0|       0.0|
|#quin69|2022-05-20T22:33:...|    level ur EA LOLW|             catatrr|  1.0|       0.0|
|#quin69|2022-05-20T22:33:...|                  72|          deathnell3|  1.0|       0.0|
| #lirik|2022-05-20T22:33:...|          PauseChamp|            mrbeannm|  0.0|       0.0|
|#quin69|2022-05-20T22:33:...|HIDEOUT SIMULATOR...|       culliganman76|  1.0|       1.0|
|#quin69|2022-05-20T22:33:...|             Weirdga|          adamoftime|  1.0|       0.0|
|#quin69|2

22/05/21 00:33:53 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:53 WARN BlockManager: Block input-0-1653086033000 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:33:54 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:54 WARN BlockManager: Block input-0-1653086034200 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:33:55 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:55 WARN BlockManager: Block input-0-1653086035000 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:33:56 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:33:56 WARN BlockManager: Block input-0-1653086036000 replicated to only 0 peer(s) instead of 1 peers


In [82]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----


22/05/21 00:33:56 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
	at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:121)
	at org.a

+-------+--------------------+--------------------+----------------+
|channel|            datetime|             message|        username|
+-------+--------------------+--------------------+----------------+
|#quin69|2022-05-20T22:33:...|                LOLW|  the_white_pepe|
|#quin69|2022-05-20T22:33:...|                  11|     crypthicccc|
| #lirik|2022-05-20T22:33:...|@NuIlzero Nice mo...|        xsalim69|
|#quin69|2022-05-20T22:33:...|spec out of eleme...|          alex6x|
| #lirik|2022-05-20T22:33:...| 3x PauseChamp combo|        fossabot|
|#quin69|2022-05-20T22:33:...|monkaHeliWatchesW...|       sitkid721|
|#quin69|2022-05-20T22:33:...|you just need to ...|verygoodplayerow|
|#quin69|2022-05-20T22:33:...|Chat, does he has...|       metrandir|
|#quin69|2022-05-20T22:33:...|duno but what abo...|       dkarkill1|
|#quin69|2022-05-20T22:33:...|get veiled helmet...|        gynariel|
| #lirik|2022-05-20T22:33:...|pepeFASTJAM pepeF...|  kirby_the_pink|
| #lirik|2022-05-20T22:33:...|catJ

22/05/21 00:34:00 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
22/05/21 00:34:00 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
22/05/21 00:34:00 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB


+-------+--------------------+--------------------+----------------+-----+----------+
|channel|            datetime|             message|        username|label|prediction|
+-------+--------------------+--------------------+----------------+-----+----------+
|#quin69|2022-05-20T22:33:...|                LOLW|  the_white_pepe|  1.0|       1.0|
|#quin69|2022-05-20T22:33:...|                  11|     crypthicccc|  1.0|       1.0|
| #lirik|2022-05-20T22:33:...|@NuIlzero Nice mo...|        xsalim69|  0.0|       0.0|
|#quin69|2022-05-20T22:33:...|spec out of eleme...|          alex6x|  1.0|       0.0|
| #lirik|2022-05-20T22:33:...| 3x PauseChamp combo|        fossabot|  0.0|       0.0|
|#quin69|2022-05-20T22:33:...|monkaHeliWatchesW...|       sitkid721|  1.0|       1.0|
|#quin69|2022-05-20T22:33:...|you just need to ...|verygoodplayerow|  1.0|       1.0|
|#quin69|2022-05-20T22:33:...|Chat, does he has...|       metrandir|  1.0|       0.0|
|#quin69|2022-05-20T22:33:...|duno but what abo...|   

## Random Forest

In [83]:
globals()['models_loaded'] = False
globals()['my_model'] = None

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load(rfPath)
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    df_result = globals()['my_model'].transform(df)
    df_result.select("channel","datetime","message","username","label","prediction").show()

In [84]:
ssc = StreamingContext(sc, 10)

In [85]:
lines = ssc.socketTextStream("localhost", 8080)
lines.foreachRDD(process)

In [86]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

22/05/21 00:34:58 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:34:58 WARN BlockManager: Block input-0-1653086098400 replicated to only 0 peer(s) instead of 1 peers
                                                                                

+-------+--------------------+--------------------+-----------+
|channel|            datetime|             message|   username|
+-------+--------------------+--------------------+-----------+
| #lirik|2022-05-20T22:34:...|xar2EDM docPls xa...|     scluse|
|#quin69|2022-05-20T22:34:...|any sniffers? SNIFFA|    bragok2|
|#quin69|2022-05-20T22:34:...|    champion or ele?|atheongames|
+-------+--------------------+--------------------+-----------+



22/05/21 00:35:00 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:35:00 WARN BlockManager: Block input-0-1653086100400 replicated to only 0 peer(s) instead of 1 peers


+-------+--------------------+--------------------+-----------+-----+----------+
|channel|            datetime|             message|   username|label|prediction|
+-------+--------------------+--------------------+-----------+-----+----------+
| #lirik|2022-05-20T22:34:...|xar2EDM docPls xa...|     scluse|  0.0|       0.0|
|#quin69|2022-05-20T22:34:...|any sniffers? SNIFFA|    bragok2|  1.0|       0.0|
|#quin69|2022-05-20T22:34:...|    champion or ele?|atheongames|  1.0|       0.0|
+-------+--------------------+--------------------+-----------+-----+----------+



22/05/21 00:35:01 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:35:01 WARN BlockManager: Block input-0-1653086101400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:35:02 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:35:02 WARN BlockManager: Block input-0-1653086102400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:35:03 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:35:03 WARN BlockManager: Block input-0-1653086103400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:35:04 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:35:04 WARN BlockManager: Block input-0-1653086104400 replicated to only 0 peer(s) instead of 1 peers


In [87]:
ssc_t.stop()

22/05/21 00:35:05 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/21 00:35:05 WARN BlockManager: Block input-0-1653086105400 replicated to only 0 peer(s) instead of 1 peers
22/05/21 00:35:05 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
22/05/21 00:35:05 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
	at java.base/ja

----- Stopping... this may take a few seconds -----


Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
	at java.base/java.lang.Thread.sleep(Native Method)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.$anonfun$restartReceiver$1(ReceiverSupervisor.scala:196)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.ja

+-------+--------------------+--------------------+--------------------+
|channel|            datetime|             message|            username|
+-------+--------------------+--------------------+--------------------+
|#quin69|2022-05-20T22:34:...|               SNFFA|            polantis|
|#quin69|2022-05-20T22:35:...|it's SNIFFA not S...|         hugejackman|
| #lirik|2022-05-20T22:35:...|         MILKMANRAVE|              crioos|
|#quin69|2022-05-20T22:35:...|              SNIFFA|flamethrowerfirin...|
| #lirik|2022-05-20T22:35:...|@Crioos peepoHuggers|      kirby_the_pink|
|#quin69|2022-05-20T22:35:...|             2Header|               prour|
| #lirik|2022-05-20T22:35:...|         MILKMANRAVE|            nuilzero|
|#quin69|2022-05-20T22:35:...|                  11|         crypthicccc|
|#quin69|2022-05-20T22:35:...|             2Header|            manoekin|
| #lirik|2022-05-20T22:35:...|lirikHMM I keep r...|           inate2052|
|#quin69|2022-05-20T22:35:...|                WTFF|