In [1]:
from pathlib import Path
import pandas as pd
import numpy as np
from tqdm.notebook import trange, tqdm
from pymilvus import MilvusClient
from pymilvus import CollectionSchema, FieldSchema, DataType


In [2]:
train_news_data = None
dev_news_data = None
aspect_vector_dir = Path('/mount/arbeitsdaten66/projekte/multiview/hardy/project/vae/outputs/mind')
MIND_dev_path = Path('/mount/arbeitsdaten66/projekte/multiview/hardy/datasets/mind_resplit/MINDlarge_dev')
MIND_train_path = Path('/mount/arbeitsdaten66/projekte/multiview/hardy/datasets/mind_resplit/MINDlarge_train')
MIND_test_path = Path('/mount/arbeitsdaten66/projekte/multiview/hardy/datasets/mind_resplit/MINDlarge_test')

In [3]:
def load_aspect_vectors(path: Path):
    """
    Load aspect vectors from a given path.
    """
    data = {}
    with open(path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            nid = int(parts[0])
            vector = [float(x) for x in parts[1:]]
            data[nid] = np.array(vector, dtype=np.float32)
    return data

In [4]:
aspect_vectors = {
    'dev': {
        'std': load_aspect_vectors(aspect_vector_dir / 'dev_mind_std_sts_aspect_vectors.txt'),
        'frame': load_aspect_vectors(aspect_vector_dir / 'dev_mind_frame_aspect_vectors.txt'),
        'cat': load_aspect_vectors(aspect_vector_dir / 'dev_mind_category_aspect_vectors.txt'),
        'political': load_aspect_vectors(aspect_vector_dir / 'dev_mind_political_aspect_vectors.txt'),
        'sentiment': load_aspect_vectors(aspect_vector_dir / 'dev_mind_sentiment_aspect_vectors.txt'),
    },
    'test': {
        'std': load_aspect_vectors(aspect_vector_dir / 'test_mind_std_sts_aspect_vectors.txt'),
        'frame': load_aspect_vectors(aspect_vector_dir / 'test_mind_frame_aspect_vectors.txt'),
        'cat': load_aspect_vectors(aspect_vector_dir / 'test_mind_category_aspect_vectors.txt'),
        'political': load_aspect_vectors(aspect_vector_dir / 'test_mind_political_aspect_vectors.txt'),
        'sentiment': load_aspect_vectors(aspect_vector_dir / 'test_mind_sentiment_aspect_vectors.txt'),
    }
}

In [5]:
aspects = ['std', 'cat', 'frame', 'political', 'sentiment']
combined_aspect_vectors = {}
for split, item in tqdm(aspect_vectors.items()):
    combined_aspect_vectors[split] = {}
    for aspect, vector in item.items():
        for nid, vec in vector.items():
            if nid not in combined_aspect_vectors[split]:
                combined_aspect_vectors[split][nid] = [None for i in range(len(aspects))]
            combined_aspect_vectors[split][nid][aspects.index(aspect)] = vec

  0%|          | 0/2 [00:00<?, ?it/s]

In [6]:
for split, item in combined_aspect_vectors.items():
    for nid, vec in item.items():
        assert len(vec) == len(aspects), f"Vector length mismatch for nid {nid} in split {split}"
        vec = np.concatenate(vec, axis=0)
        combined_aspect_vectors[split][nid] = vec

In [7]:
def append_history(df_behaviors):
    """
    MIND dataset has a bug where rows with the same user_id has similar history. This function will fix
    it by appending the user history with the clicked articles of previous impression for each user. 
    """
    df_behaviors['timestamp'] = pd.to_datetime(df_behaviors['timestamp'])
    df_behaviors['history'] = df_behaviors['history'].apply(lambda x: x.split() if type(x) == str else [])
    cal_history = {}
    for user_id, group in tqdm(df_behaviors.sort_values(by=['user_id', 'timestamp']).groupby('user_id')):
        cum_history = []
        for i, (index, row) in enumerate(group.iterrows()):
            if i != 0:
                row['history'].extend(cum_history)
                impression = [i.split('-')[0] for i in row['impressions'].split() if i.endswith('-1')]
                cum_history.extend(impression)
            else:
                cum_history = [i.split('-')[0] for i in row['impressions'].split() if i.endswith('-1')]
            cal_history[index] = row['history']
    history_series = pd.Series(cal_history)
    df_behaviors['history'] = history_series
    df_behaviors['history'] = df_behaviors['history'].apply(lambda x: ' '.join(x))
    last_user_rows = df_behaviors.sort_values(by='timestamp').groupby('user_id').tail(1)
    return last_user_rows

def load_and_fix_df(path):
    df_behaviors = pd.read_csv(path / "behaviors.tsv", header=None, sep='\t')
    df_behaviors.columns = ['impression_id', 'user_id', 'timestamp', 'history', 'impressions']
    df_behaviors['timestamp'] = pd.to_datetime(df_behaviors['timestamp'])
    df_behaviors = append_history(df_behaviors)
    return df_behaviors



In [8]:
df_train = load_and_fix_df(MIND_train_path)
df_dev = load_and_fix_df(MIND_dev_path)
df_test = load_and_fix_df(MIND_test_path)

  0%|          | 0/654870 [00:00<?, ?it/s]

  0%|          | 0/286814 [00:00<?, ?it/s]

  0%|          | 0/255990 [00:00<?, ?it/s]

In [9]:
print("Train set:")
print("Earliest timestamp:", df_train['timestamp'].min())
print("Latest timestamp:", df_train['timestamp'].max())

print("\nDev set:")
print("Earliest timestamp:", df_dev['timestamp'].min())
print("Latest timestamp:", df_dev['timestamp'].max())

print("\nTest set:")
print("Earliest timestamp:", df_test['timestamp'].min())
print("Latest timestamp:", df_test['timestamp'].max())

Train set:
Earliest timestamp: 2019-11-09 00:00:03
Latest timestamp: 2019-11-13 23:59:57

Dev set:
Earliest timestamp: 2019-11-14 00:00:00
Latest timestamp: 2019-11-14 23:59:59

Test set:
Earliest timestamp: 2019-11-15 00:00:00
Latest timestamp: 2019-11-15 23:59:43


In [10]:
def split_and_save(df, path):
    # Split df into groups by date (each group is a single day)
    df['date'] = df['timestamp'].dt.date
    train_groups_by_day = df.groupby('date')
    article_groups_by_day = {}
    for i, group in tqdm(train_groups_by_day):
        impression_rows = group['impressions'].apply(lambda x: [int(i.split('-')[0][1:]) for i in x.split()]).to_list()
        unique_articles = set()
        for impressions in impression_rows:
            unique_articles.update(set(impressions))
        article_groups_by_day[str(i)] = unique_articles
    return article_groups_by_day

In [11]:
train_article_groups = split_and_save(df_train, MIND_train_path)
test_article_groups = split_and_save(df_test, MIND_test_path)
dev_article_groups = split_and_save(df_dev, MIND_dev_path)

  0%|          | 0/5 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

In [12]:
if 'client' in locals() and client is not None:
    try:
        client.close()
    except Exception:
        pass
client = MilvusClient(str(aspect_vector_dir / "aspect_data_new.db"))
def create_collection(collection_name: str, dimension: int):
    """
    Create a collection in Milvus.
    """
    schema = CollectionSchema(
        fields=[
            FieldSchema(name='nid', dtype=DataType.INT64, is_primary=True),
            FieldSchema(name='vector', dtype=DataType.FLOAT_VECTOR, dim=dimension),
        ]
    )
    index_params = client.prepare_index_params()
    index_params.add_index(
        field_name="vector", metric_type="IP", index_type="AUTOINDEX", params={}
    )
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
        index_params=index_params
    )

In [None]:
combined_aspect_vectors['dev'][88753]

(5120,)

In [21]:
for key, vectors in tqdm(combined_aspect_vectors.items()):
    df = {}
    if key == 'train':
        df = train_article_groups
    elif key == 'dev':
        df = dev_article_groups
    elif key == 'test':
        df = test_article_groups
    for date in df.keys():
        print(f"Processing {key} with {len(vectors)} vectors on date {date}")
        collection_name = f"{key}_mind_{date.replace('-', '_')}"
        create_collection(collection_name, len(next(iter(vectors.values()))))
        keys = list(vectors.keys())
        batch_size = 1000
        for i in trange(0, len(keys), batch_size, desc=f"Inserting {collection_name}"):
            batch_keys = keys[i:i + batch_size]
            batch_vectors = {k: vectors[k] for k in batch_keys if k in df[date]}
            # Prepare data for insertion
            data = [
                {"nid": nid, "vector": vector.tolist()} for nid, vector in batch_vectors.items()
            ]
            client.insert(collection_name=collection_name, data=data)
            
            

  0%|          | 0/2 [00:00<?, ?it/s]

Processing dev with 76167 vectors on date 2019-11-14


Inserting dev_mind_2019_11_14:   0%|          | 0/77 [00:00<?, ?it/s]

Processing test with 72023 vectors on date 2019-11-15


Inserting test_mind_2019_11_15:   0%|          | 0/73 [00:00<?, ?it/s]

In [22]:
client.close()

In [32]:
import numpy as np
collection_name = "dev_mind_cat_aspect_2019_11_14"

# Check the number of entities in the collection
collection_stats = client.get_collection_stats(collection_name)
print(f"Collection '{collection_name}' size:", collection_stats["row_count"])
# Get top 3 items from 'dev_mind_cat_aspect_2019_11_14' collection in Milvus
# Assuming the collection contains a 'vector' field and you want the top 3 by L2 norm

Collection 'dev_mind_cat_aspect_2019_11_14' size: 8705


In [None]:
df_train_article_groups = pd.read_csv(MIND_train_path / 'grouped_behaviors.tsv', sep='\t', header=None, index_col=0)
df_train_article_groups.columns = ['Date', 'Articles']
df_dev_article_groups = pd.read_csv(MIND_dev_path / 'grouped_behaviors.tsv', sep='\t', header=None, index_col=0)
df_dev_article_groups.columns = ['Date', 'Articles']
df_test_article_groups = pd.read_csv(MIND_test_path / 'grouped_behaviors.tsv', sep='\t', header=None, index_col=0)
df_test_article_groups.columns = ['Date', 'Articles']

In [None]:
df_train_article_groups