In [20]:
import numpy as np

import torch
import torch.nn as nn
from torchvision import models

import pandas as pd
from itertools import product
import copy
import pickle

AttributeError: partially initialized module 'pandas' has no attribute 'core' (most likely due to a circular import)

In [17]:
def filterLabelsByFrequency(artworks, frequency):
  for column in artworks.filter(regex='labels'):
    label_freq = artworks[column].apply(
        lambda s: [x for x in s]).explode().value_counts().sort_values(ascending=False)

    # Create a list of rare labels
    rare = list(label_freq[label_freq < frequency].index)

    artworks[column] = artworks[column].apply(
        lambda s: [x for x in s if x not in rare])
    artworks[column] = artworks[column].apply(
        lambda s: s if len(s) != 0 else np.nan)

  artworks.dropna(inplace=True)
  return artworks

In [18]:
with open("artworks", "rb") as file:
    artworks = pickle.load(file)

NameError: name 'pickle' is not defined

In [None]:
artworks = filterLabelsByFrequency(artworks, 50)

object_label_freq = artworks['object_labels'].apply(
    lambda s: [x for x in s]).explode().value_counts()
style_label_freq = artworks['style_labels'].apply(
    lambda s: [x for x in s]).explode().value_counts()
material_label_freq = artworks['material_labels'].apply(
    lambda s: [x for x in s]).explode().value_counts()

label_freq = pd.concat([object_label_freq, style_label_freq,
                       material_label_freq]).sort_values(ascending=False)

In [None]:
def count_labels(label_list):
    total_labels = []
    for labels in label_list:
      labels = list(filter(None, labels))
      total_labels.extend(labels)

    return len(list(set(total_labels))), list(set(total_labels))


def get_index_of_np(numpy, val):
  index_np = np.where(numpy == val)[0]

  if index_np.size != 1:
    print('a', index_np[0], index_np.size, val)
    return None
  else:
    return index_np[0]

In [None]:
def get_adjacency_matrix(artwork_labels_list, total_labels, N_LABEL):
  adjacency_matrix = np.zeros((N_LABEL, N_LABEL))

  for artwork_labels in artwork_labels_list:
    label_combinations = list(product(artwork_labels, repeat=2))

    for label_combination in label_combinations:
      pivot_index = get_index_of_np(total_labels, label_combination[0])
      target_index = get_index_of_np(total_labels, label_combination[1])
      if (pivot_index is None) or (target_index is None):
        return None
      else:
        adjacency_matrix[pivot_index][target_index] += 1

  for row_index, row in enumerate(adjacency_matrix):
    basis = row[row_index]
    for col_index, val in enumerate(row):
      row[col_index] = val/basis

  return adjacency_matrix

In [None]:
def convert_to_graph(artworks):
  label_classes = ['object_labels', 'style_labels', 'material_labels']
  artwork_labels_list = []
  labels_list_by_class = [set() for _ in range(3)]

  for index, label_class in enumerate(label_classes):
    labels_group = artworks[label_class]

    if index == 0:
      for labels in labels_group:
        artwork_labels_list.append(copy.deepcopy(labels))
        labels_list_by_class[index] = labels_list_by_class[index].union(
            set(copy.deepcopy(labels)))
    else:
      for i, labels in enumerate(labels_group):
        artwork_labels_list[i].extend(copy.deepcopy(labels))
        labels_list_by_class[index] = labels_list_by_class[index].union(
            set(copy.deepcopy(labels)))

  N_LABEL, total_labels = count_labels(artwork_labels_list)
  adjacency_matrix = get_adjacency_matrix(
      artwork_labels_list, np.array(total_labels), N_LABEL)

  class_one_hot_encoding_matrix = np.zeros((N_LABEL, 3))

  for i in range(N_LABEL):
    label = total_labels[i]
    for j in range(3):
      class_one_hot_encoding_matrix[i][j] = np.any(
          np.array(list(labels_list_by_class[j])) == label)

  embedding = nn.Embedding(N_LABEL, embedding_dim=5)
  input = torch.tensor(range(len(total_labels)))
  word_embedding_matrix = embedding(input).detach().cpu().numpy()

  feature_matrix = np.concatenate(
      (class_one_hot_encoding_matrix, word_embedding_matrix), axis=1)

  return adjacency_matrix, feature_matrix, N_LABEL, total_labels

In [None]:
adjacency_matrix, feature_matrix, N_LABEL, total_labels = convert_to_graph(
    artworks)

In [None]:
feature_matrix_tensor = torch.Tensor(
    feature_matrix).to(torch.float32)
adjacency_matrix_tensor = torch.Tensor(
    adjacency_matrix).to(torch.float32)
degree_matrix_tensor = torch.diag(torch.sum(adjacency_matrix_tensor, dim=1))
laplacian_matrix_tensor = degree_matrix_tensor - adjacency_matrix_tensor

In [None]:
print(N_LABEL)

In [None]:
class AttentionModule(nn.Module):
  def __init__(self, in_dim, out_dim, head_num=2):
    super(AttentionModule, self).__init__()

    self.head_num = head_num
    self.attention_dim = out_dim // head_num

    self.linears = nn.ModuleList()
    self.correlations = nn.ParameterList()
    for i in range(self.head_num):
      self.linears.append(nn.Linear(in_dim, self.attention_dim))
      correlation = torch.FloatTensor(
          self.attention_dim, self.attention_dim).to(device)
      nn.init.xavier_uniform_(correlation)
      self.correlations.append(nn.Parameter(correlation))

    self.tanh = nn.Tanh()
    self.set_out_dim = nn.Linear(head_num * self.attention_dim, out_dim)

  def forward(self, x, adj):
    heads = list()
    for i in range(self.head_num):
      x_i = self.linears[i](x)
      alpha = self.attention_matrix(x_i, self.correlations[i], adj)
      x_head = torch.matmul(alpha, x_i)
      heads.append(x_head)
    out = torch.cat(heads, dim=2)
    out = self.set_out_dim(out)
    return out

  def attention_matrix(self, x_i, correlation, adj):
    x = torch.matmul(x_i, correlation)
    alpha = torch.matmul(x, torch.transpose(x_i, 1, 2))
    alpha = torch.mul(alpha, adj)
    alpha = self.tanh(alpha)
    return alpha

In [None]:
class GCNLayer(nn.Module):
	def __init__(self, in_dim, out_dim, act=None, atn=False, head_num=2, drop_rate=0):
		super(GCNLayer, self).__init__()

		self.linear = nn.Linear(in_dim, out_dim)
		nn.init.xavier_uniform_(self.linear.weight)
		self.activation = act
		self.use_attention = atn
		self.attention = AttentionModule(out_dim, out_dim, head_num)
		self.drop_rate = drop_rate
		self.dropout = nn.Dropout(self.drop_rate)

	def forward(self, x, adj):
		out = self.linear(x)
		if self.use_attention:
			out = self.attention(out, adj)
		out = torch.matmul(adj, out)
		if self.activation != None:
			out = self.activation(out)
		if self.drop_rate > 0:
			out = self.dropout(out)
		return out, adj

In [None]:
class GatedSkipConnection(nn.Module):
  def __init__(self, in_dim, out_dim):
    super(GatedSkipConnection, self).__init__()

    self.in_dim = in_dim
    self.out_dim = out_dim

    self.linear = nn.Linear(in_dim, out_dim, bias=False)
    self.linear_coef_in = nn.Linear(out_dim, out_dim)
    self.linear_coef_out = nn.Linear(out_dim, out_dim)
    self.sigmoid = nn.Sigmoid()

  def forward(self, in_x, out_x):
    if (self.in_dim != self.out_dim):
        in_x = self.linear(in_x)
    z = self.gate_coefficient(in_x, out_x)
    out = torch.mul(z, out_x) + torch.mul(1.0-z, in_x)
    return out

  def gate_coefficient(self, in_x, out_x):
    x1 = self.linear_coef_in(in_x)
    x2 = self.linear_coef_out(out_x)
    return self.sigmoid(x1+x2)

In [None]:
class GCNBlock(nn.Module):
  def __init__(self, n_layer, in_dim, hidden_dim, out_dim, sc='gsc', act=None, atn=True, head_num=2, drop_rate=0):
    super(GCNBlock, self).__init__()

    self.layers = nn.ModuleList()
    for i in range(n_layer):
        self.layers.append(GCNLayer(in_dim if i == 0 else hidden_dim,
                                    out_dim if i == n_layer-1 else hidden_dim,
                                    act,
                                    False if i == n_layer-1 else atn,
                                    1 if i == n_layer-1 else head_num,
                                    drop_rate))
    if sc == 'gsc':
        self.sc = GatedSkipConnection(in_dim, out_dim)
    elif sc == 'no':
        self.sc = None
    else:
        assert False, "Wrong sc type."

  def forward(self, x, adj):
    residual = x
    out = None
    for i, layer in enumerate(self.layers):
        out, adj = layer((x if i == 0 else out), adj)
    if self.sc != None:
        out = self.sc(residual, out)
    return out, adj

In [None]:
class InceptionModule(nn.Module):
  def __init__(self, n_layer, in_dim, hidden_dim, out_dim, sc='gsc', act=None, atn=True, head_num=2, drop_rate=0):
    super(InceptionModule, self).__init__()
    self.GCNBlocks = nn.ModuleList()
    for i in range(n_layer):
      self.GCNBlocks.append(GCNBlock(i + 1,
                                     in_dim,
                                     hidden_dim,
                                     out_dim,
                                     sc,
                                     act,
                                     atn,
                                     head_num,
                                     drop_rate))

  def forward(self, x, adj):
    outs = []
    for i, block in enumerate(self.GCNBlocks):
      out, _ = block(x, adj)
      outs.append(out)

    feature_concatenate = torch.cat(outs, 2)
    return feature_concatenate, adj

In [None]:
class ReadOut(nn.Module):
    def __init__(self, in_dim, out_dim, act=None):
        super(ReadOut, self).__init__()

        self.in_dim = in_dim
        self.out_dim = out_dim

        self.linear = nn.Linear(self.in_dim,
                                self.out_dim)
        nn.init.xavier_uniform_(self.linear.weight)
        self.activation = act

    def forward(self, x):
        out = self.linear(x)
        if self.activation != None:
            out = self.activation(out)
        out = torch.squeeze(out)
        return out

In [None]:
class GCNNet(nn.Module):
  def __init__(self, args, adj):
    super(GCNNet, self).__init__()

    self.InceptionModule = InceptionModule(args.n_layer,
                                           args.in_dim,
                                           args.hidden_dim,
                                           args.out_dim,
                                           args.sc,
                                           args.act,
                                           args.atn,
                                           args.head_num,
                                           args.drop_rate)
    self.Readout = ReadOut(args.out_dim * args.n_layer,
                           1, None)
    self.adj = adj

  def forward(self, x):
    out, _ = self.InceptionModule(x, self.adj)
    out = self.Readout(out)

    return out


In [None]:
# Define the model architecture
class Resnet50(nn.Module):
    def __init__(self, n_label):
        super(Resnet50, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        num_ftrs = self.resnet.fc.in_features
        self.resnet.fc = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(num_ftrs, n_label)
        )

    def forward(self, x):
        x = self.resnet(x)
        return x

In [None]:
class CNNGCN_2(nn.Module):
	def __init__(self, num_classes, adj, args):
		super(CNNGCN_2, self).__init__()
		self.cnn = Resnet50(num_classes)
		self.gcnnet = GCNNet(args, adj)

	def forward(self, x):
		cnn_out = self.cnn(x)
		cnn_out_2 = torch.transpose(torch.unsqueeze(cnn_out, 1), 1, 2)
		gcn_out = self.gcnnet(cnn_out_2)
		return gcn_out

In [None]:
model_path = './models/'
pytorch_model = CNNGCN_2(333)
pytorch_model.load_state_dict(torch.load(
    './models/cnngnn-epoch100_3_1_5_1_gsc_elu_true_2_0-earlystop_5.pt'))
pytorch_model.eval()
pytorch_model
dummy_input = torch.zeros(1, 3, 224, 224)
torch.onnx.export(pytorch_model, dummy_input,
                  './models/cnngnn_3_1_5_1_gsc_elu_true_2_0.onnx', verbose=True, opset_version=10)


TypeError: CNNGCN_2.__init__() missing 2 required positional arguments: 'adj' and 'args'