# Chapter 21 구조적 스트리밍의 기초

## 21.2 핵심 개념

### 21.2.1 트랜스포메이션과 액션
+ 트렌스포메이션과 유사하나 증분 처리를 할 수 없는 일부 쿼리 유형은 사용 제약이 있을 수 있음
+ 구조적 스트리밍에는 스트림 처리를 시작한 뒤 연속적으로 처리해 결과를 출력하는 한 가지 액션만 있음

### 21.2.2 입력 소스
+ 스트리밍 방식으로 데이터를 읽을 수 있는 소스
    + 아파치 카프카 0.10 버전
    + HDFS나 S3 등 분산 파일시스템의 파일(스파크는 디렉터리의 신규 파일을 계속해서 읽음)
    + 테스트용 소켓 소스

### 21.2.3 싱크
+ 스트림의 결과를 저장할 목적지
    + 아파치 카프카 0.10
    + 거의 모든 파일 포맷
    + 출력 레코드에 임의 연산을 실행하는 foreach 싱크
    + 테스트용 콘솔 싱크
    + 디버깅용 콘솔 싱크

### 21.2.4 출력 모드
+ 데이터를 출력하는 방법의 정의
    + append: 싱크에 신규 레코드만 추가
    + update: 변경 대상 레코드 자체를 갱신
    + complete: 전체 출력 내용 재작성 하기

### 21.2.5 트리거
+ 데이터 출력 시점을 정의
+ 기본적으로 마지막 입력 데이터를 처리한 직후에 신규 입력 데이터를 조회해 최단 시간 내에 새로운 처리 결과를 만들어 냄
+ 작은 크기의 파일이 여러 개 생실 수 있기 때문에 처리 시간 기반의 트리거도 지원함


### 21.2.6 이벤트 시간 처리
+ 데이터에 기록되 시간 필드 기준으로 데이터를 처리함을 의미
+ 워터마크: 시간 제한을 설정할 수 있는 스트리밍 시스템의 기능으로 늦게 들어온 이벤트를 어디까지 처리할지 시간을 제한할 수 있음

## 21.3 구조적 스트리밍 활용

### 정적인 방식의 데이터셋 읽기

In [1]:
# 세션 생성
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark structured streaming example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark.conf.set('spark.sql.shuffle.partitions', 5)

In [2]:
static = spark.read.json("../data/activity-data/")
dataSchema = static.schema

In [3]:
print(dataSchema)

StructType(List(StructField(Arrival_Time,LongType,true),StructField(Creation_Time,LongType,true),StructField(Device,StringType,true),StructField(Index,LongType,true),StructField(Model,StringType,true),StructField(User,StringType,true),StructField(gt,StringType,true),StructField(x,DoubleType,true),StructField(y,DoubleType,true),StructField(z,DoubleType,true)))


In [4]:
static.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



### 동적인 방식의 데이터셋 읽기

In [5]:
# 파일을 하나씩 읽는 것을 가정(운영 환경에서는 이렇게 사용하지 말 것)
# 데이터 형을 자동으로 추론하는 것은 비추(예제에서는 위에서 읽은 데이터형을 재활용)
streaming = spark\
    .readStream\
    .schema(dataSchema)\
    .option('maxFilesPerTrigger', 2)\
    .json("../data/activity-data/")

# 트렌스포메이션
activityCounts = streaming.groupBy('gt').count()

# 결과를 메모리에 저장하도록 메모리 싱크 설정
activityQuery = activityCounts\
    .writeStream\
    .queryName('activity_counts')\
    .format('memory')\
    .outputMode('complete')\
    .start()

# 실행
# activityQuery.awaitTermination() # 쿼리 종료 시 까지 대기함, background에서 실행됨
                                   # 운영 애플리케이션에서는 반드시 필요함

In [6]:
# 스트림 목록을 확인할 수 있음
# UUID가 부여되어 다시 선택할 수 있지만 여기서는 변수에 할당했으므로 변수 사용이 유리함
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7f0472cbe210>]

In [7]:
from time import sleep

for x in range(3):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)

+----------+-----+
|        gt|count|
+----------+-----+
|       sit|73850|
|     stand|68309|
|stairsdown|56186|
|      walk|79536|
|  stairsup|62715|
|      null|62690|
|      bike|64785|
+----------+-----+

+----------+------+
|        gt| count|
+----------+------+
|       sit|172317|
|     stand|159385|
|stairsdown|131088|
|      walk|185585|
|  stairsup|146368|
|      null|146263|
|      bike|151163|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|270779|
|     stand|250466|
|stairsdown|205978|
|      walk|291633|
|  stairsup|230032|
|      null|229834|
|      bike|237541|
+----------+------+



In [8]:
# activityQuery.stop()

## 21.4 스트림 트랜스포메이션

### 21.4.1 선택과 필터링
+ 키를 변경하지 않으므로 append 출력 모드를 사용함

In [8]:
from pyspark.sql.functions import expr

simpleTransfrom = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
    .where("stairs")\
    .where("gt is not null")\
    .select("gt", "model", "arrival_time", "creation_time")\
    .writeStream\
    .queryName("simple_transform")\
    .format("memory")\
    .outputMode("append")\
    .start()

### 21.4.2 집계
+ 7장 스트림에 적용 가능한 함수 참조
+ 원시 컬럼에 대한 집계 외 이벤트 시간 컬럼 지정, 워터마크, 윈도우 처리를 지원함 (22장 참조)

In [9]:
deviceModelStats = streaming.cube("gt", "model").avg()\
    .drop("avg(Arrival_time)")\
    .drop("avg(Creation_time)")\
    .drop("avg(Index)")\
    .writeStream.queryName("device_counts").format("memory")\
    .outputMode("complete")\
    .start()

In [10]:
for x in range(3):
    spark.sql("SELECT * FROM device_counts").show()
    sleep(1)

+----------+------+--------------------+--------------------+--------------------+
|        gt| model|              avg(x)|              avg(y)|              avg(z)|
+----------+------+--------------------+--------------------+--------------------+
|       sit|  null|-5.15907918915406...|2.734365086787861...|-1.41574357215394...|
|     stand|  null|-3.75961836153627...|4.111774313006433...|3.049861196337197...|
|       sit|nexus4|-5.15907918915406...|2.734365086787861...|-1.41574357215394...|
|     stand|nexus4|-3.75961836153627...|4.111774313006433...|3.049861196337197...|
|      null|  null|-0.00699334693654679|-0.00117077767059...|0.005646005653992315|
|      null|  null|0.001510247341501...|-0.00696338577209...|-0.00910733461973...|
|      walk|  null|-0.00304135060055069|0.004227462440234608|-6.40891870085255...|
|      null|nexus4|-0.00699334693654679|-0.00117077767059...|0.005646005653992315|
|      null|nexus4|0.001510247341501...|-0.00696338577209...|-0.00910733461973...|
|   

In [12]:
deviceModelStats.stop()

## 21.5 입력과 출력

### 파일 소스와 싱크
+ 스트리밍에서 파일 소스/싱크와 정적 파일 소스를 사용할 때 유일한 차이점은 트리거 시 읽을 파일 수를 결정할 수 있다는 점임(maxFilesPerTrigger 옵션)
+ 입력 디텍더리에 원자적으로 추가되어하며, 그렇지 않으면 파일의 일부분만 처리됨
+ 외부 디렉터리에 파일을 완전히 기록한 후 입력 디텍터리로 옮겨야 함(아마존 S3에서는 완전히 기록된 객체만 보임)

### 카프카 소스와 싱크
+ 카프카는 분산형 버퍼로 생각할 수 있음
+ 순서를 바꿀 수 없는 레코드로 구성되며 레코드의 위치를 오프셋이라고 부름

### 21.5.2 카프카 소스에서 메시지 읽기
+ 아래 옵션 중 하나를 선택
    + assign : 토픽뿐만 아니라 읽으려는 파티션까지 세밀하게 지정하는 옵션, ex) {"topicA":[0, 1], "topicB":[2, 4]}
    + subscribe : 토픽 목록턴을 지정해 여러 토픽을 구독
    + subscribePattern : 토픽 패턴을 지정해 여러 토픽을 구독
+ 카프카 서비스에 접속할 수 있도록 kafka.bootstrap.servers 값을 지정
+ 기타
    + startingOffsets / endingOffsets
        + 쿼리를 시작할 때 읽을 지점 설정
        + -2는 earliset, -1는 latest{{"topicA" : {"1"}:-1}, "topicB" : {"0"}:-2}}
        + 새로운 스트리밍 쿼리가 시작될 때만 적용
    + failOnDataLoss
        + 데이터 유실(예 : 토픽이 삭제되거나 오프셋이 범위를 벗어났을 때) 쿼리를 중단할 것인지 지정
        + 기본값은 true
    + maxOffsetsPerTrigger
        + 특정 트리거 시점에 읽을 오프셋의 전체 개수

In [None]:
""" Read messages from kafka """

# one topic
df1 = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.server", "host1:port1", "host2:port2")\
    .option("subscribe", "topic1")\
    .load()

# multi topics
df2 = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.server", "host1:port1", "host2:port2")\
    .option("subscribe", "topic1,topic2")\
    .load()

# topics which has patterns we want
df3 = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.server", "host1:port1", "host2:port2")\
    .option("subscribePattern", "topic1.*")\
    .load()

### 21.5.3 카프카 싱크에 메시지 쓰기
+ 읽은 쿼리와 매우 비슷

In [None]:
""" 동일한 역할을 함 """

df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .start()

df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .option("topic", "topic1")\
  .start()

### foreach 싱크
+ foreachPartitions과 유사하게 각 파티션에서 임의의 연산을 병렬로 수행
+ ForeachWriter 인터페이스를 구현해야 함
+ 스칼라와 자바만 지원하며, open, process, close 세가지 메서드를 지님
    + open 
    + process : 데이터를 처리하거나 저장하는 용도
    + close : 스트림 처리 도중 오류가 발생하면 close 메서도도 호출됨

```
import org.apache.spark.sql.ForeachWriter

datasetOfString.write.foreach(new ForeachWriter[String] {
  def open(partitionId: Long, version: Long): Boolean = {
    // open a database connection
  }
  def process(record: String) = {
    // write string to connection
  }
  def close(errorOrNull: Throwable): Unit = {
    // close the connection
  }
})
```

### 테스트용 소스와 싱크
+ 운영 환경에서는 절대 사용하지 말 것

In [11]:
""" 소켓 소스 """
socketDF = spark.readStream.format("socket")\
    .option("host", "localhost").option("port", 9999).load()

In [17]:
!nc -lk 9999

^C



In [12]:
""" 콘솔 싱크 """
activityCounts.writeStream.format("console").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f04908bdc10>

In [15]:
""" 메모리 싱크 """
activityCounts\
    .writeStream.queryName("exsample").format("memory")\
    .outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f04908a28d0>

### 21.5.4 데이터 출력 방법(출력 모드)


#### append
+ 새로운 로우가 결과 테이블에 추가되면 사용자가 명시한 트리거에 맞춰 싱크로 출력됨
+ 이벤트 시간과 워터마크를 사용하면 최종 결과만 싱크로 출력됨(22장)

#### complete
+ 결과 테이블의 전체 상태를 싱크로 출력
+ 데이터가 계속해서 변경될 수 있는 일부 상태 기반 데이터를 다룰 때 유용함

#### update
+ 변경된 로우만 싱크로 출력하는 점을 제외하면 complete 모두와 유사
+ 쿼리에서 집계 연산을 하지 않는다면 append와 동일

### 21.5.5 데이터 출력 시점(트리거)
+ 싱크에 큰 부하가 발생하는 현상을 방지하거나 출력 파일의 크기를 제어하는 용도로 사용됨

In [16]:
""" 처리시간 기반 트리거 """

activityCounts.writeStream.trigger(processingTime='5 seconds')\
    .format("console").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f04907ff390>

In [17]:
""" 일회성 트리거 """

# 개발 중 테스트하거나 운영 환경에서 자주 실행되지 않는 잡을 수동으로 실행할 때 유용
activityCounts.writeStream.trigger(once=True)\
    .format("console").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f04907ffe10>