# Demonstrating the dataset generation for an ampl model

This notebook shows the generation of the dataset that are subsequently used to train a neural network for optimal control.

In [1]:
### %pylab notebook
from pyquad.ekin import*
from tqdm import tqdm



In [6]:
ampl_mod_path = "ampl/bebop_6dof.mod"

num = 10000

# INITIAL CONDITIONS
y0 = np.random.uniform(-10.0, 10.0, num)
z0 = np.random.uniform(-10.0, 10.0, num)

vy0 = np.random.uniform(-5.0,5.0, num)
vz0 = np.random.uniform(-5.0,5.0, num)
theta0 = np.random.uniform(-np.pi/3, np.pi/3, num)

omega0 = np.random.uniform(-0.01, -0.01, num)


parameters = {
        "y0"          : y0,
        "z0"          : z0,
        "vy0"         : vy0,
        "vz0"         : vz0,
        "theta0"      : theta0,
        "omega0"      : omega0,
}

fixed_parameters = {
    "yn" : 0.0,
    "zn" : 0.0,
    "vyn" : 0.0,
    "vzn" : 0.0,
    "thetan" : 0.0,
    "omegan" : 0.0,  
    "epsilon" : 0.5,  
}

In [7]:
def run_ampl_model(param_dict, file_path):
    model = AMPLModel(ampl_mod_path)
    model.setParameterValues(param_dict)
    model.setParameterValues(fixed_parameters)
    model.solve()
    soln_vals = model.getSolutionValues()
    obj_vals = model.getObjectiveValues()
    # save solution to file
    np.savez(file_path, **{"Success": model.checkSolved(), "t": soln_vals['timegrid'], **soln_vals, **obj_vals, **param_dict, **fixed_parameters})

In [8]:
name = '2D_QUAD_HOVER'
datafolder = 'datasets/' + name
datafile = 'datasets/' + name + '.npz'

# make folder if it doesn't exist
if not os.path.exists(datafolder):
    os.makedirs(datafolder)

In [9]:
##### trajectories to solve
index_set = set(range(num)) -  {int(file.replace('.npz','')) for file in os.listdir(datafolder)}

# solve trajectories in parallel
njobs = 20
sol_lst = joblib.Parallel(njobs)(joblib.delayed(run_ampl_model)(
    {key: val[idx] for key,val in parameters.items()},
    datafolder + '/' + str(idx)
) for idx in tqdm(index_set))

print('Generated', num,'trajectories in', datafolder)





100%|██████████| 7446/7446 [04:32<00:00, 27.29it/s]


Generated 10000 trajectories in datasets/2D_QUAD_HOVER


In [10]:
def load_file(file, datafolder):
    try:
        return dict(np.load(datafolder + '/' + file))
    except:
        return None

njobs = 100
trajectory_list = joblib.Parallel(njobs)(
    joblib.delayed(load_file)(file, datafolder) for file in tqdm(os.listdir(datafolder))
)
trajectory_list = [traj for traj in trajectory_list if traj != None]

100%|██████████| 10000/10000 [00:04<00:00, 2231.12it/s]


In [11]:
keys = trajectory_list[0].keys()
trajectory_list = [traj for traj in trajectory_list if traj.keys()==keys]
keys = keys - {key+'m' for key in keys if key+'m' in keys}

dataset = {key: np.stack([traj[key] for traj in trajectory_list]) for key in tqdm(keys)}

# default parameters
params = AMPLModel(ampl_mod_path).getParameterValues()
dataset = {**params, **dataset}

# save dataset
np.savez_compressed(datafile, **dataset)

100%|██████████| 35/35 [00:00<00:00, 100.75it/s]


**Filter failed trajectories**

In [12]:
def filter_unsuccesful(dataset):
    return set(np.where(dataset['Success'] == False)[0])


a=np.load(datafile)
num = a['y'].shape[0]
discard = filter_unsuccesful(a)
keep = set(range(num)) - discard

print(len(discard))
print(num)

if discard:
    b = dict()
    for key in tqdm(a.keys()):
        if len(a[key].shape) > 0:
            if a[key].shape[0] == num:
                b[key] = a[key][list(keep)]
        else:
            b[key] = a[key]
    np.savez_compressed(datafile, **b)
    
dataset = dict(np.load(datafile))

19
10000


100%|██████████| 48/48 [00:01<00:00, 34.64it/s]


**Animation Code**

In [13]:
import cv2

# global variables
window_created = False
window_name = "2d Quadcopter"
img_size = 800
scale = img_size / 20.0

# videowriter
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter('output.mp4', fourcc, 100, (img_size, img_size))
recording = False

def map_to_image(x,z):
    global img_size, scale
    # Map the quadcopter's position to the image coordinates
    x = int(x * scale + img_size / 2)
    z = int(img_size - z * scale - img_size / 2)
    return x, z

def draw(image,x,z,theta,u1,u2):
    global scale
    
    # Draw the quadcopter
    
    length = 0.2 # Length of the quadcopter's arm
    direction = np.array([np.cos(theta), np.sin(theta)])
    left_motor = np.array([x, z]) - length * direction
    right_motor = np.array([x, z]) + length * direction

    # Define the thrust vectors
    left_thrust = u1*np.array([np.cos(theta+np.pi/2), np.sin(theta+np.pi/2)])*length
    right_thrust = u2*np.array([np.cos(theta+np.pi/2), np.sin(theta+np.pi/2)])*length

    # Draw the quadcopter as a line segment
    x1, z1 = map_to_image(left_motor[0], left_motor[1])
    x2, z2 = map_to_image(right_motor[0], right_motor[1])
    
    # Draw cross in the origin
    size = 0.2
    cv2.line(image, map_to_image(0,size), map_to_image(0, -size), (200, 200, 200), 2)
    cv2.line(image, map_to_image(size,0), map_to_image(-size, 0), (200, 200, 200), 2)

    # Draw the action as an arrow
    cv2.arrowedLine(image, (x1, z1), (x1+int(left_thrust[0]*scale), z1-int(left_thrust[1]*scale)), (0, 0, 255), 2)
    cv2.arrowedLine(image, (x2, z2), (x2+int(right_thrust[0]*scale), z2-int(right_thrust[1]*scale)), (0, 0, 255), 2)
    cv2.line(image, (x1, z1), (x2, z2), (255, 0, 0), 2)

    return image

def nothing(x):
    pass
    
def animate(*trajectories):
    global window_created, recording, writer
    if not window_created:
        cv2.namedWindow(window_name)
        window_created = True
        
    # create a slider to control the time
    cv2.createTrackbar('t', window_name, 0, trajectories[0]['t'].shape[0]-1, nothing)
    
    while True:
        index = cv2.getTrackbarPos('t', window_name)
        
        # Create a white image as the background
        image = 255*np.ones((800, 800, 3), dtype=np.uint8)
        
        for traj in trajectories:
            image = draw(
                image,
                traj['y'][index],
                traj['z'][index],
                traj['theta'][index],
                traj['ur'][index],
                traj['ul'][index]
            )
        
        key = cv2.waitKeyEx(1)

        # Check if the "Esc" key is pressed
        if key == 27:  # 27 is the ASCII code for the "Esc" key
            cv2.destroyAllWindows()
            window_created = False
            break
        # record if 'r' is pressed
        if key == ord('r'):
            recording = not recording
            if recording:
                print("Recording started")
            else:
                writer.release()
        if recording:
            writer.write(image)
            # write text on the image
            cv2.putText(image, "Recording", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        # Show the image in the OpenCV window
        cv2.imshow(window_name, image)
    

**View Trajectories**

In [14]:
# load dataset
dataset = dict(np.load(datafile))

# select 100 random trajectories from the dataset
indices = np.random.choice(dataset['t'].shape[0], 100, replace=False)
trajs = [{key: dataset[key][index] for key in ['t', 'y', 'z', 'theta', 'ur', 'ul']} for index in indices]

animate(*trajs)