In [1]:
import glob
import os
import re
import numpy as np
import pandas as pd
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import to_categorical

# Paths
DIRECTORY = "/home/user/datasets/resized_train_cropped/resized_train_cropped"
LABELS_FILE = "/home/user/datasets/trainLabels_cropped.csv"

# Load labels
labels_df = pd.read_csv(LABELS_FILE)
labels_dict = dict(zip(labels_df['image'], labels_df['level']))

# Collect all images in all subfolders
all_files = glob.glob(os.path.join(DIRECTORY, "**", "*.jpeg"), recursive=True)

# ===== PARAM√àTRES FIXES POUR L'ENTRA√éNEMENT =====
START_INDEX = 0      # Toujours commencer √† 0
BATCH_SIZE = 10000    # Toujours charger 1000 images
# ===============================================

images_train = []
labels_train = []
current_index = 0
loaded_count = 0

for filepath in all_files:
    filename = os.path.basename(filepath)

    # Charger seulement BATCH_SIZE images
    if loaded_count >= BATCH_SIZE:
        break

    img = load_img(filepath, target_size=(256, 256))
    img_array = img_to_array(img) / 255.0

    # Match labels
    base_name = re.sub(r'\s*\(.*\)', '', filename.split('.')[0])
    label = labels_dict.get(base_name)

    if label is not None:
        images_train.append(img_array)
        labels_train.append(label)
        loaded_count += 1
        current_index += 1
    else:
        current_index += 1

# Convert to arrays
images_train = np.array(images_train)
labels_train_raw = np.array(labels_train)

# One-hot encoding
labels_train = to_categorical(labels_train, num_classes=5)

print(f"‚úÖ Entra√Ænement: Charg√© {images_train.shape[0]} images (indices 0 √† {loaded_count - 1})")
print(f"   Distribution des classes: {np.bincount(labels_train_raw)}")

2025-11-12 13:07:13.675041: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2, in other operations, rebuild TensorFlow with the appropriate compiler flags.


‚úÖ Entra√Ænement: Charg√© 10000 images (indices 0 √† 9999)
   Distribution des classes: [7366  721 1475  247  191]


In [2]:
import glob
import os
import re
import numpy as np
import pandas as pd
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import to_categorical

# Paths
DIRECTORY = "/home/user/datasets/resized_train_cropped/resized_train_cropped"
LABELS_FILE = "/home/user/datasets/trainLabels_cropped.csv"

# Load labels
labels_df = pd.read_csv(LABELS_FILE)
labels_dict = dict(zip(labels_df['image'], labels_df['level']))

# Collect all images in all subfolders
all_files = glob.glob(os.path.join(DIRECTORY, "**", "*.jpeg"), recursive=True)

# ===== PARAM√àTRES VARIABLES POUR LE TEST =====
START_INDEX = 20000   # üîß MODIFIEZ ICI: o√π commencer (ex: 1000, 2000, etc.)
BATCH_SIZE = 1       # üîß MODIFIEZ ICI: combien d'images (ex: 1, 5, 10, etc.)
# ============================================

images_test = []
labels_test = []
current_index = 0
loaded_count = 0

for filepath in all_files:
    filename = os.path.basename(filepath)

    # Skip jusqu'√† START_INDEX
    if current_index < START_INDEX:
        base_name = re.sub(r'\s*\(.*\)', '', filename.split('.')[0])
        if labels_dict.get(base_name) is not None:
            current_index += 1
        continue

    # Charger seulement BATCH_SIZE images
    if loaded_count >= BATCH_SIZE:
        break

    img = load_img(filepath, target_size=(256, 256))
    img_array = img_to_array(img) / 255.0

    # Match labels
    base_name = re.sub(r'\s*\(.*\)', '', filename.split('.')[0])
    label = labels_dict.get(base_name)

    if label is not None:
        images_test.append(img_array)
        labels_test.append(label)
        loaded_count += 1
        current_index += 1
    else:
        current_index += 1

# Convert to arrays
images_test = np.array(images_test)
labels_test_raw = np.array(labels_test)

# One-hot encoding
labels_test = to_categorical(labels_test, num_classes=5)

print(f"‚úÖ Test: Charg√© {images_test.shape[0]} images (indices {START_INDEX} √† {START_INDEX + loaded_count - 1})")
print(f"   Classes: {labels_test_raw}")

‚úÖ Test: Charg√© 1 images (indices 20000 √† 20000)
   Classes: [0]


In [3]:
x_train = images_train.reshape((images_train.shape[0], -1))  # Shape: (n_train, 256*256*3)
y_train = np.argmax(labels_train, axis=1)

In [4]:
x_test = images_test.reshape((images_test.shape[0], -1))  # Shape: (n_train, 256*256*3)
y_test = np.argmax(labels_test, axis=1)

In [28]:
import numpy as np
from collections import Counter

class knn:
    def __init__(self, k):
        self.k = k

    def fitt(self, x_train, y_train):
        self.x_train = x_train
        self.y_train = y_train

    def predire(self, test_sample):
        #test_sample = np.array(test_sample).ravel()
        # Vectorized Euclidean distance: sqrt(sum((x_train - test_sample)^2))
        distances = np.sqrt(np.sum((self.x_train - test_sample)**2, axis=1))
        print(f'thread {threading.get_ident()} calcule {len(distances)} distances')
        # Get indices of k smallest distances
        k_indices = np.argpartition(distances, self.k)[:self.k]

        # Get the labels of those k neighbors
        k_nearest_labels = self.y_train[k_indices]

        # Return the most common label
        most_common = Counter(k_nearest_labels).most_common(1)
        return most_common[0][0]

In [29]:
knn_seq = knn(k=5)
knn_seq.fitt(x_train, y_train)

In [30]:
import time
import threading
predictions = []
debut = time.perf_counter()
for i in x_test:
  prediction = knn_seq.predire(i)
  predictions.append(prediction)
fin = time.perf_counter()
print(f'Temps de pr√©diction = {fin-debut:.4f} secondes')
print(f"Predicted labels: {predictions}")
print(f"True labels: {labels_test_raw}")  # ‚úÖ Correction: labels_test_raw au lieu de y_test

# ‚úÖ Correction: comparaison correcte
correct = all(pred == true for pred, true in zip(predictions, labels_test_raw))
print(f"Correct? {'‚úÖ' if correct else '‚ùå'}")

# Afficher les d√©tails par image
print("\nD√©tails:")
for idx, (pred, true) in enumerate(zip(predictions, labels_test_raw)):
    match = "‚úÖ" if pred == true else "‚ùå"
    print(f"  Image {idx}: Pr√©diction={pred}, Vrai={true} {match}")

thread 128909034173504 calcule 10000 distances
Temps de pr√©diction = 10.4554 secondes
Predicted labels: [np.int64(0)]
True labels: [0]
Correct? ‚úÖ

D√©tails:
  Image 0: Pr√©diction=0, Vrai=0 ‚úÖ


In [20]:
import numpy as np
from collections import Counter
from multiprocessing import Pool, Array
import ctypes

# Global shared arrays (accessed by workers without copying)
shared_x_base = None
shared_y_base = None
x_shape = None
x_dtype = None

def init_worker(x_base, y_base, shape, dtype):
    """Initialize worker process with shared memory access."""
    global shared_x_base, shared_y_base, x_shape, x_dtype
    shared_x_base = x_base
    shared_y_base = y_base
    x_shape = shape
    x_dtype = dtype

def worker_distances(args):
    """Calculate distances for a chunk - returns top k."""
    start, end, test_sample, k = args
    
    # Reconstruct numpy array from shared memory
    x_np = np.frombuffer(shared_x_base, dtype=x_dtype).reshape(x_shape)
    y_np = np.frombuffer(shared_y_base, dtype=np.int32)
    
    # Slice our chunk
    x_chunk = x_np[start:end]
    y_chunk = y_np[start:end]
    
    # Compute squared distances (skip sqrt for speed)
    distances = np.sum((x_chunk - test_sample)**2, axis=1)
    
    # Get k smallest from this chunk
    k_local = min(k, len(distances))
    k_idx = np.argpartition(distances, k_local-1)[:k_local]
    
    # Return distances and labels as arrays
    return distances[k_idx], y_chunk[k_idx]

class ParallelKNN:
    def __init__(self, k, n_workers=None):
        self.k = k
        self.n_workers = n_workers or 4
        self.pool = None
        
    def fitt(self, x_train, y_train):
        """Fit model - creates persistent pool with shared memory."""
        self.x_train = x_train
        self.y_train = y_train
        self.n_samples, self.n_features = x_train.shape
        
        # Create shared memory arrays
        x_flat = x_train.flatten()
        self.shared_x = Array(ctypes.c_double, x_flat, lock=False)
        self.shared_y = Array(ctypes.c_int32, y_train, lock=False)
        
        # Create persistent pool
        self.pool = Pool(
            processes=self.n_workers,
            initializer=init_worker,
            initargs=(self.shared_x, self.shared_y, 
                     x_train.shape, x_train.dtype)
        )
        
        # Pre-calculate chunk boundaries
        chunk_size = (self.n_samples + self.n_workers - 1) // self.n_workers
        self.chunks = []
        for i in range(self.n_workers):
            start = i * chunk_size
            end = min(start + chunk_size, self.n_samples)
            if start < self.n_samples:
                self.chunks.append((start, end))
    
    def predire(self, test_sample):
        """Predict using parallel distance computation."""
        if self.pool is None:
            raise RuntimeError("Call fitt() first")
        
        # Prepare tasks (minimal data transfer)
        tasks = [(start, end, test_sample, self.k) for start, end in self.chunks]
        
        # Parallel computation
        results = self.pool.map(worker_distances, tasks)
        
        # Merge k*workers nearest neighbors
        all_dists = np.concatenate([r[0] for r in results])
        all_labels = np.concatenate([r[1] for r in results])
        
        # Final k nearest
        final_k = min(self.k, len(all_dists))
        final_idx = np.argpartition(all_dists, final_k-1)[:final_k]
        final_labels = all_labels[final_idx]
        
        # Most common label
        return Counter(final_labels).most_common(1)[0][0]
    
    def close(self):
        """Close pool when done."""
        if self.pool:
            self.pool.close()
            self.pool.join()
            self.pool = None

In [None]:
import time
knn_mult = ParallelKNN(k=5, n_workers=4)
knn_mult.fitt(x_train, y_train)

debut = time.time()
prediction = knn_mult.predire(x_test)
fin = time.time()
print(f"Predicted class: {prediction == y_test}")
print(f'prediction time: {fin-debut:.2f} secondes')


In [None]:
print(f'prediction time: {fin-debut:.2f} secondes')

In [6]:
!pip install parsl.channels

[31mERROR: Could not find a version that satisfies the requirement parsl.channels (from versions: none)[0m[31m
[31mERROR: No matching distribution found for parsl.channels[0m[31m
[0m

In [13]:
import numpy as np
from collections import Counter
import parsl
from parsl import python_app
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor

from parsl.providers import LocalProvider

@python_app
def calculate_distances_chunk(x_chunk, y_chunk, test_sample, k):
    """
    Parsl app: Calculate distances for a data chunk.
    Returns k nearest distances and labels from this chunk.
    """
    import numpy as np
    
    # Calculate squared Euclidean distances (skip sqrt for efficiency)
    distances = np.sum((x_chunk - test_sample)**2, axis=1)
    
    # Get k smallest distances from this chunk
    chunk_k = min(k, len(distances))
    k_indices = np.argpartition(distances, chunk_k - 1)[:chunk_k]
    
    # Return distances and corresponding labels
    return distances[k_indices], y_chunk[k_indices]


class ParslKNN:
    def __init__(self, k, n_workers=4):
        """
        Initialize Parsl KNN classifier.
        
        Args:
            k: Number of nearest neighbors
            n_workers: Number of parallel workers
        """
        self.k = k
        self.n_workers = n_workers
        self.parsl_loaded = False
        

    def fitt(self, x_train, y_train):
        """
        Fit the KNN model with training data.
        
        Args:
            x_train: Training features (n_samples, n_features)
            y_train: Training labels (n_samples,)
        """
        
        
        self.x_train = np.asarray(x_train)
        self.y_train = np.asarray(y_train)
        self.n_samples = len(self.x_train)
        
        # Pre-split data into chunks for workers
        chunk_size = (self.n_samples + self.n_workers - 1) // self.n_workers
        self.data_chunks = []
        
        for i in range(self.n_workers):
            start_idx = i * chunk_size
            end_idx = min(start_idx + chunk_size, self.n_samples)
            
            if start_idx < self.n_samples:
                self.data_chunks.append((
                    self.x_train[start_idx:end_idx],
                    self.y_train[start_idx:end_idx]
                ))
        
        print(f"Data split into {len(self.data_chunks)} chunks for {self.n_workers} workers")
    
    def predire(self, test_sample):
        """
        Predict the class label for a test sample using parallel computation.
        
        Args:
            test_sample: Test sample features (n_features,)
            
        Returns:
            Predicted class label
        """
        if not hasattr(self, 'data_chunks'):
            raise RuntimeError("Model not fitted. Call fitt() first.")
        
        test_sample = np.asarray(test_sample)
        
        # Submit parallel tasks to compute distances for each chunk
        futures = []
        for x_chunk, y_chunk in self.data_chunks:
            future = calculate_distances_chunk(x_chunk, y_chunk, test_sample, self.k)
            futures.append(future)
        
        # Wait for all workers to complete and collect results
        chunk_results = [future.result() for future in futures]
        
        # Merge all k*workers nearest neighbors
        all_distances = np.concatenate([result[0] for result in chunk_results])
        all_labels = np.concatenate([result[1] for result in chunk_results])
        
        # Select final k nearest neighbors from merged results
        final_k = min(self.k, len(all_distances))
        final_k_indices = np.argpartition(all_distances, final_k - 1)[:final_k]
        final_k_labels = all_labels[final_k_indices]
        
        # Return most common label (majority vote)
        most_common = Counter(final_k_labels).most_common(1)
        return most_common[0][0]
    
    def predire_batch(self, test_samples):
        """
        Predict labels for multiple test samples.
        
        Args:
            test_samples: Array of test samples (n_test_samples, n_features)
            
        Returns:
            List of predicted labels
        """
        return [self.predire(sample) for sample in test_samples]
    
    def cleanup(self):
        """Clean up Parsl resources."""
        if self.parsl_loaded:
            parsl.dfk().cleanup()
            parsl.clear()
            self.parsl_loaded = False

In [18]:
config = Config(
                executors=[
                    ThreadPoolExecutor(
                        max_threads=12,
                        label='knn_workers'
                    )
                ],
                strategy='none',  # Disable auto-scaling for predictable performance
            )
parsl.load(config)


<parsl.dataflow.dflow.DataFlowKernel at 0x7589ea11bfb0>

In [19]:
import time
knn_parsl = ParslKNN(k=5, n_workers=12)
knn_parsl.fitt(x_train, y_train)

debut = time.time()
prediction = knn_parsl.predire(x_test)
fin = time.time()
print(f"Predicted class: {prediction == y_test}")
print(f'prediction time: {fin-debut:.2f} secondes')

Data split into 12 chunks for 12 workers
Predicted class: [ True]
prediction time: 2.04 secondes


In [17]:
parsl.clear()