Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example in a Pluto.jl reactive notebook #15

Closed
scls19fr opened this issue Feb 27, 2024 · 7 comments
Closed

Example in a Pluto.jl reactive notebook #15

scls19fr opened this issue Feb 27, 2024 · 7 comments

Comments

@scls19fr
Copy link
Contributor

Hello,

I'd like to subscribe to a topic from a MQTT broker and receive and display data in a Pluto.jl notebook (ideally with Plots.jl).
What should the best example to use for that purpose?
Did you ever try something like that?
My notebook doesn't seems to like

rc = loop_forever(client)

Is there a workaround?

Kind regards

@denglerchr
Copy link
Owner

Hello,
I also had some issues with loop_forever on older versions of Julia, my guess was that Base.@threadcall had an issue, but not sure. Which version of Julia are you using?
Anyway, the easiest is to write your own while loop that calls loop regularly. Or you can try loop_forever2 as a last resort which basically does that as well, but it can block your REPL for example.
As for how to create dynamic plots in Plots.jl, I have no experience tbh.
Another alternative, if you just want to have some graphs visualizing data from MQTT in your browser: there is a free and low code tool called Grafana with an MQTT datasource.

@scls19fr
Copy link
Contributor Author

Thanks @denglerchr for prompt answer.

I'm using Julia 1.10.1 so that's current release channel.

I also use Grafana in some projects but my goal is to have a more customizable dashboard and do some calculation that Grafana can't handle (at least in my understanding of the tool) that's why I'm considering Pluto.jl

@denglerchr
Copy link
Owner

Oh ok, can you provide me with some more information? Then I can try to reproduce the issue.

  • Linux or Windows? (or Mac, but then I cannot test myself)
  • if you can share, a reduced code to reproduce the issue in the Pluto notebook

@scls19fr
Copy link
Contributor Author

scls19fr commented Feb 29, 2024

using Windows 10
julia version 1.10.1

but maybe we should first tackle #19

@scls19fr
Copy link
Contributor Author

scls19fr commented Mar 1, 2024

Here is an example of a Pluto notebook which subscribe to "test/#" topics.

This notebooks should fill a CircularBuffer (from DataStructures.jl) with received messages and their topic.

Unfortunately it doesn't work... cell containing loop_forever2 is always running but is blocking other cells to get updated.

I think I should ask that on Pluto.jl GitHub discussions.

Discussion opened fonsp/Pluto.jl#2834

@scls19fr
Copy link
Contributor Author

scls19fr commented Mar 1, 2024

Here is my latest implementation

image

mosquitto_notebook_example.jl

### A Pluto.jl notebook ###
# v0.19.40

using Markdown
using InteractiveUtils

# This Pluto notebook uses @bind for interactivity. When running this notebook outside of Pluto, the following 'mock version' of @bind gives bound variables a default value (instead of an error).
macro bind(def, element)
    quote
        local iv = try Base.loaded_modules[Base.PkgId(Base.UUID("6e696c72-6542-2067-7265-42206c756150"), "AbstractPlutoDingetjes")].Bonds.initial_value catch; b -> missing; end
        local el = $(esc(element))
        global $(esc(def)) = Core.applicable(Base.get, el) ? Base.get(el) : iv(el)
        el
    end
end

# ╔═╡ e699fc7f-f20a-4d9b-864e-ebf73e7aa9cf
begin
    import Pkg
    # activate a temporary environment
    Pkg.activate(mktempdir())
    Pkg.add([
        Pkg.PackageSpec(name = "DataStructures"),
        Pkg.PackageSpec(url = "https://github.com/denglerchr/Mosquitto.jl/"),
    ])
    using Dates
    using DataStructures
    using PlutoUI
    using Mosquitto
end

# ╔═╡ 86b2b1b3-0ccd-43fb-bde5-71cd5d3570df
md"""# Mosquitto.jl example with Pluto.jl notebook"""

# ╔═╡ 0eaf9f60-a887-4104-9e0d-c7295202602d
begin
    ΔT = 0.5  # s
    clock = @bind ticks Clock(ΔT, false)
    HTML()
end

# ╔═╡ 8553e813-0229-4543-b82b-badfa47f3da8
begin
    capacity = 10
    messages = CircularBuffer{Tuple{DateTime,String,String}}(capacity)
	HTML()
end

# ╔═╡ 8c87ba29-c9eb-404f-91c9-7cd9dd97a2c4
begin
    # Define some constants
    const DEFAULT_HOST = "test.mosquitto.org"
    const DEFAULT_PORT = 1883
    const DEFAULT_TOPIC_SUB = "test/#"
    const DEFAULT_TOPIC_PUB = "test/julia"
    const DEFAULT_MSG_PUB = "Hello World from Julia using Mosquitto.jl"
	HTML()
end

# ╔═╡ 16f37fa4-8fca-4d96-93df-a814860198e7
begin
    mutable struct ParamsConnect
        host::Any
        port::Any
    end
    params_connect = ParamsConnect(DEFAULT_HOST, DEFAULT_PORT)

    mutable struct ParamsSubscribe
        topic::String
        count::Int
    end
    params_subscribe = ParamsSubscribe(DEFAULT_TOPIC_SUB, 0)

    mutable struct ParamsPublish
        topic::String
        message::String
        allow::Bool
    end
    params_publish = ParamsPublish(DEFAULT_TOPIC_PUB, DEFAULT_MSG_PUB, false)

	HTML()
end

# ╔═╡ b7b6bb1b-943f-4738-9cdb-05bfbb8d3fcd
begin

    ui_con_host = @bind host TextField(default = params_connect.host)
    ui_con_port = @bind port NumberField(0:65535, default = params_connect.port)

    ui_topic_sub = @bind topic_sub TextField(default = params_subscribe.topic)

    ui_topic_pub = @bind topic_pub TextField(default = params_publish.topic)
    ui_msg_pub = @bind msg_pub TextField((30, 4), default = params_publish.message)

    md"""

    ## Connect to a MQTT broker

    - host: $(ui_con_host)
    - port: $(ui_con_port)

    ### Subscribe
    - topic: $(ui_topic_sub)

    $(@bind update_topic_sub_pressed Button("Update topic subscription")) $(clock)

    ### Publish
    - topic: $(ui_topic_pub)
    - message: $(ui_msg_pub)

    $(@bind send_message_pressed Button("Send"))

    """
end

# ╔═╡ 79d4359e-c7bc-48f2-a696-4e7e82c7807e
begin
	params_publish.message = msg_pub
	params_publish.topic = topic_pub
	HTML()
end

# ╔═╡ 8794023e-1910-477f-9959-e094cec466b2
begin

	send_message_pressed
	
    function publish_message(message, topic)
        client = Client(params_connect.host, params_connect.port)
        println("client:\t$(client)")
        println("sending '$(message)' to '$(topic)")
        println("")
        rc = publish(client, topic, message; retain = false)
        println("publish:\t", rc)
        rc = loop(client; ntimes = 2)
        println("loop:\t", rc)
        rc = disconnect(client)
        println("disconnect:\t", rc)
        rc = loop(client)
        println("loop:\t", rc)
    end

	if params_publish.allow
		publish_message(params_publish.message, params_publish.topic)
	else
		params_publish.allow = true
	end

	HTML()
end

# ╔═╡ f7adee72-d7ce-11ee-2cee-7ffcce93411a
begin
    # subscribe to topic topic_sub every time the client connects
	
    function onconnect(client)
        # Check if something happened, else return 0
        nmessages = Base.n_avail(get_connect_channel(client))
        nmessages == 0 && return 0

        # At this point, a connection or disconnection happened
        for _ = 1:nmessages
            conncb = take!(get_connect_channel(client))
            if conncb.val == 1
                println(
                    "Connection of client $(client.id) successfull (return code $(conncb.returncode)), subscribing to $(params_subscribe.topic)",
                )
                subscribe(client, params_subscribe.topic)
            elseif conncb.val == 0
                println("Client $(client.id) disconnected")
            end
        end
        return nmessages
    end

    function onmessage(mrcount, client)
        # Check if something happened, else return 0
        dt = now(Dates.UTC)
        nmessages = Base.n_avail(get_messages_channel(client))
        nmessages == 0 && return 0

        # At this point, a message was received, lets process it
        for i = 1:nmessages
            temp = take!(get_messages_channel(client))
            println("Message $(mrcount + i):")
            message = String(temp.payload)
            shortened_message = length(message) > 20 ? message[1:18] * "..." : message
            println("\ttopic: $(temp.topic)")
            println("\tmessage: $(shortened_message)")
            push!(messages, (dt, temp.topic, message))
        end
        return nmessages
    end
	HTML()
end

# ╔═╡ 4f6c0ba1-edbd-4551-ae46-8286d2e92741
begin
	update_topic_sub_pressed
	params_subscribe.topic = topic_sub
	topic_sub_updated = true
	HTML()
end

# ╔═╡ 9cc6f7ba-b047-457e-b818-ea4c6fd3d9fa
begin
    topic_sub_updated
    empty!(messages)  # reset circular buffer containing latest incoming messages
    client = Client(params_connect.host, params_connect.port)
	HTML("Using <code>'$(strip(string(client)))'</code> for subscribe")
end

# ╔═╡ ba146bdd-d43f-4ad2-9815-82ec8be93ce7
begin
    ticks
    loop(client)
    onconnect(client)
    params_subscribe.count += onmessage(params_subscribe.count, client)
	HTML()
end

# ╔═╡ 4e08ef08-a9d7-460d-b3ce-e4a134efec88
begin
    ticks, topic_sub_updated
    now(Dates.UTC), reverse(messages)
end

# ╔═╡ f1ba84e1-ba68-48af-8c46-78efb7141dde
begin
    params_connect.host = host
    params_connect.port = port
    HTML()
end

# ╔═╡ Cell order:
# ╟─86b2b1b3-0ccd-43fb-bde5-71cd5d3570df
# ╟─b7b6bb1b-943f-4738-9cdb-05bfbb8d3fcd
# ╟─ba146bdd-d43f-4ad2-9815-82ec8be93ce7
# ╟─9cc6f7ba-b047-457e-b818-ea4c6fd3d9fa
# ╟─4e08ef08-a9d7-460d-b3ce-e4a134efec88
# ╟─79d4359e-c7bc-48f2-a696-4e7e82c7807e
# ╟─8794023e-1910-477f-9959-e094cec466b2
# ╟─0eaf9f60-a887-4104-9e0d-c7295202602d
# ╟─f7adee72-d7ce-11ee-2cee-7ffcce93411a
# ╟─4f6c0ba1-edbd-4551-ae46-8286d2e92741
# ╟─f1ba84e1-ba68-48af-8c46-78efb7141dde
# ╠═16f37fa4-8fca-4d96-93df-a814860198e7
# ╟─8553e813-0229-4543-b82b-badfa47f3da8
# ╟─8c87ba29-c9eb-404f-91c9-7cd9dd97a2c4
# ╟─e699fc7f-f20a-4d9b-864e-ebf73e7aa9cf

@denglerchr
Copy link
Owner

Clsoing as this seems to be mostly a Pluto related issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants