Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

failed to insert data using python paho mqtt library #1442

Closed
kqkdChen opened this issue Jun 30, 2020 · 13 comments
Closed

failed to insert data using python paho mqtt library #1442

kqkdChen opened this issue Jun 30, 2020 · 13 comments

Comments

@kqkdChen
Copy link

使用 python paho mqtt 插入数据时出现错误:
client_id不能为空,且qos必须等于0,不然就会出现错误。

qos=1 or 2, 则会出现如下错误,结果就是预期应该插入10W条数据的结果只插入了20条数据
image

iotdb version: 0.10.0
paho-mqtt: 1.5.0

代码如下

import random

import paho.mqtt.client as mqtt
import time


def on_connect(client, userdata, flags, rc):
    print("Connected with result code: " + str(rc))


def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))


client = mqtt.Client(client_id=str(random.uniform(1, 10)))
# client = mqtt.Client()
client.username_pw_set("root", "root")
client.connect('127.0.0.1', 1883)

timestamp = lambda: int(round(time.time() * 1000))
for i in range(100000):
    message = "{\n" "\"device\":\"root.log.d1\",\n" "\"timestamp\":%d,\n" "\"measurements\":[\"s1\"],\n" "\"values" \
              "\":[%f]\n" "}" % (timestamp() + 1000 * i, random.uniform(1, 10))
    client.publish('root.log.d1.info', payload=message, qos=0, retain=False)
client.disconnect()
@jixuan1989
Copy link
Member

Many thanks. @vesense can you have a look?

@vesense
Copy link
Member

vesense commented Jul 1, 2020

Thanks @kqkdChen for your report. I will take a look ASAP.

@kqkdChen
Copy link
Author

kqkdChen commented Jul 1, 2020

Thanks @kqkdChen for your report. I will take a look ASAP.

谢谢,这个插入错误的问题我已经解决了,然后你能先帮我看这个新问题吗?这个比较重要,涉及到mqtt连接的身份验证失效#1446

@vesense
Copy link
Member

vesense commented Jul 1, 2020

qos=1 or 2, 则会出现如下错误,结果就是预期应该插入10W条数据的结果只插入了20条数据

@kqkdChen 这个问题已经好了?发生了什么?

@vesense
Copy link
Member

vesense commented Jul 1, 2020

使用 python paho mqtt 插入数据时,没有设置用户名和密码相关参数仍然成功的插入了

这是由于当前默认允许匿名用户访问。这会存在一定的安全隐患,我会提一个patch来解决。

@kqkdChen
Copy link
Author

kqkdChen commented Jul 1, 2020

qos=1 or 2, 则会出现如下错误,结果就是预期应该插入10W条数据的结果只插入了20条数据

@kqkdChen 这个问题已经好了?发生了什么?

我换了另外一种方法写了mqtt client端的代码,所以就没有出现上述的错误了,所以这个问题应该是代码编写不规范造成的。然后我希望官网可以多一些其他语言的demo,我也会展示我的代码,希望可以帮助更多的人

@kqkdChen
Copy link
Author

kqkdChen commented Jul 1, 2020

单次发送

from time import *
import paho.mqtt.publish as publish


def timestamp():
    return int(round(time() * 1000))


def single(tx_id: str, log_type: str, log_content: str, hostname: str = "192.168.3.181", port: int = 1883):
    device_id = "root.log.%s" % tx_id
    payload = "{\n" "\"device\":\"%s\",\n" "\"timestamp\":%d,\n" "\"measurements\":[\"type\",\"content\"],\n" "\"values" \
              "\":[\"%s\",\"%s\"]\n" "}" % (device_id, timestamp(), log_type, log_content)
    publish.single(topic=device_id, payload=payload, hostname=hostname, port=port)


if __name__ == '__main__':
    type_list = ["INFO", "WARNING", "ERROR"]
    content_list = ["content_1", "content_2", "content_3", "content_4"]
    begin_time = time()
    for i in range(10):
        single("1411111111", type_list[i % len(type_list)], content_list[i % len(content_list)])
    end_time = time()
    run_time = end_time - begin_time
    print('cost time:', run_time)

@vesense
Copy link
Member

vesense commented Jul 1, 2020

目前官网的示例主要是Java,其他语言的Demo(如Python,C++等)比较欠缺,欢迎你把示例贡献到社区(可以提patch到documents),让更多人看到。

@jixuan1989
Copy link
Member

@kqkdChen 欢迎贡献example到官网用户手册。

修改docs/UserGuide/Client/Programming - MQTT.md,以及docs/zh/UserGuide/Client/Programming - MQTT.md (英文版和中文版)并提交PR即可。

例如,在该markdown末尾增加

##Example (Python)
粘贴你的代码(要增加一些说明,尽可能让代码易懂)

等我们的ptyon 集成测试框架搞定后,会考虑把python的样例代码放入源码中,每次构建时进行测试,确保不出错。

Welcome to posting your examples to the official website. just modify docs/UserGuide/Client/Programming - MQTT.md and docs/zh/UserGuide/Client/Programming - MQTT.md.

By the way, we are planning to develop the UT framework for Python, and after that, we may put the example codes to the source folder and consider it as a check step for each code repo update.

@vesense
Copy link
Member

vesense commented Jul 2, 2020

@kqkdChen Thanks again for your report and look forward to your more feedback and contributions.

@vesense vesense closed this as completed Jul 2, 2020
@jerrychong25
Copy link
Contributor

单次发送

from time import *
import paho.mqtt.publish as publish


def timestamp():
    return int(round(time() * 1000))


def single(tx_id: str, log_type: str, log_content: str, hostname: str = "192.168.3.181", port: int = 1883):
    device_id = "root.log.%s" % tx_id
    payload = "{\n" "\"device\":\"%s\",\n" "\"timestamp\":%d,\n" "\"measurements\":[\"type\",\"content\"],\n" "\"values" \
              "\":[\"%s\",\"%s\"]\n" "}" % (device_id, timestamp(), log_type, log_content)
    publish.single(topic=device_id, payload=payload, hostname=hostname, port=port)


if __name__ == '__main__':
    type_list = ["INFO", "WARNING", "ERROR"]
    content_list = ["content_1", "content_2", "content_3", "content_4"]
    begin_time = time()
    for i in range(10):
        single("1411111111", type_list[i % len(type_list)], content_list[i % len(content_list)])
    end_time = time()
    run_time = end_time - begin_time
    print('cost time:', run_time)

@kqkdChen 你好!

我想问在IoTDB Python Client遇到的MQTT Publish写入问题,我尝试写入2行Records但是IoTDB没成功保存。MQTT这里使用Port 2833,因为已有另一个MQTT Broker占用Port 1883。而且没有任何IoTDB报错。

Python MQTT Client代码:
58baa2b462a3869a1fd8163be9dc24e

IoTDB MQTT Configuration:
0399e3cb358d0201046bcaabb2f0e8d

也尝试使用apache-iotdb pypi library session.insert_record() 测试存入一样的Records,IoTDB成功保存。

这个问题要怎么解决呢?

谢谢!

@kqkdChen
Copy link
Author

kqkdChen commented Dec 6, 2022

@jerrychong25
你好,已经过去太久具体api用法我忘记了,但是我想说的是,监测一下client的connectsuccesserror 等回调方法,判断连接是否成功。如下是一些使用MQTT服务器的建议:

  • 编写client的回调方法,确认连接是否成功,以及定位连接异常。
  • mqtt_hander_pool_size=1 请设置更大。在IotDB的MQTT组件里,消息的处理使用了一个无界队列缓冲,默认配置会导致消息堆积最终导致IotDB OOM,线上引发过血案,不知道他们修复这个不合理的地方没有。

@jerrychong25
Copy link
Contributor

@jerrychong25 你好,已经过去太久具体api用法我忘记了,但是我想说的是,监测一下client的connectsuccesserror 等回调方法,判断连接是否成功。如下是一些使用MQTT服务器的建议:

  • 编写client的回调方法,确认连接是否成功,以及定位连接异常。
  • mqtt_hander_pool_size=1 请设置更大。在IotDB的MQTT组件里,消息的处理使用了一个无界队列缓冲,默认配置会导致消息堆积最终导致IotDB OOM,线上引发过血案,不知道他们修复这个不合理的地方没有。

好的,谢谢 @kqkdChen !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants