# Jupter(bash_kernel) Pulsar Manager

Current Pulsar cluster：
- Pulsar version 2.10.1.10
- Zookeeper: 127.0.0.1:12181,127.0.0.1:22181,127.0.0.1:32181
- Brokers: 
  - http://127.0.0.1:18080,127.0.0.1:28080,127.0.0.1:38080
  - pulsar://127.0.0.1:16650,127.0.0.1:26650,127.0.0.1:36650
- Bookies: 
  - bookiePort=13181
  - bookiePort=23181
  - bookiePort=33181

Notice: We using bash_kernel, you may need install bash_kernel and restart VS code.

```bash
pip install bash_kernel
python -m bash_kernel.install
```

## Prepare

In [11]:
# 1. Configurue the Pulsar and JAVA env.
source ~/.bash_profile
export PULSAR_HOME='/Users/futeng/Documents/workspaces/github/futeng/pulsar-pseudo-cluster/pulsar-1'
export PATH=$PATH:$PULSAR_HOME/bin

# 2. Check.
java -version
pulsar version
pulsar-admin --version

openjdk version "11.0.21" 2023-10-17 LTS
OpenJDK Runtime Environment (build 11.0.21+10-LTS)
OpenJDK 64-Bit Server VM (build 11.0.21+10-LTS, mixed mode)
Current version of pulsar is: 2.10.1.10
Current version of pulsar admin client is: 2.10.1.10


## Operations

### Threads

In [84]:
pulsar-1/bin/pulsar-daemon restart broker
pulsar-2/bin/pulsar-daemon restart broker

doing restart broker ...
stopping broker
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown completed.
starting broker, logging to /Users/futeng/Documents/workspaces/github/futeng/pulsar-pseudo-cluster/pulsar-1/logs/pulsar-broker-mbp16.local.log
Note: Set immediateFlush to true in conf/log4j2.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations.
doing restart broker ...
stopping broker
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in progress... Please wait...
Shutdown is in pro

In [None]:
pulsar-1/bin/pulsar-daemon restart bookie
pulsar-2/bin/pulsar-daemon restart bookie
pulsar-3/bin/pulsar-daemon restart bookie

In [None]:
pulsar-1/bin/pulsar-daemon restart zookeeper
pulsar-2/bin/pulsar-daemon restart zookeeper
pulsar-3/bin/pulsar-daemon restart zookeeper

In [None]:
pulsar-1/bin/pulsar-daemon stop broker
pulsar-2/bin/pulsar-daemon stop broker
pulsar-1/bin/pulsar-daemon stop bookie
pulsar-2/bin/pulsar-daemon stop bookie
pulsar-3/bin/pulsar-daemon stop bookie
pulsar-1/bin/pulsar-daemon stop zookeeper
pulsar-2/bin/pulsar-daemon stop zookeeper
pulsar-3/bin/pulsar-daemon stop zookeeper

In [100]:
jps -l | sort -r

68104 org.apache.pulsar.client.cli.PulsarClientTool
67693 org.apache.pulsar.PulsarBrokerStarter
67606 org.apache.pulsar.PulsarBrokerStarter
4993 jdk.jcmd/sun.tools.jps.Jps
43092 org.apache.bookkeeper.server.Main
43026 org.apache.bookkeeper.server.Main
42972 org.apache.bookkeeper.server.Main
41809 org.apache.zookeeper.server.quorum.QuorumPeerMain
41740 org.apache.zookeeper.server.quorum.QuorumPeerMain
41673 org.apache.zookeeper.server.quorum.QuorumPeerMain
17498 com.intellij.idea.Main


### Tenants

In [16]:
# 创建租户
tenant_name="futeng" && \
    pulsar-admin tenants create $tenant_name && \
    pulsar-admin tenants list

futeng
public
pulsar
t1
t2


### Namespaces


In [17]:
# 创建命名空间 futeng/ftns
tenant=futeng ; namespace=ns4e 
pulsar-admin namespaces create "$tenant/$namespace" 
pulsar-admin namespaces list $tenant  

futeng/ns4e


#### TTL and Retention

In [58]:
pulsar-admin namespaces set-message-ttl -ttl 10 $tenant/$namespace
pulsar-admin namespaces get-message-ttl $tenant/$namespace

10


In [72]:
pulsar-admin namespaces set-retention --time 60s --size 100m $tenant/$namespace
pulsar-admin namespaces get-retention $tenant/$namespace

{
  "retentionTimeInMinutes" : 1,
  "retentionSizeInMB" : 100
}


### Topic

```bash
pulsar-admin topics create-partitioned-topic \
  --partitions $partition_num \
  "$tenant/$namespace/$topic"

pulsar-admin topics list-partitioned-topics \
   "$tenant/$namespace" 
   
pulsar-admin topics grant-permission \
  --role $user_role \
  --actions produce,consume \
  "$tenant/$namespace/$topic"

pulsar-admin topics permissions \
  "$tenant/$namespace/$topic"
```

In [24]:
# 创建topic futeng/ftns/logtest 并赋权
tenant=futeng; namespace=ftns; topic=logtest; partition_num=3; user_role="pulsar-consumer-test" && \
  pulsar-admin topics create-partitioned-topic --partitions $partition_num "$tenant/$namespace/$topic" && \
  pulsar-admin topics list-partitioned-topics "$tenant/$namespace"  && \
  pulsar-admin topics grant-permission --role $user_role --actions produce,consume "$tenant/$namespace/$topic" && \
  pulsar-admin topics permissions "$tenant/$namespace/$topic"

In [None]:
# 单独赋权
! tenant=futeng; namespace=ftns; topic=logtest; partition_num=3; user_role="pulsar-consumer-test" && \
  pulsar-admin topics list-partitioned-topics "$tenant/$namespace"  && \
  pulsar-admin topics grant-permission --role $user_role --actions produce,consume "$tenant/$namespace/$topic" && \
  pulsar-admin topics permissions "$tenant/$namespace/$topic"

In [26]:
pulsar-admin topics create-partitioned-topic \
  --partitions 1 \
  "futeng/ns4e/p1"

pulsar-admin topics list-partitioned-topics futeng/ns4e


persistent://futeng/ns4e/p3
persistent://futeng/ns4e/p1


### Pub/Sub

In [23]:
pulsar-admin topics list-partitioned-topics futeng/ns4e

persistent://futeng/ns4e/p3


### Stats

```bash
    stats      Get the stats for the topic and its connected producers and 
      Usage: stats [options] persistent://tenant/namespace/topic
    stats-internal      Get the internal stats for the topic
      Usage: stats-internal [options] persistent://tenant/namespace/topic
    partitioned-stats      Get the stats for the partitioned topic and its 
      Usage: partitioned-stats [options] persistent://tenant/namespace/topic
            Get per partition stats
    partitioned-stats-internal      Get the internal stats for the partitioned 
      Usage: partitioned-stats-internal persistent://tenant/namespace/topic
```

In [None]:
# pulsar-admin topics stats persistent://futeng/ns4e/p3
# pulsar-admin topics stats-internal persistent://futeng/ns4e/p3

pulsar-admin topics partitioned-stats persistent://futeng/ns4e/p3



In [None]:
pulsar-admin topics partitioned-stats persistent://futeng/ns4e/p1

In [None]:
pulsar-admin topics partitioned-stats-internal persistent://futeng/ns4e/p1

### JWT Token

注意：使用 `pulsar` 命令创建 token，需要如前文一样配置好 pulsar 的环境变量。

**Shell 示例**

```bash
# 给 token 命名。注意这个名字和赋权使用的 role 名字是同一个。
role_name={{token_subject_name_same_as_role_name}}
# 指定过期时间，过期前客户端需要替换有效的 token 即可。
expiry_time="7d"
# 注意 --secret-key 指向私钥位置，这在集群里是固定位置，无需修改
secret_key="/home/futeng/pulsar-node/conf/my-secret.key"

# 使用 pulsar 工具的 tokens 子命令，创建 token。
pulsar tokens create \
  --secret-key $secret_key \
  --subject $role_name \
  --expiry-time $expiry_time
```

**Jupyter示例**

创建一个给日志应用使用的 token，有效期设置为 365 天。

```bash
! role_name=logger; expiry_time="365d"; secret_key="/home/futeng/pulsar-node/conf/my-secret.key" && \
  pulsar tokens create --secret-key $secret_key --subject $role_name --expiry-time $expiry_time
```

使用 bash64 secretKey方式：

```bash
! tokenSecretKey='data:;base64,pvzNRGk2wW9eTqvd1uwzjX6b4mD/47am57+uh/Gb6SQ=' && \
  pulsar tokens create --secret-key $tokenSecretKey --subject "pulsar-consumer-test"
```

In [20]:
# 创建 Token
! tokenSecretKey='data:;base64,pvzNRGk2wW9eTqvd1uwzjX6b4mD/47am57+uh/Gb6SQ=' && \
  pulsar tokens create --secret-key $tokenSecretKey --subject "pulsar-consumer-test"

# eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItY29uc3VtZXItdGVzdCJ9.WeRJQidiAMhTXrkfgLUkI2YLdZSEFX9DO6K3XEQdcSs

eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItY29uc3VtZXItdGVzdCJ9.WeRJQidiAMhTXrkfgLUkI2YLdZSEFX9DO6K3XEQdcSs


## Simgle Topic test

In [37]:
tenant=futeng 
namespace=ns4e
topic_name=p1 
partition_num=1
topic=persistent://$tenant/$namespace/$topic_name
echo $topic

persistent://futeng/ns4e/p1


In [None]:
pulsar-admin topics list-partitioned-topics futeng/ns4e
pulsar-admin topics partitioned-stats persistent://futeng/ns4e/p1

In [99]:
pulsar-admin topics partitioned-stats-internal $topic

{
  "metadata" : {
    "partitions" : 1
  },
  "partitions" : {
    "persistent://futeng/ns4e/p1-partition-0" : {
      "entriesAddedCounter" : 1173,
      "numberOfEntries" : 172,
      "totalSize" : 186580,
      "currentLedgerEntries" : 1,
      "currentLedgerSize" : 1084,
      "lastLedgerCreatedTimestamp" : "2024-01-03T22:22:42.869+08:00",
      "waitingCursorsCount" : 1,
      "pendingAddEntriesCount" : 0,
      "lastConfirmedEntry" : "23:0",
      "state" : "LedgerOpened",
      "ledgers" : [ {
        "ledgerId" : 21,
        "entries" : 171,
        "size" : 185496,
        "offloaded" : false,
        "underReplicated" : false
      }, {
        "ledgerId" : 23,
        "entries" : 0,
        "size" : 0,
        "offloaded" : false,
        "underReplicated" : false
      } ],
      "cursors" : {
        "s1" : {
          "markDeletePosition" : "21:170",
          "readPosition" : "21:171",
          "waitingReadOp" : false,
          "pendingReadOps" : 0,
          "message

In [98]:
pulsar-perf produce $topic -r 200 -bm 1 -m 1

2024-01-03T22:23:38,550+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - JVM args [-Dlog4j.configurationFile=log4j2.yaml, -Djava.net.preferIPv4Stack=true, -Dpulsar.allocator.exit_on_oom=true, -Dio.netty.recycler.maxCapacity.default=1000, -Dio.netty.recycler.linkCapacity=1024, -Dpulsar.log.appender=Console, -Dpulsar.log.level=info, -Dpulsar.log.root.level=info, -Dpulsar.log.dir=/Users/futeng/Documents/workspaces/github/futeng/pulsar-pseudo-cluster/pulsar-1/logs, -Dpulsar.log.file=pulsar-perftest.log]
2024-01-03T22:23:38,567+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Netty max memory (PlatformDependent.maxDirectMemory()) 8 GB
2024-01-03T22:23:38,567+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - JVM max heap memory (Runtime.getRuntime().maxMemory()) 8 GB
2024-01-03T22:23:38,590+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
  "confFile" : "/Users/futeng/D

In [93]:
pulsar-client consume $topic -s s1 -p Earliest -n 0

2024-01-03T22:18:57,989+0800 [main] WARN  io.netty.resolver.dns.DnsServerAddressStreamProviders - Can not find io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS.
2024-01-03T22:18:58,084+0800 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x75b20446, L:/127.0.0.1:58724 - R:/127.0.0.1:16650]] Connected to server
2024-01-03T22:18:58,149+0800 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":null,"subscriptionName":"s1","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"15c55","ackTimeoutMillis":0,"tickDuratio

In [80]:
pulsar-admin topics unsubscribe $topic -s s5

## 生产和消费测试

**Shell 示例**

```bash
pulsar-client \
  --url pulsar://pulsar02:6650,pulsar03:6650 \
  --auth-plugin "org.apache.pulsar.client.impl.auth.AuthenticationToken" \
  --auth-params {"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItY29uc3VtZXItdGVzdCJ9.WeRJQidiAMhTXrkfgLUkI2YLdZSEFX9DO6K3XEQdcSs"} \
  produce persistent://futeng/ftns/logtest \
  -n 10  -m "hello pulsar"
```

**Jupyter示例**

```bash
! pulsar-client \
  --url pulsar://pulsar02:6650,pulsar03:6650 \
  --auth-plugin "org.apache.pulsar.client.impl.auth.AuthenticationToken" \
  --auth-params '{"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItY29uc3VtZXItdGVzdCJ9.WeRJQidiAMhTXrkfgLUkI2YLdZSEFX9DO6K3XEQdcSs"}' \
  produce persistent://futeng/ftns/logtest -n 10 -m "hello pulsar" | tail -n 1
```

In [34]:
# 向指定 topic 发送数据
! pulsar-client \
  --url pulsar://pulsar02:6650,pulsar03:6650 \
  --auth-plugin "org.apache.pulsar.client.impl.auth.AuthenticationToken" \
  --auth-params '{"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItY29uc3VtZXItdGVzdCJ9.WeRJQidiAMhTXrkfgLUkI2YLdZSEFX9DO6K3XEQdcSs"}' \
  produce persistent://futeng/ftns/logtest -n 10 -m "hello pulsar" | tail -n 1


2023-03-30T16:19:57,846+0800 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 10 messages successfully produced


In [30]:
url = "pulsar://pulsar02:6650,pulsar03:6650"
auth_plugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken"
auth_params = '{"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItY29uc3VtZXItdGVzdCJ9.WeRJQidiAMhTXrkfgLUkI2YLdZSEFX9DO6K3XEQdcSs"}'

In [31]:
command = f"pulsar-client --url {url} --auth-plugin \"{auth_plugin}\" --auth-params '{auth_params}' produce persistent://futeng/ftns/logtest -n 10  -m \"hello pulsar\" | tail -n 1"

!{command}


2023-03-30T16:15:47,092+0800 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 10 messages successfully produced


In [35]:
command = f"""pulsar-client \
  --url {url} \
  --auth-plugin "{auth_plugin}" \
  --auth-params '{auth_params}' \
  produce persistent://futeng/ftns/logtest -n 10 -m "hello pulsar" | tail -n 1"""

!{command}

2023-03-30T16:25:19,785+0800 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 10 messages successfully produced


# 使用 Pulsar Admin REST API 搭建极简管理系统

In [2]:
import requests

url = "http://pulsar02:8080/admin/v2/persistent/public/default"
headers = {
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.cLzVbUljIwKX11Q-ru1GxyM1Qss1ryCnkzgiBT8k9e0"
}

response = requests.get(url, headers=headers)

if response.status_code == 200:
    topics = response.json()
    print("Topic list:")
    for topic in topics:
        print(topic)
else:
    print(f"Request failed with status code {response.status_code}")


Topic list:
persistent://public/default/__change_events-partition-0
persistent://public/default/my-topic-partition-0
persistent://public/default/my-topic-partition-1
persistent://public/default/my-topic-partition-10
persistent://public/default/my-topic-partition-11
persistent://public/default/my-topic-partition-2
persistent://public/default/my-topic-partition-3
persistent://public/default/my-topic-partition-4
persistent://public/default/my-topic-partition-5
persistent://public/default/my-topic-partition-6
persistent://public/default/my-topic-partition-7
persistent://public/default/my-topic-partition-8
persistent://public/default/my-topic-partition-9


In [6]:
import requests
from tabulate import tabulate

url = "http://pulsar02:8080/admin/v2/persistent/public/default"
headers = {
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.cLzVbUljIwKX11Q-ru1GxyM1Qss1ryCnkzgiBT8k9e0"
}

response = requests.get(url, headers=headers)

if response.status_code == 200:
    topics = response.json()
    topic_table = [[topic] for topic in topics]
    print("Topic list:")
    print(tabulate(topic_table, headers=["Topic Name"], tablefmt="grid"))
else:
    print(f"Request failed with status code {response.status_code}")


Topic list:
+---------------------------------------------------------+
| Topic Name                                              |
| persistent://public/default/__change_events-partition-0 |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-0        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-1        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-10       |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-11       |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-2        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-3        |
+---------------------------------------------------------+
| persistent://public/defaul

In [7]:
import requests
from IPython.display import display, Markdown
from tabulate import tabulate

url = "http://pulsar02:8080/admin/v2/persistent/public/default"
headers = {
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.cLzVbUljIwKX11Q-ru1GxyM1Qss1ryCnkzgiBT8k9e0"
}

response = requests.get(url, headers=headers)

if response.status_code == 200:
    topics = response.json()
    topic_table = [[topic] for topic in topics]
    topic_table_str = tabulate(topic_table, headers=["Topic Name"], tablefmt="grid")
    display(Markdown(f"**Topic list:**\n\n<pre>{topic_table_str}</pre>"))
else:
    print(f"Request failed with status code {response.status_code}")


**Topic list:**

<pre>+---------------------------------------------------------+
| Topic Name                                              |
+=========================================================+
| persistent://public/default/__change_events-partition-0 |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-0        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-1        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-10       |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-11       |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-2        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-3        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-4        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-5        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-6        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-7        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-8        |
+---------------------------------------------------------+
| persistent://public/default/my-topic-partition-9        |
+---------------------------------------------------------+</pre>

In [9]:
from IPython.display import IFrame

grafana_url = "http://vm30.t.com:9090/ui/#/"
width = 800
height = 600

IFrame(grafana_url, width=width, height=height)


In [38]:
import requests

# 认证头信息
headers = {
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.cLzVbUljIwKX11Q-ru1GxyM1Qss1ryCnkzgiBT8k9e0"
}

# 第一个 URL
url_clusters = "http://pulsar02:8080/admin/v2/clusters"

# 调用第一个 URL
response_clusters = requests.get(url_clusters, headers=headers)

# 检查请求是否成功
if response_clusters.status_code == 200:
    # 解析返回的 JSON 数据以获取集群名称
    cluster_names = response_clusters.json()
    
    # 遍历集群名称并调用第二个 URL
    for cluster_name in cluster_names:
        # 构建第二个 URL
        url_list_brokers = f"http://pulsar02:8080/admin/v2/brokers/{cluster_name}"
        
        # 调用第二个 URL
        response_brokers = requests.get(url_list_brokers, headers=headers)
        
        # 检查请求是否成功
        if response_brokers.status_code == 200:
            # 输出返回的 JSON 数据
            print(response_brokers.json())
        else:
            print(f"Error: {response_brokers.status_code}")
else:
    print(f"Error: {response_clusters.status_code}")


['20.24.89.27:8080', '20.187.73.0:8080']


In [40]:
import requests
import pandas as pd
from IPython.display import display
from pandas.io.json import json_normalize

url = "http://pulsar02:8080/admin/v2/broker-stats/topics"
headers = {
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.cLzVbUljIwKX11Q-ru1GxyM1Qss1ryCnkzgiBT8k9e0"
}

response = requests.get(url, headers=headers)
json_data = response.json()

# 过滤掉包含 '__change_events' 的记录
filtered_data = {
    k: v for k, v in json_data.items() if not any('__change_events' in s for s in v.keys())
}

# 将 JSON 数据转换为 Pandas DataFrame
data_frames = []
for namespace, namespace_data in filtered_data.items():
    for bundle, bundle_data in namespace_data.items():
        for topic_type, topics in bundle_data.items():
            for topic, topic_data in topics.items():
                if topic_type == 'persistent':
                    row_data = {
                        'Namespace': namespace,
                        'Topic': topic,
                        'MsgRateIn': topic_data['msgRateIn'],
                        'MsgRateOut': topic_data['msgRateOut'],
                        'MsgThroughputIn': topic_data['msgThroughputIn'],
                        'MsgThroughputOut': topic_data['msgThroughputOut'],
                        'StorageSize': topic_data['storageSize'],
                        'BacklogSize': topic_data['backlogSize'],
                    }
                    data_frames.append(pd.json_normalize(row_data))

# 合并所有的 DataFrame
result = pd.concat(data_frames, ignore_index=True)

# 设置 Namespace 和 Topic 为多层索引
result.set_index(['Namespace', 'Topic'], inplace=True)

# 以树状表格的形式显示结果
display(result)


Unnamed: 0_level_0,Unnamed: 1_level_0,MsgRateIn,MsgRateOut,MsgThroughputIn,MsgThroughputOut,StorageSize,BacklogSize
Namespace,Topic,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
public/default,persistent://public/default/my-topic-partition-11,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/__change_events-partition-0,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-8,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-0,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-4,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-9,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-5,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-1,0.0,0.0,0.0,0.0,0,0
t1/ns1,persistent://t1/ns1/__change_events-partition-0,0.0,0.0,0.0,0.0,0,0
futeng/ftns,persistent://futeng/ftns/logtest-partition-0,0.0,0.0,0.0,0.0,1431,0


In [45]:
import requests
import pandas as pd
from IPython.display import display
from pandas.io.json import json_normalize

url = "http://pulsar02:8080/admin/v2/broker-stats/topics"
headers = {
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.cLzVbUljIwKX11Q-ru1GxyM1Qss1ryCnkzgiBT8k9e0"
}

response = requests.get(url, headers=headers)
json_data = response.json()

# 将 JSON 数据转换为 Pandas DataFrame
data_frames = []
for namespace, namespace_data in json_data.items():
    for bundle, bundle_data in namespace_data.items():
        for topic_type, topics in bundle_data.items():
            for topic, topic_data in topics.items():
                if topic_type == 'persistent' and '__change_events' not in topic and 'healthcheck' not in topic:
                    row_data = {
                        'Namespace': namespace,
                        'Topic': topic,
                        'MsgRateIn': topic_data['msgRateIn'],
                        'MsgRateOut': topic_data['msgRateOut'],
                        'MsgThroughputIn': topic_data['msgThroughputIn'],
                        'MsgThroughputOut': topic_data['msgThroughputOut'],
                        'StorageSize': topic_data['storageSize'],
                        'BacklogSize': topic_data['backlogSize'],
                    }
                    data_frames.append(pd.json_normalize(row_data))

# 合并所有的 DataFrame
result = pd.concat(data_frames, ignore_index=True)

# 设置 Namespace 和 Topic 为多层索引
result.set_index(['Namespace', 'Topic'], inplace=True)

# 以树状表格的形式显示结果
display(result)


Unnamed: 0_level_0,Unnamed: 1_level_0,MsgRateIn,MsgRateOut,MsgThroughputIn,MsgThroughputOut,StorageSize,BacklogSize
Namespace,Topic,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
public/default,persistent://public/default/my-topic-partition-11,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-8,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-0,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-4,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-9,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-5,0.0,0.0,0.0,0.0,0,0
public/default,persistent://public/default/my-topic-partition-1,0.0,0.0,0.0,0.0,0,0
futeng/ftns,persistent://futeng/ftns/logtest-partition-0,0.0,0.0,0.0,0.0,1431,0
