In [1]:
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F
import numpy as np
import seaborn as sns
import json
from types import SimpleNamespace
import pinecone
import time
import pandas
from sqlalchemy import create_engine
from enum import Enum
import random
import itertools
from mysql import *
import mysql.connector
import datetime

In [2]:
class SystemLogging:
    def __init__(self, loggingConfig):
        self.name = None
        self.system_start = None
        self.system_stop = None
        self.model_start = None
        self.model_stop = None
        self.tokenizer = None
        self.model = None
        self.input: list
        self.output: list
        self._input_len = None
        self._output_len = None
        self.device: str
        self.pinecone_namespace = None
        self.pinecone_index = None
        self.model_dim = None
        self.pinecone_Ustart = None
        self.pinecone_Ustop = None
        self.pinecone_Qstart = None
        self.pinecone_Qstop = None
        self.pinecone_Kmin = None
        self.pinecone_Kmax = None
        self.pinecone_Kavg = None
        self._loader = loggingConfig
        self._insert_stmnts = {
            "info" : "INSERT INTO exec_info (exec_start, exec_stop, input_size, output_size, name) VALUES (%s, %s, %s, %s, %s)",
            "model" : "INSERT INTO exec_model (exec_id, tokenizer, model, mod_start, mod_stop, device, dim) VALUES ((SELECT exec_id FROM exec_info WHERE name = %s), %s, %s, %s, %s, %s, %s)",
            "pinecone" : "INSERT INTO exec_pinecone (exec_id, namespace, exec_pinecone.index, upsert_start, upsert_stop, query_start, query_stop, kmin, kmax, kavg) VALUES ((SELECT exec_id FROM exec_info WHERE name = %s), %s, %s, %s, %s, %s, %s, %s, %s, %s)",
            "input" : "INSERT INTO exec_input (exec_id, person_id) VALUES ((SELECT exec_id FROM exec_info WHERE name = %s), %s)",
            "output" : "INSERT INTO exec_output (exec_id, person_id, k_value) VALUES ((SELECT exec_id FROM exec_info WHERE name = %s), %s, %s)"
        }

    @property
    def input(self):
        return self._input
    
    @input.setter
    def input(self, value):
        self._input_len = len(value) or None
        self._input = value
    
    @property
    def output(self):
        return self._output
    
    @output.setter
    def output(self, value):
        self._output_len = len(value) or None
        self._output = value

    @property
    def device(self):
        return self._device
    
    @device.setter
    def device(self, value):
        if value == "cuda:0":
            self._device = "GPU"
        elif value == "cpu":
            self._device = "CPU"
        else:
            raise ValueError("device must be of type cuda:0 or cpu")
        
    def upload_to_db(self):
        database = mysql.connector.connect(
            host=self._loader.host,
            user=self._loader.user,
            password=self._loader.password,
            database=self._loader.database
        )
        cursor = database.cursor()
        info_data = (self.system_start, self.system_stop, self._input_len, self._output_len, self.name)
        cursor.execute(self._insert_stmnts["info"], info_data)
        model_data = (self.name, self.model_start, self.model_stop, self.tokenizer, self.model, self.device, self.model_dim)
        cursor.execute(self._insert_stmnts["model"], model_data)
        input_data = [(self.name, i) for i in self.input]
        cursor.executemany(self._insert_stmnts["input"], input_data)
        output_data = [(self.name, i[0], round(i[1], 2)) for i in self.output]
        cursor.executemany(self._insert_stmnts["output"], output_data)
        pinecone_data = (self.name, self.pinecone_namespace, self.pinecone_index, self.pinecone_Ustart, self.pinecone_Ustop, self.pinecone_Qstart, self.pinecone_Qstop, self.pinecone_Kmin, self.pinecone_Kmax, self.pinecone_Kavg)
        cursor.execute(self._insert_stmnts["pinecone"], pinecone_data)
        database.commit()
        cursor.close()

    def check_vars(self):
        newlist = [self.name,
        self.system_start,
        self.system_stop,
        self.model_start,
        self.model_stop,
        self.tokenizer,
        self.model,
        self.input,
        self.output,
        self._input_len,
        self._output_len,
        self.device,
        self.pinecone_namespace,
        self.pinecone_index,
        self.model_dim,
        self.pinecone_Ustart,
        self.pinecone_Ustop,
        self.pinecone_Qstart,
        self.pinecone_Qstop,
        self.pinecone_Kmin,
        self.pinecone_Kmax,
        self.pinecone_Kavg]
        return newlist


        
        

In [3]:
start_time = datetime.datetime.now()

In [4]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"

In [5]:
# If device was "cuda:0", then it means it's running on the GPU
print(device)

cuda:0


In [6]:
databaseConfig:SimpleNamespace
pineconeConfig:SimpleNamespace
with open("config.json", "r") as f:
    config = json.load(f, object_hook=lambda x: SimpleNamespace(**x))
    databaseConfig = config.oregonstate.data
    loggingConfig = config.oregonstate.logging
    pineconeConfig = config.pinecone


In [7]:
name = input("Choose a name for this execution:\n")
tokenizer_name = "sentence-transformers/all-MiniLM-L6-v2"
dimensions = 384
model_name = "sentence-transformers/all-MiniLM-L6-v2"
namespace = input("Specify a namespace to store the embeddings under:\n")
limit = int(input("How many database entries would you like to embed? (Maximum)\n"))
top_n = int(input("How many similar results would you like to see?\n"))
logger = SystemLogging(loggingConfig)
logger.system_start = start_time
logger.tokenizer = tokenizer_name
logger.model = model_name
logger.model_dim = dimensions
logger.input = [i for i in range(20)]
logger.device = device
logger.name = name
logger.pinecone_index = pineconeConfig.index



In [8]:
class Types:
    class Connection(Enum):
        FROM_TYPE_DB = 0
        FROM_TYPE_CSV = 1
        _READ_ID_LIST = 101
    

In [9]:
class eCairnConnector:
    def __init__(self, method: Types.Connection, **kwargs):
        self._kwargs = kwargs
        self._data = None
        if method == Types.Connection.FROM_TYPE_DB:
            if "db_config" not in self._kwargs.keys():
                raise ValueError("Expected to find value for `db_config`. No value found.")
            elif type(self._kwargs["db_config"]) != SimpleNamespace:
                raise TypeError("Expected to find type {} for `db_config`. Found type {}.".format(SimpleNamespace, type(self._kwargs["db_config"])))
            else:
                if "limit" in self._kwargs.keys():
                    self._from_db(self._kwargs["db_config"], limit=self._kwargs["limit"])
                else:
                    self._from_db(self._kwargs["db_config"])
        elif method == Types.Connection.FROM_TYPE_CSV:
            if "csv_filename" not in self._kwargs.keys():
                raise ValueError("Expected to find value for `csv_filename`. No value found.")
            elif type(self._kwargs["csv_filename"]) != str:
                raise TypeError("Expected to find type {} for `csv_filename`. Found type {}.".format(str, type(self._kwargs["csv_filename"])))
            else:
                self._from_csv(self._kwargs["csv_filename"])
        elif method == Types.Connection._READ_ID_LIST:
            if "limit" in self._kwargs.keys():
                self._query_db_by_id(self._kwargs["id_list"], limit = self._kwargs["limit"])
            else:
                self._query_db_by_id(self._kwargs["id_list"])


    def _from_db(self, databaseConfig:SimpleNamespace, limit=1000) -> None:
        uri = f'mariadb+mysqlconnector://{databaseConfig.user}:{databaseConfig.password}@{databaseConfig.host}/{databaseConfig.database}'
        engine = create_engine(uri)
        self._data = pandas.read_sql(f"SELECT person_id, IFNULL(description,'') FROM twitter_profiles ORDER BY twitter_profiles.person_id ASC limit {limit};", engine)

    def _from_csv(self, file:str) -> None:
        self._data = pandas.read_csv(file, index_col=0)

    def _query_db_by_id(self, ref_list:list, limit = 10000) -> None:
        uri = f'mariadb+mysqlconnector://{databaseConfig.user}:{databaseConfig.password}@{databaseConfig.host}/{databaseConfig.database}'
        engine = create_engine(uri)
        self._data = pandas.read_sql(f"SELECT person_id, IFNULL(description,'') FROM twitter_profiles WHERE person_id IN ({','.join(map(lambda x: str(x), ref_list))}) ORDER BY twitter_profiles.person_id ASC limit {limit};", engine)

    def get_dataframe(self) -> pandas.DataFrame:
        return self._data
    
    @classmethod
    def get_eCairn_byID(cls, db_config:SimpleNamespace, ref_list:list) -> pandas.DataFrame:
        new_instance = cls(Types.Connection._READ_ID_LIST, db_config=db_config, id_list = ref_list)
        return new_instance.get_dataframe()
        

In [10]:
# Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

In [11]:
# Loading up the model and the tokenizer
def pretrained_setup(tokenizer_name:str, model_name:str):
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
    model = AutoModel.from_pretrained(model_name).to(device)
    return tokenizer, model

In [12]:
# This function creates the embeddings.
def emb(text,model,tokenizer):
    encoded_input = tokenizer(text =text, padding=True, truncation=True, return_tensors='pt').to(device)
    with torch.no_grad():
        model_output = model(**encoded_input)
    sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask'])
    sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
    return np.array(sentence_embeddings.to('cpu'))

In [13]:
if limit != 0:  
    new_data = eCairnConnector(Types.Connection.FROM_TYPE_DB, db_config=databaseConfig, limit=limit)
else:
    new_data = eCairnConnector(Types.Connection.FROM_TYPE_DB, db_config=databaseConfig)
test_list = new_data.get_dataframe()
test_list = test_list.to_numpy()


In [14]:
print(len(test_list))

495197


In [15]:
embeddings_dataset = []

In [None]:
# we are creating for each bio an embedding + we are attaching an ID number. 
# An example: ("id-0",array(embeddings))
tokenizer, model = pretrained_setup(tokenizer_name, model_name)
logger.model_start = datetime.datetime.now()
logger.pinecone_namespace = namespace
start_time = time.time()
next_read = time.time() + 5
total = len(test_list)
for item in test_list:
    if time.time() > next_read:
        print(f"{len(embeddings_dataset)}/{total}")
        next_read = time.time() + 5
    embeddings_dataset.append( (f'vector-{item[0]}',emb(str(item[1]),model,tokenizer)[0].tolist(), {"original_id": item[0]}) )
logger.model_stop = datetime.datetime.now()

In [18]:
pinecone.init(api_key=pineconeConfig.key, environment=pineconeConfig.env)

In [19]:
# selecting the right index on pinecone
index = pinecone.Index(pineconeConfig.index)

Here we are going to upload the embeddings to pinecone

In [20]:


def chunks(iterable, batch_size=100):
    """A helper function to break an iterable into chunks of size batch_size."""
    it = iter(iterable)
    chunk = tuple(itertools.islice(it, batch_size))
    while chunk:
        yield chunk
        chunk = tuple(itertools.islice(it, batch_size))

vector_dim = 384
vector_count = len(embeddings_dataset)


# Upsert data with 100 vectors per upsert request
logger.pinecone_Ustart = datetime.datetime.now()
for ids_vectors_chunk in chunks(embeddings_dataset, batch_size=100):
    index.upsert(vectors=ids_vectors_chunk, namespace=namespace)  # Assuming `index` defined elsewhere

# with pinecone.Index('example-index', pool_threads=30) as index:
#     # Send requests in parallel
#     async_results = [
#         index.upsert(vectors=ids_vectors_chunk, async_req=True)
#         for ids_vectors_chunk in chunks(example_data_generator, batch_size=100)
#     ]
#     # Wait for and retrieve responses (this raises in case of error)
#     [async_result.get() for async_result in async_results]
logger.pinecone_Ustop = datetime.datetime.now()

Here we are processing the output from the embedding search back into human readable information. 

In [None]:
logger.pinecone_Qstart = datetime.datetime.now()
top = index.query(
    vector=[embeddings_dataset[0][1]],
    top_k = top_n+1,
    include_values=False,
    namespace=namespace,
    include_metadata=True
)
logger.pinecone_Qstop = datetime.datetime.now()
logger.output = [(int(i["metadata"]['original_id']), float(i["score"])) for i in top["matches"][1:]]
logger.pinecone_Kmin = min(map(lambda x: x[1], logger.output))
logger.pinecone_Kmax = max(map(lambda x: x[1], logger.output))
logger.pinecone_Kavg = float(np.average(list(map(lambda x: x[1], logger.output))))
logger.system_stop = datetime.datetime.now()

In [None]:
logger.upload_to_db()

In [None]:

# maplist = list(map(lambda x : (int(x['id']), x['score']), top["matches"]))
# output = np.array(maplist,  dtype=(object))
# oids, simvals = output.T
# oids = oids.tolist()


In [None]:

# mask = np.isin(element = test_list[:,0],test_elements = oids)
# output_processed = test_list[mask]
# readable = np.column_stack((output_processed, simvals))
# with open("output.txt", "w", encoding="UTF8") as f:
#     f.writelines(f'{sim}\t| {line}\n\n\n\n' for vid, line, sim in readable)
