# Common

In [11]:
import json
import numpy as np
import torch
from tqdm.notebook import tqdm

In [2]:
n_samples = 400_0000
n_items = 20_0000 # TODO: 더 늘려서 해볼것
n_users = 20_0000
n_scores = 5

embedding_size = 64

# Defind Dataset & Network

In [4]:
# Dataset

class Dataset(torch.utils.data.Dataset):
    def __init__(self, n_samples, n_users, n_items, n_scores, score_scale):
        self.n_samples = n_samples
        self.n_users = n_users
        self.n_items = n_items
        self.n_scores = n_scores

        self.X = torch.cat(
            tensors=[
                torch.randint(n_users, [n_samples, 1]),
                torch.randint(n_items, [n_samples, 1]),
            ],
            dim=-1
        )
        self.Y = torch.randint(n_scores, [n_samples]).float() * score_scale

    def __len__(self):
        return self.n_samples

    def __getitem__(self, i):
        return self.X[i, :], self.Y[i]


# Model Network

def should_build_done(func):
    __key__ = "is_built"

    from functools import wraps
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        if not self.__dict__.get(__key__, False):
            raise Exception (f"Model has not been built, or does not have attribute: {__key__!r}")
        return func(self, *args, **kwargs)
    return wrapper


class Model(torch.nn.Module):
    def __init__(self, n_users, n_items, embedding_size):
        super(Model, self).__init__()
        self.user_embedding = torch.nn.Embedding(n_users, embedding_size)
        self.item_embedding = torch.nn.Embedding(n_items, embedding_size)

        self.index2user = {k: v for (k, v) in enumerate(range(n_users))}
        self.index2item = {k: v for (k, v) in enumerate(range(n_items))}
        
        self.is_built = False

    def forward(self, user_indices, item_indices):
        user_vectors = self.user_embedding(user_indices)
        item_vectors = self.item_embedding(item_indices)
        score_pred = (user_vectors * item_vectors).sum(dim=-1)
        return score_pred

    def build(self, optimizer, criterion):
        self.optimizer = optimizer
        self.criterion = criterion

        self.is_built = True

    @should_build_done
    def _train_single_step(self, x, y_truth):
        user_indices = x[:, 0]
        item_indices = x[:, 1]
        y_pred = self.forward(user_indices, item_indices)
        loss = self.criterion(y_truth, y_pred)
        loss.backward()
        self.optimizer.step()

        return loss

    @should_build_done
    def _train_single_epoch(self, current_epoch, train_dataloader):
        losses = []

        with tqdm(train_dataloader, disable=False) as progress:
            progress.set_description("Train")

            for i, (x, y_truth) in enumerate(progress):
                loss = self._train_single_step(x, y_truth).item()
                losses.append(loss)
                progress.set_postfix(epoch=current_epoch, loss=loss)

                if (i == len(progress) - 1):
                    total_loss = np.mean(losses)
                    progress.set_postfix(epoch=current_epoch, loss=total_loss)

        return total_loss

    @should_build_done
    def do_train(self, epochs, train_dataloader):
        self.train()

        metrics = {
            "train_loss": [],
        }

        for current_epoch in range(1, epochs + 1):
            train_loss = self._train_single_epoch(current_epoch, train_dataloader)
            metrics["train_loss"].append(train_loss)

        return metrics

# Build Model & Train

In [5]:
train_dataloader = torch.utils.data.DataLoader(
    dataset = Dataset(n_samples=n_samples, n_users=n_users, n_items=n_items, n_scores=n_scores, score_scale=5),
    batch_size = 10000,
    shuffle = True,
)

model = Model(n_users=n_users, n_items=n_items, embedding_size=embedding_size)
model.build(
    optimizer=torch.optim.Adam(model.parameters()),
    criterion=torch.nn.MSELoss(),
)

In [383]:
metrics = model.do_train(20, train_dataloader)
metrics

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

  0%|          | 0/400 [00:00<?, ?it/s]

{'train_loss': [211.81550552368165,
  49.26697519302368,
  70.48534439086914,
  59.93053276062012,
  97.92029298782349,
  113.48603303909302,
  144.43399364471435,
  180.0618475532532,
  210.70432750701903,
  213.95666358947753,
  195.98183193206788,
  174.28171600341796,
  156.3987125015259,
  148.21272985458373,
  145.5608156967163,
  145.60790891647338,
  151.77027458190918,
  163.18006980895996,
  178.08630222320556,
  197.56065238952635]}

In [6]:
X, Y = next(iter(train_dataloader))
input_example = [X[:, 0], X[:, 1]]
traced_model = torch.jit.trace(model.forward, input_example)
traced_model

Model(
  original_name=Model
  (user_embedding): Embedding(original_name=Embedding)
  (item_embedding): Embedding(original_name=Embedding)
  (criterion): MSELoss(original_name=MSELoss)
)

In [7]:
torch.jit.save(traced_model, "./MF.jit.pt")

In [8]:
traced_model.item_embedding.state_dict()["weight"]

tensor([[-0.6496,  0.1343, -1.2856,  ...,  0.4467,  1.0553, -0.7142],
        [-0.1355,  0.8267,  1.5335,  ...,  0.9036, -0.7108,  0.5577],
        [ 0.9184,  0.3263,  3.1620,  ...,  0.3242,  1.1567, -0.5028],
        ...,
        [ 0.3554, -0.3192,  0.1038,  ...,  1.2831, -0.2343, -0.0731],
        [-0.5348, -0.3497,  1.0854,  ...,  1.0227,  0.3026, -0.0862],
        [ 1.1207, -0.1838, -1.3801,  ...,  0.5487,  0.5793,  1.4702]])

In [9]:
with open("./MF.index2user.json", "w") as f:
    json.dump(model.index2user, f)

with open("./MF.index2item.json", "w") as f:
    json.dump(model.index2item, f)

# Ready to Serve

In [3]:
with open("./MF.index2user.json", "r") as f:
    index2item = {int(k): v for k, v in json.load(f).items()}

traced_model = torch.jit.load("./MF.jit.pt")

In [4]:
base_age = np.random.randint(low=20, high=60, size=[n_samples], dtype=np.uint8)
min_age = base_age - 20
max_age = base_age + 20

jobtype_code = np.random.choice(range(10), size=[n_samples, 3])

# Serving into ES

In [5]:
!pip install elasticsearch

Collecting elasticsearch
  Downloading elasticsearch-8.12.0-py3-none-any.whl.metadata (5.3 kB)
Collecting elastic-transport<9,>=8 (from elasticsearch)
  Downloading elastic_transport-8.12.0-py3-none-any.whl.metadata (3.5 kB)
Downloading elasticsearch-8.12.0-py3-none-any.whl (431 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m431.9/431.9 kB[0m [31m13.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading elastic_transport-8.12.0-py3-none-any.whl (59 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.9/59.9 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: elastic-transport, elasticsearch
Successfully installed elastic-transport-8.12.0 elasticsearch-8.12.0


In [6]:
from elasticsearch import Elasticsearch, helpers

In [7]:
client = Elasticsearch(hosts='http://elasticsearch:9200')
client

<Elasticsearch(['http://elasticsearch:9200'])>

In [17]:
INDEX_NAME = "gno_properties"
N_DIMS = embedding_size

if client.indices.exists(index=INDEX_NAME).body:
    client.indices.delete(index=INDEX_NAME)
    print("index deleted!")

client.indices.create(
    index=INDEX_NAME,
    settings={
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    # https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
    mappings={
        "properties": {
            "gno": { "type": "long" },
            "min_age": { "type": "long" }, # scalar
            "max_age": { "type": "long" }, # scalar
            "jobtype_code": { "type": "long" }, # array[long]
            "vector": {
                # https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html#dense-vector-params
                "type": "dense_vector",
                "dims": N_DIMS,
                "index": True, # when you use approximate knn, keep False to significantly improve indexing speed
                "similarity": "cosine"
            }
        }
    }
)

index deleted!


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'gno_properties'})

In [18]:
def convert_to_docs(item_index):
    doc = {
        "_index": INDEX_NAME,
        "_source": {
            "gno": index2item[item_index],
            "min_age": min_age[item_index],
            "max_age": max_age[item_index],
            "jobtype_code": jobtype_code[item_index],
            "vector": traced_model.item_embedding.state_dict()["weight"][item_index].tolist(),
        }
    }
    return doc

docs = [convert_to_docs(i) for i in range(n_items)]
docs

[{'_index': 'gno_properties',
  '_source': {'gno': 0,
   'min_age': 2,
   'max_age': 42,
   'jobtype_code': array([2, 7, 8]),
   'vector': [-0.6496330499649048,
    0.13425153493881226,
    -1.2855665683746338,
    -1.9713165760040283,
    -0.9584363698959351,
    0.8717796206474304,
    -0.6018407940864563,
    0.45275917649269104,
    -0.4199022948741913,
    -1.9321035146713257,
    0.27058395743370056,
    1.7575584650039673,
    -1.332258939743042,
    0.2471911758184433,
    0.2605931758880615,
    -0.7261073589324951,
    0.22398442029953003,
    0.4155065417289734,
    1.5305161476135254,
    -0.0521688275039196,
    -1.026381254196167,
    -1.8556493520736694,
    0.0068673184141516685,
    -0.25621360540390015,
    0.6019365191459656,
    -0.7082161903381348,
    -1.5507102012634277,
    0.25794294476509094,
    -0.21113012731075287,
    -0.8231781721115112,
    -1.1819138526916504,
    1.2771800756454468,
    0.7448615431785583,
    0.9231162071228027,
    -1.184313535690307

In [19]:
res = helpers.bulk(client, docs)
res

(200000, [])

# Query ANN

In [119]:
from pprint import pprint
import pandas as pd

# params
input_gno = np.random.randint(0, n_items, size=2).tolist()
exclude_gno = []
user = {
    "age": 25,
    "jobtype_code": [3, 6],
}
topk = 10

In [120]:
# query 1
body = {
    "query": {"bool": {"filter": [{"terms": {"gno": input_gno}}]}},
    "source_includes": ["gno", "vector"]
}
resp = client.search(index=INDEX_NAME, **body)
input_docs = [obj["_source"] for obj in resp.body["hits"]["hits"]]

pd.DataFrame(input_docs)

Unnamed: 0,gno,vector
0,126536,"[0.6408310532569885, 1.5825265645980835, 0.447..."
1,183088,"[-0.06670179963111877, 0.5067703723907471, 0.3..."


In [130]:
# query 2
knn_boost = 0.7
knn_base = {
    "field": "vector",
    # "query_vector": source["vector"],
    "k": 100,
    "num_candidates": 1000,
    "filter": {
        "bool" : {
            "must_not": [
                {"terms": {"gno": input_gno + exclude_gno}},
            ],
            "filter": [
                {"range": {"min_age": {"lte": user["age"]}}},
                {"range": {"max_age": {"gte": user["age"]}}},
                # {"terms": {"jobtype_code": user["jobtype_code"]}}
            ]
        }
    },
    # "boost": 0.6,
}
knn = [
    {
        **knn_base,
        "query_vector": doc["vector"],
        "boost": knn_boost / len(input_docs)
    }
    for doc in input_docs
]
query = {
    "bool": {
        "should": [
            {"term": {"jobtype_code": {"value": v, "boost": (1 - knn_boost) / len(user["jobtype_code"])}}}
            for v in user["jobtype_code"]
        ]
    }
}
body = {
    "query": query, # 일반 검색엔진 쿼리 (추후 시멘틱 서치 사용)
    "knn": knn, # 벡터 검색 쿼리
    "size": topk,
}

resp = client.search(
    index=INDEX_NAME,
    **body,
    # source_excludes=["jobtype_code"]
    # rank={"rrf": {}}  # payed version only
)

print("input_gno:", input_gno)
print("exclude_gno:", exclude_gno)
print("user:")
pprint(user)

pd.DataFrame(map(lambda obj: {**obj["_source"], "score": obj["_score"]}, resp["hits"]["hits"]))

input_gno: [126536, 183088]
exclude_gno: []
user:
{'age': 25, 'jobtype_code': [3, 6]}


Unnamed: 0,gno,min_age,max_age,jobtype_code,vector,score
0,123727,3,43,"[3, 6, 5]","[1.1094634532928467, 0.9414927959442139, 0.037...",0.555917
1,197479,8,48,"[6, 3, 2]","[0.46767064929008484, -1.8216687440872192, 1.2...",0.552602
2,45463,1,41,"[9, 3, 6]","[0.31114599108695984, -0.7259986996650696, -0....",0.55181
3,8321,19,59,"[6, 3, 7]","[0.31750428676605225, 1.8127388954162598, 1.93...",0.550485
4,176580,22,62,"[1, 3, 6]","[1.064927101135254, 0.9974909424781799, 1.0518...",0.54875
5,157602,21,61,"[6, 3, 4]","[0.3778386414051056, -0.8142390251159668, 0.41...",0.548521
6,88299,6,46,"[5, 3, 6]","[0.2930828630924225, 0.44640788435935974, -0.4...",0.54769
7,198502,2,42,"[3, 6, 1]","[-1.446577787399292, 2.7796430587768555, 1.960...",0.546451
8,164640,21,61,"[3, 8, 6]","[1.1761162281036377, 0.6956945061683655, 0.477...",0.546302
9,110082,8,48,"[9, 3, 6]","[0.0458708219230175, 0.4312293529510498, -0.48...",0.545012


In [133]:
import requests

body = {
    "input_gno": input_gno,
    "exclude_gno": [123727, 197479],
    "user": user,
    "topk": topk + 20,
}
resp = requests.post("http://fastapi:2000/recommend/actvtbased", json=body).json()

print(body)
pd.DataFrame(resp["result"])

{'input_gno': [126536, 183088], 'exclude_gno': [123727, 197479], 'user': {'age': 25, 'jobtype_code': [3, 6]}, 'topk': 30}


Unnamed: 0,gno,min_age,max_age,jobtype_code,score
0,45463,1,41,"[9, 3, 6]",0.55181
1,8321,19,59,"[6, 3, 7]",0.550485
2,176580,22,62,"[1, 3, 6]",0.54875
3,157602,21,61,"[6, 3, 4]",0.548521
4,88299,6,46,"[5, 3, 6]",0.54769
5,198502,2,42,"[3, 6, 1]",0.546451
6,164640,21,61,"[3, 8, 6]",0.546302
7,110082,8,48,"[9, 3, 6]",0.545012
8,13296,9,49,"[3, 1, 7]",0.417564
9,150431,9,49,"[6, 7, 6]",0.4151
