In [None]:
import pandas as pd
import multiprocessing as mp
from functools import partial
import numpy as np
import json
import os
from tqdm import tqdm

# Đọc file Parquet
df = pd.read_parquet('')
df = df.head(100000)
print(f"Số dòng sau khi giới hạn: {len(df)}")

# Chia dữ liệu thành các phần để xử lý song song
def split_dataframe(df, chunks):
    return np.array_split(df, chunks)

# Hàm xử lý trên từng phần DataFrame với tqdm
def process_chunk(df_chunk, language='vi'):
    query_chunks = {}
    query_col = 'query' if language == 'vi' else 'query_en'
    
    for index, row in tqdm(df_chunk.iterrows(), total=len(df_chunk), 
                          desc=f"Xử lý {language}", leave=False):
        query = row[query_col]
        
        if query not in query_chunks:
            query_chunks[query] = []
        
        chunk = {
            'pos': row['pos'],
            'pos_en': row['pos_en']
        }
        query_chunks[query].append(chunk)
    
    return query_chunks

# Hàm gộp kết quả từ các phần xử lý
def merge_dictionaries(dict_list):
    result = {}
    print("Đang gộp kết quả từ các luồng...")
    for d in tqdm(dict_list):
        for key, value in d.items():
            if key in result:
                result[key].extend(value)
            else:
                result[key] = value
    return result

# Số lượng CPU cores để tận dụng
num_processes = min(4, mp.cpu_count())
print(f"Sử dụng {num_processes} luồng xử lý")


# Chia DataFrame thành các phần
print("Đang chia dữ liệu thành các phần...")
df_chunks = split_dataframe(df, num_processes)

# Tạo pool processes
pool = mp.Pool(processes=num_processes)

# Xử lý song song cho tiếng Việt
print("Bắt đầu xử lý dữ liệu tiếng Việt...")
process_chunk_vi = partial(process_chunk, language='vi')
results_vi = list(tqdm(pool.imap(process_chunk_vi, df_chunks), 
                      total=num_processes, desc="Tiến trình VI"))
query_chunks_vie = merge_dictionaries(results_vi)

# Xử lý song song cho tiếng Anh
print("Bắt đầu xử lý dữ liệu tiếng Anh...")
process_chunk_en = partial(process_chunk, language='en')
results_en = list(tqdm(pool.imap(process_chunk_en, df_chunks), 
                      total=num_processes, desc="Tiến trình EN"))
query_chunks_en = merge_dictionaries(results_en)

# Đóng pool
pool.close()
pool.join()

print(f"Số lượng query tiếng Việt duy nhất: {len(query_chunks_vie)}")
print(f"Số lượng query tiếng Anh duy nhất: {len(query_chunks_en)}")



In [None]:
# query_vie - pos_en
import json
import os
import multiprocessing as mp
from tqdm import tqdm

# Hàm xử lý từng nhóm query
def process_query_chunk(queries_chunk):
    result = {}
    for query, dict_chunks in queries_chunk:
        pos_set = set()
        pos_en_set = set()
        
        for item in dict_chunks:
            for chunk in item['pos']:
                pos_set.add(chunk)
            for chunk in item['pos_en']:
                pos_en_set.add(chunk)
        
        result[query] = list(pos_en_set)
        
    
    return result

# Hàm để chia list thành các phần bằng nhau
def split_list(items, num_chunks):
    """Chia list thành các phần bằng nhau"""
    avg = len(items) // num_chunks
    remainder = len(items) % num_chunks
    
    result = []
    start = 0
    
    for i in range(num_chunks):
        # Thêm một phần tử vào mỗi phần cho đến khi hết remainder
        end = start + avg + (1 if i < remainder else 0)
        result.append(items[start:end])
        start = end
    
    return result

# Hàm chính để merge positions với đa luồng
def merge_positions_parallel(data, output_file, num_processes=None):
    # Xác định số lượng CPU sẽ sử dụng
    if num_processes is None:
        num_processes = mp.cpu_count()
    
    print(f"Sử dụng {num_processes} luồng xử lý")
    
    # Chuyển dict thành list các tuples
    items = list(data.items())
    
    # Chia thành các phần bằng nhau
    chunks = split_list(items, num_processes)
    
    # Tạo pool processes
    pool = mp.Pool(processes=num_processes)
    
    # Thực hiện xử lý song song
    print("Đang xử lý dữ liệu...")
    results = list(tqdm(pool.imap(process_query_chunk, chunks), total=len(chunks)))
    
    # Đóng pool
    pool.close()
    pool.join()
    
    # Gộp các kết quả
    merged_data = {}
    print("Đang gộp kết quả từ các luồng...")
    for result in tqdm(results):
        merged_data.update(result)
    
    # Tạo thư mục đầu ra nếu chưa tồn tại
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    # Lưu kết quả vào file mới
    print(f"Đang lưu dữ liệu đã gộp vào: {output_file}")
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(merged_data, f, ensure_ascii=False, indent=2)
    
    print("Hoàn thành!")
    
    
    return merged_data

# Đường dẫn đến file

output_file = 'grouth_truth_query_vie_pos_en.json'

data=query_chunks_vie

# Gọi hàm xử lý đa luồng
merge_positions_parallel(data, output_file)

In [None]:
# query_en - pos_vie


import json
import os
import multiprocessing as mp
from tqdm import tqdm

# Hàm xử lý từng nhóm query
def process_query_chunk(queries_chunk):
    result = {}
    for query, dict_chunks in queries_chunk:
        pos_set = set()
        pos_en_set = set()
        
        for item in dict_chunks:
            for chunk in item['pos']:
                pos_set.add(chunk)
            for chunk in item['pos_en']:
                pos_en_set.add(chunk)
        
        result[query] = list(pos_set)
  
    return result

# Hàm để chia list thành các phần bằng nhau
def split_list(items, num_chunks):
    """Chia list thành các phần bằng nhau"""
    avg = len(items) // num_chunks
    remainder = len(items) % num_chunks
    
    result = []
    start = 0
    
    for i in range(num_chunks):
        # Thêm một phần tử vào mỗi phần cho đến khi hết remainder
        end = start + avg + (1 if i < remainder else 0)
        result.append(items[start:end])
        start = end
    
    return result

# Hàm chính để merge positions với đa luồng
def merge_positions_parallel(data, output_file, num_processes=None):
    # Xác định số lượng CPU sẽ sử dụng
    if num_processes is None:
        num_processes = mp.cpu_count()
    
    print(f"Sử dụng {num_processes} luồng xử lý")
    
    # Chuyển dict thành list các tuples
    items = list(data.items())
    
    # Chia thành các phần bằng nhau
    chunks = split_list(items, num_processes)
    
    # Tạo pool processes
    pool = mp.Pool(processes=num_processes)
    
    # Thực hiện xử lý song song
    print("Đang xử lý dữ liệu...")
    results = list(tqdm(pool.imap(process_query_chunk, chunks), total=len(chunks)))
    
    # Đóng pool
    pool.close()
    pool.join()
    
    # Gộp các kết quả
    merged_data = {}
    print("Đang gộp kết quả từ các luồng...")
    for result in tqdm(results):
        merged_data.update(result)
    
    # Tạo thư mục đầu ra nếu chưa tồn tại
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    # Lưu kết quả vào file mới
    print(f"Đang lưu dữ liệu đã gộp vào: {output_file}")
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(merged_data, f, ensure_ascii=False, indent=2)
    
    print("Hoàn thành!")
    
    
    return merged_data

# Đường dẫn đến file

output_file = 'grouth_truth_query_en_pos_vie.json'

data=query_chunks_en

# Gọi hàm xử lý đa luồng
merge_positions_parallel(data, output_file)

In [None]:
with open("kv_store_doc_status.json",'r') as f:
    smalL_data = json.load(f)

In [None]:
small_context = []
for  key, item in smalL_data.items():
    # print(item)
    if item['status'] == "processed":
        small_context.append(item['content'])


In [None]:
with open("/example_benchmark/small_contexts.json",'w', encoding='utf-8') as f:
    json.dump(small_context,f,indent=4, ensure_ascii=False)

In [None]:
with open("/data/grouth_truth_query_en_pos_vie.json",'r') as f:
    data = json.load(f)

In [None]:
queries = []
query_context = {}
for query, chunks in data.items():
    ls_chunks=[]
    cout = 0
    for chunk in chunks:
        if chunk in small_context:
            ls_chunks.append(chunk)
    if len(ls_chunks) > 0:
        queries.append(query)
        query_context[query] = ls_chunks
print(len(queries))