Before we start, we need to make sure that we have a Kafka cluster running and a topic that produces some streaming data. For simplicity, we will use a single-node Kafka cluster and a topic named `users`. Open the `5.0 user-gen-kafka.ipynb` notebook and execute the cell. This notebook produces a user record every few seconds and put it on a Kafka topic called `users`. 

In [1]:
from delta import  # Delta Lake 라이브러리 임포트 configure_spark_with_delta_pip, DeltaTable
from pyspark.sql import SparkSession  # Spark SQL 작업을 위한 SparkSession 임포트
from pyspark.sql.functions import  # Spark SQL 함수들 임포트 col, from_json,to_timestamp
from pyspark.sql.types import  # Spark SQL 데이터 타입 임포트 StructType, StructField, IntegerType, StringType

In [2]:
builder = (SparkSession.builder  # SparkSession 빌더 패턴 시작
           .appName("monitor-stream")  # 애플리케이션 이름 설정
           .master("spark://spark-master:7077")  # Spark 마스터 URL 설정
           .config("spark.executor.memory", "512m")  # Spark 설정 옵션
           .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")  # Spark 설정 옵션
           .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")  # Spark 설정 옵션)

spark = configure_spark_with_delta_pip(builder,['org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1']).getOrCreate()  # SparkSession 생성 또는 기존 세션 반환
spark.sparkContext.setLogLevel("ERROR")  # 로그 레벨을 ERROR로 설정

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-eb1864be-ea03-415c-8fb0-b08b2cd19aee;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in centra

In [3]:
get_ipython().run_line_magic('load_ext', 'sparksql_magic')
get_ipython().run_line_magic('config', 'SparkSql.limit=20')

In [4]:
schema = StructType  # 구조체 타입([
    StructField  # 구조체 필드('id', IntegerType(), True),
    StructField  # 구조체 필드('name', StringType(), True),
    StructField  # 구조체 필드('age', IntegerType(), True),
    StructField  # 구조체 필드('gender', StringType(), True),
    StructField  # 구조체 필드('country', StringType(), True),
    StructField  # 구조체 필드('timestamp', StringType(), True)])

users_df = (spark.readStream  # 스트리밍 데이터 읽기
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("subscribe", "users")
      .option("startingOffsets", "latest")
      .load(  # 파일 로드)
      .withColumn(  # 새 컬럼 추가 또는 기존 컬럼 수정'value', from_json(col('value').cast("STRING"), schema)))

users_df = users_df.select(  # 컬럼 선택
    col(  # 컬럼 참조'value.id').alias('id'),
    col(  # 컬럼 참조'value.name').alias('name'),
    col(  # 컬럼 참조'value.age').alias('age'),
    col(  # 컬럼 참조'value.gender').alias('gender'),
    col(  # 컬럼 참조'value.country').alias('country'),
    to_timestamp(col(  # 컬럼 참조'value.timestamp'), "MM/dd/yyyy, HH:mm:ss").alias('timestamp'))

In [5]:
query = (users_df.writeStream  # 스트리밍 데이터 쓰기
   .format("delta")  # Delta Lake 형식으로 저장
   .queryName("user-kafka-stream")
   .outputMode(  # 스트리밍 출력 모드 설정"append")
   .option("checkpointLocation", "/opt/workspace/data/delta_lake/monitor-streams/users/_checkpoints/")
   .start("/opt/workspace/data/delta_lake/monitor-streams/users"))

In [6]:
query.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

[Stage 0:>                                                          (0 + 1) / 1]

In [9]:
query.recentProgress

[{'id': 'f7837694-913d-4b90-ac7b-0ffc6f67747a',
  'runId': '5e1a5dcf-e297-48a2-b70f-a3bbe04815d2',
  'name': 'user-kafka-stream',
  'timestamp': '2024-02-04T18:54:54.132Z',
  'batchId': 0,
  'numInputRows': 0,
  'inputRowsPerSecond': 0.0,
  'processedRowsPerSecond': 0.0,
  'durationMs': {'addBatch': 20207,
   'commitOffsets': 86,
   'getBatch': 27,
   'latestOffset': 2290,
   'queryPlanning': 536,
   'triggerExecution': 23274,
   'walCommit': 94},
  'stateOperators': [],
  'sources': [{'description': 'KafkaV2[Subscribe[users]]',
    'startOffset': None,
    'endOffset': {'users': {'0': 275}},
    'latestOffset': {'users': {'0': 275}},
    'numInputRows': 0,
    'inputRowsPerSecond': 0.0,
    'processedRowsPerSecond': 0.0,
    'metrics': {'avgOffsetsBehindLatest': '0.0',
     'maxOffsetsBehindLatest': '0',
     'minOffsetsBehindLatest': '0'}}],
  'sink': {'description': 'DeltaSink[/opt/workspace/data/delta_lake/monitor-streams/users]',
   'numOutputRows': -1}}]



In [10]:
from pyspark.sql.streaming import StreamingQueryListener

# Define a custom listener class
class MyListener(StreamingQueryListener):

    # Override the onQueryStarted method
    def onQueryStarted(self, event):
        # 출력 the query name and id when it starts
        print(f"'{event.name}' [{event.id}] got started!")

    # Override the onQueryProgress method
    def onQueryProgress(self, event):
        # 출력 the input rate and processing rate when it progresses
        print(f"Query made progress: " + str(event.progress))

    # Override the onQueryTerminated method
    def onQueryTerminated(self, event):
        # 출력 the exception message when it terminates
        if event.exception:
            print(f"Query with id {event.id} terminated with exception: {event}")
        else:
            print(f"Query with id {event.id} terminated normally")

# 생성 an instance of the listener class
listener = MyListener()

# Register the listener with spark.streams
spark.streams.addListener(listener)

                                                                                

Query made progress: {
  "id" : "f7837694-913d-4b90-ac7b-0ffc6f67747a",
  "runId" : "5e1a5dcf-e297-48a2-b70f-a3bbe04815d2",
  "name" : "user-kafka-stream",
  "timestamp" : "2024-02-04T18:55:17.447Z",
  "batchId" : 1,
  "numInputRows" : 2,
  "inputRowsPerSecond" : 0.08578168561012224,
  "processedRowsPerSecond" : 0.2604166666666667,
  "durationMs" : {
    "addBatch" : 7489,
    "commitOffsets" : 65,
    "getBatch" : 0,
    "latestOffset" : 13,
    "queryPlanning" : 30,
    "triggerExecution" : 7680,
    "walCommit" : 78
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[users]]",
    "startOffset" : {
      "users" : {
        "0" : 275
      }
    },
    "endOffset" : {
      "users" : {
        "0" : 277
      }
    },
    "latestOffset" : {
      "users" : {
        "0" : 277
      }
    },
    "numInputRows" : 2,
    "inputRowsPerSecond" : 0.08578168561012224,
    "processedRowsPerSecond" : 0.2604166666666667,
    "metrics" : {
      "avgOffsetsB

In [13]:
query.stop()

In [14]:
spark.stop()  # Spark 세션 종료 - 리소스 정리 