### Assignment 1: Circular Doubly Linked Queue

In [None]:
# Definition for a node of circular doubly linked queue
class QueueNode:
    def __init__(self, node_value=None):
        self.value = node_value
        self.next_node = None
        self.pre_node = None

In [None]:
# Definition for a object which contains process information
class ProcessObject:
    def __init__(self):
        self.pid = 0
        self.process_name = 'Process_Name'
        self.process_owner = 'Process_Owner'
        self.number_of_thread = 4
        self.percent_cpu_used = 10.2
        self.total_time_used = 12.5789

In [None]:
# Definition for a circular doubly linked queue
class CircularDoubleLinkQueue:
    def __init__(self, capacity_of_queue:int=10):
        if (capacity_of_queue < 2):
            raise ValueError('A capacity of queue must be greater then 1.')
        self.first_node = None
        self.last_node = None
        self.occupied_amount = 0
        self.capacity = 0
        self.increase_queue_capacity(new_capacity = capacity_of_queue)
        
        #Merge sort class 
        self.process_sorting = self.ProcessMergeSort()
    #The function which create a new node and insert the into circular doubly linked queue.  
    def increase_queue_capacity(self, new_capacity:int =0):
        for index in range(self.capacity, new_capacity, 1):
            bottom_node = QueueNode()
            if index == 0:
                bottom_node.next_node = bottom_node
                bottom_node.pre_node = bottom_node
                self.first_node = bottom_node
                self.last_node = self.first_node
            else:
                bottom_node.next_node = self.first_node
                bottom_node.pre_node = self.first_node.pre_node
                self.first_node.pre_node.next_node = bottom_node
                self.first_node.pre_node = bottom_node
        self.capacity = new_capacity
    
    #Insert a value after last node. If the queue is full, then double the capacity of queue.
    def add_value_at_last(self, new_value):
        if self.occupied_amount == self.capacity:
            self.increase_queue_capacity(new_capacity = self.capacity*2)
        if self.occupied_amount==0:
            self.last_node.value = new_value
            self.first_node= self.last_node
        else:
            self.last_node.next_node.value = new_value
            self.last_node = self.last_node.next_node
        self.occupied_amount +=1
        
    #Return a value of first node. 
    def get_first_value(self):
        if self.occupied_amount == 0:
            print('It\'s empty.')
            return None
        else:
            return self.first_node.value
    
    # Remove a first value and next node to be first node. 
    def remove_first_value(self):
        if self.occupied_amount == 0:
            print('It\'s empty.')
            return None
        else:
            self.first_node.value = None
            self.first_node = self.first_node.next_node
            self.occupied_amount -=1

    ## Display part: Several function to display process data
    def display_elements_order_by_name(self):
        ordered_list = self.process_sorting.get_ordered_list(self.first_node,self.last_node, self.process_sorting.order_by['name'])
        display_cursor = ordered_list
        while(display_cursor):
            print(display_cursor.value.process_name)
            display_cursor = display_cursor.next_node 
    
    # Display all elements and order by PID
    def display_elements_order_by_pid(self):
        ordered_list = self.process_sorting.get_ordered_list(self.first_node,self.last_node, self.process_sorting.order_by['pid'])
        display_cursor = ordered_list
        while(display_cursor):
            print(display_cursor.value.pid)
            display_cursor = display_cursor.next_node        
        
    # Display all elements and order by Total CPU Time Used
    def display_elements_order_by_cpu_time_used(self):
        ordered_list = self.process_sorting.get_ordered_list(self.first_node,self.last_node, self.process_sorting.order_by['cpu_time_used'])
        display_cursor = ordered_list
        while(display_cursor):
            print(display_cursor.value.total_time_used)
            display_cursor = display_cursor.next_node
    
    # Display all elements and order by Percent of CPU Time used
    def display_elements_order_by_percent_of_cpu_time(self):
        ordered_list = self.process_sorting.get_ordered_list(self.first_node,self.last_node, self.process_sorting.order_by['percent_of_cpu_time'])
        display_cursor = ordered_list
        while(display_cursor):
            print(display_cursor.value.percent_cpu_used)
            display_cursor = display_cursor.next_node
            
    # Display all elements and order by process owner
    def display_elements_order_by_owner(self):
        ordered_list = self.process_sorting.get_ordered_list(self.first_node,self.last_node, self.process_sorting.order_by['owner'])
        display_cursor = ordered_list
        while(display_cursor):
            print(display_cursor.value.process_owner)
            display_cursor = display_cursor.next_node

    # Display all elements following by inserting order.
    def display_elements_by_original_order(self):
        ordered_list = self.process_sorting.get_ordered_list(self.first_node,self.last_node, self.process_sorting.order_by['original'])
        display_cursor = ordered_list
        while(display_cursor):
            print('PID: ', '%-6d'%display_cursor.value.pid, ', Name: ','%-12s'%display_cursor.value.process_name, \
                  ', Owner: ', '%-12s'%display_cursor.value.process_owner, ' , Percent of CPU Time:'\
                  , '%.2f%%'%display_cursor.value.percent_cpu_used,', Total CPU Time:', '%.2f'%display_cursor.value.total_time_used)

            display_cursor = display_cursor.next_node
            
    '''
    Merge sorting class for sorting queue by different attributes.
    '''
    class ProcessMergeSort:
        def __init__(self):
            self.order_by = {'name':0,'pid':1,'cpu_time_used':2,'percent_of_cpu_time':3,'owner':4, 'original':5}
            self.compare_function = None
        
        # get a sorted list by setting.
        def get_ordered_list(self, queue_fisrt, queue_last, order_by):

            is_correct_order = self.determine_compare_way(compare_by= order_by)
            
            if not is_correct_order:
                print('The way of order is incorrect. Please make sure you pass a correct one.')
                return
            
            node_cursor = queue_fisrt
            first_node = None
            while(node_cursor != queue_last):
                if not first_node:
                    first_node = last_node = QueueNode(node_cursor.value)
                else:
                    new_node = QueueNode(node_cursor.value)
                    last_node.next_node =new_node
                    new_node.pre_node =last_node
                    last_node = new_node
                node_cursor = node_cursor.next_node
            
            if not first_node:
                first_node = last_node = QueueNode(node_cursor.value)
            else:
                new_node = QueueNode(node_cursor.value)
                last_node.next_node =new_node
                new_node.pre_node =last_node
                last_node = new_node
            

            if self.compare_function:         
                sorted_list  =self.merge_sort(first_node)
            else:
                sorted_list  =first_node
    
            return sorted_list
            
        # Merge two list and the result must be ascend.
        def merge_list(self, first_list, second_list): 
            # If first linked list is empty 
            if first_list is None: 
                return second_list

            # If secon linked list is empty  
            if second_list is None: 
                return first_list 

            # Pick the smaller value and the next one is a value from another merge_list()
            if self.compare_function(first_list.value,second_list.value):
                # If the first element of first_list is less than second_list 
                first_list.next_node = self.merge_list(first_list.next_node, second_list) 
                first_list.next_node.pre_node = first_list 
                first_list.pre_node = None   
                return first_list 
            else: 
                # If the first element of second_list is less than first_list 
                second_list.next_node = self.merge_list(first_list, second_list.next_node) 
                second_list.next_node.pre_node = second_list 
                second_list.pre_node = None
                return second_list 

        # Function to do merge sort 
        def merge_sort(self, queue_fisrt): 
            if queue_fisrt is None:  
                return queue_fisrt 
            if queue_fisrt.next_node is None: 
                return queue_fisrt 

            second = self.split_queue(queue_fisrt) 

            # Recursive for left and righ halves 
            queue_fisrt = self.merge_sort(queue_fisrt) 
            second = self.merge_sort(second) 

            # Merge the two sorted halves 
            return self.merge_list(queue_fisrt, second)

        # Split the list into two lsits of half sizes.
        def split_queue(self, queue_fisrt): 
            fast_cursor = slow_cursor =  queue_fisrt 
            while(True): 
                if fast_cursor.next_node is None: 
                    break
                if fast_cursor.next_node.next_node is None: 
                    break
                fast_cursor = fast_cursor.next_node.next_node 
                slow_cursor = slow_cursor.next_node
            #Slow_cursor.next_node will be the fist node of right half.
            right_half_queue = slow_cursor.next_node
            #Slow_cursor.next_node will be the end of left half, so assign it to None.
            slow_cursor.next_node = None
            return right_half_queue 

        # Set a comparison way.
        def determine_compare_way(self, compare_by):
            if compare_by == self.order_by['name']:
                self.compare_function = self.is_1st_less_than_2nd_by_name;
            elif compare_by == self.order_by['pid']:
                self.compare_function = self.is_1st_less_than_2nd_by_pid;
            elif compare_by == self.order_by['cpu_time_used']:
                self.compare_function = self.is_1st_less_than_2nd_by_cpu_time_used;
            elif compare_by == self.order_by['percent_of_cpu_time']:
                self.compare_function = self.is_1st_less_than_2nd_by_percent_of_cpu_time;
            elif compare_by == self.order_by['owner']:
                self.compare_function = self.is_1st_less_than_2nd_by_owner;
            elif compare_by == self.order_by['original']:
                self.compare_function = None;
            else:
                return False
            return True
        
        # If it's true, it means 1st.process_name < 2nd.process_name
        def is_1st_less_than_2nd_by_name(self,process_1st,process_2nd):
            if not isinstance(process_1st, ProcessObject):
                raise TypeError('First data is not ProcessObject')
            if not isinstance(process_2nd, ProcessObject):
                raise TypeError('Second data is not ProcessObject')
            return process_1st.process_name < process_2nd.process_name

        # If it's true, it means 1st.pid < 2nd.pid
        def is_1st_less_than_2nd_by_pid(self,process_1st,process_2nd):
            if not isinstance(process_1st, ProcessObject):
                raise TypeError('First data is not ProcessObject')
            if not isinstance(process_2nd, ProcessObject):
                raise TypeError('Second data is not ProcessObject')
            return process_1st.pid < process_2nd.pid

        # If it's true, it means 1st.cpu_time_used < 2nd.cpu_time_used
        def is_1st_less_than_2nd_by_cpu_time_used(self,process_1st,process_2nd):
            if not isinstance(process_1st, ProcessObject):
                raise TypeError('First data is not ProcessObject')
            if not isinstance(process_2nd, ProcessObject):
                raise TypeError('Second data is not ProcessObject')
            return process_1st.total_time_used < process_2nd.total_time_used

        # If it's true, it means 1st.percent_cpu_used < 2nd.percent_cpu_used
        def is_1st_less_than_2nd_by_percent_of_cpu_time(self,process_1st,process_2nd):
            if not isinstance(process_1st, ProcessObject):
                raise TypeError('First data is not ProcessObject')
            if not isinstance(process_2nd, ProcessObject):
                raise TypeError('Second data is not ProcessObject')
            return process_1st.percent_cpu_used < process_2nd.percent_cpu_used

        # If it's true, it means 1st.process_owner < 2nd.process_owner
        def is_1st_less_than_2nd_by_owner(self,process_1st,process_2nd):
            if not isinstance(process_1st, ProcessObject):
                raise TypeError('First data is not ProcessObject')
            if not isinstance(process_2nd, ProcessObject):
                raise TypeError('Second data is not ProcessObject')
            return process_1st.process_owner < process_2nd.process_owner

### Unit Test

In [None]:
import unittest

# Definition for testing CircularDoubleLinkQueue class
class TestCircularDoubleLinkQueue(unittest.TestCase):
    
    '''
    Test QueueNode class
    '''
    def test_node(self):
        """Double linke Node tests"""
        test_node = QueueNode()
        self.assertIsNone(test_node.value)
        self.assertIsNone(test_node.next_node)
        self.assertIsNone(test_node.pre_node)

        #Give a initial value(integer) 
        test_node = QueueNode(node_value=10)
        self.assertEqual(test_node.value, 10)
        self.assertIsNone(test_node.next_node)
        self.assertIsNone(test_node.pre_node)

        #Give a initial value(String) 
        test_string = 'Test String'
        test_node = QueueNode(node_value=test_string)
        self.assertEqual(test_node.value, test_string)
        self.assertIsNone(test_node.next_node)
        self.assertIsNone(test_node.pre_node)

        #Give a initial value(null) 
        test_string = 'Test String'
        test_node = QueueNode(node_value=test_string)
        self.assertEqual(test_node.value, test_string)
        self.assertIsNone(test_node.next_node)
        self.assertIsNone(test_node.pre_node)
        
        #Connect two node 
        test_node1 = QueueNode(node_value=1)
        test_node2 = QueueNode(node_value=2)
        test_node1.next_node = test_node2
        test_node2.pre_node = test_node1

        self.assertEqual(test_node1.value, 1)
        self.assertEqual(test_node2.value, 2)
        self.assertEqual(test_node1.next_node, test_node2)
        self.assertEqual(test_node2.pre_node, test_node1)
        self.assertIsNone(test_node1.pre_node)
    
    '''
    Test ProcessObject class
    '''
    def test_process_object(self):
        """Process object test"""
        test_process = ProcessObject()
        #Make sure the object contains data which we need.
        #Test by defalut value
        self.assertEqual(test_process.pid, 0)
        self.assertEqual(test_process.process_name, 'Process_Name')
        self.assertEqual(test_process.process_owner, 'Process_Owner')
        self.assertEqual(test_process.number_of_thread, 4)
        self.assertEqual(test_process.percent_cpu_used, 10.2)
        self.assertEqual(test_process.total_time_used, 12.5789)
    
        #Assign some value and check
        test_process.pid = 10
        self.assertEqual(test_process.pid, 10)
        test_process.process_name = "Test Name"
        self.assertEqual(test_process.process_name, 'Test Name')
        test_process.process_owner = 'Test owner'
        self.assertEqual(test_process.process_owner, 'Test owner')
        test_process.number_of_thread = 10
        self.assertEqual(test_process.number_of_thread, 10)
        test_process.percent_cpu_used = 0.45
        self.assertEqual(test_process.percent_cpu_used, 0.45)
        test_process.total_time_used = 321
        self.assertEqual(test_process.total_time_used, 321)
    
    '''
    Test CircularDoubleLinkQueue class
    '''
    def test_queue_increase_function(self):
        '''Test increase_queue_capacity()'''
        capacity = 4
        test_queue = CircularDoubleLinkQueue(capacity_of_queue=capacity)
        
        #Check queue is a circular
        queue_node = test_queue.first_node
        for times in range(capacity):
            queue_node = queue_node.next_node
        self.assertEqual(queue_node,test_queue.first_node)
        queue_node = test_queue.first_node
        for times in range(capacity-1):
            queue_node = queue_node.next_node
        self.assertNotEqual(queue_node,test_queue.first_node)
        
        #Check queue is a doubly linked
        queue_node = test_queue.first_node
        for times in range(capacity):
            queue_node = queue_node.pre_node
        self.assertEqual(queue_node,test_queue.first_node)
        
        queue_node = test_queue.first_node
        queue_node = queue_node.next_node.pre_node
        self.assertEqual(queue_node,test_queue.first_node)
    
    #Test whether a queue has correct capacity or not. 
    def test_capacity_of_queue(self):
        '''Test a capacity: Must more than 1 and occupied must be 0'''
        try:
            test_queue = CircularDoubleLinkQueue(capacity_of_queue=1)
            self.assertTrue(False)
        except ValueError:
            self.assertTrue(True)
        
        #Default qunatity of capacity 
        test_queue = CircularDoubleLinkQueue()
        self.assertEqual(test_queue.capacity, 10)
        self.assertEqual(test_queue.occupied_amount, 0)

        #Allocate a queue has 4 nodes 
        test_queue = CircularDoubleLinkQueue(capacity_of_queue = 4)
        self.assertEqual(test_queue.capacity, 4)
        self.assertEqual(test_queue.occupied_amount, 0)

    #Test add value to last node
    def test_queue_add_fuunction(self):
        '''Test add_value_at_last'''
        test_queue = CircularDoubleLinkQueue(capacity_of_queue=4)
        test_queue.add_value_at_last(5)
        self.assertEqual(test_queue.occupied_amount, 1)
        test_queue.add_value_at_last(6)
        self.assertEqual(test_queue.occupied_amount, 2)

    #Adding a value as a queue is full will double the capacity.
    def test_queue_extend_double(self):
        '''Test capacity become double'''
        capacity = 4
        test_queue = CircularDoubleLinkQueue(capacity_of_queue=capacity)
        
        for times in range(capacity):
            test_queue.add_value_at_last(times)
        #Now the number of data is same as capacity
        self.assertEqual(test_queue.occupied_amount, capacity)
        self.assertEqual(test_queue.capacity, capacity)

        #Add one more data into the full queue.
        test_queue.add_value_at_last(5)
        
        #The capacity must be double
        self.assertEqual(test_queue.capacity, capacity*2)

    #Test the queue can return the correct first value.
    def test_queue_first_value(self):
        '''Test get_first_value()'''
        capacity = 4
        test_queue = CircularDoubleLinkQueue(capacity_of_queue=capacity)
        
        #If there is no data in queue, return none
        return_value  = test_queue.get_first_value()
        self.assertIsNone(return_value)
        
        #Insert distinctive value into queue
        for times in range(capacity):
            test_queue.add_value_at_last(times)
        #First value is 0
        return_value  = test_queue.get_first_value()
        self.assertEqual(return_value, 0)
        #Test the queue can return the correct first value.
    
    #Test whether a queue can remove a value
    def test_queue_remove_first_value(self):
        '''Test remove_first_value()'''
        
        capacity = 4
        test_queue = CircularDoubleLinkQueue(capacity_of_queue=capacity)
        
        #If there is no data in queue, return none
        return_value  = test_queue.remove_first_value()
        self.assertIsNone(return_value)
        
        #Insert distinctive value into queue
        for times in range(capacity):
            test_queue.add_value_at_last(times)

        #Queue looks like 0, 1, 2, 3
        test_queue.remove_first_value()
        #After removed first value, Queue looks like 1, 2, 3
        return_value  = test_queue.get_first_value()
        self.assertEqual(return_value, 1)

        test_queue.remove_first_value()
        #After removed first value, Queue looks like 2, 3
        return_value  = test_queue.get_first_value()
        self.assertEqual(return_value, 2)
    
    #Test display function
    def test_all_display(self):
        '''Test All display function()'''
        test_queue = self.create_a_sample_queue_with_process()
        #Order by name
        try:
            test_queue.display_elements_order_by_name()
        except:
            self.assertFalse(True)
            
        #Order by PID
        try:
            test_queue.display_elements_order_by_pid()
        except:
            self.assertFalse(True)
        
        #Order by CPu Time
        try:
            test_queue.display_elements_order_by_cpu_time_used()
        except:
            self.assertFalse(True)
            
        #Order by Percent of CPU
        try:
            test_queue.display_elements_order_by_percent_of_cpu_time()
        except:
            self.assertFalse(True)

        #Order by Owner
        try:
            test_queue.display_elements_order_by_owner()
        except:
            self.assertFalse(True)

    #The value is process object
    def create_a_sample_queue_with_process(sef, capacity = 8):
        queue = CircularDoubleLinkQueue(capacity)
        for index in range(capacity):
            a_process = ProcessObject()
            a_process.pid = capacity - index
            a_process.process_name = 'p_name' + str(index%4)
            a_process.process_owner = 'p_owner' + str(index%2)
            queue.add_value_at_last(a_process)
        return queue

## Unit test for ProcessMergeSort class

In [None]:
import unittest

class TestProcessMergeSort(unittest.TestCase):
    
    def test_compare_with_name(self):
        '''is_1st_less_than_2nd_by_name'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        
        process_1st = ProcessObject()
        process_2nd = ProcessObject()
        process_1st.process_name = 'A'
        process_2nd.process_name = 'B'
        
        #The parameter has to match the type
        try:
            process_sorting.is_1st_less_than_2nd_by_name(12,process_2nd)
        except TypeError:
            self.assertTrue(True)
        try:
            process_sorting.is_1st_less_than_2nd_by_name(process_1st,'Tse')
        except TypeError:
            self.assertTrue(True)
        
        self.assertTrue(process_sorting.is_1st_less_than_2nd_by_name(process_1st,process_2nd))
        self.assertFalse(process_sorting.is_1st_less_than_2nd_by_name(process_2nd,process_1st))

    def test_compare_with_pid(self):
        '''is_1st_less_than_2nd_by_pid'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        
        process_1st = ProcessObject()
        process_2nd = ProcessObject()
        process_1st.pid = 12
        process_2nd.pid = 35
        
        #The parameter has to match the type
        try:
            process_sorting.is_1st_less_than_2nd_by_pid(12,process_2nd)
        except TypeError:
            self.assertTrue(True)
        try:
            process_sorting.is_1st_less_than_2nd_by_pid(process_1st,'Tse')
        except TypeError:
            self.assertTrue(True)
        
        self.assertTrue(process_sorting.is_1st_less_than_2nd_by_pid(process_1st,process_2nd))
        self.assertFalse(process_sorting.is_1st_less_than_2nd_by_pid(process_2nd,process_1st))

        
    def test_compare_with_cpu_time_used(self):
        '''is_1st_less_than_2nd_by_cpu_time_used'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        
        process_1st = ProcessObject()
        process_2nd = ProcessObject()
        process_1st.total_time_used = 12
        process_2nd.total_time_used = 35
        
        #The parameter has to match the type
        try:
            process_sorting.is_1st_less_than_2nd_by_cpu_time_used(12,process_2nd)
        except TypeError:
            self.assertTrue(True)
        try:
            process_sorting.is_1st_less_than_2nd_by_cpu_time_used(process_1st,'Tse')
        except TypeError:
            self.assertTrue(True)
        
        self.assertTrue(process_sorting.is_1st_less_than_2nd_by_cpu_time_used(process_1st,process_2nd))
        self.assertFalse(process_sorting.is_1st_less_than_2nd_by_cpu_time_used(process_2nd,process_1st))

        
    def test_compare_with_percent_of_cpu_time(self):
        '''is_1st_less_than_2nd_by_percent_of_cpu_time'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        
        process_1st = ProcessObject()
        process_2nd = ProcessObject()
        process_1st.percent_cpu_used = 12.34
        process_2nd.percent_cpu_used = 35.097
        
        #The parameter has to match the type
        try:
            process_sorting.is_1st_less_than_2nd_by_percent_of_cpu_time(12,process_2nd)
        except TypeError:
            self.assertTrue(True)
        try:
            process_sorting.is_1st_less_than_2nd_by_percent_of_cpu_time(process_1st,'Tse')
        except TypeError:
            self.assertTrue(True)
        
        self.assertTrue(process_sorting.is_1st_less_than_2nd_by_percent_of_cpu_time(process_1st,process_2nd))
        self.assertFalse(process_sorting.is_1st_less_than_2nd_by_percent_of_cpu_time(process_2nd,process_1st))

    def test_compare_with_owner(self):
        '''is_1st_less_than_2nd_by_owner'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        
        process_1st = ProcessObject()
        process_2nd = ProcessObject()
        process_1st.process_owner = 'owner1'
        process_2nd.process_owner = 'owner5'
        
        #The parameter has to match the type
        try:
            process_sorting.is_1st_less_than_2nd_by_owner(12,process_2nd)
        except TypeError:
            self.assertTrue(True)
        try:
            process_sorting.is_1st_less_than_2nd_by_owner(process_1st,'Tse')
        except TypeError:
            self.assertTrue(True)
        
        self.assertTrue(process_sorting.is_1st_less_than_2nd_by_owner(process_1st,process_2nd))
        self.assertFalse(process_sorting.is_1st_less_than_2nd_by_owner(process_2nd,process_1st))

    def test_controller_of_comparison(self):
        '''is_1st_less_than_2nd_by_owner'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        process_1st = ProcessObject()
        process_2nd = ProcessObject()
        
        #By Name
        #Assign a value into each filed. In 1st, only name is greater than 2nd  
        process_1st.process_name = 'name2'
        process_1st.pid = 1
        process_1st.percent_cpu_used = 1
        process_1st.total_time_used = 1
        process_1st.process_owner = 'owner1'

        process_2nd.process_name = 'name1'
        process_2nd.pid = 2
        process_2nd.percent_cpu_used = 2
        process_2nd.total_time_used = 2
        process_2nd.process_owner = 'owner2'
        
        process_sorting.determine_compare_way(process_sorting.order_by['name'])
        self.assertFalse(process_sorting.compare_function(process_1st,process_2nd) )

        #Modified. The data will be 1st < 2nd
        process_1st.process_name = 'name1'
        process_2nd.process_name = 'name2'
        self.assertTrue(process_sorting.compare_function(process_1st,process_2nd) )

        #By PID
        #Assign a value into each filed. In 1st, only oid is greater than 2nd  
        process_1st.process_name = 'name1'
        process_1st.pid = 2
        process_1st.percent_cpu_used = 1
        process_1st.total_time_used = 1
        process_1st.process_owner = 'owner1'

        process_2nd.process_name = 'name2'
        process_2nd.pid = 1
        process_2nd.percent_cpu_used = 2
        process_2nd.total_time_used = 2
        process_2nd.process_owner = 'owner2'
        
        process_sorting.determine_compare_way(process_sorting.order_by['pid'])
        self.assertFalse(process_sorting.compare_function(process_1st,process_2nd) )

        #Modified. The data will be 1st < 2nd
        process_1st.pid = 1
        process_2nd.pid = 2
        self.assertTrue(process_sorting.compare_function(process_1st,process_2nd) )
        
        #By cpu_time_used
        #Assign a value into each filed. In 1st, only total_time_used is greater than 2nd  
        process_1st.process_name = 'name1'
        process_1st.pid = 1
        process_1st.percent_cpu_used = 1
        process_1st.total_time_used = 20
        process_1st.process_owner = 'owner1'

        process_2nd.process_name = 'name2'
        process_2nd.pid = 2
        process_2nd.percent_cpu_used = 2
        process_2nd.total_time_used = 10
        process_2nd.process_owner = 'owner2'
        
        process_sorting.determine_compare_way(process_sorting.order_by['cpu_time_used'])
        self.assertFalse(process_sorting.compare_function(process_1st,process_2nd) )

        #Modified. The data will be 1st < 2nd
        process_1st.total_time_used = 1
        process_2nd.total_time_used = 20
        self.assertTrue(process_sorting.compare_function(process_1st,process_2nd) )
        
        
        #By percent_of_cpu_time
        #Assign a value into each filed. In 1st, only percent_cpu_used is greater than 2nd  
        process_1st.process_name = 'name1'
        process_1st.pid = 1
        process_1st.percent_cpu_used = 5
        process_1st.total_time_used = 1
        process_1st.process_owner = 'owner1'

        process_2nd.process_name = 'name2'
        process_2nd.pid = 2
        process_2nd.percent_cpu_used = 2
        process_2nd.total_time_used = 20
        process_2nd.process_owner = 'owner2'
        
        process_sorting.determine_compare_way(process_sorting.order_by['percent_of_cpu_time'])
        self.assertFalse(process_sorting.compare_function(process_1st,process_2nd) )

        #Modified. The data will be 1st < 2nd
        process_1st.percent_cpu_used = 1
        process_2nd.percent_cpu_used = 4
        self.assertTrue(process_sorting.compare_function(process_1st,process_2nd) )
        
        #By owner
        #Assign a value into each filed. In 1st, only owner is greater than 2nd  
        process_1st.process_name = 'name1'
        process_1st.pid = 1
        process_1st.percent_cpu_used = 1
        process_1st.total_time_used = 1
        process_1st.process_owner = 'owner2'

        process_2nd.process_name = 'name2'
        process_2nd.pid = 2
        process_2nd.percent_cpu_used = 2
        process_2nd.total_time_used = 4
        process_2nd.process_owner = 'owner1'
        
        process_sorting.determine_compare_way(process_sorting.order_by['owner'])
        self.assertFalse(process_sorting.compare_function(process_1st,process_2nd) )

        #Modified. The data will be 1st < 2nd
        process_1st.process_owner = 'owner1'
        process_2nd.process_owner = 'owner2'
        self.assertTrue(process_sorting.compare_function(process_1st,process_2nd) )
        
        #By orginal
        process_sorting.determine_compare_way(process_sorting.order_by['original'])
        self.assertIsNone(process_sorting.compare_function)

    def test_split_queue_function(self):
        '''Test: Is that split_queue() can split a queue into halves'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        total_node = 8
        #Create a queue list
        head_node = QueueNode(node_value=0)
        tail_node = QueueNode(node_value=1)
        head_node.next_node = tail_node
        tail_node.pre_node = head_node
        
        for times in range(2,total_node):
            new_node = QueueNode(node_value=times)
            new_node.pre_node = tail_node
            tail_node.next_node = new_node
            tail_node = new_node
        
        right_half = process_sorting.split_queue(head_node)
        
        #Check left half. 0~3
        for node_value in range(0,int(total_node/2)):
            self.assertEqual(head_node.value, node_value)
            head_node = head_node.next_node
        self.assertIsNone(head_node)

        #Check right half. 4~7
        for node_value in range(int(total_node/2),total_node):
            self.assertEqual(right_half.value, node_value)
            right_half = right_half.next_node
        self.assertIsNone(right_half)

    def test_merge_list(self):
        '''Test: merge_list()'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        
        #Assign easy_comparison to compare_function. 
        process_sorting.compare_function =self.easy_comparison
        
        #Create a queue list
        first_node = QueueNode(node_value=5)
        second_node = QueueNode(node_value=2)
        
        a_queue_is_25 = process_sorting.merge_list(first_node,second_node)

        self.assertEqual(a_queue_is_25.value,2)
        a_queue_is_25 = a_queue_is_25.next_node
        self.assertEqual(a_queue_is_25.value,5)
        a_queue_is_25 = a_queue_is_25.next_node
        self.assertIsNone(a_queue_is_25)
        
        first_node = QueueNode(node_value=4)
        second_node = QueueNode(node_value=1)
        
        a_queue_is_14 = process_sorting.merge_list(first_node,second_node)

        self.assertEqual(a_queue_is_14.value,1)
        a_queue_is_14 = a_queue_is_14.next_node
        self.assertEqual(a_queue_is_14.value,4)
        a_queue_is_14 = a_queue_is_14.next_node
        self.assertIsNone(a_queue_is_14)

        first_node = QueueNode(node_value=5)
        second_node = QueueNode(node_value=2)
        a_queue_is_25 = process_sorting.merge_list(first_node,second_node)
        first_node = QueueNode(node_value=4)
        second_node = QueueNode(node_value=1)
        a_queue_is_14 = process_sorting.merge_list(first_node,second_node)
        
        a_queue_is_1245 = process_sorting.merge_list(a_queue_is_25,a_queue_is_14)
        
        self.assertEqual(a_queue_is_1245.value,1)
        a_queue_is_1245 = a_queue_is_1245.next_node
        self.assertEqual(a_queue_is_1245.value,2)
        a_queue_is_1245 = a_queue_is_1245.next_node
        self.assertEqual(a_queue_is_1245.value,4)
        a_queue_is_1245 = a_queue_is_1245.next_node
        self.assertEqual(a_queue_is_1245.value,5)
        a_queue_is_1245 = a_queue_is_1245.next_node
        self.assertIsNone(a_queue_is_1245)

    def test_merge_soft(self):
        '''Test: merge_sort()'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        #Assign easy_comparison to compare_function. 
        process_sorting.compare_function =self.easy_comparison
        
        sample_queue = self.create_a_sample_queue_with_int()
        
        sorted_queue = process_sorting.merge_sort(sample_queue)
        pre_value = sorted_queue.value
        while(sorted_queue):
            self.assertLessEqual(pre_value,sorted_queue.value)
            pre_value = sorted_queue.value
            sorted_queue = sorted_queue.next_node
     
    def test_get_ordered_list(self):
        '''Test: get_ordered_list()'''
        process_sorting =  CircularDoubleLinkQueue.ProcessMergeSort()
        
        sample_queue = self.create_a_sample_queue_with_process()
        
        #Sort by name
        sorted_queue = process_sorting.get_ordered_list(sample_queue.first_node,sample_queue.last_node,process_sorting.order_by['name'] )
        pre_value = sorted_queue.value
        while(sorted_queue):
            self.assertLessEqual(pre_value.process_name,sorted_queue.value.process_name)
            pre_value = sorted_queue.value
            sorted_queue = sorted_queue.next_node
      
        #Sort by pid
        sorted_queue = process_sorting.get_ordered_list(sample_queue.first_node,sample_queue.last_node,process_sorting.order_by['pid'] )
        pre_value = sorted_queue.value
        while(sorted_queue):
            self.assertLessEqual(pre_value.pid,sorted_queue.value.pid)
            pre_value = sorted_queue.value
            sorted_queue = sorted_queue.next_node

        #Sort by cpu_time_used
        sorted_queue = process_sorting.get_ordered_list(sample_queue.first_node,sample_queue.last_node,process_sorting.order_by['cpu_time_used'] )
        pre_value = sorted_queue.value
        while(sorted_queue):
            self.assertLessEqual(pre_value.total_time_used,sorted_queue.value.total_time_used)
            pre_value = sorted_queue.value
            sorted_queue = sorted_queue.next_node

        #Sort by percent_of_cpu_time
        sorted_queue = process_sorting.get_ordered_list(sample_queue.first_node,sample_queue.last_node,process_sorting.order_by['percent_of_cpu_time'] )
        pre_value = sorted_queue.value
        while(sorted_queue):
            self.assertLessEqual(pre_value.percent_cpu_used,sorted_queue.value.percent_cpu_used)
            pre_value = sorted_queue.value
            sorted_queue = sorted_queue.next_node

        #Sort by percent_of_cpu_time
        sorted_queue = process_sorting.get_ordered_list(sample_queue.first_node,sample_queue.last_node,process_sorting.order_by['owner'] )
        pre_value = sorted_queue.value
        while(sorted_queue):
            self.assertLessEqual(pre_value.process_owner,sorted_queue.value.process_owner)
            pre_value = sorted_queue.value
            sorted_queue = sorted_queue.next_node
        
    #For testing the merge_list(). Make it compare values of nodes directly not data in process object
    def easy_comparison(self, value1, value2):
        return value1 < value2
    
    #The value is integer
    def create_a_sample_queue_with_int(sef, length = 10):
        first_node = None
        last_node = None
        for number in range(length):
            new_node = QueueNode(number%5)
            if not first_node:
                first_node = new_node
                last_node = new_node
            else:
                last_node.next_node =new_node
                new_node.pre_node =last_node
                last_node = new_node
        return first_node
    
    #The value is process object
    def create_a_sample_queue_with_process(sef, capacity = 8):
        queue = CircularDoubleLinkQueue(capacity)
        for index in range(capacity):
            a_process = ProcessObject()
            a_process.pid = capacity - index
            a_process.process_name = 'p_name' + str(index%4)
            a_process.process_owner = 'p_owner' + str(index%2)
            queue.add_value_at_last(a_process)
        return queue

In [None]:
# def suite():
#     suite = unittest.TestSuite()
#     tests = unittest.defaultTestLoader.loadTestsFromTestCase(TestCircularDoubleLinkQueue)
#     suite.addTests(tests)
#     tests = unittest.defaultTestLoader.loadTestsFromTestCase(TestProcessMergeSort)
#     suite.addTests(tests)
#     return suite

if __name__ == '__main__':
    runner = unittest.TextTestRunner(verbosity=2, buffer=True)
    
    suite = unittest.TestSuite()
    tests = unittest.defaultTestLoader.loadTestsFromTestCase(TestCircularDoubleLinkQueue)
    suite.addTests(tests)
    tests = unittest.defaultTestLoader.loadTestsFromTestCase(TestProcessMergeSort)
    suite.addTests(tests)
    runner.run(suite)