Import Library

In [3]:
import multiprocessing as mp
import numpy as np
import pandas as pd
import time

Open Data

In [4]:
df = pd.read_csv('../../train.csv')
df.head()

Unnamed: 0,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
0,id2875421,2,2016-03-14 17:24:55,2016-03-14 17:32:30,1,-73.982155,40.767937,-73.96463,40.765602,N,455
1,id2377394,1,2016-06-12 00:43:35,2016-06-12 00:54:38,1,-73.980415,40.738564,-73.999481,40.731152,N,663
2,id3858529,2,2016-01-19 11:35:24,2016-01-19 12:10:48,1,-73.979027,40.763939,-74.005333,40.710087,N,2124
3,id3504673,2,2016-04-06 19:32:31,2016-04-06 19:39:40,1,-74.01004,40.719971,-74.012268,40.706718,N,429
4,id2181028,2,2016-03-26 13:30:55,2016-03-26 13:38:10,1,-73.973053,40.793209,-73.972923,40.78252,N,435


Take 25% of the Data

In [5]:
df_sample = df.sample(frac=0.25, random_state=42)
sampled = df_sample["trip_duration"]

Filtering Using Multiprocessing

In [10]:
def filter_chunk_live(chunk, output_queue):
    filtered = chunk[chunk['trip_duration'] > 1000]
    for _, row in filtered.iterrows():
        output_queue.put(row)

Split the Data for Multiprocecssing

In [7]:
def split_dataframe(df, n_chunks):
    chunk_size = len(df) // n_chunks
    return [df[i*chunk_size:(i+1)*chunk_size] for i in range(n_chunks - 1)] + [df[(n_chunks - 1)*chunk_size:]]


Run MultiProcessing

In [11]:
def live_parallel_filter(df_sampled, num_processes=4):
    chunks = split_dataframe(df_sampled, num_processes)
    output_queue = mp.Queue()

    processes = []
    for chunk in chunks:
        p = mp.Process(target=filter_chunk_live, args=(chunk, output_queue))
        p.start()
        processes.append(p)

    # Print data live while collecting it
    live_filtered = []
    active_processes = len(processes)
    while active_processes > 0:
        try:
            row = output_queue.get(timeout=0.5)
            print(row.to_dict())  # 👈 LIVE OUTPUT
            live_filtered.append(row)
        except:
            # Check if all processes are done
            active_processes = sum(p.is_alive() for p in processes)

    for p in processes:
        p.join()

    # Combine collected rows into a DataFrame
    return pd.DataFrame(live_filtered)

In [None]:
if __name__ == '__main__':
    start = time.time()
    filtered_df = live_parallel_filter(df_sample, num_processes=4)
    end = time.time()

    print("\n Filtering complete.")
    print(f"Result: {filtered_df.shape[0]} rows matched.")
    print(f"Took {end - start:.2f} seconds.")