#### Dataset comes from: https://huggingface.co/ashraq/financial-news-articles
Milestone 3的相关代码，使用Milvus作为向量数据库，contriever-msmarco为Embedding模型

### 创建向量数据库

In [5]:
import torch
from transformers import AutoModel, AutoTokenizer
import numpy as np
import pyarrow.parquet as pq

In [6]:
df = pq.read_table('data/train-1-of-2.parquet').to_pandas()

In [7]:
class MsmarcoModel:
    def __init__(self, model_path):
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.model = AutoModel.from_pretrained(model_path).to('cuda')
    
    def mean_pooling(self, token_embeddings, mask):
        token_embeddings = token_embeddings.masked_fill(~mask[..., None].bool(), 0.)
        sentence_embeddings = token_embeddings.sum(dim=1) / mask.sum(dim=1)[..., None]
        return sentence_embeddings
    
    def encode(self, texts):
        inputs = self.tokenizer(texts, padding=True, truncation=True, return_tensors='pt').to('cuda')
        with torch.no_grad():
            outputs = self.model(**inputs)
            return self.mean_pooling(outputs[0], inputs['attention_mask']).detach().cpu().numpy()

In [8]:
contriever_path = '/home/cdsw/models/contriever-msmarco'
model = MsmarcoModel(contriever_path)

#### Embedding向量提取

In [8]:
class VectorIndex:
    def __init__(self, df, model):
        self.ids = None
        self.vecs = None
        self.df = df
        self.model = model
        self.reset()
        # self.milvus_client = init_vector_database()

    def reset(self):
        self.ids = []
        self.vecs = np.empty([0, 768], dtype=np.float32)

    def next_line(self):
        num_rows = self.df.shape[0]
        for i in range(num_rows):
            yield (i+1)*10000, self.df['title'][i]
            lines = [line for line in self.df['text'][i].split('\n') if len(line.strip()) > 0]
            for j, line in enumerate(lines, start=1):
                yield (i+1)*10000+j, line

    def splits(self, split_size):
        line_iter = self.next_line()
        while True:
            lines = list(itertools.islice(line_iter, split_size))
            if len(lines) == 0:
                break
            ids = [line[0] for line in lines]
            docs = [line[1] for line in lines]
            yield ids, docs

    def split_embedding(self, docs):
        # print('.', end='')
        vecs = self.model.encode(docs)
        self.vecs = np.concatenate((self.vecs, vecs))
        print('.', end='')

    def checkpoint(self, dir_prefix='index'):
        id_range = '%s-%s' % (self.ids[0], self.ids[-1])
        filename = '%s/split-ids-%s.npy' % (dir_prefix, id_range)
        np.save(filename, self.ids)
        filename = '%s/split-vecs-%s.npy' % (dir_prefix, id_range)
        np.save(filename, self.vecs)
        print(' Checkpoint %s' % self.ids[-1])

    def proc_docs(self, show_progress=200, ckpt_vecs=20000):
        for ids, docs in self.splits(show_progress):
            self.ids += ids
            self.split_embedding(docs)

            if len(self.ids) > 0 and len(self.ids) % ckpt_vecs == 0:
                self.checkpoint()
                self.reset()
                break

In [10]:
import itertools
df = pq.read_table('data/train-1-of-2.parquet').to_pandas()
vec_idx = VectorIndex(df, model)
vec_idx.proc_docs()

........................................................................................................................................................................................................ Checkpoint 8990094


#### Embedding向量导入Milvus

In [9]:
from milvus import default_server
from pymilvus import MilvusClient, DataType

In [10]:
default_server.show_startup_banner = True
default_server.start()



    __  _________ _   ____  ______
   /  |/  /  _/ /| | / / / / / __/
  / /|_/ // // /_| |/ / /_/ /\ \
 /_/  /_/___/____/___/\____/___/ {Lite}

 Welcome to use Milvus!

 Version:   v2.3.5-lite
 Process:   575
 Started:   2024-07-30 06:38:07
 Config:    /home/cdsw/.milvus.io/milvus-server/2.3.5/configs/milvus.yaml
 Logs:      /home/cdsw/.milvus.io/milvus-server/2.3.5/logs

 Ctrl+C to exit ...


In [None]:
from pymilvus import connections, utility
connections.connect(host='localhost', port=default_server.listen_port)

In [7]:
def init_vector_database():
    uri = "http://localhost:%s" % default_server.listen_port
    client = MilvusClient(uri=uri)

    if 'rag4fin' not in client.list_collections():
        schema = MilvusClient.create_schema(
            auto_id=False,
            enable_dynamic_field=True,
        )

        schema.add_field(field_name="split_id", datatype=DataType.INT64, is_primary=True)
        schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)

        index_params = client.prepare_index_params()
        index_params.add_index(
            field_name="vector",
            index_type="AUTOINDEX",
            metric_type="IP"
        )
        client.create_collection(
            collection_name="rag4fin",
            schema=schema,
            index_params=index_params
        )

    return client

In [28]:
task_id = utility.do_bulk_insert(
    collection_name="rag4fin_consine",
    is_row_based=False,
    files=["/home/cdsw/index/3/id.npy", "/home/cdsw/index/3/vector.npy"]
)
utility.get_bulk_insert_state(task_id)

<Bulk insert state:
    - taskID          : 451457106256593346,
    - state           : Started,
    - row_count       : 0,
    - infos           : {'files': '/home/cdsw/index/3/id.npy,/home/cdsw/index/3/vector.npy', 'collection': 'rag4fin_consine', 'partition': '_default', 'failed_reason': ''},
    - id_ranges       : [],
    - create_ts       : 2024-07-28 13:11:13
>

In [29]:
utility.get_bulk_insert_state(task_id)

<Bulk insert state:
    - taskID          : 451457106256593346,
    - state           : Completed,
    - row_count       : 20000,
    - infos           : {'files': '/home/cdsw/index/3/id.npy,/home/cdsw/index/3/vector.npy', 'collection': 'rag4fin_consine', 'partition': '_default', 'failed_reason': '', 'progress_percent': '100', 'persist_cost': '2.04'},
    - id_ranges       : [],
    - create_ts       : 2024-07-28 13:11:13
>

In [None]:
utility.list_bulk_insert_tasks()

In [31]:
for i in range(4, 21):
    f_id = "/home/cdsw/index/%s/id.npy" % i
    f_vector = "/home/cdsw/index/%s/vector.npy" % i
    task_id = utility.do_bulk_insert(
        collection_name="rag4fin_consine",
        is_row_based=False,
        files=[f_id, f_vector]
    )
    state=utility.get_bulk_insert_state(task_id)
    while state.state_name != 'Completed':
        time.sleep(0.1)
        state=utility.get_bulk_insert_state(task_id)
    print('Split %s imported.' % i)

Split 4 imported.
Split 5 imported.
Split 6 imported.
Split 7 imported.
Split 8 imported.
Split 9 imported.
Split 10 imported.
Split 11 imported.
Split 12 imported.
Split 13 imported.
Split 14 imported.
Split 15 imported.
Split 16 imported.
Split 17 imported.
Split 18 imported.
Split 19 imported.
Split 20 imported.


In [None]:
state=utility.get_bulk_insert_state(task_id)
state.state_name

### Document Retrial from Milvus
#### Vector Search with Milvus

In [8]:
text = 'Who wins prime contract of the U.S. Navy?'
query = model.encode([text])
query.shape

(1, 768)

In [77]:
uri = "http://localhost:%s" % default_server.listen_port
client = MilvusClient(uri=uri)
candidates = client.search(
    collection_name="rag4fin_consine", # Replace with the actual name of your collection
    # Replace with your query vector
    data=query,
    limit=20, # Max. number of search results to return
    search_params={"metric_type": "COSINE", "params": {}} # Search parameters
)

In [47]:
for k in candidates[0]:
    id = k['id']
    distance = k['distance']
    if id % 10000 == 0:
        text = df['title'][id//10000-1]
    else:
        text = df['text'][id//10000-1].split('\n')[id%10000-1]
    print(id, distance, text)

16240000 0.7258501052856445 BRIEF-KeyW Wins Prime Contract To Deliver Tagging, Tracking And Locating Equipment And Support For The U.S. Navy
16240003 0.7201040387153625 * KEYW WINS PRIME CONTRACT TO DELIVER TAGGING, TRACKING AND LOCATING EQUIPMENT AND SUPPORT FOR THE U.S. NAVY 
16240001 0.6941074132919312 January 9, 2018 / 2:23 PM / Updated 8 minutes ago BRIEF-KeyW Wins Prime Contract To Deliver Tagging, Tracking And Locating Equipment And Support For The U.S. Navy Reuters Staff 1 Min Read 
21900000 0.652808427810669 BRIEF-Naval Surface Warfare Center Carderock Division Awards Leidos Prime Contract
178230000 0.611345648765564 Raytheon wins $2.3 billion U.S. defense contract -Pentagon
235260000 0.5991119742393494 Lockheed wins $459 million U.S. defence contract - Pentagon
21900001 0.5946351885795593  21 PM / in 8 minutes BRIEF-Naval Surface Warfare Center Carderock Division Awards Leidos Prime Contract Reuters Staff 1 Min Read 
17480000 0.5890912413597107 Raytheon wins $641.8 million U.

In [10]:
def combine_candidates(candidates):
    ids_out = []
    for k in candidates[0]:
        if len(ids_out) > 10:
            break
        id = k['id']
        distance = k['distance']
        if distance < 0.5:
            break
        elif id % 10000 == 0:  # The title is matched
            doc_id = id//10000-1
            ids_out.append(doc_id)
        else:
            ids_out.append((id//10000-1, id%10000-1))

    for i in range(len(ids_out)):
        id = ids_out[i]
        if isinstance(id, int):
            for j in range(len(ids_out)):
                if ids_out[j] is not None and not isinstance(ids_out[j], int) and ids_out[j][0] == id:
                    ids_out[j] = None

    return [id for id in ids_out if id is not None]

In [76]:
# t1 = candidates[0][0]
# t2 = candidates[0][1]
# t3 = candidates[0][3]

# candidates[0][0] = t2
# candidates[0][1] = t3
# candidates[0][3] = t1

candidates[0]

[{'id': 16240003, 'distance': 0.7201040387153625, 'entity': {}},
 {'id': 21900000, 'distance': 0.652808427810669, 'entity': {}},
 {'id': 16240001, 'distance': 0.6941074132919312, 'entity': {}},
 {'id': 16240000, 'distance': 0.7258501052856445, 'entity': {}},
 {'id': 178230000, 'distance': 0.611345648765564, 'entity': {}},
 {'id': 235260000, 'distance': 0.5991119742393494, 'entity': {}},
 {'id': 21900001, 'distance': 0.5946351885795593, 'entity': {}},
 {'id': 17480000, 'distance': 0.5890912413597107, 'entity': {}},
 {'id': 14810017, 'distance': 0.5676067471504211, 'entity': {}},
 {'id': 178230001, 'distance': 0.5653888583183289, 'entity': {}},
 {'id': 235260001, 'distance': 0.5580492615699768, 'entity': {}},
 {'id': 2860003, 'distance': 0.5455134510993958, 'entity': {}},
 {'id': 261270003, 'distance': 0.5428184866905212, 'entity': {}},
 {'id': 261270000, 'distance': 0.5238215923309326, 'entity': {}},
 {'id': 130090000, 'distance': 0.5206090807914734, 'entity': {}},
 {'id': 73490002, 'di

#### Rerank Search Results

In [None]:
from FlagEmbedding import FlagReranker
reranker = FlagReranker('BAAI/bge-reranker-v2-m3', use_fp16=True)

In [87]:
query_text = 'Who wins prime contract of the U.S. Navy?'

for k in candidates[0]:
    id = k['id']
    distance = k['distance']
    if id % 10000 == 0:
        text = df['title'][id//10000-1]
        passage = text + '\n' + df['text'][id//10000-1]
        score = reranker.compute_score([query_text, passage], normalize=True)
    else:
        text = df['text'][id//10000-1].split('\n')[id%10000-1]
        score = reranker.compute_score([query_text, df['text'][id//10000-1].split('\n')[id%10000-1]], normalize=True)
    print(id, distance, score, text)

16240000 0.7258501052856445 0.9817357216481429 BRIEF-KeyW Wins Prime Contract To Deliver Tagging, Tracking And Locating Equipment And Support For The U.S. Navy
16240003 0.7201040387153625 0.9863631647185701 * KEYW WINS PRIME CONTRACT TO DELIVER TAGGING, TRACKING AND LOCATING EQUIPMENT AND SUPPORT FOR THE U.S. NAVY 
16240001 0.6941074132919312 0.977585783536363 January 9, 2018 / 2:23 PM / Updated 8 minutes ago BRIEF-KeyW Wins Prime Contract To Deliver Tagging, Tracking And Locating Equipment And Support For The U.S. Navy Reuters Staff 1 Min Read 
21900000 0.652808427810669 0.03796369520926448 BRIEF-Naval Surface Warfare Center Carderock Division Awards Leidos Prime Contract
178230000 0.611345648765564 0.002243087851289173 Raytheon wins $2.3 billion U.S. defense contract -Pentagon
235260000 0.5991119742393494 0.011158031248084035 Lockheed wins $459 million U.S. defence contract - Pentagon
21900001 0.5946351885795593 0.02088421589400643  21 PM / in 8 minutes BRIEF-Naval Surface Warfare Ce

In [88]:
print(df['title'][2189])
print()
print(df['text'][2189])

BRIEF-Naval Surface Warfare Center Carderock Division Awards Leidos Prime Contract

 21 PM / in 8 minutes BRIEF-Naval Surface Warfare Center Carderock Division Awards Leidos Prime Contract Reuters Staff 1 Min Read 
Jan 2 (Reuters) - Leidos Holdings Inc: 
* LEIDOS - AWARDED CONTRACT BY NAVAL SURFACE WARFARE CENTER CARDEROCK DIVISION TO SUPPORT SOUTHEAST ALASKA MEASUREMENT FACILITY SIGNATURE SILENCING PROGRAM 
* LEIDOS HOLDINGS INC SAYS THE CONTRACT HAS A FIVE YEAR PERIOD OF PERFORMANCE AND A TOTAL CONTRACT VALUE OF APPROXIMATELY $42 MILLION Source text for Eikon: Further company coverage:


### The Whole Python Class

In [1]:
class Retriever:
    def __init__(self):
        model_path = os.getenv("EMB_MODEL")
        log.info('Loading embedding model %s ... ' % model_path)
        self.model = MsmarcoModel(model_path)
        self.reranker = FlagReranker('BAAI/bge-reranker-v2-m3', use_fp16=True)
        self.milvus_client = MilvusClient(uri=os.getenv("MILVUS_URI"))
        self.df = pq.read_table(os.getenv('DATA_FILE')).to_pandas()

    def combine_candidates(self, candidates):
        ids_out = []
        for k in candidates[0]:
            # if len(ids_out) > 10:
            #     break
            id = k['id']
            distance = k['distance']
            if distance < 0.5:
                break
            elif id % 10000 == 0:  # The title is matched
                doc_id = id//10000-1
                ids_out.append(doc_id)
            else:
                ids_out.append((id//10000-1, id%10000-1))
        for i in range(len(ids_out)):
            id = ids_out[i]
            if isinstance(id, int):
                for j in range(len(ids_out)):
                    if ids_out[j] is not None and not isinstance(ids_out[j], int) and ids_out[j][0] == id:
                        ids_out[j] = None
        return [id for id in ids_out if id is not None]
                
    def text_id(self, user_input, top_k=100):
        '''
        Returns a list of 2-tuples: [(id1, score1), (id2, score2), (id3, score3), ...]
        1st element of the tuple: ID.
        2nd element of the tuple: Matching score.
        '''
        query = self.model.encode([user_input])
        candidates = self.milvus_client.search(
            collection_name=os.getenv("MILVUS_INDEX_NAME"),
            data=query,
            limit=top_k, # Max. number of search results to return
            search_params={"metric_type": "COSINE", "params": {}} # Search parameters
        )
        candidates = self.combine_candidates(candidates)
        return candidates

    def rerank(self, user_input, candidates, top_k=3):
        print('Candidate 1 #:', len(candidates))
        candidates = [id for id in candidates
                      if isinstance(id, int)
                      or len(df['text'][id[0]].split('\n')[id[1]].split())>3]
        print('Candidate 2 #:', len(candidates))
        # docs = [df['title'][id] + '\n' + df['text'][id] 
        #         if isinstance(id, int) else
        #         df['text'][id[0]].split('\n')[id[1]]
        #         for id in candidates]
        candidates = list(set([id if isinstance(id, int) else id[0] for id in candidates]))
        print('Candidate 3 #:', len(candidates))
        docs = [self.df['title'][id] + '\n' + self.df['text'][id] for id in candidates]
        query_and_docs = [[user_input, doc] for doc in docs]
        scores = self.reranker.compute_score(query_and_docs, normalize=True)
        return [x for x in zip(candidates, docs, scores) if x[2]>0.4]

    def get_texts(self, user_input, top_k=3):
        # text_ids = self.text_id(user_input, top_k)
        # titles, texts = self.df['title'], self.df['text']
        # return [(id, titles[id], texts[id], score) for id, score in text_ids]
        candidates = self.text_id(user_input, top_k=top_k*80)
        return self.rerank(user_input, candidates, top_k)
    
    def get_texts_without_rerank(self, user_input, top_k=20):
        candidates = self.text_id(user_input, top_k=top_k)
        docs = [df['title'][id] + '\n' + df['text'][id] 
                if isinstance(id, int) else
                df['text'][id[0]].split('\n')[id[1]]
                for id in candidates]
        return list(zip(candidates, docs))

In [65]:
import os
import logging
from dotenv import load_dotenv
from FlagEmbedding import FlagReranker

log = logging.getLogger('mydata-chatbot-web')
load_dotenv(dotenv_path='mydata-chatbot.env')
r=Retriever()

In [33]:
print(df['title'][16405])
print(df['text'][16405].split('\n')[1])
print('='*10)
print(df['text'][16405])

Microsoft reports loss on tax charge
(Reuters) - Microsoft Corp ( MSFT.O ) reported better-than-expected quarterly revenue and profit on Wednesday, helped by robust demand for its cloud computing services and flagship Azure product, which has recorded dramatic growth over several quarters. 
January 31, 2018 / 9:12 PM / in 20 minutes Microsoft reports better-than-expected quarterly revenue, profit Reuters Staff 2 Min Read 
(Reuters) - Microsoft Corp ( MSFT.O ) reported better-than-expected quarterly revenue and profit on Wednesday, helped by robust demand for its cloud computing services and flagship Azure product, which has recorded dramatic growth over several quarters. 
Microsoft took a $13.8 billion charge in the second quarter that resulted in a loss, but excluding items it earned 96 cents per share, beating analysts’ average expectation of 86 cents. 
Revenue climbed 12 percent to $28.92 billion, beating analysts’ expectations of $28.40 billion. 
Since Chief Executive Officer Satya

In [66]:
# user_input='Who wins prime contract of the U.S. Navy?'
# user_input='What is the revenue of Microsoft?'
user_input='What is the revenue of Cloudera?'

for id, text, score in r.get_texts(user_input):
    print('='*10)
    print(id, score)
    print(text)

# r.get_texts_without_rerank(user_input, 200)

# print("===== 全文的rank值大于片段 =====")
# r.rerank(user_input, [16405, (16405, 1), (16405, 2)])

Candidate 1 #: 236
Candidate 2 #: 81
Candidate 3 #: 58
6529 0.0025114635138098534
SAP SE: Stellar New Cloud Bookings, Up 31% in Q4 at Constant Currencies - EPS Up Double Digits
4866 0.0008231385881888257
BRIEF-Robert Half International - For The Quarter Ended December 31, 2017, Net Income Was $.38 Per Share‍​
17027 0.0010322310367548194
Allergan forecasts 2018 revenue below estimates
5124 0.0012994274657913674
Juniper Networks quarterly revenue drops 10.5 percent
12166 0.0009811718812071798
IBM returns to growth, but shares drop after recent rally
4871 0.0005316132776684705
Microsemi meets 1Q profit forecasts
16904 0.002107472437884964
Amazon's cloud business acquires Sqrrl, a security start-up with NSA roots
13958 3.373722476473661e-05
Google plans to build 3 new underwater cables to expand cloud business
23565 0.0005636750731053124
Berkshire Hills Reports Operating Results; Dividend Increased; Annual Meeting Announced
10254 0.0009547317110805592
Home security company ADT Inc's IPO pr