In [2]:
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch
import lightning.pytorch as pl
import lightning.pytorch.callbacks as plc
import torch.nn.functional as F
import torch.nn as nn
import os
import sys
sys.path.append('../')
import pytorch_lightning.loggers as pl_loggers
import pickle
import math
import json
from test_case import configs
import numpy as np
import wandb
import random
DEVICE = 'cuda:2' if torch.cuda.is_available() else 'cpu'


In [15]:
class SeqFormer(nn.Module):
    def __init__(
        self,
        input_dim,
        hidden_dim,
        output_dim,
        mlp_activation="ReLU",
        transformer_activation="gelu",
        mlp_dropout=0.1,
        transformer_dropout=0.1,
    ):
        super(SeqFormer, self).__init__()
        # input_dim: node bits
        self.tranformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=input_dim,
                dim_feedforward=hidden_dim,
                nhead=3,
                batch_first=True,
                activation=transformer_activation,
                dropout=transformer_dropout,
            ),
            num_layers=1,
        )
        self.node_length = input_dim
        if mlp_activation == "ReLU":
            self.mlp_activation = nn.ReLU()
        elif mlp_activation == "GELU":
            self.mlp_activation = nn.GELU()
        elif mlp_activation == "LeakyReLU":
            self.mlp_activation = nn.LeakyReLU()
        # self.mlp_hidden_dims = [128, 64, 32]
        self.mlp_hidden_dims = [256, 128, 1]
        self.mlp = nn.Sequential(
            *[
                nn.Linear(self.node_length, self.mlp_hidden_dims[0]),
                nn.Dropout(mlp_dropout),
                self.mlp_activation,
                nn.Linear(self.mlp_hidden_dims[0], self.mlp_hidden_dims[1]),
                nn.Dropout(mlp_dropout),
                self.mlp_activation,
                nn.Linear(self.mlp_hidden_dims[1], output_dim),
            ]
        )
        self.apply(self._init_weights)
        # self.sigmoid = nn.Sigmoid()

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, x, attn_mask=None):
        # change x shape to (batch, seq_len, input_size) from (batch, len)
        # one node is 18 bits
        # x = x.view(x.shape[0], self.node_length, -1)
        # x = x.transpose(1,2)
        x = x.view(x.shape[0], -1, self.node_length)
        # attn_mask = attn_mask.repeat(4,1,1)
        attn_mask = attn_mask.repeat(1, 3, 1, 1)
        out = self.tranformer_encoder(x, mask=attn_mask)
        # out = self.transformer_decoder(out, out, tgt_mask=attn_mask)
        out = self.mlp(out)
        # out = (torch.tanh(out).squeeze(dim=2) * 5).add(5) 
        # out = torch.tanh(out).squeeze(dim=2).add(1) * 5
        out = torch.tanh(out).squeeze(dim=2).add(1)
        return out # [0, 1] -> [1, 2] [??]

In [17]:
Transformer_model = SeqFormer(
                        input_dim=configs['node_length'],
                        hidden_dim=256,
                        output_dim=1,
                        mlp_activation="ReLU",
                        transformer_activation="gelu",
                        mlp_dropout=0.1,
                        transformer_dropout=0.1,
                    )



In [18]:
class PL_Leon(pl.LightningModule):
    def __init__(self, model, optimizer_state_dict=None, learning_rate=0.001):
        super(PL_Leon, self).__init__()
        self.model = model
        self.optimizer_state_dict = optimizer_state_dict
        self.learning_rate = 0.001

    def forward(self, batch_pairs):
        pass

    def getBatchPairsLoss(self, labels, costs1, costs2, encoded_plans1, encoded_plans2, attns1, attns2):
        """
        batch_pairs: a batch of train pairs
        return. a batch of loss
        """
        loss_fn = nn.BCELoss()
        batsize = costs1.shape[0]
        encoded_plans = torch.cat((encoded_plans1, encoded_plans2), dim=0)
        attns = torch.cat((attns1, attns2), dim=0)
        cali = self.model(encoded_plans, attns) # cali.shape [# of plan, pad_length] cali 是归一化后的基数估计
        cali = cali[:, 0]
        costs = torch.cat((costs1, costs2), dim=0)
        calied_cost = torch.log(costs) * cali
        try:
            sigmoid = F.sigmoid(-(calied_cost[:batsize] - calied_cost[batsize:]))
            loss = loss_fn(sigmoid, labels.float())
        except:
            print(calied_cost, sigmoid)
        with torch.no_grad():
            prediction = torch.round(sigmoid)
            accuracy = torch.sum(prediction == labels).item() / len(labels)
        return loss, accuracy

    def training_step(self, batch):
        labels, costs1, costs2, encoded_plans1, encoded_plans2, attns1, attns2, latency1, latency2 = batch
        loss, acc  = self.getBatchPairsLoss(labels, costs1, costs2, encoded_plans1, encoded_plans2, attns1, attns2)
        # loss = (torch.abs(latency1 - latency2) * loss / 90000).mean()
        self.log_dict({'t_loss': loss, 't_acc': acc}, on_epoch=True)
        return loss

    def validation_step(self, batch):
        labels, costs1, costs2, encoded_plans1, encoded_plans2, attns1, attns2, latency1, latency2 = batch
        loss, acc  = self.getBatchPairsLoss(labels, costs1, costs2, encoded_plans1, encoded_plans2, attns1, attns2)
        # loss = (torch.abs(latency1 - latency2) * loss / 90000).mean()
        self.log_dict({'v_loss': loss, 'v_acc': acc}, on_epoch=True)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.001)
        if self.optimizer_state_dict is not None:
            curr = optimizer.state_dict()['param_groups'][0]['params']
            prev = self.optimizer_state_dict['param_groups'][0]['params']
            assert curr == prev, (curr, prev)
            optimizer.load_state_dict(self.optimizer_state_dict)
            for param_group in optimizer.param_groups:
                param_group['lr'] = self.learning_rate
            assert optimizer.state_dict(
            )['param_groups'][0]['lr'] == self.learning_rate
        return optimizer

In [6]:
def load_model(prev_optimizer_state_dict=None):
    model = Transformer_model.to(DEVICE)
    model = PL_Leon(model, prev_optimizer_state_dict)
    return model

In [7]:
class LeonDataset(Dataset):
    def __init__(self, labels, costs1, costs2, encoded_plans1, encoded_plans2, attns1, attns2, latency1, latency2):
        self.labels = labels
        self.costs1 = costs1
        self.costs2 = costs2
        self.encoded_plans1 = encoded_plans1
        self.encoded_plans2 = encoded_plans2
        self.attns1 = attns1
        self.attns2 = attns2
        self.latency1 = latency1
        self.latency2 = latency2

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return (self.labels[idx],
                self.costs1[idx],
                self.costs2[idx],
                self.encoded_plans1[idx],
                self.encoded_plans2[idx],
                self.attns1[idx],
                self.attns2[idx],
                self.latency1[idx],
                self.latency2[idx])

In [8]:
def prepare_dataset(pairs):
    labels = []
    costs1 = []
    costs2 = []
    encoded_plans1 = []
    encoded_plans2 = []
    attns1 = []
    attns2 = []
    latency1 = []
    latency2 = []
    for pair in pairs:
        if pair[0][0].info['latency'] > pair[1][0].info['latency']:
            label = 0
        else:
            label = 1
        labels.append(label)
        costs1.append(pair[0][0].cost)
        costs2.append(pair[1][0].cost)
        encoded_plans1.append(pair[0][1])
        encoded_plans2.append(pair[1][1])
        attns1.append(pair[0][2])
        attns2.append(pair[1][2])
        latency1.append(pair[0][0].info['latency'])
        latency2.append(pair[1][0].info['latency'])
    labels = torch.tensor(labels)
    costs1 = torch.tensor(costs1)
    costs2 = torch.tensor(costs2)
    encoded_plans1 = torch.stack(encoded_plans1)
    encoded_plans2 = torch.stack(encoded_plans2)
    attns1 = torch.stack(attns1)
    attns2 = torch.stack(attns2)
    latency1 = torch.tensor(latency1)
    latency2 = torch.tensor(latency2)
    dataset = LeonDataset(labels, costs1, costs2, encoded_plans1, encoded_plans2, attns1, attns2, latency1, latency2)
    return dataset

In [9]:
def load_callbacks(logger):
    callbacks = []
    callbacks.append(plc.EarlyStopping(
        monitor='v_acc',
        mode='max',
        patience=5,
        min_delta=0.001
    ))
    if logger:
        callbacks.append(plc.ModelCheckpoint(
            dirpath= logger.experiment.dir,
            monitor='val_scan',
            filename='best-{epoch:02d}-{val_scan:.3f}',
            save_top_k=1,
            mode='min',
            save_last=False
        ))
    return callbacks

In [10]:
def Getpair(exp):
    pairs = []
    for eq in exp.keys():
        for j in exp[eq]:
            for k in exp[eq]:
                if (j[0].info['sql_str'] == k[0].info['sql_str']) and (j[0].hint_str() == k[0].hint_str()): # sql 和 hint 都相同
                    continue
                # if (j[0].info['latency'] == k[0].info['latency']): # latency 相同 1s之内不把他train_pair
                if max(j[0].info['latency'],k[0].info['latency']) / min(j[0].info['latency'],k[0].info['latency']) < 1.2:
                    continue
                # if j[0].info['latency'] == 90000 or k[0].info['latency'] == 90000:
                #     continue
                tem = [j, k]
                pairs.append(tem)
    return pairs

In [11]:
with open('../log/exp.pkl', 'rb') as f:
        exp = pickle.load(f)
current_directory = os.getcwd()
# 获取上一级目录
parent_directory = os.path.dirname(current_directory)
logger =  pl_loggers.WandbLogger(save_dir=parent_directory + '/logs', name="embedding mlp", project='leon3')
prev_optimizer_state_dict = None
model = load_model().to(DEVICE)
callbacks = load_callbacks(logger=None)

In [12]:
from statistics import mean
filtered_keys = [key for key in exp.keys() if len(key) > 10 and len(exp[key]) < 500 and len(exp[key]) > 457]
print(len(filtered_keys))
for eq in filtered_keys:
    print(eq, len(exp[eq]))

1
cn,k,mc,mk,t 458


In [13]:
train_pairs = Getpair(exp)
print("len(train_pairs)" ,len(train_pairs))
leon_dataset = prepare_dataset(train_pairs)
trainer = pl.Trainer(accelerator="gpu",
                        devices=[2],
                        max_epochs=40,
                        logger=logger)

len(train_pairs) 1228676


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [14]:
dataloader_train = DataLoader(leon_dataset, batch_size=512, shuffle=True, num_workers=0)
dataloader_val = DataLoader(leon_dataset, batch_size=512, shuffle=False, num_workers=0)
trainer.fit(model, dataloader_train, dataloader_val)
prev_optimizer_state_dict = trainer.optimizers[0].state_dict()

You are using a CUDA device ('NVIDIA GeForce RTX 3080') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mwyz12234[0m ([33mleon1[0m). Use [1m`wandb login --relogin`[0m to force relogin


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2,3]

  | Name  | Type      | Params
------------------------------------
0 | model | SeqFormer | 48.8 K
------------------------------------
48.8 K    Trainable params
0         Non-trainable params
48.8 K    Total params
0.195     Total estimated model params size (MB)


Sanity Checking DataLoader 0:   0%|          | 0/2 [00:00<?, ?it/s]

/data1/wyz/miniconda3/envs/leon/lib/python3.8/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:441: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=47` in the `DataLoader` to improve performance.


RuntimeError: The shape of the 3D attn_mask is torch.Size([1024, 40, 40]), but should be (3072, 40, 40).