# Load Data to Redis Cloud

In [1]:
#!/usr/bin/env python3
import typing as t
import asyncio
import numpy as np
import pickle
import redis.asyncio as redis

from redis.commands.search.field import TagField

In [2]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [3]:
from models import Paper
from search_index import SearchIndex
import config

In [4]:
def read_paper_df() -> t.List:
    with open(config.DATA_LOCATION + "/arxiv_embeddings_without_cutoff.pkl", "rb") as f:
        df = pickle.load(f)
    return df


In [5]:
df = read_paper_df()

In [6]:
df.head()

Unnamed: 0,id,title,year,authors,categories,abstract,vector
0,704.0304,The World as Evolving Information,2012,Carlos Gershenson,"cs.IT,cs.AI,math.IT,q-bio.PE",This paper discusses the benefits of describ...,"[-0.011167259886860847, -0.026415932923555374,..."
1,704.0865,An architecture-based dependability modeling f...,2006,"Ana-Elena Rugina (LAAS), Karama Kanoun (LAAS),...","cs.PF,cs.SE","For efficiency reasons, the software system ...","[0.02402251772582531, -0.003231793874874711, -..."
2,704.1267,Text Line Segmentation of Historical Documents...,2007,"Laurence Likforman-Sulem, Abderrazak Zahour, B...",cs.CV,There is a huge amount of historical documen...,"[-0.0011464570416137576, 0.04184567928314209, ..."
3,704.2092,A Note on the Inapproximability of Correlation...,2008,Jinsong Tan,"cs.LG,cs.DS",We consider inapproximability of the correla...,"[0.005469118244946003, -0.013095404952764511, ..."
4,704.3395,General-Purpose Computing on a Semantic Networ...,2010,Marko A. Rodriguez,"cs.AI,cs.PL",This article presents a model of general-pur...,"[0.037214089184999466, -0.029926029965281487, ..."


In [7]:
len(df.vector[0])

768

In [15]:
from tqdm import tqdm

In [18]:
def gather_with_concurrency(n, redis_conn, *papers):
    def load_paper(paper):
            vector = paper.pop('vector')
            paper['paper_id'] = paper.pop('id')
            # TODO - we need to be able to use other separators
            paper['categories'] = paper['categories'].replace(",", "|")
            p = Paper(**paper)
            # save model TODO -- combine these two objects eventually
            p.save()
            # save vector data
            key = "paper_vector:" + str(p.paper_id)
            redis_conn.hset(
                key,
                mapping={
                    "paper_pk": p.pk,
                    "paper_id": p.paper_id,
                    "categories": p.categories,
                    "year": p.year,
                    "vector": np.array(vector, dtype=np.float32).tobytes(),
            })
            
    for p in tqdm(papers):
        load_paper(p)

def load_all_data():
    # TODO use redis-om connection
    redis_conn = redis.from_url(config.REDIS_URL)
    search_index = SearchIndex()
    print("Loading papers into Simpa App")
    papers = read_paper_df()
    papers = papers.to_dict('records')
    gather_with_concurrency(100, redis_conn, *papers)


    print("Creating vector search index")
    categories_field = TagField("categories", separator = "|")
    year_field = TagField("year", separator = "|")
    # create a search index
    if config.INDEX_TYPE == "HNSW":
        search_index.create_hnsw(
            categories_field,
            year_field,
            redis_conn=redis_conn,
            number_of_vectors=len(papers),
            prefix="paper_vector:",
            distance_metric="IP",
        )
    else:
         search_index.create_flat(
            categories_field,
            year_field,
            redis_conn=redis_conn,
            number_of_vectors=len(papers),
            prefix="paper_vector:",
            distance_metric="IP",
        )
    print("Search index created")

In [19]:
load_all_data()

Loading papers into Simpa App


  if __name__ == "__main__":
100%|███████████████████████████████████████████████████████████████████████████████| 30186/30186 [00:03<00:00, 9479.07it/s]

Creating vector search index
Search index created





In [20]:
redis_conn = redis.from_url(config.REDIS_URL)

In [21]:
redis_conn.ping()

<coroutine object Redis.execute_command at 0x1bb741d40>

In [22]:
papers = read_paper_df()

In [23]:
papers.head()

Unnamed: 0,id,title,year,authors,categories,abstract,vector
0,704.0304,The World as Evolving Information,2012,Carlos Gershenson,"cs.IT,cs.AI,math.IT,q-bio.PE",This paper discusses the benefits of describ...,"[-0.011167259886860847, -0.026415932923555374,..."
1,704.0865,An architecture-based dependability modeling f...,2006,"Ana-Elena Rugina (LAAS), Karama Kanoun (LAAS),...","cs.PF,cs.SE","For efficiency reasons, the software system ...","[0.02402251772582531, -0.003231793874874711, -..."
2,704.1267,Text Line Segmentation of Historical Documents...,2007,"Laurence Likforman-Sulem, Abderrazak Zahour, B...",cs.CV,There is a huge amount of historical documen...,"[-0.0011464570416137576, 0.04184567928314209, ..."
3,704.2092,A Note on the Inapproximability of Correlation...,2008,Jinsong Tan,"cs.LG,cs.DS",We consider inapproximability of the correla...,"[0.005469118244946003, -0.013095404952764511, ..."
4,704.3395,General-Purpose Computing on a Semantic Networ...,2010,Marko A. Rodriguez,"cs.AI,cs.PL",This article presents a model of general-pur...,"[0.037214089184999466, -0.029926029965281487, ..."


In [4]:
redis_client = redis.from_url(config.REDIS_URL)

In [5]:
async def papers_from_results(total, results) -> t.Dict[str, t.Any]:
    # extract papers from VSS results
    return {
        'total': total,
        'papers': [
            await process_paper(p, i)
            for i, p in enumerate(results.docs)
        ]
    }

In [6]:
async def find_papers_by_text():
    # Create query
    query = search_index.vector_query(
        2012
    )

    # find the vector of the Paper listed in the request
    paper_vector_key = "paper_vector:0704.0304"
    vector = await redis_client.hget(paper_vector_key, "vector")

    # obtain results of the queries
    total, results = await asyncio.gather(
        redis_client.ft(config.INDEX_NAME).search(query, query_params={"vec_param": vector})
    )

    # Get Paper records of those results
    return await papers_from_results(total.total, results)

In [8]:
print(asyncio.get_running_loop().is_running()) 

True


In [7]:
result = asyncio.run(find_papers_by_text())

RuntimeError: asyncio.run() cannot be called from a running event loop