In [1]:
import rospy 
import actionlib
from stretchsense.msg import smartGloveServerAction, smartGloveServerGoal
import time

In [2]:
def feedback(msg):
    print('Feedback received: ', msg)

In [3]:
def call_server(action: list):
    
    client = actionlib.SimpleActionClient('smartglove', smartGloveServerAction)
    client.wait_for_server()
    goal = smartGloveServerGoal()
    goal.goal = action
    client.send_goal(goal, feedback_cb = feedback)
    client.wait_for_result()
    result = client.get_result()
    return result

def scan_for_devices(arg: None):
    return call_server(['scan'])
    
def connect_with_address(addr: str):
    return call_server(['connect', addr])

def select_model(filename: str):
    return call_server(['select_theta', filename])

def calibrate(arg: None):
    return call_server(['calibrate'])

def choose_goal(action: callable, arg: str = 'empty'):
    try:
        result = action(arg)
        print('The result is:', result)
        
    except rospy.ROSInterruptException as e:
        print('Something went wrong:', e)
        
def stop_thread(arg: None):
    return call_server(['stop'])

def disconnect_peripheral(arg: None):
    return call_server(['disconnect'])

rospy.init_node('smart_glove_client')

In [4]:
choose_goal(scan_for_devices)
choose_goal(connect_with_address, 'Acidalia')
# choose_goal(calibrate)
choose_goal(select_model, 'theta_new.csv')
# time.sleep(5)
# choose_goal(stop_thread)

The result is: result: 
  - Acidalia
  - f8:a6:1c:cb:ed:a6
The result is: result: []
The result is: result: 
  - Clench your fist and follow the model on the screen


In [5]:
choose_goal(stop_thread)
choose_goal(disconnect_peripheral)

The result is: result: 
  - thread stopped
The result is: result: 
  - disconnected


In [None]:
count = [1,2,3]

try:
    while True:
        for r in range(20):
            print(count[r])
except Exception as e:
    print(e)
    


In [4]:
import time
import threading
# from multiprocessing import Process

arr = [1,2,3]

class thread_test():
    def __init__(self):
#         super().__init__()
        self._stop = threading.Event()
        
    def stopped(self):
        return self._stop.isSet()
    
    def recurring(self):
        try:
            while not self.stopped():
                print('running')
                time.sleep(1)
            print('stopped running')
        except Exception as e:
            print(e)
            pass 



In [5]:
tt = thread_test()

tt_thread = threading.Thread(target = tt.recurring, args=())
tt_thread.isDaemon = True

tt_thread.start()

# tt_thread = thread_test()
# tt_thread.start()

In [3]:
print(f'the thread is alive: {tt_thread.is_alive()}')
tt._stop.set()
print(f'the thread is stopped: {tt_thread._is_stopped}')

# tt_thread._is_stopped

the thread is alive: True
the thread is stopped: False
stopped running


In [None]:
tt = thread_test()

In [2]:
import threading

dir(threading.Event)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_reset_internal_locks',
 'clear',
 'isSet',
 'is_set',
 'set',
 'wait']

In [2]:
def returnNone():
    return

print(returnNone())

None
