# pyspark stream with socket

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Schema 생성

입력 string을 dataframe 형식으로 받기 위해 struct schema를 정의.

In [2]:
userSchema = StructType([
    StructField('max_rpm', IntegerType(), True),
    StructField('s20', IntegerType(), True),
    StructField('s30', IntegerType(), True),
    StructField('s40', IntegerType(), True),
    StructField('s50', IntegerType(), True),
    StructField('s60', IntegerType(), True),
    StructField('s70', IntegerType(), True),
    StructField('s80', IntegerType(), True),
    StructField('s90', IntegerType(), True),
    StructField('s100', IntegerType(), True),
    StructField('s110', IntegerType(), True),
    StructField('s120', IntegerType(), True),
    StructField('s130', IntegerType(), True),
    StructField('s140', IntegerType(), True),
    StructField('s150', IntegerType(), True),
])

## get data by socket

port 9999의 localhost socket과 연결(terminal에서 Netcat 이용)

In [3]:
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

23/12/12 13:16:42 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


## Make DataFrame

공백 단위로 구분된 input string을 schema에 따라 dataframe 형태로 정의

In [4]:
words = lines.selectExpr("split(value, ' ') as data") \
        .selectExpr(
    "data[0] as max_rpm",
    "data[1] as s20",
    "data[2] as s30",
    "data[3] as s40",
    "data[4] as s50",
    "data[5] as s60",
    "data[6] as s70",
    "data[7] as s80",
    "data[8] as s90",
    "data[9] as s100",
    "data[9] as s110",
    "data[9] as s120",
    "data[9] as s130",
    "data[9] as s140",
    "data[9] as s150"
)

structuredDF = words.selectExpr(
    "cast(max_rpm as float)",
    "cast(s20 as integer)",
    "cast(s30 as integer)",
    "cast(s40 as integer)",
    "cast(s50 as integer)",
    "cast(s60 as integer)",
    "cast(s70 as integer)",
    "cast(s80 as integer)",
    "cast(s90 as integer)",
    "cast(s100 as integer)",
    "cast(s110 as integer)",
    "cast(s120 as integer)",
    "cast(s130 as integer)",
    "cast(s140 as integer)",
    "cast(s150 as integer)",
)


In [5]:
structuredDF.isStreaming

True

In [6]:
structuredDF.printSchema()

root
 |-- max_rpm: float (nullable = true)
 |-- s20: integer (nullable = true)
 |-- s30: integer (nullable = true)
 |-- s40: integer (nullable = true)
 |-- s50: integer (nullable = true)
 |-- s60: integer (nullable = true)
 |-- s70: integer (nullable = true)
 |-- s80: integer (nullable = true)
 |-- s90: integer (nullable = true)
 |-- s100: integer (nullable = true)
 |-- s110: integer (nullable = true)
 |-- s120: integer (nullable = true)
 |-- s130: integer (nullable = true)
 |-- s140: integer (nullable = true)
 |-- s150: integer (nullable = true)



In [7]:
type(structuredDF)

pyspark.sql.dataframe.DataFrame

## ML Model

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidatorModel

import numpy as np
import pandas as pd
import glob

저장된 모델(random forest, decision tree)사용

In [9]:
rfc_loaded_model = CrossValidatorModel.load('random_forest')
dtc_loaded_model = CrossValidatorModel.load('decision_tree')

                                                                                

In [10]:
columns = ['max_rpm','s20', 's30', 's40', 's50', 's60', 's70', 's80','s90','s100','s110', 's120','s130','s140','s150']

vec_assembler = VectorAssembler(inputCols=columns, outputCol='features')
test_ftr_vec = vec_assembler.transform(structuredDF)
prediction = dtc_loaded_model.transform(test_ftr_vec)

## Output on Console

결과값 console로 출력, probability와 prediction column만 선택

In [13]:
output_df = prediction.select("probability", "prediction")

In [12]:
# 0: 안전운행, 1: 저위험가속, 2: 중위험가속, 3: 고위험가속
query = output_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

23/12/12 13:16:52 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/t6/0yqx06jd2b19jm16pgd8h6p80000gn/T/temporary-0b260d0e-e16b-4853-8a8a-8104ae839309. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/12 13:16:52 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/12 13:16:52 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+----------+
|probability|prediction|
+-----------+----------+
+-----------+----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------+----------+
|      probability|prediction|
+-----------------+----------+
|[0.0,0.0,1.0,0.0]|       2.0|
+-----------------+----------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------+----------+
|      probability|prediction|
+-----------------+----------+
|[1.0,0.0,0.0,0.0]|       0.0|
+-----------------+----------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----------------+----------+
|      probability|prediction|
+-----------------+----------+
|[0.0,1.0,0.0,0.0]|       1.0|
+-----------------+----------+

-------------------------------------------

23/12/12 13:17:37 WARN TextSocketMicroBatchStream: Stream closed by localhost:9999
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/apache-spark/3.5.0/libexec/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/homebrew/Cellar/apache-spark/3.5.0/libexec/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/gyeongbin/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 