<a href="https://colab.research.google.com/github/YoshiyukiKono/pulsar_for_beginners/blob/main/02_pulsar_consumer_for_astradb_cdc.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Pulsar クライアント開発演習
## 第二部 Astra DB CDC + Pulsarデータ出力

## 概要

### 事前準備
 - 環境準備
  - Astraアカウント登録・サインイン
  - Astra DB データベース、キースペース作成
  - Astra Streaming テナント作成

 - 開発準備
  - Astra DB トークン生成
  - Astra Streaming トークン生成


### 演習概要

####
 - Astraコントロールプレーンでの操作
  - Astra DB テーブル作成
  - Astra DB CDC定義
    - CDC用 Astra Streaming ネームスペース、トピック、サブスクリプション自動生成結果確認
 - 本演習ノートブックでの操作
  - Pulsarコンシューマー実行

---

### Astra DBテーブル作成

ここでは、Astra DBに以下のテーブルが作成され、Astra StreamingへのCDC設定が行われていることを前提とします。
```
CREATE TABLE cdc.cdc_test (
    message_id text PRIMARY KEY,
    message text
)
```

---

## 演習 第二部 Astra DB CDC + Pulsarデータ出力

In [2]:
!pip install pulsar_client==3.3.0 avro-python3==1.10.2



### Astra Streaming 接続設定

#### Connect > Tenant Details > Broaker Service URL

In [3]:
service_url = input("service url:")


service url:pulsar+ssl://pulsar-gcp-useast1.streaming.datastax.com:6651


#### Connect > Tenant Details > Web Service URL

In [4]:
admin_url = input("admin url:")

admin url:https://pulsar-gcp-useast1.api.streaming.datastax.com


In [5]:
import getpass

token = getpass.getpass()
print("token:" + token[0:5] + "..." + token[-5:])
print("token length:" + str(len(token)))

··········
token:eyJhb...euW8Q
token length:548


In [6]:
your_tenant_name = input("tenant name:")

tenant name:cdc


In [15]:
your_namespace = input("namespace:")
your_topic_name = input("topic name:")

topic_url = 'persistent://' + your_tenant_name + '/' + your_namespace + '/' + your_topic_name
print("topic url:" + topic_url)

namespace:astracdc
topic name:data-b04d69bb-6851-4bac-9703-8108dc560c41-cdc.cdc_test
topic url:persistent://cdc/astracdc/data-b04d69bb-6851-4bac-9703-8108dc560c41-cdc.cdc_test


注意：上記で入力する「topic name」は、パスの含まれないトピック自体の名称です。
階層的なパスで表現されたフルネームではありません。
Astraコントロールプレーン上で、トピック名の隣のコピーアイコンをクリックした場合フルネームとしてコピーされるため、注意が必要です。（間違ってフルネームを入力しても、以下の接続確認〜トピックパーティションの取得〜ではエラーとならず、その後のスキーマ取得の際にエラーとなるため注意してください）

In [16]:
import pulsar

client = pulsar.Client(service_url,
                        authentication=pulsar.AuthenticationToken(token))
topic_partitions = client.get_topic_partitions(topic_url)
print(topic_partitions)

['persistent://cdc/astracdc/data-b04d69bb-6851-4bac-9703-8108dc560c41-cdc.cdc_test-partition-0', 'persistent://cdc/astracdc/data-b04d69bb-6851-4bac-9703-8108dc560c41-cdc.cdc_test-partition-1', 'persistent://cdc/astracdc/data-b04d69bb-6851-4bac-9703-8108dc560c41-cdc.cdc_test-partition-2']


## Pulsarコンシューマー実行

In [17]:
import base64
import io
import json
import re
import time
from urllib.request import Request, urlopen

import avro.schema
import pulsar
from avro.io import BinaryDecoder, DatumReader

関数定義

In [18]:
import logging

logging.basicConfig(
    format='%(asctime)s.%(msecs)05d %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

def http_get(url):
    req = Request(url)
    req.add_header("Accept", "application/json")
    req.add_header("Authorization", "Bearer " + token)
    return urlopen(req).read()


def getSchema(admin_url, tenant_name, namespace, topic_name):
    schema_url = "%s/admin/v2/schemas/%s/%s/%s/schema" % (admin_url, tenant_name, namespace, topic_name)

    print(schema_url)

    logging.info("Schema URL='{}'".format(schema_url))
    topic_schema = http_get(schema_url).decode("utf-8")
    # This isn't great
    # the data part of the json has extra back slashes
    topic_schema = topic_schema.replace("\\", "")
    topic_schema = topic_schema.replace('data":"', 'data":')
    topic_schema = topic_schema.replace('}","properties', '},"properties')

    logging.info("Topic'{}' Schema='{}'".format(topic_name, topic_schema))

    schema_json = json.loads(topic_schema)

    data_schema = schema_json["data"]

    keyschema_json = data_schema["key"]
    valueschema_json = data_schema["value"]

    # the namespaces start with numbers and AVRO doesn't like it
    # so strip them out for now
    key_namespace = keyschema_json["namespace"]
    key_namespace = re.sub("\d.*_", "", key_namespace)
    keyschema_json["namespace"] = key_namespace

    value_namespace = valueschema_json["namespace"]
    value_namespace = re.sub("\d.*_", "", value_namespace)
    valueschema_json["namespace"] = value_namespace

    keyAvroSchema = avro.schema.Parse(json.dumps(keyschema_json))
    valueAvroSchema = avro.schema.Parse(json.dumps(valueschema_json))

    return keyAvroSchema, valueAvroSchema

In [19]:
keyAvroSchema, valueAvroSchema = getSchema(admin_url,your_tenant_name, your_namespace, your_topic_name)

https://pulsar-gcp-useast1.api.streaming.datastax.com/admin/v2/schemas/cdc/astracdc/data-b04d69bb-6851-4bac-9703-8108dc560c41-cdc.cdc_test/schema


In [20]:
keyAvroReader = DatumReader(keyAvroSchema)
valueAvroReader = DatumReader(valueAvroSchema)

サブスクリプション名

In [21]:
subscription_name = input("subsuctiption name:")

subsuctiption name:sub


CDCの確認のため、以下のコンシューマーは、更新を待機するものとなっています。
終了するためには、（実行中を示すアイコンをクリックして）人為的に停止（KeyboardInterruptを発生）させてください。
実行中に、Astra DBのテーブルにレコードを追加します（AstraコントロールプレーンのCQL COnsoleが使えます）。
以下は、実行例です、
```
INSERT INTO cdc_test (message_id, message) values ('20231128','Hello World');
```

In [22]:
MODE_RUNNING = True

consumer = client.subscribe(topic_url, subscription_name)

try:
    waitingForMsg = True
    while waitingForMsg:
        try:
            msg = consumer.receive(2000)

            try:
                # The PartitionKey is Base64 Encoded, so it needs to be decoded
                msgKey = msg.partition_key()
                msgKey_decoded = base64.b64decode(msgKey)

                messageKey_bytes = io.BytesIO(msgKey_decoded)
                keydecoder = BinaryDecoder(messageKey_bytes)
                msgKey = keyAvroReader.read(keydecoder)

                message_bytes = io.BytesIO(msg.data())
                decoder = BinaryDecoder(message_bytes)
                msgvalue = valueAvroReader.read(decoder)

                print("Received message key='{}' value='{}'".format(msgKey, msgvalue))
                #print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
                # Acknowledging the message to remove from message backlog
                consumer.acknowledge(msg)
            except:
                consumer.negative_acknowledge(msg)

        except Exception as ex:
            print(ex)
            if MODE_RUNNING:
                print("Still waiting for a message...");
            else:
                print("Stop waiting for a message.");
                waitingForMsg = False
        time.sleep(1)

finally:
  consumer.close()

Received message key='{'message_id': '2'}' value='{'message': 'Hello2'}'
Received message key='{'message_id': '9'}' value='{'message': 'Hello9'}'
Received message key='{'message_id': '6'}' value='{'message': 'Hello6'}'
Received message key='{'message_id': '19'}' value='{'message': 'Hello taro19'}'
Received message key='{'message_id': '16'}' value='{'message': 'Hello taro16'}'
Received message key='{'message_id': '10'}' value='{'message': 'Hello taro10'}'
Received message key='{'message_id': '3'}' value='{'message': 'Hello3'}'
Received message key='{'message_id': '5'}' value='{'message': 'Hello5'}'
Received message key='{'message_id': '4'}' value='{'message': 'Hello4'}'
Received message key='{'message_id': '17'}' value='{'message': 'Hello taro17'}'
Received message key='{'message_id': '18'}' value='{'message': 'Hello taro18'}'
Received message key='{'message_id': '12'}' value='{'message': 'Hello taro12'}'
Received message key='{'message_id': '14'}' value='{'message': 'Hello taro14'}'
Re

KeyboardInterrupt: ignored

## 参考：APIコールの確認

https://*.api.astra.datastax.com/admin/v2/schemas/{tenant}/{namespace}/{topic}/schema

In [30]:
print(admin_url)
print(your_tenant_name)
print(your_namespace)
print(your_topic_name)

https://pulsar-gcp-useast1.api.streaming.datastax.com
cdc
astracdc
data-b04d69bb-6851-4bac-9703-8108dc560c41-cdc.cdc_test


In [31]:
schema_url = "%s/admin/v2/schemas/%s/%s/%s/schema" % (admin_url, your_tenant_name, your_namespace, your_topic_name)
print(schema_url)

https://pulsar-gcp-useast1.api.streaming.datastax.com/admin/v2/schemas/cdc/astracdc/data-b04d69bb-6851-4bac-9703-8108dc560c41-cdc.cdc_test/schema


In [32]:
req = Request(schema_url)
req.add_header("Accept", "application/json")
req.add_header("Authorization", "Bearer " + token)
res = urlopen(req).read()
print(res)

b'{"version":0,"type":"KEY_VALUE","timestamp":1699844893626,"data":"{\\"key\\":{\\"type\\":\\"record\\",\\"name\\":\\"cdc_test\\",\\"namespace\\":\\"_62303464363962622d363835312d346261632d393730332d383130386463353630633431_cdc\\",\\"doc\\":\\"Table 62303464363962622d363835312d346261632d393730332d383130386463353630633431_cdc.cdc_test\\",\\"fields\\":[{\\"name\\":\\"message_id\\",\\"type\\":\\"string\\"}]},\\"value\\":{\\"type\\":\\"record\\",\\"name\\":\\"cdc_test\\",\\"namespace\\":\\"_62303464363962622d363835312d346261632d393730332d383130386463353630633431_cdc\\",\\"doc\\":\\"Table 62303464363962622d363835312d346261632d393730332d383130386463353630633431_cdc.cdc_test\\",\\"fields\\":[{\\"name\\":\\"message\\",\\"type\\":[\\"null\\",\\"string\\"],\\"default\\":null}]}}","properties":{"key.schema.properties":"{}","value.schema.properties":"{}","value.schema.type":"AVRO","key.schema.name":"cdc_test","value.schema.name":"cdc_test","kv.encoding.type":"SEPARATED","key.schema.type":"AVRO"}}'


## クライアント接続のクローズ

In [33]:
client.close()