In [1]:
from pyspark import SparkContext, SparkConf, storagelevel
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
from pyspark.sql.types import StructType, FloatType, IntegerType

sc

# 연결 확인
- https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
- 시작 : pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2

In [2]:
spark = SparkSession.builder \
                    .appName("wordcount") \
                    .master("local[*]") \
                    .getOrCreate()

In [3]:
lines = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "stream1") \
  .load()

In [4]:
words = lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
result = words

In [5]:
# 모든 것을 다 보려면complete
query = result \
        .writeStream \
        .outputMode("update") \
        .format("console") \
        .start()

In [6]:
try :
    query.awaitTermination()
except :
    query.stop()

# stream 처리 skelleton
- https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
- https://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html
- https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns
- https://spark.apache.org/docs/2.3.0/sql-programming-guide.html
- https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
- https://changhsinlee.com/pyspark-udf/

In [None]:
spark = SparkSession.builder \
                    .appName("skelleton") \
                    .master("local[*]") \
                    .getOrCreate()
            
source = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "stream1") \
        .load()
        
######################################################################################

line = source.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
             .select(psf.col("key"), psf.explode(psf.split(psf.col("value"), "\n")).alias("value")) \
             .select(psf.col("key"), psf.split(psf.col("value"), ", ").alias("value"))
        
######################################################################################

df = line.select(psf.col("key").alias("token"),
                 psf.col("value").getItem(8).alias("time"),
                 psf.col("value").getItem(0).alias("img_cnt"),
                 psf.col("value").getItem(1).alias("face_cnt"),
                 psf.col("value").getItem(2).alias("eye_cnt"),
                 psf.col("value").getItem(3).alias("confidence"),
                 psf.col("value").getItem(4).alias("x"),
                 psf.col("value").getItem(5).alias("y"),
                 psf.col("value").getItem(6).alias("w"),
                 psf.col("value").getItem(7).alias("h"))\
         .selectExpr("token", 
                     "time",
                     "CAST(img_cnt AS Double)", 
                     "CAST(face_cnt AS Double)",
                     "CAST(eye_cnt AS Double)", 
                     "CAST(confidence AS Double)",
                     "CAST(x AS Double)", 
                     "CAST(y AS Double)",
                     "CAST(w AS Double)", 
                     "CAST(h AS Double)")
    
######################################################################################

confidence = df.select(psf.col("token"), psf.col("time"), psf.col("confidence"))
confidence_agg = df.select(psf.col("token"), psf.col("time"), psf.col("confidence")) \
                   .agg(psf.min("confidence").alias("confidence_min"), 
                        psf.mean("confidence").alias("confidence_mean"), 
                        psf.count("token").alias("count"))

######################################################################################

def dist(x, y, w, h) :
    center_x = x + w/2
    center_y = y + h/2
    
    return ((center_x-240)**2 + (center_y-180)**2)**0.5

def area(w, h) :
    return w*h

dist_udf = psf.udf(lambda x,y,w,h: dist(x,y,w,h), FloatType())
area_udf = psf.udf(lambda w,h : area(w,h), FloatType())

distance = df.select(psf.col("token"), psf.col("time"), psf.col("x"), psf.col("y"), psf.col("w"), psf.col("h")) \
             .select(psf.col("token"), 
                     psf.col("time"),
                     psf.col("x"),
                     psf.col("y"),
                     psf.col("w").alias("wh"),
                     dist_udf("x", "y", "w", "h").alias("dist"), 
                     area_udf("w", "h").alias("area"))
distance_agg = df.select(psf.col("token"), psf.col("time"), psf.col("x"), psf.col("y"), psf.col("w"), psf.col("h")) \
                 .agg(psf.mean("x").alias("x_mean"), 
                      psf.mean("y").alias("y_mean"), 
                      psf.mean("w").alias("w_mean"), 
                      psf.mean("h").alias("h_mean"))
    
######################################################################################

def div(a, b) :
    if a == 0 :
        return 0
    else :
        return b/a
    
div_udf = psf.udf(lambda a,b : div(a,b), FloatType())

cnt = df.select(psf.col("token"), psf.col("time"), psf.col("img_cnt"), psf.col("face_cnt"), psf.col("eye_cnt")) \
        .select(psf.col("token"), 
                psf.col("time"),
                psf.col("img_cnt"), 
                div_udf("img_cnt", "face_cnt").alias("face_ratio"), 
                div_udf("img_cnt", "eye_cnt").alias("eye_ratio"))
cnt_agg = df.select(psf.col("token"), psf.col("img_cnt"), psf.col("face_cnt"), psf.col("eye_cnt")) \
            .agg(psf.mean("img_cnt").alias("img_cnt_mean"), 
                 psf.mean("face_cnt").alias("face_cnt_mean"), 
                 psf.mean("eye_cnt").alias("eye_cnt_mean"))

######################################################################################

joined_df = confidence.join(distance, ["token","time"])
joined_df = joined_df.join(cnt, ["token","time"])

######################################################################################
# join은 append 모드에서만 가능
# join이 없을 경우에는 update를 사용하면 새로 들어오는 값의 straming 처리를 확인할 수 있음

result = joined_df
query = result \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .start()
        
######################################################################################  

try :
    query.awaitTermination()
except :
    query.stop()

# stream 처리1 
- window를 사용하지 않고 각 수집시기마다 처리
- 각 사용자의 데이터마다 처리하기 때문에 groupBy 사용하지 않음
- stream-static join 사용


In [2]:
spark = SparkSession.builder \
                    .appName("stream_without_window") \
                    .master("local[*]") \
                    .getOrCreate()

In [3]:
static_df = spark.read.option("header", "true") \
                      .csv("/hyunwoo/train_result/result.csv") \
                      .selectExpr("token",
                                  "CAST(face_ratio_mean AS Double)",
                                  "CAST(face_ratio_std AS Double)",
                                  "CAST(eye_ratio_mean AS Double)",
                                  "CAST(eye_ratio_std AS Double)",
                                  "CAST(confidence_mean AS Double)",
                                  "CAST(confidence_std AS Double)",
                                  "CAST(area_mean AS Double)",
                                  "CAST(area_std AS Double)",
                                  "CAST(dist_mean AS Double)",
                                  "CAST(dist_std AS Double)",
                                  "CAST(x_mean AS Double)",
                                  "CAST(x_std AS Double)",
                                  "CAST(y_mean AS Double)",
                                  "CAST(y_std AS Double)",
                                  "CAST(wh_mean AS Double)",
                                  "CAST(wh_std AS Double)")
static_df.show()

+----------+---------------+-------------------+--------------+-------------------+-----------------+-----------------+---------+------------------+------------------+-----------------+------+------------------+------+------------------+-------+------------------+
|     token|face_ratio_mean|     face_ratio_std|eye_ratio_mean|      eye_ratio_std|  confidence_mean|   confidence_std|area_mean|          area_std|         dist_mean|         dist_std|x_mean|             x_std|y_mean|             y_std|wh_mean|            wh_std|
+----------+---------------+-------------------+--------------+-------------------+-----------------+-----------------+---------+------------------+------------------+-----------------+------+------------------+------+------------------+-------+------------------+
|0123456789|           0.76|0.07516648189186452|       0.35875|0.04250000000000002|38.34141864776611|4.604178031144208| 5216.835|196.50912691221015|20.713182306289674|0.409143507442128|196.86|1.02255960251

In [8]:
source = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "stream1") \
        .load()
        
######################################################################################

line = source.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
             .select(psf.col("key"), psf.explode(psf.split(psf.col("value"), "\n")).alias("value")) \
             .select(psf.col("key"), psf.split(psf.col("value"), ", ").alias("value"))
        
######################################################################################

df = line.select(psf.col("key").alias("token"),
                 psf.col("value").getItem(8).alias("time"),
                 psf.col("value").getItem(0).alias("img_cnt"),
                 psf.col("value").getItem(1).alias("face_cnt"),
                 psf.col("value").getItem(2).alias("eye_cnt"),
                 psf.col("value").getItem(3).alias("confidence"),
                 psf.col("value").getItem(4).alias("x"),
                 psf.col("value").getItem(5).alias("y"),
                 psf.col("value").getItem(6).alias("w"),
                 psf.col("value").getItem(7).alias("h"))\
         .selectExpr("token", 
                     "time",
                     "CAST(img_cnt AS Double)", 
                     "CAST(face_cnt AS Double)",
                     "CAST(eye_cnt AS Double)", 
                     "CAST(confidence AS Double)",
                     "CAST(x AS Double)", 
                     "CAST(y AS Double)",
                     "CAST(w AS Double)", 
                     "CAST(h AS Double)")
    
######################################################################################

def dist(x, y, w, h) :
    center_x = x + w/2
    center_y = y + h/2
    
    return ((center_x-240)**2 + (center_y-180)**2)**0.5

def area(w, h) :
    return w*h

def div(a, b) :
    if a == 0 :
        return 0
    else :
        return b/a

def normalize_and_map(value, mean, std) :
    result = abs((value-mean)/std)-1.3
    
    if result >= 0 :
        return 1
    else :
        return 0

def overlap(x,xm,y,ym,wh,whm) :
    if x == 0 :
        return 0
    else : 
        r1 = (x, y-wh, x+wh, y)
        r2 = (xm , ym-whm, xm+whm, ym)
        
        dx = min(r1[2], r2[2]) - max(r1[0], r2[0])
        dy = min(r1[3], r2[3]) - max(r1[1], r2[1])
        area = dx*dy
        
        if (dx>=0) and (dy>=0) and (area/(whm*whm) >= 0.6) :
            return 0
        else :
            return 1
        
def get_result(a,b,c,d,e,f) :
    result = a+b+c+d+e+f
    if result >=4 :
        return 0
    else :
        return 1
    
dist_udf = psf.udf(lambda x,y,w,h: dist(x,y,w,h), FloatType())
area_udf = psf.udf(lambda w,h : area(w,h), FloatType())
div_udf = psf.udf(lambda a,b : div(a,b), FloatType())
normal_map_udf = psf.udf(lambda a,b,c : normalize_and_map(a,b,c), IntegerType())
overlap_udf = psf.udf(lambda x,xm,y,ym,wh,whm: overlap(x,xm,y,ym,wh,whm), IntegerType())
result_udf = psf.udf(lambda a,b,c,d,e,f, : get_result(a,b,c,d,e,f), IntegerType())

######################################################################################

stream_df = df.select(psf.col("token"), 
                      psf.col("time"), 
                      psf.col("confidence"),
                      psf.col("x"),
                      psf.col("y"),
                      psf.col("w").alias("wh"),
                      dist_udf("x", "y", "w", "h").alias("dist"), 
                      area_udf("w", "h").alias("area"),
                      div_udf("img_cnt", "face_cnt").alias("face_ratio"), 
                      div_udf("img_cnt", "eye_cnt").alias("eye_ratio"))
        
joined_df = stream_df.join(static_df, ["token"])

normal_df = joined_df.select(psf.col("token"), 
                             psf.col("time"), 
                             normal_map_udf("confidence", "confidence_mean", "confidence_std").alias("confidence"),
                             overlap_udf("x", "x_mean", "y", "y_mean", "wh", "wh_mean").alias("overlap"),
                             normal_map_udf("dist", "dist_mean", "dist_std").alias("dist"),
                             normal_map_udf("area", "area_mean", "area_std").alias("area"),
                             normal_map_udf("face_ratio", "face_ratio_mean", "face_ratio_std").alias("face_ratio"),
                             normal_map_udf("eye_ratio", "eye_ratio_mean", "eye_ratio_std").alias("eye_ratio"))

result_df = normal_df.select(psf.col("token"), 
                             psf.col("time"),
                             result_udf("confidence", "overlap", "dist", "area", "face_ratio", "eye_ratio").alias("result"))

######################################################################################
# join은 append 모드에서만 가능
# join이 없을 경우에는 update를 사용하면 새로 들어오는 값의 straming 처리를 확인할 수 있음

result = result_df.selectExpr("CAST(token AS STRING) AS key", "to_json(struct(*)) AS value")
query = result \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("checkpointLocation", "/hyunwoo/stream1_checkpoint") \
        .option("topic", "stream2") \
        .start()
        
######################################################################################  

try :
    query.awaitTermination()
except :
    query.stop()