# このノートブックについて

Apahce Kafkaクラスタから読み込み、Delta Lakeに書き込む例。
今回はシンプルに読んだまま書き込む例とする。

多くのケースでは、読まれたデータを変換（フォーマット変換、数値変換、ID変換など）する必要があることに注意。
またApache Kafkaクラスタにはストリームデータとして情報が書き込まれることを考慮し、ここではApache SparkのStructured Streamingを利用して書き込むことにする。

なお、世の中にはKafka ConnectのDelta Lake Sinkもあるようなのでそちらも参考になると思われる（動作確認は未実施）

# 準備

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import StringType
import os
import time

今回の例では、あらかじめノートブックを起動する際に環境変数で書き込み先を指定している例としている。
必要に応じて以下を編集して書き込み先を定義すること。

なお、本プロジェクトにはJupyter LabをPySparkと連係するためのヘルパーシェルスクリプトを同梱しているが、その中でApache Spark起動時のオプションとして `--conf "spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain"` を指定している。
このため、AWS CLI等を利用して設定した `~/.aws/credentials` にあるクレデンシャル情報を利用してAWS S3にアクセスするようになっている。

もしMinIO等S3互換ストレージを利用する場合は、クレデンシャルの内容を適切に設定しておく必要がある。

In [7]:
checkpoint_location = 'file:///tmp/_checkpoints/etl-from-json'
output_base_url = os.environ['OUTPUT_URL']
output_url = output_base_url + 'el_aircon'

今回は簡単な動作確認目的であるため、1ノード上にPythonクライアントとApache Kafkaのブローカが存在する前提としている。
別途クラスタを立ててそちらに書き込まれている場合は、必要に応じて `bootstrap_servers` の内容を編集すること。

またトピックも、Apache Kafkaクラスタへのデータ書き込み時に使用したものを利用すること。もし変更していたら、編集していただきたい。

In [8]:
bootstrap_servers = 'localhost:9092'
topic_name = 'el_aircon'

Apache Sparkのストリーム処理定義（Apache Kafkaからの読み込み、Delta Lakeへの書き込み）

In [9]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", bootstrap_servers) \
  .option("startingOffsets", "earliest") \
  .option("subscribe", topic_name) \
  .load()

In [10]:
output = df.select(df['key'].cast(StringType()).alias('id'), df['value'].cast(StringType()).alias('state'), df['*'])

In [11]:
output.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", checkpoint_location) \
  .start(output_url)

21/10/25 23:10:30 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

<pyspark.sql.streaming.StreamingQuery at 0x7f746e64a690>

上記を実行後に、 [BasicExampleOfKafkaECHONETLite.ipynb] などを利用し、Apache Kafkaの `el_aircon` トピックにデータを書き込む。
手動でやってもいいし、当該ノートブックを自動起動するようにしておいてもよい。

その作業をしているのを待つためにスリープを挟む。

[BasicExampleOfKafkaECHONETLite.ipynb]: https://github.com/dobachi/PythonKafkaECHONETLiteExample/blob/main/BasicExampleOfKafkaECHONETLite.ipynb

In [12]:
time.sleep(30)

                                                                                

# 書き込まれたデータを読み取る

Delta Lakeとして書き込まれたデータを確認するため、今度はストリームではなくバッチ的に読み取って確認する。

In [13]:
written_df = spark.read.format('delta').load(output_url)
written_df

DataFrame[id: string, state: string, key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [14]:
written_df.show()

[Stage 21:>                                                         (0 + 1) / 1]

+------------+-----+--------------------+----------------+---------+---------+------+--------------------+-------------+
|          id|state|                 key|           value|    topic|partition|offset|           timestamp|timestampType|
+------------+-----+--------------------+----------------+---------+---------+------+--------------------+-------------+
|0068#0x0,0x1|49,21|[30 30 36 38 23 3...|[34 39 2C 32 31]|el_aircon|        0|     0|2021-10-17 00:04:...|            0|
|0068#0x0,0x1|49,21|[30 30 36 38 23 3...|[34 39 2C 32 31]|el_aircon|        0|     1|2021-10-17 00:04:...|            0|
|0068#0x0,0x1|49,21|[30 30 36 38 23 3...|[34 39 2C 32 31]|el_aircon|        0|     2|2021-10-17 00:04:...|            0|
|0748#0x0,0x1|49,20|[30 37 34 38 23 3...|[34 39 2C 32 30]|el_aircon|        0|     3|2021-10-25 22:47:...|            0|
|0748#0x0,0x1|49,20|[30 37 34 38 23 3...|[34 39 2C 32 30]|el_aircon|        0|     4|2021-10-25 22:47:...|            0|
|0748#0x0,0x1|49,20|[30 37 34 38

                                                                                

以下のような出力が見られただろうか？

```
+------------+-----+--------------------+----------------+---------+---------+------+--------------------+-------------+
|          id|state|                 key|           value|    topic|partition|offset|           timestamp|timestampType|
+------------+-----+--------------------+----------------+---------+---------+------+--------------------+-------------+
|0379#0x0,0x1|48,25|[30 33 37 39 23 3...|[34 38 2C 32 35]|el_aircon|        0|    10|2021-10-25 23:11:...|            0|
|0379#0x0,0x1|48,25|[30 33 37 39 23 3...|[34 38 2C 32 35]|el_aircon|        0|    11|2021-10-25 23:11:...|            0|
|0379#0x0,0x1|48,25|[30 33 37 39 23 3...|[34 38 2C 32 35]|el_aircon|        0|     9|2021-10-25 23:11:...|            0|
+------------+-----+--------------------+----------------+---------+---------+------+--------------------+-------------+
```