In [None]:
import socket, struct, time
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
import cv2, os, yaml
from ipywidgets import IntProgress
from IPython.display import display
import pandas as pd

import sys
sys.path.append("../../SeeThruFinger")
sys.path.append("../../SeeThruFinger/tracker")
sys.path.append("../../SeeThruFinger/tracker/model")
from track_anything import TrackingAnything_2
from track_anything import parse_augment
import requests,json,torch,torchvision,time  
from tools.painter import mask_painter

In [None]:
folder ="./checkpoints"
xmem_checkpoint = "./checkpoints/XMem-s012.pth"
e2fgvi_checkpoint = "./checkpoints/E2FGVI-HQ-CVPR22.pth"

# initialize sam, xmem, e2fgvi models
args = parse_augment()
model = TrackingAnything_2(xmem_checkpoint, e2fgvi_checkpoint,None,args)

In [None]:
image_w = 1280
image_h = 720
cap = cv2.VideoCapture(0)
cap.set(3,image_w)
cap.set(4,image_h)
ret, img = cap.read()

In [None]:
class ATISensor:
    '''The class interface for an ATI Force/Torque sensor.
    This class contains all the functions necessary to communicate
    with an ATI Force/Torque sensor with a Net F/T interface
    using socket.
    '''    
    def __init__(self, ip='192.168.1.1'):
        self.ip = ip
        self.port = 49151
        self.sock = socket.socket()
        time.sleep(0.5) # wait for proper connection
        self.sock.connect((self.ip, self.port))
        self.READ_CALIBRATION_INFO = bytes([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                              0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
        self.READ_FORCE = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                           0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
        self.RESET_FORCE = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
                           0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01])
        self.countsPerForce = 1000000
        self.countsPerTorque = 1000000
        self.scaleFactors_force = 15260      # ATI Nano25 of SusTech
        self.scaleFactors_torque = 92
        self.sock.send(self.RESET_FORCE)
    def get_measurement(self):
        self.sock.send(self.READ_FORCE)
        force_info = self.sock.recv(16)
        header, status, ForceX, ForceY, ForceZ, TorqueX, TorqueY, TorqueZ = struct.unpack('!2H6h', force_info)
        raw = np.array([ForceX, ForceY, ForceZ, TorqueX, TorqueY, TorqueZ])
        force_torque = np.concatenate([raw[:3] * self.scaleFactors_force/self.countsPerForce, 
                                 raw[3:] * self.scaleFactors_torque/self.countsPerTorque])
        return force_torque
    
    def reset(self):
        self.sock.send(self.RESET_FORCE)
        
    def close(self):
        self.sock.close()

sensor = ATISensor()
t0 = time.time()
for i in range(1000):
    ft = sensor.get_measurement()

sensor.close()
print("The frequency of ATI is ", 1000/(time.time()-t0))

# Generate mask template for tracking

In [None]:
t0 = time.time()  
for i in range(30):
    ret, img = cap.read()

print('Frame rate is ', 30/(time.time()-t0))
plt.imshow(img)

In [None]:
# generate template for tracking

gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
mask = cv2.inRange(gray, 70, 255)
cv2.imwrite('cam11_template.png',gray)
mask[:,:80]=0
mask[:,1100:]=0
plt.imshow(mask)
np.save('cam11_template.npy',mask[::2,::2])

In [None]:
mask9 = np.load("cam9_template.npy").astype(np.float32)
mask10 = np.load("cam10_template.npy")
mask11 = np.load("cam11_template.npy")

plt.imshow(mask9,cmap='gray', vmin=0, vmax=255)

In [None]:
np.sum(mask9==255)/(mask9.shape[0]*mask9.shape[0])

# collect train/test data for force learning

In [None]:
sensor = ATISensor()
K = 1
N = 1000
f = IntProgress(min=0, max=N) # instantiate the bar
display(f)

fts = np.zeros([N,6])
timestamps = []

for k in range(K):
    images = np.zeros([N, 360, 640,3],dtype=np.uint8)
    sensor.reset()
    print("%s Please press the finger..."%k)
    s = time.time()
    for i in range(N):  #120*60*10)
        timestamps.append(time.time()-s)
        ft = sensor.get_measurement()
        ret, color_image = cap.read()
        images[i,:,:,] = color_image[::2,::2,:]
        fts[i, :] = ft
        f.value = i
    print("Saving images...")
    
sensor.close()

dir = '/home/fang/Documents/Track-Anything/forcesensor/test-2-cam11-blackstick-' + datetime.now().strftime("%m%d-%H%M%S")
os.mkdir(dir)

for i in range(N):
    cv2.imwrite(dir+'/%s.png'%i,images[i,:,:,:])
np.save(dir+"/force_vecs.npy", fts)
np.save(dir+"/timestamps.npy",timestamps)
print("Save to path: ", dir)

# Read and process train data

In [None]:
# dir = '/home/fang/Documents/Track-Anything/forcesensor/train-2-0524-112734/'
# dir = '/home/fang/Documents/Track-Anything/forcesensor/train-2-YCB-0524-163343/'
# dir = '/home/fang/Documents/Track-Anything/forcesensor/train-3-0525-193122/'
# dir = '/home/fang/Documents/Track-Anything/forcesensor/train-2-YCB2-0525-200807/'
dir = '/home/fang/Documents/Track-Anything/forcesensor/train-2-cam10-0531-154909/'
# dir = '/home/fang/Documents/Track-Anything/forcesensor/train-3-cam10-0531-162540/'

N = 10000
images = np.zeros([N, 360, 640,3],dtype=np.uint8)
painted_images = []
masks = []

template_mask = np.load("cam10_template.npy")/255
model.xmem.clear_memory()
img_init = cv2.imread(dir+'0.png')
mask, logit = model.xmem.track(img_init, template_mask)
for i in range(60):
    mask, logit = model.xmem.track(img_init)

f = IntProgress(min=0, max=N) # instantiate the bar
display(f)
for i in range(N):
    f.value = i
    images[i,:,:,:] = cv2.imread(dir+'%s.png'%i)
    mask, logit = model.xmem.track(images[i,:,:,:])
    masks.append(mask)
    painted_image = mask_painter(images[i], (mask==1).astype('uint8'), mask_color=1+1)
    painted_images.append(painted_image) 

In [None]:
plt.imshow(masks[10])

In [None]:
timestamps = np.load(dir+'timestamps.npy')
out = cv2.VideoWriter(dir+'inpaint_720-360_cam10_train-2.mov', cv2.VideoWriter_fourcc(*'jpeg'), len(timestamps)/timestamps[-1], (640, 360))
for i in range(len(masks)):
    out.write(painted_images[i])

out.release()
np.save(dir+'mask.npy',masks)

In [None]:
fts = np.load(dir+'/force_vecs.npy')

In [None]:
fig, [ax,ax0, ax1, ax2] = plt.subplots(4,1,figsize=(9, 8), dpi=150)
ax.plot(fts[:,:2])
ax0.plot(fts[:,2])
ax1.plot(fts[:,3:5])
ax2.plot(fts[:,5])

# Read and process test data

In [None]:
# dir = '/home/fang/Documents/Track-Anything/forcesensor/test-2-cam10-0530-220501/'
# dir = '/home/fang/Documents/Track-Anything/forcesensor/test-2-cam11-0531-185016/'
dir = '/home/fang/Documents/Track-Anything/forcesensor/test-2-cam11-blackstick-0604-104623/'

N = 1000
images = np.zeros([N, 360, 640,3],dtype=np.uint8)
painted_images = []
masks = []

template_mask = np.load("cam11_template.npy")/255
model.xmem.clear_memory()
img_init = cv2.imread(dir+'0.png')
mask, logit = model.xmem.track(img_init, template_mask)
for i in range(60):
    mask, logit = model.xmem.track(img_init)

for i in range(N):
    images[i,:,:,:] = cv2.imread(dir+'%s.png'%i)
    mask, logit = model.xmem.track(images[i,:,:,:])
    masks.append(mask)
    painted_image = mask_painter(images[i], (mask==1).astype('uint8'), mask_color=1+1)
    painted_images.append(painted_image) 
        
timestamps = np.load(dir+'timestamps.npy')
out = cv2.VideoWriter(dir+'inpaint_720-360_cam11_test-2-black.mov', cv2.VideoWriter_fourcc(*'jpeg'), len(timestamps)/timestamps[-1], (640, 360))
for i in range(len(masks)):
    out.write(painted_images[i])

out.release()
np.save(dir+'mask.npy',masks)