<a href="https://colab.research.google.com/github/VictorL0913/CMPSC472_project-1_map_reduce/blob/main/472_project1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%writefile parallel_sort_thread.c

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>

int *arr;
int n, workers;

// Compare function for qsort
int cmp(const void *a, const void *b){
    int x = *(int*)a, y = *(int*)b;
    return (x>y) - (x<y);
}

// Merge two sorted subarrays into one - reduce phase
void merge(int *a,int l1,int r1,int l2,int r2,int *tmp){
    int i=l1,j=l2,k=0;
    while(i<=r1 && j<=r2) tmp[k++] = (a[i]<=a[j]?a[i++]:a[j++]);
    while(i<=r1) tmp[k++] = a[i++];
    while(j<=r2) tmp[k++] = a[j++];
    for(i=0;i<k;i++) a[l1+i]=tmp[i];
}

// Function run by each thread to sort its chunk - map phase
void *thread_sort(void *arg){
    long id=(long)arg;
    int chunk=(n+workers-1)/workers;
    int l=id*chunk;
    if(l>=n) return NULL;
    int r=l+chunk-1; if(r>=n) r=n-1;
    qsort(arr+l,r-l+1,sizeof(int),cmp);
    return NULL;
}

int main(int argc,char **argv){
    if(argc<3){
        printf("Usage: %s <size> <workers>\n",argv[0]);
        return 1;
    }

    n=atoi(argv[1]);
    workers=atoi(argv[2]);
    arr=malloc(n*sizeof(int));
    srand(42);
    for(int i=0;i<n;i++) arr[i]=rand();

    pthread_t th[workers];
    struct timespec t0,t1;
    clock_gettime(CLOCK_MONOTONIC,&t0);

    for(long i=0;i<workers;i++) pthread_create(&th[i],NULL,thread_sort,(void*)i);
    for(int i=0;i<workers;i++) pthread_join(th[i],NULL);

    clock_gettime(CLOCK_MONOTONIC,&t1);

    int *tmp=malloc(n*sizeof(int)); // temp buffer for merging
    int active=workers;
    int chunk=(n+workers-1)/workers;

    while(active>1){
        int pairs=active/2;
        for(int p=0;p<pairs;p++){
            int l1=p*2*chunk, r1=l1+chunk-1; if(r1>=n) r1=n-1;
            int l2=l1+chunk, r2=l2+chunk-1; if(r2>=n) r2=n-1;
            merge(arr,l1,r1,l2,r2,tmp); // merge adjacent chunks
        }
        active=(active+1)/2; // halve the number of active threads
        chunk*=2; // chunk size doubles after each iteration
    }

    // get evaluation metrics
    double time_ms=(t1.tv_sec-t0.tv_sec)*1000.0 + (t1.tv_nsec-t0.tv_nsec)/1000000.0;
    size_t mem_bytes = n*sizeof(int) + n*sizeof(int);
    double mem_MB = mem_bytes / (1024.0*1024.0);

    printf("workers: %d\n",workers);
    printf("time: %.3f ms\n",time_ms);
    printf("memory usage: %.6f MB\n",mem_MB);

    free(tmp); free(arr);
    return 0;
}


Writing parallel_sort_thread.c


In [None]:
!gcc parallel_sort_thread.c -pthread -o p1_threads
!./p1_threads 32 1
!./p1_threads 32 2
!./p1_threads 32 4
!./p1_threads 32 8
!./p1_threads 131072 1
!./p1_threads 131072 2
!./p1_threads 131072 4
!./p1_threads 131072 8

workers: 1
time: 0.129 ms
memory usage: 0.000244 MB
workers: 2
time: 0.151 ms
memory usage: 0.000244 MB
workers: 4
time: 0.430 ms
memory usage: 0.000244 MB
workers: 8
time: 0.718 ms
memory usage: 0.000244 MB
workers: 1
time: 27.215 ms
memory usage: 1.000000 MB
workers: 2
time: 17.983 ms
memory usage: 1.000000 MB
workers: 4
time: 16.814 ms
memory usage: 1.000000 MB
workers: 8
time: 16.797 ms
memory usage: 1.000000 MB


In [None]:
%%writefile parallel_sort_processes.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h> // used for shared memory for IPC
#include <sys/wait.h>
#include <time.h>

// Compare function for qsort
int cmp(const void *a,const void *b){
    int x=*(int*)a, y=*(int*)b;
    return (x>y)-(x<y);
}

int main(int argc,char **argv){
    if(argc<3){
        printf("Usage: %s <size> <workers>\n",argv[0]);
        return 1;
    }

    int n=atoi(argv[1]), workers=atoi(argv[2]);
    // create shared memory for array so all processes can see same array
    int *arr=mmap(NULL,n*sizeof(int),PROT_READ|PROT_WRITE,MAP_SHARED|MAP_ANONYMOUS,-1,0);
    srand(42);
    for(int i=0;i<n;i++) arr[i]=rand();

    struct timespec t0,t1;
    clock_gettime(CLOCK_MONOTONIC,&t0); // start timestamp

    // Fork workers to sort their chunks - map phase
    for(int i=0;i<workers;i++){
        pid_t pid=fork();
        if(pid==0){
            int chunk=(n+workers-1)/workers;
            int l=i*chunk, r=l+chunk-1; if(r>=n) r=n-1; if(l>=n) _exit(0);
            qsort(arr+l,r-l+1,sizeof(int),cmp);
            _exit(0);
        }
    }

    // Wait for all children to finish sorting
    for(int i=0;i<workers;i++) wait(NULL);

    clock_gettime(CLOCK_MONOTONIC,&t1); // end timestamp

    // Merge sorted chunks - reduce phase
    int *tmp = malloc(n*sizeof(int));
    int active=workers;
    int chunk=(n+workers-1)/workers;

    while(active>1){
        int pairs=active/2;
        for(int p=0;p<pairs;p++){
            int l1=p*2*chunk, r1=l1+chunk-1; if(r1>=n) r1=n-1; // left and right index of first chunk
            int l2=l1+chunk, r2=l2+chunk-1; if(r2>=n) r2=n-1;  // left and right index of second chunk

            // 2 pointer merge to sort the two chunks like merge sort
            int i=l1,j=l2,k=0;
            while(i<=r1 && j<=r2) tmp[k++] = (arr[i]<=arr[j]?arr[i++]:arr[j++]);
            while(i<=r1) tmp[k++] = arr[i++];
            while(j<=r2) tmp[k++] = arr[j++];
            for(i=0;i<k;i++) arr[l1+i]=tmp[i];
        }
        // after each pass, the number of active processes is halved
        active=(active+1)/2;
        // chunk size is doubled
        chunk*=2;
    }

    // get evaluation metrics
    double time_ms=(t1.tv_sec-t0.tv_sec)*1000.0 + (t1.tv_nsec-t0.tv_nsec)/1000000.0;
    size_t mem_bytes = n*sizeof(int) + n*sizeof(int);
    double mem_MB = mem_bytes/(1024.0*1024.0);

    printf("workers: %d\n",workers);
    printf("time: %.3f ms\n",time_ms);
    printf("memory usage: %.6f MB\n",mem_MB);

    free(tmp);
    munmap(arr,n*sizeof(int));
    return 0;
}


Writing parallel_sort_processes.c


In [None]:
!gcc parallel_sort_processes.c -o p1_process
!./p1_process 32 1
!./p1_process 32 2
!./p1_process 32 4
!./p1_process 32 8
!./p1_process 131072 1
!./p1_process 131072 2
!./p1_process 131072 4
!./p1_process 131072 8

workers: 1
time: 0.268 ms
memory usage: 0.000244 MB
workers: 2
time: 1.759 ms
memory usage: 0.000244 MB
workers: 4
time: 1.465 ms
memory usage: 0.000244 MB
workers: 8
time: 1.862 ms
memory usage: 0.000244 MB
workers: 1
time: 27.617 ms
memory usage: 1.000000 MB
workers: 2
time: 17.273 ms
memory usage: 1.000000 MB
workers: 4
time: 17.168 ms
memory usage: 1.000000 MB
workers: 8
time: 19.217 ms
memory usage: 1.000000 MB


In [5]:
%%writefile maxagg_thread.c

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>

int *arr;             // main array
int n, workers;       // array size and number of threads
int global_max;       // shared max value
pthread_mutex_t lock; // mutex for synchronization

// Function for each thread to compute local max and update global max
void *thread_max(void *arg){
    long id = (long)arg;
    int chunk = (n + workers - 1) / workers;
    int l = id * chunk;
    if(l >= n) return NULL;
    int r = l + chunk - 1; if(r >= n) r = n - 1;
    int local_max = arr[l];
    for(int i = l+1; i <= r; i++) if(arr[i] > local_max) local_max = arr[i];

    pthread_mutex_lock(&lock);           // lock before updating global max
    if(local_max > global_max) global_max = local_max;
    pthread_mutex_unlock(&lock);         // unlock
    return NULL;
}

int main(int argc, char **argv){
    if(argc < 3){
        printf("Usage: %s <size> <workers>\n", argv[0]);
        return 1;
    }

    n = atoi(argv[1]);
    workers = atoi(argv[2]);
    arr = malloc(n * sizeof(int));
    srand(42);
    for(int i=0;i<n;i++) arr[i]=rand();

    global_max = arr[0];
    pthread_mutex_init(&lock, NULL);

    pthread_t th[workers];
    struct timespec t0, t1;
    clock_gettime(CLOCK_MONOTONIC, &t0);

    // create threads - map phase
    for(long i=0;i<workers;i++) pthread_create(&th[i], NULL, thread_max, (void*)i);
    for(int i=0;i<workers;i++) pthread_join(th[i], NULL);

    clock_gettime(CLOCK_MONOTONIC, &t1);

    double time_ms = (t1.tv_sec - t0.tv_sec)*1000.0 + (t1.tv_nsec - t0.tv_nsec)/1000000.0;
    size_t mem_bytes = n*sizeof(int) + sizeof(int);
    double mem_MB = mem_bytes / (1024.0*1024.0);

    // get evaluation metrics - reduce phase
    printf("workers: %d\n", workers);
    printf("max value: %d\n", global_max);
    printf("time: %.3f ms\n", time_ms);
    printf("memory usage: %.6f MB\n", mem_MB);

    free(arr);
    pthread_mutex_destroy(&lock);
    return 0;
}


Overwriting maxagg_thread.c


In [6]:
!gcc maxagg_thread.c -pthread -o p2_threads
!./p2_threads 32 1
!./p2_threads 32 2
!./p2_threads 32 4
!./p2_threads 32 8
!./p2_threads 131072 1
!./p2_threads 131072 2
!./p2_threads 131072 4
!./p2_threads 131072 8

workers: 1
max value: 2108313867
time: 0.233 ms
memory usage: 0.000126 MB
workers: 2
max value: 2108313867
time: 0.967 ms
memory usage: 0.000126 MB
workers: 4
max value: 2108313867
time: 0.277 ms
memory usage: 0.000126 MB
workers: 8
max value: 2108313867
time: 0.425 ms
memory usage: 0.000126 MB
workers: 1
max value: 2147476004
time: 0.671 ms
memory usage: 0.500004 MB
workers: 2
max value: 2147476004
time: 0.482 ms
memory usage: 0.500004 MB
workers: 4
max value: 2147476004
time: 0.538 ms
memory usage: 0.500004 MB
workers: 8
max value: 2147476004
time: 0.760 ms
memory usage: 0.500004 MB


In [16]:
%%writefile maxagg_process.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <pthread.h>
#include <time.h>


int main(int argc, char **argv){
    if(argc < 3){
        printf("Usage: %s <size> <workers>\n", argv[0]);
        return 1;
    }

    int n = atoi(argv[1]);
    int workers = atoi(argv[2]);

    // array of random numbers (local memory)
    int *arr = malloc(n * sizeof(int));
    srand(42);
    for(int i=0;i<n;i++) arr[i] = rand();

    // Shared memory for global max
    int *global_max = mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE,
                           MAP_SHARED|MAP_ANONYMOUS, -1, 0);
    *global_max = arr[0];

    // Shared mutex for synchronization across process
    pthread_mutex_t *lock = mmap(NULL, sizeof(pthread_mutex_t),
                                 PROT_READ|PROT_WRITE,
                                 MAP_SHARED|MAP_ANONYMOUS, -1, 0);

    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
    pthread_mutex_init(lock, &attr);

    struct timespec t0, t1;
    clock_gettime(CLOCK_MONOTONIC, &t0);  // start timestamp

    // Fork workers to compute local max and update global max
    for(int i=0;i<workers;i++){
        pid_t pid = fork();
        if(pid == 0){
            // Compute chunk boundaries
            int chunk = (n + workers - 1) / workers;
            int l = i * chunk;
            int r = l + chunk - 1;
            if(r >= n) r = n - 1;
            if(l >= n) _exit(0);

            // Local max computation
            int local_max = arr[l];
            for(int j = l + 1; j <= r; j++)
                if(arr[j] > local_max) local_max = arr[j];

            // Update global max with locking to ensure atomicity
            pthread_mutex_lock(lock);
            if(local_max > *global_max) *global_max = local_max;
            pthread_mutex_unlock(lock);

            _exit(0);
        }
    }

    // Wait for all children
    for(int i=0;i<workers;i++) wait(NULL);
    clock_gettime(CLOCK_MONOTONIC, &t1);  // end timestamp

    // Get evaluation metrics
    double time_ms = (t1.tv_sec - t0.tv_sec)*1000.0 +
                     (t1.tv_nsec - t0.tv_nsec)/1000000.0;
    size_t mem_bytes = n*sizeof(int) + sizeof(int);
    double mem_MB = mem_bytes / (1024.0*1024.0);

    // output
    printf("workers: %d\n", workers);
    printf("array size: %d\n", n);
    printf("global max value: %d\n", *global_max);
    printf("time: %.3f ms\n", time_ms);
    printf("memory usage: %.6f MB\n", mem_MB);

    free(arr);
    pthread_mutex_destroy(lock);
    munmap(global_max, sizeof(int));
    munmap(lock, sizeof(pthread_mutex_t));
    return 0;
}


Overwriting maxagg_process.c


In [17]:
!gcc maxagg_process.c -o p2_processes
!./p2_processes 32 1
!./p2_processes 32 2
!./p2_processes 32 4
!./p2_processes 32 8
!./p2_processes 131072 1
!./p2_processes 131072 2
!./p2_processes 131072 4
!./p2_processes 131072 8

workers: 1
array size: 32
global max value: 2108313867
time: 0.399 ms
memory usage: 0.000126 MB
workers: 2
array size: 32
global max value: 2108313867
time: 0.449 ms
memory usage: 0.000126 MB
workers: 4
array size: 32
global max value: 2108313867
time: 1.153 ms
memory usage: 0.000126 MB
workers: 8
array size: 32
global max value: 2108313867
time: 2.030 ms
memory usage: 0.000126 MB
workers: 1
array size: 131072
global max value: 2147476004
time: 0.962 ms
memory usage: 0.500004 MB
workers: 2
array size: 131072
global max value: 2147476004
time: 0.794 ms
memory usage: 0.500004 MB
workers: 4
array size: 131072
global max value: 2147476004
time: 1.518 ms
memory usage: 0.500004 MB
workers: 8
array size: 131072
global max value: 2147476004
time: 1.512 ms
memory usage: 0.500004 MB
