In [None]:
import torch
from torch import nn
from torch.nn import functional as F
from torch import optim
from torch.utils.tensorboard import SummaryWriter
writer=SummaryWriter('runs_res_large\cache')

In [None]:
dropout=0.3
network=nn.Sequential(
    nn.Flatten(),
    nn.Linear(225,64),
    nn.BatchNorm1d(64),
    nn.ReLU(),
    nn.Dropout(dropout),
    nn.Linear(64,16),
    nn.BatchNorm1d(16),
    nn.ReLU(),
    nn.Linear(16,15)).to('cuda')


In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, inchannel, outchannel, stride=1):
        super(ResidualBlock, self).__init__()
        self.left=nn.Sequential(
            nn.Conv2d(inchannel,outchannel,kernel_size=3,stride=stride,padding=1),
            nn.BatchNorm2d(outchannel),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannel,outchannel,kernel_size=3,stride=1,padding=1),
            nn.BatchNorm2d(outchannel))
        self.right=nn.Conv2d(inchannel,outchannel,kernel_size=1,stride=stride)
    def forward(self,x):
        out=self.left(x)
        out=out+self.right(x)
        return F.relu(out)

class ResNet16(nn.Module):
    def __init__(self):
        super(ResNet16, self).__init__()
        self.pre = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),)
        self.layer1 = self._make_layer(64, 64, 2)
        self.layer2 = self._make_layer(64, 128, 2, stride=2)
        self.layer3 = self._make_layer(128, 256, 2, stride=2)
        self.layer4 = self._make_layer(256, 512, 2, stride=2)
        self.fc = nn.Linear(2048, 512)
        self.fc1 = nn.Linear(512, 15)

    def _make_layer(self, inchannel, outchannel, block_num, stride=1):
        layers = []
        layers.append(ResidualBlock(inchannel, outchannel, stride))
        for i in range(1, block_num):
            layers.append(ResidualBlock(outchannel, outchannel))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.pre(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = x.view(x.size(0), -1)
        x=self.fc(x)
        x=self.fc1(x)
        return x


In [None]:
class lstm(nn.Module):
    def __init__(self, hidden_size=32, num_layers=2):
        super(lstm, self).__init__()
        self.hidden_size = hidden_size
        self.n_layers = num_layers
        self.lstm = nn.LSTM(15, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 15)
    
    def forward(self, x):
        h0 = torch.zeros(self.n_layers, x.size(0), self.hidden_size).to('cuda')
        x, _ = self.lstm(x, (h0, h0))  # Use (h0, h0) for both hx and cx
        x = self.fc(x[:, -1, :])
        return x

In [None]:
drop_out=0
class block_change(nn.Module):
    def __init__(self,in_features,out_features):
        super(block_change, self).__init__()
        self.fc1=nn.Linear(in_features, out_features)
        self.bn1=nn.BatchNorm1d(out_features)
        self.relu1=nn.ReLU()
        self.dropout1=nn.Dropout(drop_out)
        self.fc2=nn.Linear(out_features, out_features)
        self.bn2=nn.BatchNorm1d(out_features)
        self.dropout2=nn.Dropout(drop_out)
        self.shortcut=nn.Sequential(nn.Linear(in_features, out_features),nn.BatchNorm1d(out_features),nn.Dropout(drop_out))
        self.relu2=nn.ReLU()
        
    def forward(self, x):
        temp=x.clone()
        x=self.fc1(x)
        x=self.bn1(x)
        x=self.relu1(x)
        x=self.dropout1(x)
        x=self.fc2(x)
        x=self.bn2(x)
        x=self.dropout2(x)
        x=x+self.shortcut(temp)
        x=self.relu2(x)
        return x

Res_seq= nn.Sequential( nn.Flatten(),
                        nn.Linear(225, 256),
                        nn.BatchNorm1d(256),
                        nn.ReLU(),
                        nn.Dropout(0.2),
                      *[block_change(2**i,2**(i+1)) for i in range(8,10)],
                      *[block_change(2**i,2**(i-1)) for i in range(12,4,-1)],
                      nn.Linear(16, 15))

Res_small=nn.Sequential(nn.Flatten(),
                        nn.Linear(225, 256),
                        nn.BatchNorm1d(256),
                        nn.ReLU(),
                        nn.Dropout(0.2),
                        *[block_change(2**i,2**(i-1)) for i in range(8,4,-1)],
                        nn.Linear(16, 15))
                        
Res_large= nn.Sequential( nn.Flatten(),
                        nn.Linear(225, 256),
                        nn.BatchNorm1d(256),
                        nn.ReLU(),
                        nn.Dropout(0.2),
                      *[block_change(256,256) for i in range(8)],
                      *[block_change(2**i,2**(i-1)) for i in range(8,4,-1)],
                      nn.Linear(16, 15))

In [None]:
network=Res_large.to('cuda')

In [None]:
import torch

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Count the number of parameters
num_params = count_parameters(network)

print("Number of parameters in the model:", num_params)

In [None]:
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
data=pd.read_csv('train_one_hotted_large.csv')

In [None]:
# data=data[:200000]

In [None]:
def one_hot_inp(data:pd.DataFrame):
    list_of_in=data.values.tolist()
    list_of_out=[]
    for i in list_of_in:
        temp=[[0]*15 for _ in range(15)]
        for j in range(15):
            if i[j]!=-1:
                temp[j][int(i[j])]=1
        list_of_out.append(temp)
    return list_of_out
        

In [None]:
class CustomDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe.iloc[:,1:]
        self.feature = torch.tensor(one_hot_inp(data.iloc[:,15:]), dtype=torch.float32)
        self.label = torch.tensor(self.dataframe.iloc[:,:15].to_numpy(), dtype=torch.float32)

    def __getitem__(self, index):
        feature = self.feature[index]
        label = self.label[index]
        return feature, label

    def __len__(self):
        return len(self.label)

# class CustomDataset(Dataset):
#     def __init__(self, dataframe):
#         self.dataframe = dataframe.iloc[:,1:]
#         self.feature = torch.tensor(self.dataframe.iloc[:,1:].to_numpy(), dtype=torch.float32)
#         self.label = F.one_hot(torch.tensor(self.dataframe.iloc[:,0].to_numpy())).to(torch.float32)

#     def __getitem__(self, index):
#         feature = self.feature[index]
#         label = self.label[index]
#         return feature, label

#     def __len__(self):
#         return len(self.label)


In [None]:
pytorch_dataset = CustomDataset(data)
train_set, val_set = torch.utils.data.random_split(pytorch_dataset, [0.8, 0.2])

In [None]:
val_load = torch.utils.data.DataLoader(
    val_set
    , batch_size=len(val_set))
x_val, y_val = next(iter(val_load))
x_val = x_val.cuda()
y_val = y_val.cuda()
train_load = torch.utils.data.DataLoader(
    train_set
    , batch_size=512
)

In [None]:
inputs, labels = next(iter(train_load))

In [None]:
optimizer = optim.Adam(network.parameters(), lr=0.001, weight_decay=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer,patience=5)

@torch.no_grad()
def get_num_correct(preds, labels):
    return preds.argmax().eq(labels).sum().item()


@torch.no_grad()
def val_accuracy():
    preds = network(x_val)
    return get_num_correct(preds, y_val) / y_val.shape[0]


@torch.no_grad()
def val_loss():
    network.eval()
    preds = network(x_val)
    network.train()
    return F.cross_entropy(preds, y_val)


def train_one_epoch(epoch):
    network.train()
    total_loss = 0
    for i in train_load:
        network.train()
        optimizer.zero_grad()
        features, targets = i
        features=features.to('cuda')
        targets=targets.to('cuda')
        preds = network(features)
        loss = F.cross_entropy(preds, targets)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()*features.shape[0]
    scheduler.step(val_loss())
    writer.add_scalar('training loss', total_loss / len(train_set), epoch)
    val=float(val_loss())
    writer.add_scalar('validation loss', val, epoch)
    return f"loss: {total_loss / len(train_set)}\n val_loss: {val}\n"

In [None]:
network.train()
for i in range(500):
    m = train_one_epoch(epoch=i)
    print('epoch:', i + 1, m)

In [None]:
writer.close()

In [None]:
import datetime
strtime=datetime.datetime.now().strftime("%y-%m-%d")
torch.save({"model":network,"weight":network.state_dict(),"optimizer":optimizer.state_dict()}, f'res_large_{strtime}.pth')

In [None]:
from itertools import combinations
network.eval()
list_tar=[(1,2),(1,3),(1,4),(1,5),(1,6),(2,3),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6),(4,5),(4,6),(5,6)]
enu=enumerate(list_tar)
encode_dict={tar:idx for idx,tar in enu}
encode_dict[-1]=-1
def one_hot_list_inp(inp_list):
    temp=[[0]*15 for _ in range(15)]
    for j in range(15):
        if inp_list[j]!=-1:
            temp[j][int(inp_list[j])]=1
    return temp
def encode(inp):
    return encode_dict[inp]
def encode_list(inp_list):
    return [encode_dict[tar] for tar in inp_list]
enu=enumerate(list_tar)
decode_dict={idx:tar for idx,tar in enu}
decode_dict[-1]=-1
def decode(inp):
    return decode_dict[inp]
def decode_list(inp):
    return [decode_dict[tar] for tar in inp]
def form_tri(lst: list,inp: set):
    line_list=[x for x in lst+[inp] if x != -1]
    combs = list(combinations(line_list, 3))
    for comb in combs:
        (a,b),(c,d),(e,f)=comb
        if len(set([a,b,c,d,e,f]))==3:
            return True
    return False
def make_move(inp: list): # inptut should be encoded.
    softmax = nn.Softmax(dim=1)
    network.eval()
    inp1=one_hot_list_inp(inp)
    out=softmax(network(torch.tensor([inp1],dtype=torch.float32).to('cuda'))).tolist()[0]
    descending_indices = sorted(list(range(len(out))), key=lambda i: out[i],reverse=True)
    for idx in descending_indices:
        if (idx in inp) or form_tri(decode_list(inp[1::2]),decode(idx)):
            continue
        else:
            return decode(idx)
    for idx in descending_indices:
        if (idx in inp):
            continue
        else:
            return decode(idx)
    return -1

def make_move_no_assistance(inp: list):
    softmax = nn.Softmax(dim=1)
    network.eval()
    inp1=one_hot_list_inp(inp)
    out=softmax(network(torch.tensor([inp1],dtype=torch.float32).to('cuda'))).tolist()[0]
    descending_indices = sorted(list(range(len(out))), key=lambda i: out[i],reverse=True)
    for idx in descending_indices:
        if (idx in inp):
            continue
        else:
            return decode(idx)
    return -1

from random import choice
def find_pos(inp_list):
    for i in range(2,7):
        lst=inp_list[:i+1]
        combs = list(combinations(lst, 3))
        combs=sorted(combs)
        for comb in combs:
            (a,b),(c,d),(e,f)=comb
            if len(set([a,b,c,d,e,f]))==3:
                return i
    return 7
# 0 stands for the the player one win, 1 stands for the player two win
def determine_winner(inp_list):
    pos_1=find_pos(inp_list[::2])
    pos_2=find_pos(inp_list[1::2])
    if pos_1<=pos_2:
        return 1
    else:
        return 0

def ai_play_against_random():
    output=[-1 for _ in range(15)]
    pos=0
    pick=choice(list_tar)
    output[pos]=pick
    pos+=1
    for _ in range(7):
        output[pos]=make_move_no_assistance(encode_list(output))
        pos+=1
        pick=choice([x for x in list_tar if x not in output])
        output[pos]=pick
        pos+=1
    return determine_winner(decode_list(encode_list(output)))

def ai_play_against_naive():
    output=[-1 for _ in range(15)]
    pos=0
    pick=choice(list_tar)
    output[pos]=pick
    pos+=1
    output[pos]=make_move(encode_list(output))
    pos+=1
    pick=choice([x for x in list_tar if x not in output])
    output[pos]=pick
    pos+=1
    for i in range(6):
        output[pos]=make_move(encode_list(output))
        pos+=1
        temp_list=[x for x in list_tar if x not in output and x != -1]
        temp_list1=temp_list.copy()
        for j in range(len(temp_list)):
            if form_tri(output[::2],temp_list[j]):
                temp_list1.remove(temp_list[j])
        if temp_list1==[]:
            pick=choice([x for x in list_tar if x not in output])
        else:
            pick=choice(temp_list1)
        output[pos]=pick
        pos+=1
    return determine_winner(decode_list(encode_list(output)))

def naive_against_radom():
    output=[]
    for i in range(5):
        pick=choice(list_tar)
        output.append(pick)
    for i in range(5):
        temp_list=[x for x in list_tar if x not in output]
        temp_list1=temp_list.copy()
        for j in range(len(temp_list)):
            if form_tri(output[1::2],temp_list[j]):
                temp_list1.remove(temp_list[j])
        if temp_list1==[]:
            pick=choice([x for x in list_tar if x not in output])
        else:
            pick=choice(temp_list1)
        output.append(pick)
        while pick in output:
            pick=choice(list_tar)
        output.append(pick)
    return determine_winner(output)

def naive_against_naive():
    output=[]
    for i in range(4):
        pick=choice([x for x in list_tar if x not in output])
        output.append(pick)
    for i in range(5):
        temp_list=[x for x in list_tar if x not in output]
        temp_list1=temp_list.copy()
        for j in range(len(temp_list)):
            if form_tri(output[::2],temp_list[j]):
                temp_list1.remove(temp_list[j])
        if temp_list1==[]:
            pick=choice([x for x in list_tar if x not in output])
        else:
            pick=choice(temp_list1)
        output.append(pick)
        temp_list=[x for x in list_tar if x not in output]
        temp_list1=temp_list.copy()
        for j in range(len(temp_list)):
            if form_tri(output[1::2],temp_list[j]):
                temp_list1.remove(temp_list[j])
        if temp_list1==[]:
            pick=choice([x for x in list_tar if x not in output])
        else:
            pick=choice(temp_list1)
        output.append(pick)
    pick=choice([x for x in list_tar if x not in output])
    output.append(pick)
    return determine_winner(output)

def random_against_random():
    output=[]
    for i in range(15):
        pick=choice([x for x in list_tar if x not in output])
        output.append(pick)
    return determine_winner(output) 

In [None]:
# network=torch.load('./models/model_64_36_original_plain.pth').to('cuda')
network.eval()
n=0
for i in range(10000):
    n+=ai_play_against_naive()
print(n/10000)