In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install torch-geometric

Collecting torch-geometric
  Downloading torch_geometric-2.5.3-py3-none-any.whl (1.1 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.1 MB[0m [31m1.7 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.1/1.1 MB[0m [31m19.2 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m14.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torch-geometric
Successfully installed torch-geometric-2.5.3


In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
import json

class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.conv1 = GCNConv(12, 16)
        self.conv2 = GCNConv(16, 8)

    def forward(self, x, edge_index):
        x = F.relu(self.conv1(x, edge_index))
        x = self.conv2(x, edge_index)
        return x

class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.conv1 = GCNConv(8, 16)
        self.conv2 = GCNConv(16, 12)

    def forward(self, x, edge_index):
        x = F.relu(self.conv1(x, edge_index))
        x = F.sigmoid(self.conv2(x, edge_index))
        return x

class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.encoder(x, edge_index)
        x = self.decoder(x, edge_index)
        return x

# Load data
def get_data():

    data_file = "/content/drive/My Drive/data.csv"
    data = pd.read_csv(data_file)
    return data

# Prepare data for training and testing
def get_XY(data, weights=None):
    features = ['valence', 'key', 'tempo', 'acousticness', 'danceability',
                'energy', 'explicit', 'instrumentalness', 'liveness',
                'speechiness', 'loudness', 'year']
    X = data[features]
    Y = data['id'] if 'id' in data.columns else None

    # Scale the features
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    # Apply weights
    if weights is not None:
        for i, weight in enumerate(weights):
            X[:, i] *= weight

    return X, Y

from sklearn.neighbors import NearestNeighbors

def get_edge_index(X, k=5):
    # Fit nearest neighbors
    nbrs = NearestNeighbors(n_neighbors=k, algorithm='ball_tree').fit(X)
    distances, indices = nbrs.kneighbors(X)

    # Create edge index
    edge_index = []
    for i in range(indices.shape[0]):
        for j in range(1, indices.shape[1]):  # Ignore the first neighbor because it's the node itself
            edge_index.append([i, indices[i, j]])

    return torch.tensor(edge_index, dtype=torch.long).t().contiguous()

# Use it in your training function
def train_autoencoder(data, weights, epochs=100):
    model = Autoencoder()

    X, _ = get_XY(data, weights)
    X = torch.tensor(X, dtype=torch.float)
    edge_index = get_edge_index(X.numpy())  # Convert tensor to numpy array for get_edge_index
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        out = model(Data(x=X, edge_index=edge_index))
        loss = criterion(out, X)  # Compare the output to the original input
        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f'Epoch: {epoch}, Loss: {loss.item()}')

    # Save model
    torch.save(model.state_dict(), 'autoencoder_model.pth')

# Convert JSON embedding to compatible format
def convert_json_to_embedding(json_embedding):
    embedding_data = json.loads(json_embedding)
    embedding = embedding_data["embedding"]
    weights = embedding_data["weights"]
    return embedding, weights

# Example usage:
json_embedding = '''
{
    "embedding": {
        "valence": 0.9,
        "key": 7,
        "tempo": 120.0,
        "acousticness": 0.2,
        "danceability": 0.8,
        "energy": 0.7,
        "explicit": 0,
        "instrumentalness": 0.1,
        "liveness": 0.5,
        "speechiness": 0.2,
        "loudness": -10.0,
        "year": 2020
    },
    "weights": [0.8, 0.3, 0.7, 0.6, 0.9, 0.8, 0.1, 0.4, 0.5, 0.4, 0.9, 1.0]
}
'''

data = get_data()
embedding, weights = convert_json_to_embedding(json_embedding)
train_autoencoder(data, weights)

Epoch: 0, Loss: 0.7073466777801514
Epoch: 10, Loss: 0.5183368921279907
Epoch: 20, Loss: 0.382033109664917
Epoch: 30, Loss: 0.3472760021686554
Epoch: 40, Loss: 0.32771623134613037
Epoch: 50, Loss: 0.31596291065216064
Epoch: 60, Loss: 0.2997666001319885
Epoch: 70, Loss: 0.2853260338306427
Epoch: 80, Loss: 0.27942267060279846
Epoch: 90, Loss: 0.27625009417533875
