In [1]:
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
import os
import json
from typing import List, Dict
import numpy as np
from datetime import datetime
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from eventregistry import EventRegistry, GetRecentArticles, ReturnInfo, ArticleInfoFlags
from langchain_milvus.utils.sparse import BaseSparseEmbedding
from pymilvus.model.hybrid import MGTEEmbeddingFunction, BGEM3EmbeddingFunction

# Initialize Milvus connection and Event Registry
connections.connect(host='localhost', port='19530')
er = EventRegistry(apiKey="d9e60a6f-15c2-459e-9c5a-f6e02538d7a5")

# Define the embedding function
class CustomSparseEmbedding(BaseSparseEmbedding):
    def __init__(self, model_name):
        self.embedding_function = MGTEEmbeddingFunction(model_name=model_name)

    def embed_documents(self, texts: List[str]) -> List[Dict[int, float]]:
        return [self.embedding_function.encode_documents([text])['sparse_embeddings'] for text in texts]

    def embed_query(self, query: str) -> Dict[int, float]:
        result = self.embedding_function.encode_queries(query)
        sparse_vector = result['sparse']
        coo = sparse_vector.tocoo()
        sparse_dict = {int(i): float(v) for i, v in zip(coo.col, coo.data)}
        return sparse_dict

# Embedding functions
model_name = "BAAI/llm-embedder"
bge_m3_ef = BGEM3EmbeddingFunction(model_name='BAAI/bge-m3', use_fp16=True, device="cuda:0")
dense_dim = bge_m3_ef.dim["dense"]
sparse_embedding_function = CustomSparseEmbedding(model_name="Alibaba-NLP/gte-multilingual-base")

# Milvus collection setup
fields = [
    FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=512, is_primary=True),
    FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR),
    FieldSchema(name="dense_vector", dtype=DataType.FLOAT_VECTOR, dim=dense_dim),
]
schema = CollectionSchema(fields=fields, description="Collection for storing nodes vectors", enable_dynamic_field=False)
collection_name = "vector_documents"
collection = Collection(name=collection_name, schema=schema, consistency_level="Strong")
collection.load()

# Function to insert articles into Milvus
def insert_articles_into_milvus(articles):
    docs_list = []
    try:
        doc_id_counter = collection.num_entities
    except Exception as e:
        print(f"Error fetching collection entity count: {e}")
        doc_id_counter = 0

    for article in articles:
        if article["content"].strip():
            try:
                dense_vector = bge_m3_ef.encode_documents([article["content"]])["dense"][0]
                sparse_vector = sparse_embedding_function.embed_query(article["content"])
                doc = {
                    "doc_id": str(doc_id_counter),
                    "title": article["title"],
                    "text": article["content"],
                    "dense_vector": np.array(dense_vector, dtype=np.float32),
                    "sparse_vector": sparse_vector
                }
                docs_list.append(doc)
                doc_id_counter += 1
            except Exception as e:
                print(f"Error processing article for Milvus: {e}")

    if not docs_list:
        print("No valid articles to insert into Milvus.")
        return

    try:
        doc_ids, titles, texts, dense_vectors, sparse_vectors = zip(*[
            (doc["doc_id"], doc["title"], doc["text"], doc["dense_vector"], doc["sparse_vector"])
            for doc in docs_list if len(doc["text"]) <= 65535
        ])
        entities = [doc_ids, titles, texts, sparse_vectors, dense_vectors]
        collection.insert(entities)
        print(f"Inserted {len(doc_ids)} articles into Milvus from {len(articles)} fetched articles.")
    except Exception as e:
        print(f"Error inserting articles into Milvus: {e}")

# Main function to fetch and store articles
def run_cron_job():
    try:
        # Configure recent articles query
        recentQ = GetRecentArticles(er, lang="eng", returnInfo=ReturnInfo(ArticleInfoFlags(body=True)))

        # Get current timestamp for file name
        current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = f"{current_time}_news.json"

        # Fetch articles
        articleList = recentQ.getUpdates()
        print(f"{len(articleList)} English articles were added since the last call.")

        articles_data = []
        for article in articleList:
            print(f"Title: {article.get('title')}")
            print(f"URL: {article.get('url')}")
            print(f"Content: {article.get('body')[:100]}...")
            article_details = {
                "title": article.get("title"),
                "content": article.get("body"),
                "url": article.get("url")
            }
            articles_data.append(article_details)

        # Save articles to JSON
        with open(output_file, "w") as f:
            json.dump(articles_data, f, ensure_ascii=False, indent=4)
        print(f"English articles saved to {output_file}.")

        # Insert into Milvus
        insert_articles_into_milvus(articles_data)
        print(f"Articles from {output_file} inserted into Milvus.")

    except Exception as e:
        print(f"An error occurred: {e}")


# run_cron_job()


Fetching 30 files:   0%|          | 0/30 [00:00<?, ?it/s]

  colbert_state_dict = torch.load(os.path.join(model_dir, 'colbert_linear.pt'), map_location='cpu')
  sparse_state_dict = torch.load(os.path.join(model_dir, 'sparse_linear.pt'), map_location='cpu')


In [None]:
scheduler = BackgroundScheduler()
scheduler.add_job(run_cron_job, 'interval', hours = 1)

# Start the scheduler
scheduler.start()

59 English articles were added since the last call.
Title: Young Kazakh Accordionist Wins First World Trophy for Kazakhstan - The Astana Times
URL: https://astanatimes.com/2024/12/young-kazakh-accordionist-wins-first-world-trophy-for-kazakhstan/
Content: ASTANA -- Nasyr Tarlabek, a 10-year-old prodigy from Kazakhstan, has made history by winning the Con...
Title: A legacy of learning
URL: https://www.newindianexpress.com/cities/chennai/2024/Dec/07/a-legacy-of-learning
Content: CHENNAI: What started as a humble endeavour to provide quality education has now become a legacy of ...
Title: Three million people told to stay inside as UK braces for 90mph storm
URL: https://www.bristolpost.co.uk/news/uk-world-news/millions-urged-stay-indoors-uk-9774521
Content: Our community members are treated to special offers, promotions and adverts from us and our partners...
Title: Nigeria's Company Income Tax drops to N1.77trn in Q3 2024
URL: https://dailypost.ng/?p=1148087
Content: Nigeria's Company In