## Mapping the sEMG via OSC


### Importing the libraries


In [2]:
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import pandas as pd

##################### OSC libraries #########################
import argparse #to properly understand incoming OSC messages 
from pythonosc import dispatcher # allow us to catch and map input from osc 
from pythonosc import osc_server # to create osc messages 
from pythonosc import osc_message_builder #to create adress-value massages 
from pythonosc import udp_client# allow us to listen from messages 
from pythonosc.osc_server import ThreadingOSCUDPServer

### Importing the sEmg signal


In [3]:
sheet= "Sheet1" #exel sheet name

# We will use these two lists for the plots 
muscles= ['Brachioradialis  Sx', 
          'Brachioradialis  Dx',
          'Extensor Carpi Radialis Sx',
          'Extensor Carpi Radialis  Dx',
          'Triceps brachii Long Head Dx', 
          'Deltoid  Lateral  Dx',
          'Abductor digiti minimi (hand) Sx',
          'Abductor digiti minimi (hand) Dx']
gesture = ['Arpeggio',
           'Strumming',
           'Bending', 
           'PullOffHammerOn',
           'Tapping',
            'StrongPick',
            'DoublePick']


#RMS path
arpeggio_path_rms =  "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\1 Arpeggio\Arpeggio_rms_exp.xlsx"
strumming_path_rms = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\2 Strumming\Strumming_rms_exp.xlsx"
bending_path_rms = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\3 Bending\Bending_rms_exp.xlsx"
pullOffHammerOn_path_rms = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\4 PullOffHammerOn\PullOffHammerOn_rms_exp.xlsx"
tapping_path_rms = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\5 Tapping\\tapping_rms_exp.xlsx"
strongPick_path_rms = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\6 StrongPick\\strongPick_rms_exp.xlsx"
doublePick_path_rms = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\7 DoublePick\\doublePick_rms_exp.xlsx"

#RAW path
arpeggio_path_raw =  "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\1 Arpeggio\Arpeggio_exp.xlsx"
strumming_path_raw = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\2 Strumming\Strumming_exp.xlsx"
bending_path_raw = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\3 Bending\Bending_exp.xlsx"
pullOffHammerOn_path_raw = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\4 PullOffHammerOn\PullOffHammerOn_exp.xlsx"
tapping_path_raw = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\5 Tapping\\tapping_exp.xlsx"
strongPick_path_raw = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\6 StrongPick\\strongPick_exp.xlsx"
doublePick_path_raw = "C:\\Users\david\OneDrive - Politecnico di Milano\Documenti\GItDesktop\MAE_Thesis\\firstPrototype\secondAcquisition\\7 DoublePick\\doublePick_exp.xlsx"
#loading the rawPower dataframes

dataFrame_tapping_rms = pd.read_excel(io=tapping_path_rms, sheet_name=sheet)
dataFrame_tapping_raw = pd.read_excel(io=tapping_path_raw, sheet_name=sheet)

# np array convertion
npdf_tapping_rms = np.asarray(dataFrame_tapping_rms) 
npdf_tapping_raw = np.asarray(dataFrame_tapping_raw) 

def loading_exp(raw_path,rms_path, sheet= "Sheet1", discarded_sample= 10000):
    dataFrame_raw = pd.read_excel(io=raw_path, sheet_name=sheet)
    dataFrame_rms = pd.read_excel(io=rms_path, sheet_name=sheet)

    #np array convertion deleting the first column equal to datatime, and the first raw equal to the muscles name
    
    npdf_raw= np.asarray(dataFrame_raw).T[1:,discarded_sample:]
    npdf_rms= np.asarray(dataFrame_rms).T[1:,discarded_sample:]
    return npdf_raw,npdf_rms 

tapping_raw, tapping_rms = loading_exp(tapping_path_raw, tapping_path_rms)


# alto zero crossing vuole dire una maggiore frequenza di attivazione del muscolo e quindi indica un aumento dello sforzo 



In [4]:
#TAPPING
tapping_raw, tapping_rms = loading_exp(tapping_path_raw, tapping_path_rms)

tapping_left_outer_forearm_rms = tapping_rms[2] 
tapping_left_outer_forearm_raw = tapping_raw[2]

#STRUMMING
strumming_raw, strumming_rms = loading_exp(strumming_path_raw, strumming_path_rms)

strumming_left_outer_forearm_rms = strumming_rms[2] 
strumming_left_outer_forearm_raw = strumming_raw[2]

#ARPEGGIO

arpeggio_raw, arpeggio_rms = loading_exp(arpeggio_path_raw, arpeggio_path_rms)

arpeggio_left_outer_forearm_rms = arpeggio_rms[2] 
arpeggio_left_outer_forearm_raw = arpeggio_raw[2]

#BENDING

bending_raw, bending_rms = loading_exp(bending_path_raw, bending_path_rms)

bending_left_outer_forearm_rms = bending_rms[2] 
bending_left_outer_forearm_raw = bending_raw[2]

#pullOffHammerOn
pullOffHammerOn_raw, pullOffHammerOn_rms = loading_exp(pullOffHammerOn_path_raw, pullOffHammerOn_path_rms)

pullOffHammerOn_left_outer_forearm_rms = pullOffHammerOn_rms[2] 
pullOffHammerOn_left_outer_forearm_raw = pullOffHammerOn_raw[2]

#STRONGPICK
strongPick_raw, strongPick_rms = loading_exp(strongPick_path_raw, strongPick_path_rms)

strongPick_left_outer_forearm_rms = strongPick_rms[2] 
strongPick_left_outer_forearm_raw = strongPick_raw[2]

#DOUBLE PICK
doublePick_raw, doublePick_rms = loading_exp(doublePick_path_raw, doublePick_path_rms)

doublePick_left_outer_forearm_rms = doublePick_rms[2] 
doublePick_left_outer_forearm_raw = doublePick_raw[2]



In [5]:
arpeggio_right_outer_forearm_rms = arpeggio_rms[3] 
arpeggio_right_outer_forearm_raw = arpeggio_raw[3]

In [6]:

fs= 1000
num_samples= strumming_left_outer_forearm_raw.shape[0]
second_duration = (1/fs)* num_samples
istants_array= np.arange(0, num_samples)/fs

print('num_samples: ',num_samples,'duration[s]: ', second_duration)

num_samples:  32977 duration[s]:  32.977000000000004



### ZCR computation

In [7]:
def ZC(vect):
    """computes the zero_crossing
    
    Args:
        vect(array_like): array on which compute the zero crossing

    Returns: 
        int: ZC value
    """
    return len(np.where(np.diff(np.sign(vect)))[0])

def zcr_signal(sEmg_array):
    """
    Take in input the sEmg array(np) and compute the zcr applying a rolling window
    """
    win_length = 500 # Window length
    win_stride = 250 # Window stride = hop_size
    n_windows = int((len(sEmg_array) - win_length) / win_stride) + 1 # It's gonna be the lenght of the zcr vector
    print('n_window= ', n_windows)
    windows = np.array([sEmg_array[i:i+win_length] for i in range(0, n_windows*win_stride, win_stride)])
    zcrs = [ZC(window) for window in windows]
    return zcrs



### OSC connection setup



In [8]:
#SETTING UP THE SERVER OSC AND THE CLIENT

if __name__ == "__main__":
    #if you are the python main thread then we are going to set up the catchng and sending of OSC messages
    #Set OSC port
    ip = "127.0.0.1" #specila ip to send message only into your computer loval ip adress
    sendPort = 57121 #check the port later 
    inPort= 8000 #check later 
    #sending osc messages 
    client = udp_client.SimpleUDPClient(ip,sendPort)

    #cathces OSC messages 
    dispatcher = dispatcher.Dispatcher()
    #now we create the message names
    #dispatcher.map("/sEmg1", sEmg1_mapping) #message name, function to call 

    #setting up a server
    server = ThreadingOSCUDPServer((ip, inPort), dispatcher)
    print("server is on {}.".format(server.server_address))
    server.serve_forever() #costantily listening for messages 
    #server.kill() 

server is on ('127.0.0.1', 8000).


KeyboardInterrupt: 

#### Sending the OSC

In [None]:
import time  

def sEmg1_mapping(unused_addr, args, client, array1, array2):

    for i in range(len(array1)):
        client.send_message("/sEmg1", array1[i])
        client.send_message("/sEMg2",array2[i])
        print("sending the value 1: " , array1[i], ", sending the value 2: ", array2[i])
    #client.send_message("/sEmg_1_received", "value")

#mandiamo lo zcr ogni 500ms, ma devi fare delle prove, devi capire come si mappa. Uno sforzo muscolare aumenta lo zcr, quindi vediamo se lo mappiamo nelle diverse gesture come modifica il segnale. 
#se ci sono dei bei onset il risultato sar√†  migliore, guarda  i grafici e fai tante prove. Da oggi devi solo fare esperimenti! esperimenti! esperiementi! Altre acquisizioni! e completare la maxPatch. 

# we need to set up the main python methods, dealing with the main python thread first 

     
    #provare a mandare l'array signal_bicep_Dx
      
def send_zcr(zcr_vect,emg_vect, fs):  
    time.sleep(2)#wait 2 secs that i have to start the audio in ableton 
    sEmg_duration = len(emg_vect)*(1/fs) #secondi
    delta_t= sEmg_duration / len(zcr_vect)
    
    print('delta_t: ', delta_t, 'sEmg_duration: ', sEmg_duration)
    for i in range(len(zcr_vect)):
        time.sleep(delta_t) # send the zcr value every 0.2 a second, time accept values in seconds, if you divide for 1000 you obtaine ms
        #nomalization 
        new_min = 0.2
        new_max = 1   
        normal_vect= (zcr_vect - np.min(zcr_vect)) * (new_max - new_min) / (np.max(zcr_vect) - np.min(zcr_vect)) + new_min
        client.send_message("/raw_zcr", normal_vect[i])
        #client.send_message("/normal_BiceptDx",left_outer_forearm_normal_normalized[i])
        print("sending the zcr value: " , normal_vect[i])
    
#zcr_strumming_left_outer_forearm_raw = zcr_signal(strumming_left_outer_forearm_raw)
#send_zcr(zcr_strumming_left_outer_forearm_raw, strumming_left_outer_forearm_raw, 1000)

#zcr_arpeggio_left_outer_forearm_raw = zcr_signal(arpeggio_left_outer_forearm_raw)
#send_zcr(zcr_arpeggio_left_outer_forearm_raw, arpeggio_left_outer_forearm_raw, 1000)
zcr_arpeggio_right_outer_forearm_raw = zcr_signal(arpeggio_right_outer_forearm_raw)
send_zcr(zcr_arpeggio_right_outer_forearm_raw, arpeggio_right_outer_forearm_raw, 1000)


In [14]:
def normalization(vect, new_min= 0.1,new_max= 1):
                    """
                    Normalize a vector to a custum range
                    """
                    #vect = np.asarray(vect)
                    old_min= 40
                    old_max= 160
                    normal_vect= (vect - old_min) * (new_max - new_min) / (old_max - old_min) + new_min
                    return normal_vect #return the normalized vector
x= normalization(100)

print(x)

0.55
