# Kafka Streaming + PySpark 예제

### 1. findspark를 통해 pyspark 등 라이브러리 추가

In [1]:
import findspark
findspark.init("/usr/local/lib/spark-3.3.2-bin-hadoop3")

### 2. 동작하고있는 Kafka 서버와 Topic을 정의

In [2]:
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, pandas_udf, split

kafka_bootstrap_servers = 'slave03:9092'
topic = 'quickstart-events'

### 3. SparkConf를 통해 configuration 추가하고, SparkContext 생성
spark.jars.packages 옵션을 통해 Maven Repository에서 특절 Group,Artifact, Version의 Jar 파일을 가져올 수 있다. \<groupId>:\<artifactID>:\<version>의 형식으로 값을 넘겨줄 수 있으며, Spark는 받은 jar 파일을 자동으로 HDFS에 넘겨주어 의존성을 추가한다. 이 예제에선 org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 패키지를 통해 Spark와 Kafka를 연동한다.

In [3]:
sconf = SparkConf()
sconf.setAppName("Jupyter_Notebook").set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2")

sc = SparkContext(conf=sconf)

:: loading settings :: url = jar:file:/usr/local/lib/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-298400fa-0ce9-4e18-8eeb-ca542a7cc368;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading https://r

23/03/04 05:19:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/04 05:19:05 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/03/04 05:19:09 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.3.2.jar added multiple times to distributed cache.
23/03/04 05:19:09 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.3.2.jar added multiple times to distributed cache.
23/03/04 05:19:09 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.8.1.jar added multiple times to distributed cache.
23/03/04 05:19:09 WARN Client: Same path resource file:///root/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.0.jar added multiple times to distributed cache.
23/03/04 05:19:09 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.commons_commons-pool2-2.11.1.jar added multiple times to distributed cache.
23/03/04 05:19:09 WARN Client: 

### 4. SparkSession을 Kafka 세션으로 정의, readStream-load를 통해 스트리밍 세션으로 연동

In [4]:
session = SparkSession(sc)
streamming_df = session \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("failOnDataLoss","False") \
  .option("subscribe", topic) \
  .load()
streamming_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### 5. timestamp 컬럼에서 밀리초 단위 제거, 초 단위 그룹핑, 20초동안 콘솔로 스트림 출력

In [23]:
import time

streamming_query = streamming_df.withColumn("timestamp_sec", col("timestamp").cast("string").substr(12, 8)).groupby("timestamp_sec").count()

query = streamming_query.writeStream.format("console").outputMode("complete").start()

time.sleep(20)
query.stop()

23/03/04 05:08:30 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-78398bba-298a-407f-8a82-0ea580d0dff0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/04 05:08:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------+-----+
|timestamp_sec|count|
+-------------+-----+
+-------------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------+-----+
|timestamp_sec|count|
+-------------+-----+
|     05:08:35|    2|
|     05:08:34|   10|
|     05:08:33|    4|
+-------------+-----+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------+-----+
|timestamp_sec|count|
+-------------+-----+
|     05:08:38|    4|
|     05:08:35|    6|
|     05:08:37|    6|
|     05:08:34|   10|
|     05:08:36|   10|
|     05:08:33|    4|
+-------------+-----+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------+-----+
|timestamp_sec|count|
+-------------+-----+
|     05:08:39|    7|
|     05:08:38|    6|
|     05:08:35|    6|
|     05:08:40|    5|
|     05:08:37|    6|
|     05:08:34|   10|
|     05:08:36|   10|
|     05:08:33|    4|
+-------------+-----+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------+-----+
|timestamp_sec|count|
+-------------+-----+
|     05:08:39|    7|
|     05:08:38|    6|
|     05:08:35|    6|
|     05:08:40|    5|
|     05:08:37|    6|
|     05:08:34|   10|
|     05:08:41|    6|
|     05:08:36|   10|
|     05:08:33|    4|
|     05:08:42|    7|
+-------------+-----+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+-------------+-----+
|timestamp_sec|count|
+-------------+-----+
|     05:08:39|    7|
|     05:08:38|    6|
|     05:08:35|    6|
|     05:08:40|    5|
|     05:08:37|    6|
|     05:08:34|   10|
|     05:08:43|    6|
|     05:08:45|    1|
|     05:08:41|    6|
|     05:08:36|   10|
|     05:08:44|    7|
|     05:08:33|    4|
|     05:08:42|    7|
+-------------+-----+





23/03/04 05:08:50 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@6d84b86a is aborting.
23/03/04 05:08:50 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@6d84b86a aborted.




23/03/04 05:08:50 WARN TaskSetManager: Lost task 85.0 in stage 13.0 (TID 1288) (slave01 executor 1): TaskKilled (Stage cancelled)
23/03/04 05:08:50 WARN TaskSetManager: Lost task 78.0 in stage 13.0 (TID 1287) (slave03 executor 2): TaskKilled (Stage cancelled)


### 6. Session과 Context 종료

In [24]:
session.stop()
sc.stop()