In [1]:
import time
import numpy as np
from concurrent.futures import ProcessPoolExecutor
import numba

In [2]:
def nearest_timestamp_indices(source: np.ndarray, target: np.ndarray) -> np.ndarray:

    indices = np.zeros(shape = (source.shape[0]), dtype = np.int32)
    
    for i,ts in enumerate(source):
        pos = np.searchsorted(target, ts)
        if pos == 0:
            indices[i] = 0
        elif pos == len(target):
            indices[i] = len(target) - 1
        else:
            left_dist = abs(target[pos - 1] - ts)
            right_dist = abs(target[pos] - ts)
            if right_dist < left_dist:
                indices[i] = pos
            else:
                indices[i] = pos - 1

    return indices

In [3]:
@numba.njit("int32[:](float64[:], float64[:])", parallel = True)
def parallel(source: np.ndarray, target: np.ndarray) -> np.ndarray:

    indices = np.zeros(shape = (source.shape[0]), dtype = np.int32)
    
    for i,ts in enumerate(source):
        pos = np.searchsorted(target, ts)
        if pos == 0:
            indices[i] = 0
        elif pos == len(target):
            indices[i] = len(target) - 1
        else:
            left_dist = abs(target[pos - 1] - ts)
            right_dist = abs(target[pos] - ts)
            if right_dist < left_dist:
                indices[i] = pos
            else:
                indices[i] = pos - 1

    return indices

In [4]:
def parallel_processing(source: np.ndarray, target: np.ndarray, num_threads: int) -> np.ndarray:

    chunk_size = (len(source) + num_threads - 1) // num_threads 
    chunks = [source[i * chunk_size: (i + 1) * chunk_size] for i in range(num_threads)]
    
    result = np.zeros(len(source), dtype=int)

    with ProcessPoolExecutor() as executor:
        # Разбиваем задачу по частям и передаем в каждый поток
        futures = [executor.submit(nearest_timestamp_indices, chunk, target) for chunk in chunks]
        
        for i, future in enumerate(futures):
            chunk_result = future.result()
            result[i * chunk_size: (i + 1) * chunk_size] = chunk_result[:len(chunk_result)]
    
    return result

In [5]:
# Генерация временных меток с небольшим случайным шумом.
def generate_noisy_timestamps(fps: int, start_time: float, end_time: float) -> np.ndarray:

    total_count = int((end_time - start_time) * fps)

    timestamps_no_noise = np.linspace(start_time, end_time, total_count)

    random_noise = np.random.randn(total_count)

    noisy_timestamps = timestamps_no_noise + random_noise

    unique_sorted_timestamps = np.sort(np.unique(noisy_timestamps))

    return unique_sorted_timestamps

In [7]:
def main_execution():
  
    start_time_cam1 = time.time() - 100
    end_time_cam1 = time.time() + 3600 * 2
    timestamps_cam1 = generate_noisy_timestamps(30, start_time_cam1, end_time_cam1)
    start_time_cam2 = time.time() + 200
    end_time_cam2 = time.time() + 3600 * 2.5
    timestamps_cam2 = generate_noisy_timestamps(60, start_time_cam2, end_time_cam2)
 
    start_time_seq = time.time()
    result_sequential = nearest_timestamp_indices(timestamps_cam1, timestamps_cam2)
    time_seq = time.time() - start_time_seq
    print(f"Время выполнения последовательного алгоритма: {time_seq:.4f} секунд")

    start_time_parallel = time.time()
    result_parallel = parallel(timestamps_cam1, timestamps_cam2)
    time_parallel = time.time() - start_time_parallel
    print(f"Время выполнения параллельного алгоритма: {time_parallel:.4f} секунд")

    print(f"Разница во времени: {time_seq - time_parallel:.4f} секунд")

if __name__ == "__main__":
    main_execution()

Время выполнения последовательного алгоритма: 0.2708 секунд
Время выполнения параллельного алгоритма: 0.0172 секунд
Разница во времени: 0.2537 секунд
