# 스트리밍 

## word count 예제 
9999 소켓을 통해 들어오는 메시지를 워드카운팅하는 코드입니다. 

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

스트리밍 처리를 위한 쓰레드 두개를 생성합니다 

In [None]:
sc = SparkContext("local[2]", "NetworkWordCount")

배치 간격을 1초로 맞추어 스트리밍 컨텍스트 생성 

In [None]:
ssc = StreamingContext(sc, 1)

스트리밍 데이터를 받을 소켓을 엽니다.

In [None]:
lines = ssc.socketTextStream("localhost", 9999)

띄워쓰기를 기준으로 텍스트 분할

words = lines.flatMap(lambda line: line.split(" "))

각 배치별 워드카운팅을 진행할 코드

In [None]:
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

In [None]:
wordCounts.pprint()

In [None]:
ssc.start()
ssc.awaitTermination()

## stateful streaming 

기존 코드는 각 배치마다 따로 결과를 냈지만 이번에는 누산을 해보도록 하겠습니다. 

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [None]:
sc = SparkContext("local[2]", "StatefulNetworkWordCount")

In [None]:
ssc = StreamingContext(sc, 1)

장애 시, 복구시점 

In [None]:
ssc.checkpoint("checkpoint")

updatebykey의 타겟이 되는 함수 
기존의 것과 새로운 것이 합쳐집니다. 

In [None]:
def updateFunc(new_values, last_sum):
 	return sum(new_values) + (last_sum or 0)

In [None]:
lines = ssc.socketTextStream("localhost", 9999)

In [None]:
running_counts = lines.flatMap(lambda line: line.split(" "))\
					.map(lambda word: (word, 1))\
                    .updateStateByKey(updateFunc)

In [None]:
running_counts.pprint()

In [None]:
ssc.start()

In [None]:
ssc.awaitTermination()

## Structed streaming

spark2.0에서는 dataframe을 이용한 스트리밍 처리를 지원합니다. 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

이번에는 스트리밍 컨텍스트를 만들지 않습니다. 그 이유는 이미 스팤 세션에 들어있기 때문이죠 

In [2]:
spark = SparkSession \
	.builder \
	.appName("StructuredNetworkWordCount") \
	.getOrCreate()

In [3]:
lines = spark\
	.readStream\
	.format('socket')\
	.option('host', 'localhost')\
	.option('port', 9999)\
	.load()

explode : arrray의 각 구성요소를 쪼개서 행으로 만듬(groupby를 가능하게 만듬)<br>
spllit : string을 조건에 맞게 쪼개서 array로 만듬

RDD에서의 lines.flatMap(lambda line: line.split(" "))와 동일한 작업

In [4]:
words = lines.select(
	explode(
    	split(lines.value, ' ')
	).alias('word')
)

word컬럼을 같은 것끼리 묶은 후 카운팅을 합니다. 

In [5]:
# Generate running word count
wordCounts = words.groupBy('word').count()

In [6]:
query = wordCounts\
	.writeStream\
	.outputMode('complete')\
	.format('console')\
	.start()

In [None]:
query.awaitTermination()

## 미션 : 카프키를 이용해서 스트림 처리하기



### 카프카로 읽어오는 방법

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

#### 멀티토픽도 가능합니다. 

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()

#### 토픽목록에 패턴도 걸 수 있습니다. 

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()

#### 오프셋 지정도 가능합니다 

In [None]:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()

### 카프카로 싱크하는 법

In [None]:
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

#### 네 멀티토픽 가능하구요

In [None]:
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1,topic2") \
  .start()