In [1]:
# MQTT and Events
import asyncio
import time
import json
from web3 import AsyncWeb3, WebSocketProvider
from eth_abi.abi import decode
from eth_utils import to_checksum_address
import nest_asyncio
from paho.mqtt import client as mqtt_client
# Langchain
import os
from langchain_ollama import ChatOllama
from cdp_langchain.agent_toolkits import CdpToolkit
from cdp_langchain.utils import CdpAgentkitWrapper
from langchain_core.tools import tool
from typing_extensions import Annotated, TypedDict
from dotenv import load_dotenv

In [2]:
# Modifiers
nest_asyncio.apply()
load_dotenv()

True

In [3]:
# Define the contract address and ABI (Application Binary Interface)
websocket = os.getenv("WS_ALCHEMY") 
contract_address = os.getenv("CONTRACT_TOKEN") 
topic_w3 = "Transfer(address,address,uint256)"
contract_abi = [
	{
		"anonymous": False,
		"inputs": [
			{
				"indexed": True,
				"internalType": "address",
				"name": "from",
				"type": "address"
			},
			{
				"indexed": True,
				"internalType": "address",
				"name": "to",
				"type": "address"
			},
			{
				"indexed": True,
				"internalType": "uint256",
				"name": "value",
				"type": "uint256"
			}
		],
		"name": "Transfer",
		"type": "event"
	}
]

In [4]:
# Mqtt
broker = os.getenv("MQTT_BROKER_URL")
port = os.getenv("MQTT_PORT") 
client_id = f'python-mqtt-event-service'
username = os.getenv("MQTT_USERNAME") 
password = os.getenv("MQTT_PASSWORD")

# Variables
arm_robot_address = "0x5fF9d76A368A0E5321c1bb396fF4873A8FB0b620"
user_address = "0x9055598b0fF3da03DFF593D2a7cFae26c583F4ed"
sub_topics = ["esp32/output", "esp32arm/output", "webpage/output"]

In [5]:
# Setup Cdp and Wallet
wallet_data_file = "wallet_data.txt"

wallet_data = None
if os.path.exists(wallet_data_file):
    with open(wallet_data_file) as f:
        wallet_data = f.read()

values = {}
if wallet_data is not None:
    values = {"cdp_wallet_data": wallet_data}

agentkit = CdpAgentkitWrapper(**values)

# persist the agent's CDP MPC Wallet Data.
wallet_data = agentkit.export_wallet()
with open(wallet_data_file, "w") as f:
    f.write(wallet_data)
    
cdp_toolkit = CdpToolkit.from_cdp_agentkit_wrapper(agentkit)
tools_cdp = cdp_toolkit.get_tools()
import_tools = ["get_balance", "get_wallet_details", "request_faucet_funds"]
tools = [next(tool for tool in tools_cdp if tool.name == tool_name) for tool_name in import_tools]

In [6]:
# Pre flight modificators
# https://github.com/coinbase/agentkit/tree/master/python/cdp-agentkit-core/cdp_agentkit_core/actions

# If You want modify the preflight of any CDP function use this part
@tool
def transfer(amount: float, asset_id: str, destination: str, gasless: bool = False) -> str:
    """Transfer a specified amount of an asset to a destination onchain. USDC Transfers on Base Sepolia and Mainnet can be gasless. Always use the gasless option when available."""
    preFlightAmount = amount
    if(amount < 0):
        preFlightAmount = amount*10**18

    fx = next(tool for tool in tools_cdp if tool.name == "transfer")
    result = fx.run({"amount":str(preFlightAmount),"asset_id":asset_id, "destination":destination, "gassless":gasless})
    return result

mod_tools = [transfer]

In [7]:
# Tools excecution 
def execTools(events):
    content = "error"
    for tool_call in events.tool_calls:
        try:
            selected_tool = next(tool for tool in (tools + mod_tools) if tool.name == tool_call["name"].lower() )
            tool_msg = selected_tool.invoke(tool_call)
            print(tool_msg)
            content = tool_msg.content
        except Exception as e:
            print(e)
            print(tool_call)
    return content

In [8]:
# Structured Outputs

class Exercises(TypedDict):
    """"Assign an exercise to the ARM."""
    exercise: Annotated[int, ..., "The ARM will perform the following exercises, sorted by difficulty: Label 1 - Elbow Flexion, Label 2 - Elbow Flexoextension, Label 3 - Arm Lift."]
    reps: Annotated[int, ..., "Number of repetitions will be determined based on the patient's profile."]

def setPrompt(profile):
    prompt = f"""
    You are a doctor who provides the best exercises for arm rehabilitation. The three exercises available, sorted by difficulty from easy to hard, are: Elbow Flexion, Elbow Flexion-Extension, and Arm Lift.
    
    If the patient has the following profile:
    
    '{profile}'
    
    Which exercise should the patient do now?
    """
    return prompt

def setPrompt2(exercise):
    prompt = f"""
    You are a doctor who provides the best exercises for arm rehabilitation. 
    The three exercises available, sorted by difficulty from easy to hard, are: 
    1. Elbow Flexion
    2. Elbow Flexion-Extension
    3. Arm Lift.
    
    You provide:
    
    Exercise: {str(exercise["exercise"])}
    Repetitions: {str(exercise["reps"])}
    
    Explain it briefly as if it were your recommendation, in one paragraph say why this is the best option and mention the exercise and reps. 
    """
    return prompt

def setPrompt3(exercise):
    prompt = f""" Transfer {str(exercise["exercise"])}00.00 of this token {contract_address} to this address {arm_robot_address}"""
    return prompt

In [9]:
model = ChatOllama(model="llama3.1:8b", keep_alive="1h")
model.invoke("say only hello world!")
model_with_tools = model.bind_tools(tools + mod_tools, tool_choice="auto")
model_with_structure = model.with_structured_output(Exercises)

In [10]:
# MQTT Control defs
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("\nConnected to MQTT Broker!")
    else:
        print("Failed to connect, return code %d\n", rc)
        
def on_disconnect(client, userdata, rc):
    print(rc)
    reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
    while reconnect_count < MAX_RECONNECT_COUNT:
        print(reconnect_delay)
        time.sleep(reconnect_delay)
        try:
            client.reconnect()
            print("Reconnected successfully!")
            return
        except Exception as e:
            print(e)

        reconnect_delay *= RECONNECT_RATE
        reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
        reconnect_count += 1
    print(reconnect_count)

In [11]:
# Globals
counter = [0,0,0]
exercise = None
flag = False

In [None]:
#MQTT Message Defs

def on_event_evm(client, msg):
    result = msg["result"]
    from_address = to_checksum_address(decode(["address"], result["topics"][1])[0])
    to_address = to_checksum_address(decode(["address"], result["topics"][2])[0])
    amount = decode(["uint256"], result["data"])[0]
    print(from_address)
    print(to_address)
    print(amount)
    if(to_address==arm_robot_address):
        if(str(amount)[0] == "1"):
            client.publish("esp32arm/input", "{\"action\":\"ef\"}")
        elif(str(amount)[0] == "2"):
            client.publish("esp32arm/input", "{\"action\":\"efe\"}")
        elif(str(amount)[0] == "3"):
            client.publish("esp32arm/input", "{\"action\":\"al\"}")

def on_message_mqtt(client, userdata, msg):
    global counter
    global exercise
    global flag
    if(msg.topic == "webpage/output"):
        flag = False
        counter = [0,0,0]
        print(msg.payload.decode())
        my_payload = json.loads(msg.payload.decode())
        if(my_payload["command"]=="structured"):
            exercise = model_with_structure.invoke(setPrompt(my_payload["message"]))
            events = model.invoke(setPrompt2(exercise))
            client.publish("webpage/input", events.content)
        if(my_payload["command"]=="exercise"):
            events = model_with_tools.invoke(setPrompt3(exercise))
            res = execTools(events)
            client.publish("webpage/input", res + "\n \n Waiting for the arm to perform the exercise for the patient...")
        if(my_payload["command"]=="standard"):
            events = model.invoke(my_payload["message"])
            print(events.content)
            client.publish("webpage/input", json.dumps(events.content))
    if(msg.topic == "esp32arm/output" and msg.payload.decode()== "1"):
        client.publish("webpage/input", "Waiting for the patient to perform the exercise...")
        flag = True
        counter = [0,0,0]
    if(msg.topic == "esp32/output" and flag):
        message = msg.payload.decode()
        detection = json.loads(message)
        keys = list(detection.keys())
        if(keys[0]=="ef"):
            counter[0] = counter[0] + 1
        elif(keys[0]=="efe"):
            counter[1] = counter[1] + 1
        elif(keys[0]=="al"):
            counter[2] = counter[2] + 1
        if(exercise["exercise"] == 1 and counter[0] == exercise["reps"]):
            events = model_with_tools.invoke(f""" Transfer 1000.00 of this token {contract_address} to this address {user_address}""")
            res = execTools(events)
            client.publish("webpage/input", res + "\n \n The patient successfully completed the exercise.")
            flag = False
            counter = [0,0,0]
        elif(exercise["exercise"] == 2 and counter[1] == exercise["reps"]):
            events = model_with_tools.invoke(f""" Transfer 1000.00 of this token {contract_address} to this address {user_address}""")
            res = execTools(events)
            client.publish("webpage/input", res + "\n \n The patient successfully completed the exercise.")
            flag = False
            counter = [0,0,0]
        elif(exercise["exercise"] == 3 and counter[2] == exercise["reps"]):
            events = model_with_tools.invoke(f""" Transfer 1000.00 of this token {contract_address} to this address {user_address}""")
            res = execTools(events)
            client.publish("webpage/input", res + "\n \n The patient successfully completed the exercise.")
            flag = False
            counter = [0,0,0]
        print(counter)

async def subscribe_to_transfer_events():
    async with AsyncWeb3(WebSocketProvider(websocket)) as w3:
        # MQTT Setup
        client = mqtt_client.Client(client_id)
        client.username_pw_set(username, password)
        # MQTT Callbacks
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_message = on_message_mqtt
        client.connect(broker, int(port))
        client.loop_start()
        # MQTT Sub Topics
        for topic in sub_topics:
            client.subscribe(topic)
        # EVM Events Setup
        contract = w3.eth.contract(address=contract_address, abi=contract_abi)
        # EVM Sub Topics
        transfer_event_topic = w3.keccak(text=topic_w3)
        filter_params = {
            "address": contract_address,
            "topics": [transfer_event_topic],
        }
        subscription_id = await w3.eth.subscribe("logs", filter_params)
        
        print(f"Subscribing to transfer events for Contract at {subscription_id}")
        async for payload in w3.socket.process_subscriptions():
            on_event_evm(client,payload)
            
asyncio.run(subscribe_to_transfer_events())


Connected to MQTT Broker!
Subscribing to transfer events for Contract at 0x7352b745f440303156bb13e9982c6a2a
