In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
conf = SparkConf().setAppName("streaming").setMaster("local")

In [4]:
sc = SparkContext(conf = conf)

In [5]:
spark = SparkSession(sc)

In [6]:
sc

In [7]:
# 데이터셋 불러오기
#static = spark.read.json('file:///home/ubuntu/Spark-The-Definitive-Guide/data/activity-data/') 아래것도 같음. 다만 이게 더 짧을 뿐..
static = spark.read.format('json').load('./activity-data/')
dataSchema = static.schema

In [None]:
# 데이터 스키마 형태
print(dataSchema)

In [None]:
# 데이터 프레임
static

In [None]:
# 기본적으로 정적 구조적 API의 모든 트랜스포메이션은 스트리밍 DataFrame에서도 사용 가능
# 구조적 스트리밍에서 스키마 추론 기능을 사용하고 싶을 경우 명시적으로 설정해야함
# maxFilesPerTrigger는 폴더 내의 전체 파일을 얼마나 빨리 읽을지 결정해주는 파라미터. 낮게 잡으면 트리거당 하나의 파일을 읽게 만들어 스트림의 흐름을 인위적으로 제한할 수 있음
streaming = spark.readStream.schema(dataSchema).option('maxFilesPerTrigger', 1).json('file:///home/ubuntu/ybigtatask/activity-data/')

In [None]:
# 스트리밍 DataFrame은 지연 처리 방식으로 동작함
# 스트림 처리를 시작하는 액션을 호출하기 전에 스트리밍 DataFrame에 대한 트랜스포메이션을 지정할 수 있음
activityCounts = streaming.groupBy('gt').count()
print(activityCounts)

In [None]:
# 스트림 쿼리를 시작하는 액션 
# 쿼리 결과를 내보낼 목적지나 싱크를 지정해야함 -> 예제에서는 결과를 메모리에 저장하는 메모리 싱크 사용
# 스트림 처리에 사용되는 쿼리의 이름을 activity_counts로 설정
# memory VS console
activityQuery = activityCounts.writeStream.queryName('activity_counts').format('memory').outputMode('complete').start()

# activityQuery.awaitTermination()  주석 풀면 안돼요!

In [None]:
# 스트림 처리 중에 집계 결과가 저장된 메모리 테이블을 조회할 수 있음
# 시점마다 다른 결과가 반환됨
from time import sleep

for x in range(5):
    spark.sql('SELECT * FROM activity_counts').show()
    sleep(1)

In [None]:
# 예제 실행 코드
# 백그라운드에서 스트리밍 연산 실행됨. 쿼리 실행 중 드라이버 프로세스 종료되는 상황 방지
# 실행시 끝날때까지 기다려야하기 때문에 실행 ㄴㄴ
# 운영시 반드시 필요한 코드. 없으면 스트림 처리 실행할 수 없음
activityQuery.awaitTermination()

In [None]:
# 스트림 중지
activityQuery.stop()

In [None]:
# 현재 실행중인 스트림 목록
spark.streams.active

##  스트림 트랜스포메이션
- 모든 유형의 선택과 필터, 그리고 트랜스포메이션뿐만 아니라 DataFrame의 모든 함수와 개별 컬럼 처리도 지원함

### 선택과 필터링
- 구조적 스트리밍은 DataFrame의 모든 함수와 개별 컬럼을 처리하는 선택과 필터링 그리고 단순 트랜스포메이션을 지원함

In [None]:
from pyspark.sql.functions import expr

simpleTransform = streaming.withColumn('stairs', expr("gt like '%stairs%'")).where('stairs').where('gt is not null')\
    .select('gt', 'model', 'arrival_time', 'creation_time')\
    .writeStream\
    .queryName('simple_transform')\
    .format('memory')\
    .outputMode('append')\
    .start()

In [None]:
from time import sleep

for x in range(5):
    spark.sql('SELECT * FROM simple_transform').show()
    sleep(1)

In [None]:
simpleTransform.stop()

### 집계(aggregate) - Average

In [None]:
deviceModelStats = streaming.cube('gt', 'model').avg()\
    .drop('avg(Arrival_time)')\
    .drop('avg(Creation_Time)')\
    .drop('avg(Index)')\
    .writeStream.queryName('device_counts').format('memory')\
    .outputMode('complete')\
    .start()

NameError: ignored

In [None]:
from time import sleep

for x in range(5):
    spark.sql('SELECT * FROM device_counts').show()
    sleep(1)

In [None]:
deviceModelStats.stop()

### join

In [None]:
# cube?
historicalAgg = static.groupBy('gt', 'model').avg()
deviceModelStats = streaming.drop('Arrival_Time', 'Creation_Time', 'Index')\
    .cube('gt','model').avg()\
    .join(historicalAgg, ['gt', 'model'])\
    .writeStream.queryName('device_counts').format('memory')\
    .outputMode('complete')\
    .start()

In [None]:
from time import sleep

for x in range(5):
    spark.sql('SELECT * FROM device_counts').show()
    sleep(1)

In [None]:
deviceModelStats.stop()

# 스트림 입출력

## 스트림 싱크 예시(카프카)

In [None]:
# ex
df1 = spark.readStream.format('kafka')\
    .option('kafaka.bootstrap.servers', 'host1:port1, host2:port2')\
    .option('subscribe', 'topic1')\
    .load()

# 여러 개의 토픽 수신
df1 = spark.readStream.format('kafka')\
    .option('kafaka.bootstrap.servers', 'host1:port1, host2:port2')\
    .option('subscribe', 'topic1, topic2')\
    .load()

# 패턴에 맞는 토픽 수신
df1 = spark.readStream.format('kafka')\
    .option('kafaka.bootstrap.servers', 'host1:port1, host2:port2')\
    .option('subscribe', 'topic.*')\
    .load()

In [None]:
df1.selectExpr('topic', 'CAST(key AS STRING)', 'CAST(value AS STRING)')\
    .writeStream\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'host1:port1,host2:port2')\
    .option('checkpointLocation', '/to/HDFS-compatible/dir')\
    .start()

df1.selectExpr('topic', 'CAST(key AS STRING)', 'CAST(value AS STRING)')\
    .writeStream\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'host1:port1,host2:port2')\
    .option('checkpointLocation', '/to/HDFS-compatible/dir')\
    .option('topic', 'topic1')
    .start()

## 테스트용 소스와 싱크
- 스트리밍 쿼리의 프로토타입을 만들거나 디버깅 시 유용한 몇 가지 테스트용 소스와 싱크를 제공함
- 운영환경에서는 사용하면 안됨, 종단 간 내고장성을 지원하지 않기 때문에 개발 시에만 사용

- 소켓 소스
  - TCP 소켓을 통해 스트림 데이터를 전송할 수 있음
  - 데이터를 읽기 위한 호스트와 포트를 지정해야함
  - 디버깅엔 유용하지만 내고장성 보장하지 못함
  - nc -lk 9999 명령어를 통해 NetCat 사용하기
  - Unix계열은 다 가능. EC2 컴퓨터에 새롭게 접속해서 새로운 Shell에서 nc -lk 9999 실행하기
 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()


words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()



# # " "로 구분된 item과 price를 받고 각각 item과 price라는 column으로 넣기
# items = lines.withColumn('item', split(lines.value, " ")[0])\
#               .withColumn('price', split(lines.value, " ")[1].cast(DoubleType()))

# # item별로 price 평균 구하기
# getAvg = items.groupBy("item").avg("price")

# word count, 평균 구하기 둘 중 하나 or 둘 다 실습 때 해보기

In [None]:
# update VS complete ?

# query = getAvg \
query = wordCounts \
    .writeStream \
    .queryName('socket_word') \
    .outputMode("update") \
    .format("console") \
    .start()
#format을 console로 하면 테이블이 console에서 나타남.
#그렇다면.. 아래 Shell에서 코드 돌리면 나올까? output으로 나타나게 하는 방법은..?

In [None]:
from time import sleep

for x in range(5):
    spark.sql('SELECT * FROM socket_word').show()
    sleep(1)
    

In [None]:
query.stop()

## 트리거

In [None]:
streaming = spark.readStream.schema(dataSchema).option('maxFilesPerTrigger', 1).json('file:///home/ubuntu/ybigta_session_homework/Spark-The-Definitive-Guide/data/activity-data/')

In [None]:
activityCounts=streaming

In [None]:
# 처리 시간 기반 트리거
a = activityCounts.writeStream.trigger(processingTime='5 seconds').queryName('simple_transform').format('console').outputMode('append').start()

In [None]:
a.stop()

In [None]:
b = activityCounts.writeStream.trigger(once=True).format('console').outputMode('append').start()

In [None]:
b.stop()