In [1]:
import torch

Z_DIM = 32
VOC_ACTIONS = ['no_action', 'Button.left', 'Key.enter', 'Key.esc', 'o', 'scroll_down', 'z']
MAX_LENGTH = 25 #steps
MAXWIDTH_screenshoot = 1920  #pixels
MAXHEIGHT_screenshoot = 1080 #pixels
MAXDELAY = 15 #secs
SEED = 0

torch.manual_seed(SEED)
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
device = "cpu"
print(f'device: {device}')

device: cpu


In [2]:
import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self, latent_dims):
        super().__init__()
        self.conv1 = nn.Conv2d(3,32,4, stride=2, padding="valid")
        self.conv2 = nn.Conv2d(32,64,4, stride=2, padding="valid")
        self.conv3 = nn.Conv2d(64,128,4, stride=2, padding="valid")
        self.conv4 = nn.Conv2d(128,256,4, stride=2, padding="valid")
        self.fc_mean = nn.Linear(256*2*2, latent_dims)
        self.fc_logvar = nn.Linear(256*2*2, latent_dims)

        self.N = torch.distributions.normal.Normal(torch.tensor(0.0).to(device), 
                                                   torch.tensor(1.0).to(device))
        self.kl = 0
    
    def forward(self, x):
        #x = x.to(device)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = torch.flatten(x, 1)
        z_mu = self.fc_mean(x)
        z_logvar = self.fc_logvar(x)
        z_sigma = torch.exp(0.5*z_logvar)
        
        z = z_mu + z_sigma*self.N.sample(z_mu.shape)
        self.kl = 0.5*(z_sigma**2 + z_mu**2 - z_logvar - 1).sum()
        return z, z_mu, z_sigma

class Decoder(nn.Module):
    def __init__(self, latent_dims):
        super().__init__()
        self.decoder = nn.Sequential(
            nn.Linear(latent_dims, 1024),
            nn.ReLU(True),
            nn.Unflatten(1,(1024,1,1)),
            nn.ConvTranspose2d(1024,128,5, stride=2, padding=0),
            nn.ReLU(True),
            nn.ConvTranspose2d(128,64,5, stride=2, padding=0),
            nn.ReLU(True),
            nn.ConvTranspose2d(64,32,6, stride=2, padding=0),
            nn.ReLU(True),
            nn.ConvTranspose2d(32,3,6, stride=2, padding=0),
            nn.Tanh(),
        )
    
    def forward(self, x):
        return self.decoder(x)

class Vae(nn.Module):
    def __init__(self, latent_dims):
        super().__init__()
        self.encoder = Encoder(latent_dims)
        self.decoder = Decoder(latent_dims)

    def forward(self, x):
        #x = x.to(device)
        z, _, _ = self.encoder(x)
        return self.decoder(z)
    
from torchvision import transforms

IMGSIZE= [64,64] #[1080//10, 1920//10]
img_transform = transforms.Compose([
                transforms.Resize(IMGSIZE, antialias=None),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                    std=[0.5, 0.5, 0.5]),
                
                ])

vae = Vae(latent_dims=Z_DIM)
vae.to(device)
vae.load_state_dict(torch.load('best_vae.pt')['model_state_dict'])
vae.eval()
vae.encoder

  from .autonotebook import tqdm as notebook_tqdm


Encoder(
  (conv1): Conv2d(3, 32, kernel_size=(4, 4), stride=(2, 2), padding=valid)
  (conv2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2), padding=valid)
  (conv3): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=valid)
  (conv4): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=valid)
  (fc_mean): Linear(in_features=1024, out_features=32, bias=True)
  (fc_logvar): Linear(in_features=1024, out_features=32, bias=True)
)

In [50]:
import torch.nn.functional as F
import torch.nn as nn
  
class LSTM13(nn.Module):  
    def __init__(self, z_dim, h_dim, hfc_dim, num_layers=1):
        super().__init__()
        self.h_dim = h_dim
        self.lstm = nn.LSTM(z_dim, h_dim, num_layers=num_layers, batch_first=True)
        self.fc1_action = nn.Linear(h_dim, hfc_dim)
        self.fc2_action = nn.Linear(hfc_dim, 1)
        self.fc_position = nn.Linear(h_dim, 2)
        self.fc_delay = nn.Linear(h_dim, 1)

    def forward(self, zt, state):
        ht, state = self.lstm(zt, state)  # (N, L, h_dim)
        at = self.fc2_action(F.relu(self.fc1_action(ht)))
        pt = self.fc_position(ht)
        dt = F.relu(self.fc_delay(ht))
        return at, pt, dt, state
    
lstm_best = LSTM13(z_dim=32, h_dim=256, hfc_dim=1024, num_layers=1)
lstm_best.to(device)
lstm_best.load_state_dict(torch.load('best_lstm13_vae.pt')['model_state_dict'])

# ¿Pongo el estado internamente o exteernamente? 

<All keys matched successfully>

In [106]:
import torch.nn.functional as F
import torch.nn as nn

class LSTM14(nn.Module):  
    def __init__(self, z_dim, h_dim, hfc_dim, num_layers=1):
        super().__init__()
        self.h_dim = h_dim
        self.lstm = nn.LSTM(z_dim, h_dim, num_layers=num_layers, batch_first=True)
        self.fc1_action = nn.Linear(z_dim + h_dim, hfc_dim)
        self.fc2_action = nn.Linear(hfc_dim, 1)
        self.fc_position = nn.Linear(z_dim + h_dim, 2)
        self.fc_delay = nn.Linear(z_dim + h_dim, 1)

    def forward(self, zt, state):  # both zt,ht as inputs
        ht, state = self.lstm(zt, state)  # (N, L, h_dim)
        xt = torch.cat((zt,ht), -1)
        at = self.fc2_action(F.relu(self.fc1_action(xt)))
        pt = self.fc_position(xt)
        dt = F.relu(self.fc_delay(xt))
        return at, pt, dt, state

lstm_best = LSTM14(z_dim=32, h_dim=256, hfc_dim=1024, num_layers=1)
lstm_best.to(device)
lstm_best.load_state_dict(torch.load('best_lstm14.pt')['model_state_dict'])

<All keys matched successfully>

In [88]:
x = torch.ones((1,32))
state = (torch.zeros((1,lstm_best.h_dim)), torch.zeros((1,lstm_best.h_dim)))
with torch.no_grad():
    out = lstm_best(x, state)
    state = out[-1]

print(out[:3])
print([out[i].shape for i in range(3)])

(tensor([[1.0256]]), tensor([[ 0.0686, -0.4251]]), tensor([[0.]]))
[torch.Size([1, 1]), torch.Size([1, 2]), torch.Size([1, 1])]


In [107]:
import pyautogui
import time

def extract_features(image): 
    img_tensor = img_transform(image)
    with torch.no_grad():
        _, zmu, _ = vae.encoder(torch.unsqueeze(img_tensor, 0))
    return zmu

def pcontroller(action, position, delay): 
    # Mouse
    if "Button." in action or "scroll_" in action:
        if action == 'Button.left': 
            pyautogui.moveTo(position[0],position[1])
            pyautogui.click()
        elif action == 'scroll_down': 
            pyautogui.moveTo(position[0],position[1])
            pyautogui.scroll(-1)
    # Keyboard
    elif "Key." in action: 
        if action == 'Key.enter':
            pyautogui.moveTo(position[0],position[1])
            pyautogui.press('enter')
        elif action == 'Key.esc':
            pyautogui.moveTo(position[0],position[1])
            pyautogui.press('esc')
    # Write with keyboard
    elif len(action) == 1 and action.isalpha():
        pyautogui.moveTo(position[0],position[1])
        pyautogui.write(action)
    else:
        print(f'This action {action} does not exist')
    time.sleep(delay)

def execute(at, pt, dt, debugging=False):
    at = at.squeeze().detach().cpu().numpy()
    pt = pt.squeeze().detach().cpu().numpy()
    dt = dt.squeeze().detach().cpu().numpy()

    def unnormalized(x_norm, max):
        x = max/2*(x_norm + 1.0)
        return int(round(x,0))
    
    action = VOC_ACTIONS[unnormalized(at,len(VOC_ACTIONS))]
    px = unnormalized(pt[0],MAXWIDTH_screenshoot)
    py = unnormalized(pt[1],MAXHEIGHT_screenshoot)
    delay = MAXDELAY*dt
    if debugging == False:
        pcontroller(action, (px, py), delay)
    return action, (px,py), delay


# Tool for debugging actions
from PIL import Image, ImageDraw, ImageFont

def viz_actions(img, index, action, px, py, delay):
    mfont = ImageFont.truetype("arial.ttf", 40)
    kfont = ImageFont.truetype("arial.ttf", 80)
    size = 10
    
    draw = ImageDraw.Draw(img)
    if 'Button.' in action: 
        draw.ellipse([px-size/2,py-size/2,px+size//2,py+size//2], fill="red")
        draw.point((px,py), fill='yellow')
        draw.text((px,py), str(index), fill='red', font=mfont)
    else:
        draw.text((MAXWIDTH_screenshoot//2 - 100,MAXHEIGHT_screenshoot//2), action, fill='red', font=kfont)
    img.save('runs/step{:02}_delay{}.jpg'.format(index, str(round(delay, 3))))

In [63]:
import pandas as pd
from PIL import Image

actions_df = pd.read_csv('actions_df - sample0.csv')

In [110]:
time.sleep(3)
state = (torch.zeros((1,lstm_best.h_dim)), torch.zeros((1,lstm_best.h_dim)))
for i in range(MAX_LENGTH):  #
    screenshot = pyautogui.screenshot()
    #screenshot = Image.open(actions_df['img_path'][i])
    zt = extract_features(screenshot)
    with torch.no_grad():
        at, pt, dt, state = lstm_best(zt, state)
    action, (px,py), delay = execute(at,pt, dt, debugging=False)
    if action == "no_action":
        pyautogui.alert('The execution has been terminated.')
        break

    print(f'step = {i+1}')
    print(round(float(at[0,0]),4), round(float(pt[0,0]),4), round(float(pt[0,1]),4), round(float(dt[0,0]),3))
    print(action, (px,py), round(delay, 3))
    print()

    viz_actions(screenshot, i, action, px, py, delay)


step = 1
0.5712 0.0152 -0.8511 0.0
scroll_down (975, 80) 0.0

step = 2
0.7516 0.437 -0.9946 0.0
z (1380, 3) 0.0

step = 3
-0.1172 -0.2628 -0.86 0.0
Key.esc (708, 76) 0.0

step = 4
-0.3984 -0.3146 -0.6146 0.016
Key.enter (658, 208) 0.233

step = 5
-0.4669 0.1174 -0.5325 0.312
Key.enter (1073, 252) 4.673

step = 6
-0.6115 -0.3915 -0.5158 0.007
Button.left (584, 261) 0.101

step = 7
-0.2095 -0.3037 -0.1684 0.277
Key.esc (668, 449) 4.162

step = 8
-0.4461 -0.0376 -0.3124 0.087
Key.enter (924, 371) 1.31

step = 9
-0.4313 0.0026 -0.134 0.176
Key.enter (963, 468) 2.647

step = 10
-0.5023 0.0979 -0.2115 0.12
Key.enter (1054, 426) 1.793

step = 11
-0.4962 0.0975 -0.1286 0.153
Key.enter (1054, 471) 2.3

step = 12
-0.5167 0.1244 -0.1546 0.137
Key.enter (1079, 457) 2.048

step = 13
-0.5154 0.1253 -0.1389 0.14
Key.enter (1080, 465) 2.106

step = 14
-0.517 0.1305 -0.1405 0.135
Key.enter (1085, 464) 2.029

step = 15
-0.516 0.1313 -0.1372 0.134
Key.enter (1086, 466) 2.006

step = 16
-0.5159 0.1326 -0.

In [None]:

FEB27 2024

Me parece que si deb limitar el rango de at y pt usando tanh [-1,1] porque a veces puede generar valores fuera del rango. 

Recuerda que las Zoom cambio un poco su webapp


Use los screenshots guardados del sample0 y funciono bien todo el resto del codigo. Esto me idica que el problema esta en el Autoencoder que 
genera features sensibles a los cambios en el screenshot. 

Hey! podria equitar nuevaos samples con screenshot diferentes aprovechando que lstm si predice bien. 

Probe entrenando LSTM13 con VAE, hace que las decisiones de la posicion varien un poco del ideal, asi que no le atina a los botones. Y varian cada vez que se ejecuta. 

Probe el modelo LSTM14 que contatena zty ht, me parece que es un modelo qe podria generalizar mas que el LSTM13 y LSTM12. 

Conclusion: mi codigo es completo y funcional, pero el modelo no generaliza lo suficiente. 


¿Que hago ahora?
1) colecto, reentreno modelos, y lo puebo en condiciones iguales -> grabo el demo y compacto el pipeline 
2) mejoro la recoleccion de datos, recolecto una base de datos mas grande, y entreno un modelo screen -> (M&K action, pt, dt) sin tareas, solo imtar la dinamica
   Con el fin de ajustarlo con pocos jemplos despues para la tarea de abrir la sesion de zoom 
3) Solo recolecto mas ejemplos de la misma tarea, y busco que generalize haciendo esa tarea. 

4) añado las acciones pasadas como entrada al LSTM14 
5) Aplico la filosofica de JEPA para entrenar un modelo fundacional. 

6) Replicar el trabjao de deepmind 2022


RW para implentar modelos del mundo 
Predicting the Future with Simple World Models https://www.scholar-inbox.com/papers/Saanum2024ARXIV_Predicting_the_Future_with.pdf
WORLD MODEL ON MILLION-LENGTH VIDEO AND LANGUAGE WITH RINGATTENTION  https://www.scholar-inbox.com/papers/Liu2024ARXIV_World_Model_on_Million.pdf
WorldCoder, a Model-Based LLM Agent: Building World Models by Writing Code and Interacting with the Environment  https://www.scholar-inbox.com/papers/Tang2024ARXIV_WorldCoder_a_Model_Based.pdf
