In [1]:
from collection import SparseCollection, SparseCollectionCSR
from retriever import SparseRetriever
from weighting_model import BM25WeightingModel
from backend import TYPE
import json
import psutil
from text2vec import BagOfWords
import torch
from collections import defaultdict
from tqdm import tqdm
import multiprocessing  as mp

import pyterrier  as pt
import pandas as pd

import torch.nn as nn

import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"

pt.init()

indexref = pt.IndexRef.of("../syn-question-col-analysis/datasets/msmarco/terrier_index/")
index = pt.IndexFactory.of(indexref)

def tp_func():
    stops = pt.autoclass("org.terrier.terms.Stopwords")(None)
    stemmer = pt.autoclass("org.terrier.terms.PorterStemmer")(None)
    def _apply_func(row):
        words = row["query"].split(" ") # this is safe following pt.rewrite.tokenise()
        words = [stemmer.stem(w) for w in words if not stops.isStopword(w) ]
        return words
    return _apply_func 

pipe = pt.rewrite.tokenise() >> pt.apply.query(tp_func())
token2id = {word.getKey():i for i,word in enumerate(index.getLexicon()) }

def tokenizer(text):
    tokens_ids = []
    for token in pipe(pd.DataFrame([{"qid":0, "query":text.lower()}]))["query"][0]:
        if token in token2id:
            token_id=token2id[token]
            if token_id is not None:
                tokens_ids.append(token_id)
    return tokens_ids

PyTerrier 0.9.2 has loaded Terrier 5.7 (built by craigm on 2022-11-10 18:30) and terrier-helper 0.0.7

No etc/terrier.properties, using terrier.default.properties for bootstrap configuration.


23:14:04.245 [main] WARN org.terrier.structures.BaseCompressingMetaIndex - Structure meta reading data file directly from disk (SLOW) - try index.meta.data-source=fileinmem in the index properties file. 1.9 GiB of memory would be required.


In [24]:
with open("../syn-question-col-analysis/question_generation/gen_output/msmarco/selected_corpus_lm_fcm_STD2_L10000_gpt-neo-1.3B_BS_5_E13931.459746599197.jsonl") as f:
    questions = [line["question"] for line in map(json.loads, f)]

sparse_collection = SparseCollectionCSR.load_from_file("csr_msmarco_bm25_12_075_terrier")

bow = BagOfWords(tokenizer, sparse_collection.shape[1])

class QuestionDataset(torch.utils.data.Dataset):
    def __init__(self, questions, bow, vocab_size):
        self.questions = questions#[:10000]
        self.bow = bow
        self.vocab_size = vocab_size

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        b = self.bow(self.questions[idx])
        indices = list(b.keys())
        values = list(b.values())
        
        return indices, values#{"indices": indices, "values": values}
    
dataset = QuestionDataset(questions, bow, sparse_collection.shape[1])

def distributed_collate_fn(data):
    max_len = max([len(x[0]) for x in data])
    indices = []
    values = []
    sizes = []
    for x in data:
        sizes.append(len(x[0]))
        indices.append(x[0]+[0]*(max_len-len(x[0])))
        values.append(x[1]+[0]*(max_len-len(x[1])))
    return torch.tensor(indices), torch.tensor(values), torch.tensor(sizes)

dl = torch.utils.data.DataLoader(dataset, batch_size=2, collate_fn=distributed_collate_fn, pin_memory=True, num_workers=0)

In [25]:
sample = next(iter(dl))



[tensor([[880134, 869173, 274027, 541942, 652667],
         [869173, 274027, 541942, 652667,      0]]),
 tensor([[1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 0.]]),
 tensor([5, 4])]

In [26]:
class CSRSparseRetrievalDistributedModel(torch.nn.Module):
    def __init__(self, sparse_collection, top_k = 10):
        super().__init__()
        #self.shape = sparse_collection.sparse_vecs, sparse_collection.shape
        self.crow = torch.nn.parameter.Parameter(sparse_collection.sparse_vecs[0], requires_grad=False)
        self.indice = torch.nn.parameter.Parameter(sparse_collection.sparse_vecs[1], requires_grad=False)
        self.values = torch.nn.parameter.Parameter(sparse_collection.sparse_vecs[2], requires_grad=False)
        self.collection_matrix = None#torch.sparse_csr_tensor(self.crow, self.indice, self.values, sparse_collection.shape)
        self.shape = sparse_collection.shape
        self.top_k = top_k
        
    def forward(self, indices, values, size):
        size = size.squeeze(0)
        values = values[0,:size]
        indices = indices[0, :size].unsqueeze(0)
        
        query = torch.sparse_coo_tensor(indices, values, (self.shape[-1],), dtype=torch.float32).to_dense()
        #print(x.shape)
        collection_matrix = torch.sparse_csr_tensor(self.crow, self.indice, self.values, self.shape)
    
        return torch.topk(collection_matrix @ query, k=self.top_k, dim=0)

In [None]:
sparse_model= CSRSparseRetrievalDistributedModel(sparse_collection, top_k=10).to("cuda")

replicas = torch.nn.parallel.replicate(sparse_model, [0,1])

In [None]:
sparse_collection = SparseCollectionCSR.load_from_file("csr_msmarco_bm25_12_075_terrier")

bow = BagOfWords(tokenizer, sparse_collection.shape[1])

# add the option if no weighitngmodel then use the collection weighting model
sparse_retriver = SparseRetriever(sparse_collection, bow, BM25WeightingModel())

with open("../syn-question-col-analysis/question_generation/gen_output/msmarco/selected_corpus_lm_fcm_STD2_L10000_gpt-neo-1.3B_BS_5_E13931.459746599197.jsonl") as f:
    questions = [line["question"] for line in map(json.loads, f)]
    


In [3]:
indices, values = sparse_retriver.retrieve(questions[:10_000])

  collection_matrix = torch.sparse_csr_tensor(self.crow, self.indice, self.values, self.shape)
100%|██████████| 10000/10000 [01:34<00:00, 106.17it/s]


In [5]:
#indices[0]
#os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"

In [20]:
converted_indices = []
#values_list = []
for i in range(indices.shape[0]):
    q_indices = [sparse_collection.metadata.index2docID[idx] for idx in indices[i].tolist()]
    converted_indices.append(q_indices)
    #values_list.append(values[i].tolist())
#converted_indices[0]

In [22]:
converted_indices[0]

[341,
 6080047,
 6080050,
 6080052,
 2787655,
 3224336,
 4383297,
 5254867,
 3005847,
 8529165,
 2684918,
 3224335,
 4719606,
 256282,
 4719605,
 5970721,
 2003533,
 760538,
 5316812,
 8491925,
 4265297,
 8491926,
 7791450,
 7103086,
 5945756,
 1680681,
 2788095,
 8491927,
 943733,
 8370577,
 4430990,
 2757777,
 7103261,
 8491930,
 2788092,
 5324897,
 8513416,
 396979,
 8370578,
 2003539,
 2274925,
 4150059,
 5616934,
 2317632,
 2003538,
 5945759,
 6747148,
 2660504,
 115228,
 8370579,
 214475,
 473866,
 8370581,
 7507826,
 7006077,
 5229958,
 3136236,
 7941668,
 6030992,
 943729,
 4554884,
 2757774,
 4559939,
 2688278,
 115233,
 6747146,
 546033,
 2554568,
 6891002,
 7941667,
 2317636,
 5607909,
 6614214,
 7137291,
 8206871,
 6326123,
 5607908,
 4798159,
 4853270,
 7791456,
 6212158,
 7618170,
 785924,
 8491929,
 3452685,
 8370585,
 6708193,
 6996099,
 2788093,
 3041330,
 7105622,
 8090523,
 290039,
 6708197,
 8023051,
 5229960,
 6747145,
 2684914,
 6708190,
 6708192,
 7137295,
 69961

In [17]:
import torch.nn as nn

class SparseRetrievalModel(nn.Module):
    def __init__(self, sparse_collection, top_k = 10):
        super().__init__()
        #self.shape = sparse_collection.sparse_vecs, sparse_collection.shape
        self.crow = torch.nn.parameter.Parameter(sparse_collection.sparse_vecs[0], requires_grad=False)
        self.indice = torch.nn.parameter.Parameter(sparse_collection.sparse_vecs[1], requires_grad=False)
        self.values = torch.nn.parameter.Parameter(sparse_collection.sparse_vecs[2], requires_grad=False)
        self.collection_matrix = None#torch.sparse_csr_tensor(self.crow, self.indice, self.values, sparse_collection.shape)
        self.shape = sparse_collection.shape
        self.top_k = top_k
        
    def forward(self, x):
        x= x.squeeze()
        print(x.shape)
        if self.collection_matrix is None:
            self.collection_matrix = torch.sparse_csr_tensor(self.crow, self.indice, self.values, self.shape)
       
        return torch.topk(self.collection_matrix @ x, k=self.top_k, dim=0)#.cpu()
        #return x


In [18]:

retrieval_model = SparseRetrievalModel(sparse_collection, 10)


In [19]:
with open("../syn-question-col-analysis/question_generation/gen_output/msmarco/selected_corpus_lm_fcm_STD2_L10000_gpt-neo-1.3B_BS_5_E13931.459746599197.jsonl") as f:
    questions = [line for line in map(json.loads, f)]

In [20]:
def text_to_dense_torch(text):
    b = bow(text)
    return torch.sparse_coo_tensor([list(b.keys())], list(b.values()), (vocab_size,), dtype=torch.float32).to_dense()

def text_to_sparse_torch(text):
    b = bow(text)
    return torch.sparse_coo_tensor([list(b.keys())], list(b.values()), (vocab_size,), dtype=torch.float32)


In [24]:
retrieval_model_gpu = retrieval_model.to("cuda")
retrieval_model_multigpu = torch.nn.DataParallel(retrieval_model_gpu)

ValueError: Expected a non cpu device, but got: cpu

In [22]:
query1 = text_to_dense_torch(questions[0]["question"])#.to("cuda:0")
print(query1.shape)
query2 = text_to_dense_torch(questions[1]["question"])
query = torch.stack([query1, query2])
query = query#.to("cuda")
print(query.shape)

torch.Size([1170682])
torch.Size([2, 1170682])


In [23]:
out = retrieval_model_multigpu(query)

torch.Size([1170682])
torch.Size([1170682])


AttributeError: Caught AttributeError in replica 0 on device 0.
Original Traceback (most recent call last):
  File "/home/tiagoalmeida/.local/lib/python3.10/site-packages/torch/nn/parallel/parallel_apply.py", line 64, in _worker
    output = module(*input, **kwargs)
  File "/home/tiagoalmeida/.local/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1501, in _call_impl
    return forward_call(*args, **kwargs)
  File "/tmp/ipykernel_277486/216961384.py", line 20, in forward
    return torch.topk(self.collection_matrix @ x, k=self.top_k, dim=0).cpu()
AttributeError: 'torch.return_types.topk' object has no attribute 'cpu'


In [16]:
out

#print(1000/(end_t-start_t), "it/s")

torch.return_types.topk(
values=tensor([73.5256, 43.7951, 43.2043, 41.2657, 40.5497, 40.2520, 38.8254, 38.5653,
        37.7857, 35.0274, 73.5256, 43.7951, 43.2043, 41.2657, 40.5497, 40.2520,
        38.8254, 38.5653, 35.0274, 34.3543], device='cuda:0'),
indices=tensor([    341, 6080044, 6080053, 6080050, 2787655, 3224317, 4383297, 5254910,
        3005850, 8529163,     341, 6080044, 6080053, 6080050, 2787655, 3224317,
        4383297, 5254910, 8529163, 2684918], device='cuda:0'))

In [15]:
# batch retrievel
batches_questions = [list(map(lambda x:x["question"], questions[i:i+64])) for i in range(64,len(questions),64)]

results = []
for bq in tqdm(batches_questions):
    results.append(batch_retrieve_topk(bq,10))


  2%|▏         | 16/748 [00:18<14:07,  1.16s/it]


KeyboardInterrupt: 

In [14]:

batches_questions

[]

In [8]:
query_gpu = text_to_dense_torch(questions[0]["question"]).to("cuda:0")
r = torch.topk(csr_matrix_gpu @ query_gpu, k=10, dim=0)

In [9]:
r

torch.return_types.topk(
values=tensor([50.9797, 30.3736, 29.9639, 28.6196, 28.1220, 27.9136, 26.9275, 26.7314,
        26.2070, 24.2917], device='cuda:0'),
indices=tensor([    341, 6080044, 6080053, 6080050, 2787655, 3224317, 4383297, 5254910,
        3005850, 8529163], device='cuda:0'))

In [6]:
sparse_collection.metadata.index2docID["6080053"]

6080050

In [26]:
bow(questions[0]["question"])

defaultdict(float,
            {880134: 1.0, 869173: 1.0, 274027: 1.0, 541942: 1.0, 652667: 1.0})

In [24]:
query_tokens = pipe(pd.DataFrame([{"qid":0, "query":questions[0]["question"].lower()}]))["query"][0]
query_tokens

['relationship', 'rarotongan', 'cook', 'island', 'maori']

In [29]:
PATH_TO_MSMARCO = "../syn-question-col-analysis/datasets/msmarco/corpus_L8841823.jsonl"

def get_title_abstract(string):
    data = json.loads(string)
    title, abstract = data["title"], data["abstract"]
    return f"{title} {abstract}"

with open(PATH_TO_MSMARCO) as f:
    for doc in map(get_title_abstract, f):
       
        break

In [31]:
bow(doc)

defaultdict(float,
            {839186: 1.0,
             266051: 1.0,
             927074: 2.0,
             684233: 1.0,
             373151: 1.0,
             523132: 1.0,
             998799: 2.0,
             651144: 1.0,
             845056: 1.0,
             533255: 1.0,
             767761: 1.0,
             257234: 1.0,
             474305: 1.0,
             523347: 1.0,
             77832: 1.0,
             140132: 1.0,
             884129: 1.0,
             367894: 1.0,
             1066913: 1.0,
             666305: 1.0,
             506868: 1.0,
             1042085: 1.0,
             530548: 1.0,
             625474: 1.0,
             754559: 1.0})

In [None]:
batch_size = 2
batched_queries = [questions[i:i + batch_size] for i in range(0, len(questions), batch_size)]

results = []
start_t = time.time()
for batch in tqdm(batched_queries):
    results.append(batch_retrieve_topk(list(map(lambda x:x["question"], batch))))
end_t = time.time()

print("took", end_t-start_t)
# 0.003 - 0.003
# 0.008 - 0.02

In [9]:
query_gpu = text_to_dense_torch(question["question"]).to("cuda:1")
end_stack = time.time()
r = torch.topk(csr_matrix_gpu @ query_gpu, k=10, dim=0).indices

In [10]:
bow()

tensor([1424203, 2678308, 3748774, 3233037, 2741266, 5938529, 5026090, 5495619,
        2542788,  838903], device='cuda:1')

<function Tensor.to_sparse_csc>

In [6]:
def text_to_dense_torch(text, dim):
    b = bow(text)
    return torch.sparse_coo_tensor([list(b.keys())], list(b.values()), (dim,), dtype=torch.float32).to_dense()


In [7]:
csr_matrix_gpu = csr_matrix_cpu.to("cuda:1")


In [8]:
PATH_TO_QUESTIONS = "../syn-question-col-analysis/datasets/msmarco/relevant_pairs.jsonl"


In [20]:

with open(PATH_TO_QUESTIONS) as f:
    questions = {line["question"] for line in map(json.loads,f)}

questions = [text_to_dense_torch(q, csr_matrix_cpu.shape[1]) for q in questions]
queries_gpu = torch.stack(questions[:20], -1).to("cuda:1")

#sort, idx = (csr_matrix_gpu @ queries_gpu).sort(descending=True, dim=0)
#result = idx[:10].cpu()
result = torch.topk(csr_matrix_gpu @ queries_gpu, k=10, dim=0).indices.T.cpu()

In [22]:
result

tensor([[8617271, 7607669, 5466810, 1379245, 1379240, 5466807, 1664523, 8617274,
          547444,  269428],
        [1929910, 3572695, 7839904, 1288938, 4842897, 2507917, 7839906, 3572702,
         5359212, 3937200],
        [2533260, 5012351, 8121380,  719552, 5291683, 7952865, 5291686,  719550,
         6528714, 7088568],
        [8049577, 1433123,  669004, 4563960, 1584254, 3410067, 2055598, 8049578,
           16845, 3552218],
        [8635981, 7267248,  527698, 3260688, 1837110, 1958102, 1958100, 7267243,
         8199361, 7367407],
        [8760867, 8760864, 3641634, 2787508, 4788864, 2157456, 8760868, 8760873,
         3620983, 3342992],
        [7778351, 2868845, 6436703, 7778348, 4164404, 4337532, 2197526, 2997653,
         7670593, 2160853],
        [7447941, 8433858, 6654655, 8433854, 7896211, 2747492, 2365660, 5638740,
         4704978, 2702419],
        [8160520, 3838645,  554521, 4511137,  398442, 4575877, 5218014, 8160527,
         1901881, 8478604],
        [ 536176, 3

In [13]:
del queries_gpu
del result

In [13]:
import time

time_list = []
for _ in range(20):
    
    with open(PATH_TO_QUESTIONS) as f:
        questions = {line["question"] for line in map(json.loads,f)}

    questions = [text_to_dense_torch(q, csr_matrix_cpu.shape[1]) for q in questions]
    queries_gpu = torch.stack(questions, -1).to("cuda:1")

    start_t = time.time()
    #sort, idx = (csr_matrix_gpu @ queries_gpu).sort(descending=True, dim=0)
    #result = idx[:10].cpu()
    result = torch.topk(csr_matrix_gpu @ queries_gpu, k=100000, dim=0).indices.cpu()
    time_list.append(time.time()-start_t)
    
    for q in questions:
        del q
    del questions
    del queries_gpu
    del result 
    #del sort
    #del idx

In [14]:
time_list

[0.4787881374359131,
 0.44898533821105957,
 0.4485352039337158,
 0.4484899044036865,
 0.44817113876342773,
 0.4484434127807617,
 0.4484429359436035,
 0.44898128509521484,
 0.44836854934692383,
 0.44843435287475586,
 0.4483344554901123,
 0.4485807418823242,
 0.44898390769958496,
 0.44857192039489746,
 0.44869017601013184,
 0.448455810546875,
 0.44840002059936523,
 0.4485960006713867,
 0.4486415386199951,
 0.4485054016113281]

In [46]:
csr_matrix_gpu = csr_matrix_cpu.to("cuda:1")

In [68]:
queries = [text_to_dense_torch("what is the meaning of life?", csr_matrix_cpu.shape[1]),
           text_to_dense_torch("what time it is?", csr_matrix_cpu.shape[1]),
            ]

queries = torch.stack(queries, -1)
queries_gpu = queries.to("cuda:1")
    


tensor([[4.3629, 3.6962],
        [0.5774, 0.0000],
        [0.5461, 1.5242],
        ...,
        [0.5942, 0.0000],
        [0.5746, 2.5969],
        [1.1826, 0.5707]], device='cuda:1')

In [79]:
import time
time_list = []
for _ in range(20):
    
    queries = [text_to_dense_torch("what is the meaning of life?", csr_matrix_cpu.shape[1]),
           text_to_dense_torch("what time it is?", csr_matrix_cpu.shape[1]),
           text_to_dense_torch("what time it is? 2", csr_matrix_cpu.shape[1]),
           text_to_dense_torch("wagfsdf asdg asg ?", csr_matrix_cpu.shape[1]),
           #text_to_dense_torch("wwef faes ewq ta afewf s?", csr_matrix_cpu.shape[1]),
            ]

    queries = torch.stack(queries, -1)
    queries_gpu = queries.to("cuda:1")

    start_t = time.time()
    csr_matrix_gpu @ queries_gpu
    time_list.append(time.time()-start_t)

In [80]:
sum(time_list)/len(time_list)*1000


0.08742809295654297

In [88]:

with open(PATH_TO_QUESTIONS) as f:
    questions = {line["question"] for line in map(json.loads,f)}


In [91]:
queries_gpu = queries.to("cuda:1")

OutOfMemoryError: CUDA out of memory. Tried to allocate 1.42 GiB (GPU 1; 7.79 GiB total capacity; 6.26 GiB already allocated; 1.19 GiB free; 6.49 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [94]:
sum(time_list)/len(time_list)*1000

0.1820206642150879

In [None]:

%%timeit -n 1000
retrieve_gpu_nvprims_nvfuser(query_gpu)

In [47]:
query_gpu = query.to("cuda:1")
retrieve_gpu4(csr_matrix_gpu, query_gpu)

tensor([4.3629, 0.5774, 0.5461,  ..., 0.5942, 0.5746, 1.1826], device='cuda:1')

In [48]:
%%timeit -l 1000
retrieve_gpu4(csr_matrix_gpu, query_gpu)

11.1 ms ± 78.5 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [44]:
%%timeit
csr_matrix @ query

11.1 ms ± 88 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [53]:
%%timeit
torch.topk(csr_matrix@query,k=10)

190 ms ± 4.17 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


torch.Size([30522])

tensor(crow_indices=tensor([     0,     40,     67,  ..., 465841, 465881,
                            465917]),
       col_indices=tensor([ 1012,  1025,  1996,  ...,  9949, 17502, 26110]),
       values=tensor([2., 1., 6.,  ..., 1., 1., 3.]), size=(10000, 30522),
       nnz=465917, layout=torch.sparse_csr)

In [15]:
sparseCSR_collection.transform(BM25Transform())

Converting to BM25 matrix: 100%|██████████| 10000/10000 [00:00<00:00, 27992.58it/s]


In [16]:
sparseCSR_collection.sparse_vecs

(tensor([     0,     40,     67,  ..., 465841, 465881, 465917],
        dtype=torch.int32),
 tensor([ 1012,  1025,  1996,  ...,  9949, 17502, 26110], dtype=torch.int32),
 tensor([0.0000, 2.1513, 0.0000,  ..., 6.4540, 8.6054, 9.9709]))

In [17]:
torch.sparse_csr_tensor(*sparseCSR_collection.sparse_vecs, sparseCSR_collection.shape)

tensor(crow_indices=tensor([     0,     40,     67,  ..., 465841, 465881,
                            465917]),
       col_indices=tensor([ 1012,  1025,  1996,  ...,  9949, 17502, 26110]),
       values=tensor([0.0000, 2.1513, 0.0000,  ..., 6.4540, 8.6054, 9.9709]),
       size=(10000, 30522), nnz=465917, layout=torch.sparse_csr)