In [1]:
from joblib import Parallel, delayed
import joblib
import time
import math

In [2]:
def compute_square(start, end):
    result = 0
    for i in range(start, end):
        result += i ** 2
        result += math.factorial(i % 10)  
        result += int(math.sin(i) * 1000000)  
        result += int(math.sqrt(i) * 1000)
    return result

In [3]:
results = compute_square(0, 100000000)
print("Single Process Result:", results)

Single Process Result: 333333329004091102437280


In [4]:
print("core count:", joblib.cpu_count())

core count: 10


In [None]:
n_jobs = joblib.cpu_count()
chunk_size = 100000000 // n_jobs
ranges = [(i * chunk_size, (i + 1) * chunk_size if i < n_jobs - 1 else 100000000) for i in range(n_jobs)]

results = Parallel(n_jobs=-1)(delayed(compute_square)(start, end) for start, end in ranges) #Burda amacımız chunklarla beraber görev sayısını azaltmak. Parallel fonksiyonun göre sayısı arttıkça overhead artar. Azaltırsak daha iyi performans alırız. Kısaca asıl olay chunk içindeki sayıyı da düzgün bölüştürmek. Böylece her işlemci çekirdeği eşit iş yapar ve overhead azalır.
print("Parallel Process Result:", sum(results))

Parallel Process Result: 333333329004091102437280


In [6]:
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.datasets import make_classification
from joblib import Parallel, delayed
from tqdm import tqdm 

X, y = make_classification(n_samples=1000, n_features=50, n_informative=15, random_state=42)
kf = KFold(n_splits=5, shuffle=True, random_state=42)

def train_and_evaluate_fold(train_index, val_index):
    X_train, X_val = X[train_index], X[val_index]
    y_train, y_val = y[train_index], y[val_index]
    
    model = RandomForestClassifier(n_estimators=150, random_state=42, n_jobs=1) 
    model.fit(X_train, y_train)
    
    preds = model.predict(X_val)
    
    accuracy = accuracy_score(y_val, preds)
    conf_matrix = confusion_matrix(y_val, preds)
    
    return {'accuracy': accuracy, 'confusion_matrix': conf_matrix}

start = time.time()
results = Parallel(n_jobs=-1)(
    delayed(train_and_evaluate_fold)(train, val) 
    for train, val in tqdm(kf.split(X), total=kf.get_n_splits())
)
# Kısaca burda bir chunk oluşturup her fold için paralel işlem yapıyoruz. Oluşturmadan yapamıyoruz tam olarak parallel func u ancak bu şekilde çalışıyor.
end = time.time()
print(f"Time taken for parallel CV: {end - start:.2f} seconds")

for i, result in enumerate(results):
    print(f"Fold {i+1} Accuracy: {result['accuracy']:.4f}")

100%|██████████| 5/5 [00:00<00:00, 5306.56it/s]


Time taken for parallel CV: 1.36 seconds
Fold 1 Accuracy: 0.8550
Fold 2 Accuracy: 0.9100
Fold 3 Accuracy: 0.8450
Fold 4 Accuracy: 0.8800
Fold 5 Accuracy: 0.9100


In [7]:
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.datasets import make_classification
from tqdm import tqdm 

X, y = make_classification(n_samples=1000, n_features=50, n_informative=15, random_state=42)
kf = KFold(n_splits=5, shuffle=True, random_state=42)

def train_and_evaluate_fold(X, y):
    start = time.time()
    for train, val in tqdm(kf.split(X), total=kf.get_n_splits()):
        train_index, val_index = train, val
        X_train, X_val = X[train_index], X[val_index]
        y_train, y_val = y[train_index], y[val_index]
        
        model = RandomForestClassifier(n_estimators=150, random_state=42, n_jobs=1) 
        model.fit(X_train, y_train)
        
        preds = model.predict(X_val)
        
        accuracy = accuracy_score(y_val, preds)
        conf_matrix = confusion_matrix(y_val, preds)
    
    end = time.time()
    print(f"Time taken for sequential CV: {end - start:.2f} seconds")
    return {'accuracy': accuracy, 'confusion_matrix': conf_matrix}

train_and_evaluate_fold(X, y)

for i, result in enumerate(results):
    print(f"Fold {i+1} Accuracy: {result['accuracy']:.4f}")

100%|██████████| 5/5 [00:01<00:00,  2.95it/s]

Time taken for sequential CV: 1.70 seconds
Fold 1 Accuracy: 0.8550
Fold 2 Accuracy: 0.9100
Fold 3 Accuracy: 0.8450
Fold 4 Accuracy: 0.8800
Fold 5 Accuracy: 0.9100





In [8]:
from pathos.multiprocessing import Pool, cpu_count
import pandas as pd

df_mp = pd.DataFrame(np.random.randint(0, 100, size=(1000000, 4)), columns=list('ABCD'))

def complex_function(row):
    return (row['A'] * row['B']) / (row['C'] + 1) + row['D']

def _apply_df_helper(args):
    df_chunk, func = args
    return df_chunk.apply(func, axis=1)

def apply_by_multiprocessing(df, func, **kwargs):
    workers = kwargs.pop('workers')
    if workers is None:
        workers = cpu_count()
        
    df_split = np.array_split(df, workers)
    
    start = time.time()
    with Pool(processes=workers) as pool:
        result = pool.map(_apply_df_helper, [(chunk, func) for chunk in df_split])

    end = time.time()
    print(f"Time taken for multiprocessing apply: {end - start:.2f} seconds")

    return pd.concat(result)

mp_num_cores = cpu_count()

result_df = apply_by_multiprocessing(df_mp, complex_function, workers=mp_num_cores)

  return bound(*args, **kwds)


Time taken for multiprocessing apply: 0.76 seconds


In [9]:
start = time.time()
result_df = df_mp.apply(complex_function, axis=1)
end = time.time()
print(f"Time taken for sequential apply: {end - start:.2f} seconds")

Time taken for sequential apply: 3.87 seconds


In [None]:
import pandas as pd
import numpy as np
import time
import multiprocessing as mp
from multiprocessing import Pool, cpu_count

def _apply_df_helper(df_chunk, func):
    return df_chunk.apply(func, axis=1) 

def complex_function(row):
    return (row['A'] * row['B']) / (row['C'] + 1) + row['D']

def apply_by_multiprocessing(df, func, **kwargs):
    workers = kwargs.pop('workers', cpu_count())
    
    df_split = np.array_split(df, workers)
    
    start = time.time()
    
    with Pool(processes=workers) as pool:
        result = pool.starmap(_apply_df_helper, [(chunk, func) for chunk in df_split])
        # pool.map yerine pool.starmap kullanmak daha temizdir.Eğer birden fazla argümanı (df_chunk ve func) doğrudan iletebiliriz. Ama .map sadece tek argüman alır.

    end = time.time()
    print(f"Çoklu işlem apply süresi: {end - start:.2f} saniye")
    return pd.concat(result)


if __name__ == "__main__":
    DATA_SIZE = 500000
    df = pd.DataFrame(np.random.randint(0, 100, size=(DATA_SIZE, 4)), columns=list('ABCD'))
    
    mp_num_cores = cpu_count()
    print(f"Veri boyutu: {DATA_SIZE} satır")
    print(f"Kullanılacak çekirdek sayısı: {mp_num_cores}")
    
    result_df_mp = apply_by_multiprocessing(df, complex_function, workers=mp_num_cores)
    print("Çoklu İşlem başarıyla tamamlandı. Sonuç boyutu:", result_df_mp.shape)
    
    
# Normal .py dosyasında multiprocessing kullanırken dikkat edilmesi gereken bazı önemli noktalar vardır: Bunlarda yukardaki gibi if __name__ == "__main__": koruması en kritik olanıdır.

Veri boyutu: 500000 satır
Kullanılacak çekirdek sayısı: 10


  return bound(*args, **kwds)
Process SpawnPoolWorker-12:
Process SpawnPoolWorker-11:
Traceback (most recent call last):
Traceback (most recent call last):
Process SpawnPoolWorker-13:
  File "/opt/anaconda3/envs/.mlenv/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/opt/anaconda3/envs/.mlenv/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/.mlenv/lib/python3.13/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/anaconda3/envs/.mlenv/lib/python3.13/multiprocessing/queues.py", line 387, in get
    return _ForkingPickler.loads(res)
           ~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get attribute '_apply_df_helper' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
  File "/opt/anaconda3/envs/.mlenv/lib/python3.13/multiprocessing/process.py", line 313

KeyboardInterrupt: 