In [1]:
import os,sys
import json
from collections import defaultdict
from tqdm import tqdm


In [2]:
def open_jsonl(filepath):
    with open(filepath, "r", encoding="utf-8") as f:
        lines = f.readlines()
    return [json.loads(line.strip()) for line in lines]

In [5]:
dataset_path = "/data3/doyoon/sequential_recsys/datasets/"
dataset = "amazon"
category = "Books"
history = json.load(open(os.path.join(dataset_path, dataset, category,f"{category}_history.json"),'r'))
user_simmilarity = json.load(open(os.path.join(dataset_path, dataset, category,f"{category}_similar_users.json"),'r'))

In [8]:
len(history)

10297355

In [7]:
len(list(user_simmilarity.items()))

1212490

- history 개수가 5개 미만인 사용자 제외

In [5]:
max_length = 20
min_length = 5
history_preprocessed = {}
for key,value in tqdm(history.items()):
  if len(value) >= min_length:
    v = value[-max_length:]
    # if len(v) != max_length:
    #   v.extend([-(int(key)+1) for _ in range(max_length-len(v))])
    history_preprocessed.update({key:v})

100%|██████████| 10297355/10297355 [00:05<00:00, 1894985.36it/s]


In [6]:
import torch
import numpy as np
users_list = list(history_preprocessed.keys())
# values = list(history_after.values())
values = list(history_preprocessed.values())

In [7]:
import multiprocessing
import os
import psutil  # you might need to install this: pip install psutil

# Method 1: Using multiprocessing
print(f"multiprocessing.cpu_count(): {multiprocessing.cpu_count()}")

# Method 2: Using os
print(f"os.cpu_count(): {os.cpu_count()}")

multiprocessing.cpu_count(): 128
os.cpu_count(): 128


In [8]:
from collections import defaultdict
import json
import heapq
from typing import Dict, List, Set, Tuple
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm

class UserSimilarityFinder:
    def __init__(self, user_items: Dict):
        self.user_items = self.preprocess_data(user_items)
        self.inverted_index = self.create_inverted_index()
        
    @staticmethod
    def preprocess_data(user_items: Dict) -> Dict[str, Set[int]]:
        return {user_id: set(items) for user_id, items in user_items.items()}
    
    def create_inverted_index(self) -> Dict[int, Set[str]]:
        """Create an inverted index mapping items to users who have that item."""
        inverted_index = defaultdict(set)
        for user_id, items in self.user_items.items():
            for item in items:
                inverted_index[item].add(user_id)
        return inverted_index
    
    def find_similar_users(self, query_user_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
        """Find top-k similar users for a single query user."""
        query_items = self.user_items[query_user_id]
        
        # Get candidate users who share at least one item with query user
        candidates = set()
        for item in query_items:
            candidates.update(self.inverted_index[item])
        candidates.discard(query_user_id)
        
        # Calculate scores for candidates
        scores = []
        for candidate_id in candidates:
            candidate_items = self.user_items[candidate_id]
            intersection_size = len(query_items & candidate_items)
            if intersection_size > 0:
                score = intersection_size / len(candidate_items)
                scores.append((-score, candidate_id))
        
        # Get top-k results
        top_k_results = heapq.nsmallest(top_k, scores)
        return [(user_id, -score) for score, user_id in top_k_results]
    
    def process_user_batch(self, user_batch: List[str], top_k: int) -> Dict[str, List[Tuple[str, float]]]:
        """Process a batch of query users."""
        batch_results = {}
        for user_id in user_batch:
            batch_results[user_id] = self.find_similar_users(user_id, top_k)
        return batch_results
    
    def find_similar_users_parallel(self, query_users: List[str], top_k: int = 10, 
                                  batch_size: int = 1000, n_workers: int = 4) -> Dict[str, List[Tuple[str, float]]]:
        """Find similar users for multiple query users in parallel."""
        # Split query users into batches
        batches = []
        for i in tqdm(range(0, len(query_users), batch_size),desc="Get batches"):
            batches.append(query_users[i:i + batch_size])
        results = {}
        
        # Process batches in parallel
        with ProcessPoolExecutor(max_workers=n_workers) as executor:
            future_to_batch = {
                executor.submit(self.process_user_batch, batch, top_k): batch 
                for batch in batches
            }
            
            for future in tqdm(future_to_batch, desc="Processing batches"):
                batch_results = future.result()
                results.update(batch_results)
        
        return results

In [9]:
top_k = 5
batch_size = 1024
n_workers = 128

finder = UserSimilarityFinder(history_preprocessed)
results = finder.find_similar_users_parallel(
        query_users=users_list,
        top_k=top_k,
        batch_size=batch_size,
        n_workers=n_workers
    )

Get batches: 100%|██████████| 1185/1185 [00:00<00:00, 11576.19it/s]
Processing batches:  17%|█▋        | 200/1185 [19:26<1:35:44,  5.83s/it]


In [None]:
json.load(open(os.path.join(dataset_path, dataset, category,f"{category}_similarity.json"),'r'))