# Project

In [None]:
# here create some data that wil be sorted?

## Merge sort vs parallel merge sort

In [None]:
import multiprocessing
import math
import random
import time
import matplotlib.pyplot as plt

plt.style.use("bmh") 

# used resources:
# merge sort
# https://www.youtube.com/watch?v=4VqmGXwpLqc&ab_channel=BackToBackSWE
# parallel merge sort
# https://gist.github.com/stephenmcd/39ded69946155930c347

def merge_sort(arr):
    if len(arr) == 1:
        return arr
    
    mid = len(arr) // 2 
    arr_one = arr[:mid]  # first half
    arr_two = arr[mid:]  # second half
    
    arr_one = merge_sort(arr_one)
    arr_two = merge_sort(arr_two)
    
    return merge(arr_one, arr_two)
    
def merge(*args):
    # tuple works better with multiprocessing
    # otherwise use 2 lists
    arr_a, arr_b = args[0] if len(args) == 1 else args
    arr_c = []
    i_a = 0
    i_b = 0
    
    while len(arr_a) < i_a and len(arr_b) < i_b:
        if arr_a[i_a] > arr_b[i_b]:
            arr_c.append(arr_b[i_b])
            i_b += 1
        else:
            arr_c.append(arr_a[i_a])
            i_a += 1
            
    # add all elements that are left into list
    if len(arr_a) > i_a:
        arr_c.extend(arr_a)
    elif len(arr_b) > i_b:
        arr_c.extend(arr_b)
    
    return arr_c

def parallel_merge_sort(arr):
    # get how many processes algorithm can use
    # will give the same amount as non parallel if there are only 2 processes
    processes_count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=processes_count)
    # divide array based on available cpu processes
    size = int(math.ceil(float(len(arr)) / processes_count))
    arr = [arr[i * size:(i+1) * size] for i in range(processes_count)]
    arr = pool.map(merge_sort, arr)

    while len(arr) > 1:
        if len(arr) % 2 == 1:
            extra = arr.pop()
        else:
            extra = None
        arr = [(arr[i], arr[i+1]) for i in range(0, len(arr), 2)]
        arr = pool.map(merge, arr) + ([extra] if extra else [])
    return arr[0]



if __name__ == "__main__":
    nums = [1000, 10000, 100000, 1000000, 10000000]
    a_time = []
    b_time = []
    
    for i in nums:
        randomlist = random.sample(range(0, 2 * i), i)

        start = time.time()
        merge_sort(randomlist)
        end = time.time() - start
        a_time.append(end)

        start = time.time()
        parallel_merge_sort(randomlist)
        end = time.time() - start
        b_time.append(end)

    print(a_time)
    print(b_time)

    plt.plot(nums, a_time, 'r', label='marge sort')
    plt.plot(nums, b_time, 'g', label='parallel merge sort')
    plt.ylabel('time (s)')
    plt.xlabel('array size')
    plt.legend(loc='upper left')
    plt.show()
    

1
0.11981034278869629
2
50001
1
0.19895195960998535


# Hyperquicksort

In [None]:
# From Silver Maala HW1 and https://en.wikipedia.org/wiki/Quicksort.


def quicksort(array):
    if len(array) == 0:
        return
    quicksort_req(array, 0, len(array) - 1)


def quicksort_req(array, low, high):
    if low < high:
        p = partition_quicksort(array, low, high)
        quicksort_req(array, low, p - 1)
        quicksort_req(array, p + 1, high)


def partition_quicksort(array, low, high):
    pivot = array[high]
    i = low
    for j in range(low, high):
        if array[j] < pivot:
            swap(array, i, j)
            i += 1
    swap(array, i, high)
    return i


def swap(array, i1, i2):
    array[i1], array[i2] = array[i2], array[i1]

In [None]:
# https://cse.buffalo.edu/faculty/miller/Courses/CSE633/Srivastava-Spring-2014-CSE633.pdf

import math
import time
from multiprocessing import Queue
from random import randint, choice
import multiprocessing

from quicksort import quicksort

import sys

sys.setrecursionlimit(10000)


def partition(array, pivot):
    # b1 <= x < b2
    smaller_equal = []
    greater = []
    for i in array:
        if i <= pivot:
            smaller_equal.append(i)
        else:
            greater.append(i)
    return smaller_equal, greater


def split_links(links):
    min_link = min(links.keys())
    max_link = max(links.keys())
    split = min_link + (max_link - min_link) / 2
    low_nodes, high_nodes = {}, {}
    for key, value in links.items():
        if key <= split:
            low_nodes[key] = value
        else:
            high_nodes[key] = value
    return low_nodes, high_nodes


def get_partner_thread_id(thread_id, links):
    highest = len(links)
    dif = int(highest / 2)
    high = thread_id + dif
    low = thread_id - dif
    return high if (high <= max(links.keys())) else low


def split_array(array, nr_of_splits):
    array_size = len(array)
    inc = math.ceil(array_size / nr_of_splits)
    return [array[i * inc:(i + 1) * inc] for i in range(nr_of_splits)]


def broadcast(links, array):
    use_quicksort = False
    pivot = get_pivot(array)
    split_arrays = split_array(array, len(links))
    for thread_id, arr_s in zip(links.keys(), split_arrays):
        partner_thread_id = get_partner_thread_id(thread_id, links)
        partner_is_high = thread_id < partner_thread_id
        links[thread_id].put((pivot, arr_s, partner_thread_id, partner_is_high, use_quicksort))


def get_pivot(array):
    return 0 if len(array) == 0 else choice(array)


def hyper_quicksort(pivot, array, my_c_link, partner_c_link, partner_is_high):
    array1, array2 = partition(array, pivot)
    if partner_is_high:
        partner_c_link.put(array2)
        c = my_c_link.get(block=True)
        return array1 + c
    else:
        partner_c_link.put(array1)
        c = my_c_link.get(block=True)
        return c + array2


def thread_sort(my_broadcast_link, my_result_link, c_links, my_thread_id):
    # Admin stuff.
    my_c_link = c_links[my_thread_id]

    while True:
        # Getting first info.
        pivot, array, partner_thread_id, partner_is_high, use_quicksort = my_broadcast_link.get(block=True)

        # If can't use more recursion just use regular quicksort.
        if use_quicksort:
            array = my_result_link.get(block=True)
            quicksort(array)
            # array.sort()
            my_result_link.put(array)
            return

        # ...
        partner_c_link = c_links[partner_thread_id]
        array_new = hyper_quicksort(pivot, array, my_c_link, partner_c_link, partner_is_high)

        # Returning result to main thread.
        my_result_link.put(array_new)


def recursion(broadcast_links, result_links):
    # Can't use more recursion.
    if len(broadcast_links) == 1:
        for thread_id in broadcast_links.keys():
            broadcast_links[thread_id].put((None, None, None, None, True))
        return

    # New links and pivot for the recursion.
    low_broadcast_links, high_broadcast_links = split_links(broadcast_links)

    # New low array group.
    low_array = []
    for i in low_broadcast_links.keys():
        low_array += result_links[i].get(block=True)
    broadcast(low_broadcast_links, low_array)

    # New high array group.
    high_array = []
    for i in high_broadcast_links.keys():
        high_array += result_links[i].get(block=True)
    broadcast(high_broadcast_links, high_array)

    recursion(low_broadcast_links, result_links)
    recursion(high_broadcast_links, result_links)


def main(array, hypercube_dim=3):
    m = multiprocessing.Manager()

    # Queues for the threads.
    broadcast_links, c_links, result_links = {}, {}, {}
    for thread_id in range(1, 2 ** hypercube_dim + 1):
        broadcast_links[thread_id] = m.Queue()
        c_links[thread_id] = m.Queue()
        result_links[thread_id] = m.Queue()

    # Creating and running the processes.
    for thread_id in range(1, 2 ** hypercube_dim + 1):
        multiprocessing.Process(target=thread_sort,
                                args=(broadcast_links[thread_id], result_links[thread_id], c_links, thread_id,)).start()

    # ---------- Starting the sorting ----------

    # Broadcast the pivot and array to all threads to start the sort.
    broadcast(broadcast_links, array)

    # Recursive part.
    recursion(broadcast_links, result_links)

    # Getting the sorted array.
    sorted_array = []
    for i in range(1, 2 ** hypercube_dim + 1):
        sorted_array += result_links[i].get(block=True)

    for thread_id in range(1, 2 ** hypercube_dim + 1):
        broadcast_links[thread_id].empty()
        c_links[thread_id].empty()
        result_links[thread_id].empty()

    # print(sorted_array)


if __name__ == '__main__':
    # Creating random array. Remove later.
    for size in range(10000, 1000000, 50000):
        minimum_value = 0
        maximum_value = 10000
        array = [randint(minimum_value, maximum_value) for _ in range(size)]

        start = time.time()
        main(array)
        end = time.time() - start
        print("Multi", end)

        start = time.time()
        quicksort(array)
        end = time.time() - start
        print("Regular", end)
        print()


Process Process-40:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 261, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.6/multiprocessing/util.py", line 296, in _exit_function
    _run_finalizers(0)
  File "/usr/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers
    finalizer()


KeyboardInterrupt: ignored

  File "/usr/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 809, in _decref
    conn = _Client(token.address, authkey=authkey)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 493, in Client
    answer_challenge(c, authkey)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 732, in answer_challenge
    message = connection.recv_bytes(256)         # reject large message
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Process Process-42:
Process Process-38:
Process Process-39:
Process Process-41:
Process Process-43:
Process 

In [None]:
# For plotting
# Array with values [0, 2**64]
times = [1000, 10000, 100000, 1000000, 10000000]
multi = [2.1853315830230713, 2.2524845600128174, 3.472196102142334, 9.096222877502441, 82.25024771690369]
single = [0.0019991397857666016, 0.0319976806640625, 0.4350013732910156, 5.645039796829224, 72.90752291679382]



# RadixSort

## Simple Implementation

In [None]:
import math
import multiprocessing
import random
import sys
import time
from multiprocessing import Pool
import os
import threading
import logging

In [None]:
#k = max value
#https://www.codingeek.com/algorithms/counting-sort-explanation-pseudocode-and-implementation/
#https://www.lewuathe.com/radix-sort-in-python.html
def countingSort(array, k, get_index):
    n = len(array)
    c = [0]*(k+1)
    B = list(range(n))
    
    for j in array:
        c[get_index(j)] += 1
        
    for i in range(1,k+1):
        c[i] += c[i-1]
    
    for i, j in enumerate(c[:-1]):
        if i == 0:
            c[i] = 0
        c[i+1] = j
    
    for a in array:
        B[c[get_index(a)]] = a
        c[get_index(a)] += 1

    return B

In [None]:
#https://www.lewuathe.com/radix-sort-in-python.html
def get_digit(n, d):
    for i in range(d-1):
        n //= 10
    return n % 10

def radix_sort(arr):
    max_value = max(arr)
    num_digits = len(str(max_value))
    # O(k(n+k))
    for d in range(num_digits):
    # Counting sort takes O(n+k)
        arr = countingSort(arr, max_value, lambda a: get_digit(a, d+1))
    return arr

In [None]:
#Usage
radix_sort([57,63,74,39,47,13])

[13, 39, 47, 57, 63, 74]

## Threaded Implementation

In [None]:
import math
import multiprocessing
import random
import sys
import time
import multiprocessing as mp
import os
import threading
import logging
from timeit import default_timer as timer
import matplotlib.pyplot as plt
#https://www.sanfoundry.com/python-program-implement-bucket-sort/
def bucket_sort(alist):
    if(len(alist)==0):
        return alist
    largest = max(alist)
    length = len(alist)
    size = largest/length
 
    buckets = [[] for _ in range(length)]
    for i in range(length):
        j = int(alist[i]/size)
        if j != length:
            buckets[j].append(alist[i])
        else:
            buckets[length - 1].append(alist[i])
 
    for i in range(length):
        insertion_sort(buckets[i])
 
    result = []
    for i in range(length):
        result = result + buckets[i]
 
    return result
  #https://www.sanfoundry.com/python-program-implement-bucket-sort/
def insertion_sort(alist):
    for i in range(1, len(alist)):
        temp = alist[i]
        j = i - 1
        while (j >= 0 and temp < alist[j]):
            alist[j + 1] = alist[j]
            j = j - 1
        alist[j + 1] = temp

def first_n_digits(num, n):
    return int(str(num)[:n])

#https://github.com/MartinThoma/algorithms/blob/master/sorting/radixsort-multi-thread.py
def get_prefixes():
    helper = [i for i in range(10, 100)]
    helper.insert(0, 0)
    return helper

def splitFiles(file, prefixes):
    prefFile = {}
    for prefix in prefixes:
        prefFile[prefix] = []
    for i in file:
        if(i%10 == i):
            prefFile[int(str(first_n_digits(i,1))+'0')].append(i)
        else:
            prefFile[first_n_digits(i,2)].append(i)
    return prefFile


def sortFiles(osPaths):
    threads = list()
    for i in osPaths.keys():
        x = threading.Thread(target=sortFile, args=(osPaths[i],i,osPaths))
        threads.append(x)
        x.start()
    for index, thread in enumerate(threads):
        thread.join()
    mergeFiles(osPaths)
    return mergeFiles(osPaths)

def sortFile(values, key, hash_):
    bucket_sort(values)
    hash_[key] = values


def mergeFiles(path):
  final = []
	for list_ in sorted(path.keys()):
		final.extend(path[list_])
	return final

f = open("numbers.txt", "r")
toSort = []
for lines in f:
    toSort.append(int(lines[:-1]))

if __name__ == "__main__":

	nums = [1000, 10000, 100000, 1000000, 10000000]
	a_time = []
	b_time = []
    
	for i in nums:
		randomlist = random.sample(range(1, 2 * i), i)

		start = time.time()
		pool = mp.Pool(mp.cpu_count())
		thing = splitFiles(randomlist, get_prefixes())
		results = pool.starmap(sortFile, [(thing[j], j, thing) for j in thing])
		pool.close()
		check = mergeFiles(thing)
		end = time.time() - start
		b_time.append(end)

	plt.plot(nums, b_time, 'g', label='radix')
	plt.ylabel('time (s)')
	plt.xlabel('array size')
	plt.legend(loc='upper left')
	plt.show()

    
	print("time for parallel sorting: ",tac-tic)

In [None]:
#file and number generation. Currently set to 10^6 total values, exp is the for assigning bit values
def generateNumbers(exp):
    f= open("numbers.txt","w")
    exp = 2**exp
    for i in range(10**6):
        n = random.randint(0,exp)
        f.write(str(n)+'\n')
    f.close()

In [None]:
#if you wish to generate a file to test 
#%%time
#generateNumbers(64)

In [None]:
if __name__ == "__main__":
    nums = [1000, 10000, 100000, 1000000]
    a_time = []
    b_time = []
    
    for i in nums:
        #randomlist = random.sample(range(1, 2 * i), i)
        randomlist = np.random.randint(1,9999,i)

        start = time.time()
        radix_sort(randomlist)
        end = time.time() - start
        a_time.append(end)

        start = time.time()
        pool = mp.Pool(mp.cpu_count())
        thing = splitFiles(randomlist, get_prefixes())
        results = pool.starmap(sortFile, [(thing[j], j, thing) for j in thing])
        pool.close()
        check = mergeFiles(thing)
        end = time.time() - start
        b_time.append(end)

    print(a_time)
    print(b_time)

    plt.plot(nums, a_time, 'r', label='sort')
    plt.plot(nums, b_time, 'g', label='parallel sort')
    plt.ylabel('time (s)')
    plt.xlabel('array size')
    plt.legend(loc='upper left')
    plt.show()

In [None]:
import math
import multiprocessing
import random
import sys
import time
import multiprocessing as mp
import os
import threading
import logging
from timeit import default_timer as timer
import matplotlib.pyplot as plt
import numpy as np
results = []
#https://github.com/richursa/cpuBitonicSort
def collect_result(result):
    global results
    results.append(result)

def ascendingSwap(index1, index2, array):
    if(index1 >= len(array)):
        index1 = len(array)-1
    if(index2 >= len(array)):
        index2 = len(array)-1
    if(array[index2] < array[index1]):
        helper = array[index2]
        array[index2] = array[index1]
        array[index1] = helper

def descendingSwap(index1, index2, array):
    if(index1 >= len(array)):
        index1 = len(array)-1
    if(index2 >= len(array)):
        index2 = len(array)-1
    if(array[index1] < array[index2]):
        helper = array[index2]
        array[index2] = array[index1]
        array[index1] = helper

def bitByBit(startIndex, endIndex, dir_, array):
    if(dir_ == 1):
        counter = 0
        elemNumber = endIndex - startIndex+1
        j = elemNumber//2
        while(j>0):
            counter = 0
            i = startIndex
            while(i+j <= endIndex):
                if(counter < j):
                    ascendingSwap(i, j+i, array)
                    counter+=1
                else:
                    counter = 0
                    i = i+j-1
                i+=1
            j = j//2
    else:
        counter = 0
        elemNumber = endIndex - startIndex+1
        j = elemNumber//2
        while(j>0):
            counter = 0
            i = startIndex
            while(i <= endIndex-j):
                if(counter < j):
                    descendingSwap(i, j+i, array)
                    counter+=1
                else:
                    counter = 0
                    i = i+j-1
                i+=1
            j = j//2
                    
def bitonicSequence(startIndex, endIndex, array):
    elemNumber = endIndex - startIndex+1
    m = multiprocessing.Manager()
    array_m = m.list(array)
    j = 2
    while(j <= elemNumber):
        i = 0
        while(i<elemNumber):
            jobs = []
            if((i//j)%2 == 0):
                p1 = mp.Process(target = bitByBit, args = (i, i+j-1, 1, array_m, ))
                p1.start()
            else:
                p2 = mp.Process(target = bitByBit, args = (i, i+j-1, 0, array_m, ))
                p2.start()
            i = i+j
        j = j*2

    return array_m

def bitonicSequence_(startIndex, endIndex, array):
    elemNumber = endIndex - startIndex+1
    j = 2
    while(j <= elemNumber):
        i = 0
        while(i<elemNumber):
            if((i//j)%2 == 0):
                bitByBit(i, i+j-1, 1, array)
            else:
                bitByBit(i, i+j-1, 0, array)
            i = i+j
        j = j*2
    return array

list_ = []
#https://www.inf.hs-flensburg.de/lang/algorithmen/sortieren/bitonic/oddn.htm
def sort(array):
    global list_
    list_ = array
    bitonicSort(0, len(array), True)
def bitonicSort(lo, n, dir_):
    if(n>1):
        m = n//2
        bitonicSort(lo, m, not dir_)
        p.start()
        p.join()
        bitonicSort(lo+m, n-m, dir_)
        bitonicMerge(lo, n, dir_)
def bitonicMerge(lo, n, dir_):
    if(n>1):
        m = greatestPowerOfTwoLessThan(n)
        i = lo
        while(i<lo+n-m):
            compare(i, i+m, dir_)
            i+=1
        bitonicMerge(lo, m, dir_)
        bitonicMerge(lo+m, n-m, dir_)

def compare(i, j, dir_):
    global list_
    if(dir_ == (list_[i] > list_[j])):
        exchange(i,j)

def exchange(i,j):
    global list_
    t = list_[i]
    list_[i] = list_[j]
    list_[j] = t
def greatestPowerOfTwoLessThan(n):
    k=1
    while (k>0 and k<n):
        k=k<<1
    return k>>1


if __name__ == "__main__":
    nums = [1000, 10000]
    a_time = []
    b_time = []
    
    for i in nums:
        #randomlist = random.sample(range(1, 2 * i), i)
        randomlist = np.random.randint(1,9999,i)

        start = time.time()
        sort(randomlist)
        end = time.time() - start
        a_time.append(end)

        start = time.time()
        bitonicSequence(0, len(randomlist)-1,randomlist)
        end = time.time() - start
        b_time.append(end)

    print(a_time)
    print(b_time)

    plt.plot(nums, a_time, 'r', label='sort')
    plt.plot(nums, b_time, 'g', label='parallel sort')
    plt.ylabel('time (s)')
    plt.xlabel('array size')
    plt.legend(loc='upper left')
    plt.show()

    
    