## kafka 简单取数

In [1]:
import json, time
from confluent_kafka import Consumer, KafkaError

In [2]:
kafka = {
        "http": "192.168.50.85:19092",
        "offset": "earliest",
        "groupid": "apm-rca117",
        "topic.event": "dc_event",
        "topic.rca": "dc_rca"
    }

In [3]:
c = Consumer(
    {
        "bootstrap.servers": kafka["http"],  # kafka所在ip地址
        "group.id": kafka["groupid"],
        "enable.auto.commit": True,          # 是否自动提交offset，设为True时，每隔一段时间就会提交一次offset
        "auto.offset.reset": kafka["offset"],
        # "partition.assignment.strategy": "roundrobin",
    }
)
c.subscribe([kafka["topic.event"]])           # 为consumer分配分区
# c.subscribe([kafka["topic.event"]+"-0", kafka["topic.event"] + "-1", kafka["topic.event"] + "-2"])


In [None]:
# 没有break会不停的取数据
values = []
a = 0
while True:
    msg = c.poll(1)
    if msg is None:
        continue
    else:
        offset = msg.offset()
        partition = msg.partition()
        message = json.loads(msg.value())
        # message['offset'] = offset
        # message['partition'] = partition
        if ( (message["repeatableEvent"] == 1)
            and (message["status"] > 0)
            and (message["silence"] ==0) 
        ):
            a += 1
            print(message)
            values.append(message)
                # print(f"{message['monitorId']}:{message['triggerTime']}  partition:{msg.partition()},offset:{msg.offset()} ")
        

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)





In [14]:
metrics = ['service.request_errors.pct', 'service.request.count', 'service.response.time', 'url.request_errors.pct', 'url.response.time', 'url.requests.count']
values = ['service.request_errors.pct']
intersection = set(metrics) & set(values)
if intersection:
    print(1)

%4|1704707331.966|MAXPOLL|rdkafka#consumer-2| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 322ms (adjust max.poll.interval.ms for long-running message processing): leaving group


In [13]:
sorted([{'offset':i['offset'],'time':i['triggerTime'],'partition':i['partition']} for i in values], key=lambda x: x['time'])

[{'offset': 1702917, 'time': 1687846140, 'partition': 1},
 {'offset': 1734841, 'time': 1687846200, 'partition': 0},
 {'offset': 1724857, 'time': 1687846260, 'partition': 2},
 {'offset': 1734846, 'time': 1687846320, 'partition': 0},
 {'offset': 1724890, 'time': 1687846380, 'partition': 2}]

___________

# 从kafka中取一段时间内的数据

In [12]:
from confluent_kafka import Consumer, KafkaError, TopicPartition, Message
import datetime, time, json

In [13]:
kafka = {}
kafka['http'] = '192.168.50.107:19092'
kafka['topicCtrl'] = 'dc_abnormal_control'
kafka['topicData'] = 'dc_abnormal_data'
kafka['offset'] = 'earliest'
kafka['groupid'] = 'AnomalyDetectTest01'

In [14]:
c = Consumer({
    'bootstrap.servers':kafka['http'], # kafka所在ip地址
    'group.id':time.time(),
    'enable.auto.commit':True, # 是否自动提交offset，设为True时，每隔一段时间就会提交一次offset
    'default.topic.config':{
        'auto.offset.reset':kafka['offset']
    }  
})

In [15]:
# 拉取昨天一天的数据，start_time、end_time这两个时间可以随便设置
now = datetime.datetime.now() - datetime.timedelta(days=1)
start_time = datetime.datetime.strptime(now.strftime('%Y-%m-%d 00:00:00'),'%Y-%m-%d %H:%M:%S')
end_time = datetime.datetime.strptime(now.strftime('%Y-%m-%d 23:59:59'),'%Y-%m-%d %H:%M:%S')

In [16]:
# 获取当前topic有多少个分区 
cluster_data = c.list_topics(topic = kafka['topicData'])
topic_data = cluster_data.topics[kafka['topicData']]
available_partitions = topic_data.partitions

In [17]:
start_tps = [TopicPartition(kafka['topicData'], index, int(start_time.timestamp() * 1000)) for index in range(len(available_partitions))]
start_offset = c.offsets_for_times(start_tps)
end_tps = [TopicPartition(kafka['topicData'], index, int(end_time.timestamp() * 1000)) for index in range(len(available_partitions))]
end_offset = c.offsets_for_times(end_tps)
c.assign(start_offset)

In [18]:
start_offset

[TopicPartition{topic=dc_abnormal_data,partition=0,offset=-1,error=None},
 TopicPartition{topic=dc_abnormal_data,partition=1,offset=38239,error=None},
 TopicPartition{topic=dc_abnormal_data,partition=2,offset=-1,error=None}]

In [19]:
values = []
while True:
    msg = c.poll(100) # poll：轮询一次过程中，在一定时间内broker可消费的数据，单位为ms，如这里的100,就是在0.1s内拉取数据返回到消费者端
    if msg is None:
        continue
    if msg.error():
        prinit('Consumer error:{}'.format(msg.error()))
        continue
    offset = msg.offset()
    if offset < end_offset[1].offset: # 有数据的在partition为1的topicpartition
        kafka_timestamp = msg.timestamp()[1] # 搞数据入kafka的时间戳
        kafka_value = json.loads(msg.value())
        values.append(kafka_value)
    else: 
        c.unassign()  # 超过当前Partition的话，停止订阅
        break
c.close()

In [22]:
end_offset[1].offset

38899

In [9]:
values_to_json = {}
values_to_json['values'] = values

In [10]:
with open('../values.json', 'w') as f:
    json.dump(values_to_json, f, indent = 2, sort_keys = True, ensure_ascii = False)

________

### 从多个partition中取数

In [1]:
from confluent_kafka import Consumer, KafkaError, TopicPartition, Message
import datetime, time, json

In [2]:
kafka = {}
kafka['http'] = '192.168.50.107:19092'
kafka['topicCtrl'] = 'dc_abnormal_control'
kafka['topicData'] = 'dc_test_prophet'
kafka['offset'] = 'earliest'
kafka['groupid'] = 'AnomalyDetectTest01'

In [3]:
c = Consumer({
    'bootstrap.servers':kafka['http'], # kafka所在ip地址
    'group.id':time.time(),
    'enable.auto.commit':True, # 是否自动提交offset，设为True时，每隔一段时间就会提交一次offset
    'default.topic.config':{
        'auto.offset.reset':kafka['offset']
    }  
})

In [4]:
# 拉取昨天一天的数据，start_time、end_time这两个时间可以随便设置
start = datetime.datetime.now() - datetime.timedelta(days=40)
start_time = datetime.datetime.strptime(start.strftime('%Y-%m-%d 00:00:00'),'%Y-%m-%d %H:%M:%S')
end = datetime.datetime.now() - datetime.timedelta(days=40)
end_time = datetime.datetime.strptime(end.strftime('%Y-%m-%d 23:59:59'),'%Y-%m-%d %H:%M:%S')

In [5]:
# 获取当前topic有多少个分区 
cluster_data = c.list_topics(topic = kafka['topicData'])
topic_data = cluster_data.topics[kafka['topicData']]
available_partitions = topic_data.partitions

In [6]:
start_tps = [TopicPartition(kafka['topicData'], index, int(start_time.timestamp() * 1000)) for index in range(len(available_partitions))]
start_offset = c.offsets_for_times(start_tps)
end_tps = [TopicPartition(kafka['topicData'], index, int(end_time.timestamp() * 1000)) for index in range(len(available_partitions))]
end_offset = c.offsets_for_times(end_tps)
c.assign(start_offset)

In [7]:
end_offset

[TopicPartition{topic=dc_test_prophet,partition=0,offset=156,error=None},
 TopicPartition{topic=dc_test_prophet,partition=1,offset=130,error=None},
 TopicPartition{topic=dc_test_prophet,partition=2,offset=168,error=None}]

In [9]:
values = []
while True:
    msg = c.poll(100) # poll：轮询一次过程中，在一定时间内broker可消费的数据，单位为ms，如这里的100,就是在0.1s内拉取数据返回到消费者端
    if msg is None:
        continue
    if msg.error():
        prinit('Consumer error:{}'.format(msg.error()))
        continue
    offset = msg.offset()
    if offset < end_offset[0].offset: 
        kafka_timestamp = msg.timestamp()[1]/1000 # 搞数据入kafka的时间戳
        kafka_value = json.loads(msg.value())
        values.append(kafka_value)
    else: 
        c.unassign()  # 超过当前Partition的话，停止订阅
        break
c.close()

In [11]:
len(values)

0

In [None]:
values = []
while True:
    msg = c.poll(100) # poll：轮询一次过程中，在一定时间内broker可消费的数据，单位为ms，如这里的100,就是在0.1s内拉取数据返回到消费者端
    if msg is None:
        continue
    if msg.error():
        prinit('Consumer error:{}'.format(msg.error()))
        continue
    offset = msg.offset()
    datapartitions = [i for i in end_offset if i.offset != -1]

    for i in datapartitions:
        if offset < i.offset: 
            kafka_timestamp = msg.timestamp()[1] # 搞数据入kafka的时间戳
            kafka_value = json.loads(msg.value())
            values.append(kafka_value)
        else: 
            c.unassign()  # 超过当前Partition的话，停止订阅
            break
c.close()

In [12]:
msg = c.poll(100)
msg.timestamp()

(1, 1649401570230)

In [10]:
len(values)

0