In [4]:
import concurrent.futures
import os
import threading

In [9]:
class SharedObject:
    def __init__(self):
        self._lock = threading.Lock()
        self._list = [
            "https://www.google.com",
            "https://www.youtube.com",
            "https://www.facebook.com",
            "https://www.instagram.com",
            "https://www.twitter.com",
            "https://www.reddit.com"
        ]
        
    def do_something(self):
        with self._lock:
            
            if len(self._list) == 0:
                print("No more urls, exiting...")
                return

            print(f'Process {os.getpid()} is using {self._list[-1]}')

            return self._list.pop()
        


In [10]:


class ParallelClass:
    def __init__(self, num_processes):
        self.NUM_PROCESSES = num_processes
        self.object = SharedObject()
        self.lock = threading.Lock()
    
    def parallel_method(self, data_list):
        with concurrent.futures.ProcessPoolExecutor(max_workers=self.NUM_PROCESSES) as executor:
            results = [executor.submit(self.process_data) for _ in data_list]
            for future in concurrent.futures.as_completed(results):
                print(f'The process {os.getpid()} is done with {future.result()}')
        return results
                
    def process_data(self):
        with self.lock:
            print(f'Calling do_something from process {os.getpid()}')
            return self.object.do_something()


In [11]:
p = ParallelClass(4)
resutlts = p.parallel_method([1,2,3,4,5,6,7,8,9,10])

# Output:
print(resutlts)


TypeError: cannot pickle '_thread.lock' object

In [12]:
import multiprocessing

class ParallelClass:
    def __init__(self, shared_obj1, shared_obj2):
        self.shared_obj1 = multiprocessing.Manager().Value(type(shared_obj1), shared_obj1)
        self.shared_obj2 = multiprocessing.Manager().Value(type(shared_obj2), shared_obj2)

    def parallel_method(self, arg_list):
        with multiprocessing.Pool() as pool:
            results = pool.map(self._method_wrapper, arg_list)
        return results

    def _method_wrapper(self, arg):
        # Access shared objects in a thread-safe way
        with self.shared_obj1.get_lock(), self.shared_obj2.get_lock():
            # Do something with shared objects
            return arg

# Example usage
if __name__ == '__main__':
    shared_obj1 = "shared_obj1"
    shared_obj2 = ["shared_obj2"]
    parallel_class = ParallelClass(shared_obj1, shared_obj2)
    results = parallel_class.parallel_method([1, 2, 3])
    print(results)


AttributeError: 'ValueProxy' object has no attribute 'get_lock'

In [9]:
import multiprocessing
import os

class SharedArray:
    def __init__(self):
        self.array = multiprocessing.Array('c', b'hello world,hi there,test this,out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up')
    
    def reload_array(self, data):
        self.array = multiprocessing.Array('c', data)

    def get_array(self):
        return self.array

class Consumer:
    def __init__(self, shared_array):
        self.shared_array = shared_array
        
    def consume(self,i, results):
        with self.shared_array.get_array().get_lock():

            # Check if there is any data in the shared array
            if len(self.shared_array.get_array().value.decode()) > 0:

                word = self.shared_array.get_array().value.decode().split(',')[0]

                print(f'process {os.getpid()} has {self.shared_array.get_array().value.decode()}')

                # Remove the word from the shared array
                self.shared_array.get_array().value = self.shared_array.get_array().value[len(word)+1:]

                results.append(word)

                print(f'process {os.getpid()} is using {word}')
            else:
                print(f'Process {os.getpid()} has no more words to consume')
                print(f'Process {os.getpid()} is reloading the shared array')
                self.shared_array = SharedArray()

        
    def run_consumers(self, num_processes=5):
        results = multiprocessing.Manager().list()
        processes = []
        for i in range(num_processes):
            p = multiprocessing.Process(target=self.consume, args=(i,results))
            processes.append(p)
            p.start()
        for p in processes:
            p.join()
        return list(results)
    





In [10]:
consumer = Consumer(SharedArray())
print(consumer.run_consumers(200))

process 77659 has hello world,hi there,test this,out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 77659 is using hello world
process 77663 has hi there,test this,out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 77663 is using hi there
process 77670 has test this,out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 77670 is using test this
process 77677 has out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 77677 is using out of ideas
process 77684 has programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 77684 is using programming is fun
process 77691 has time flies,life is short,learn everyday,stay curious,never give up
process 77691 is using time flies
process 77698 has life is short,learn everyday,stay curious,never give 

In [17]:
import multiprocessing
import os

class SharedArray:
    def __init__(self, data):
        self.array = multiprocessing.Array('c', data)
    
    def get_array(self):
        return self.array

class Consumer:
    def __init__(self, shared_array):
        self.shared_array = shared_array
        
    def consume(self, i, results):
        with self.shared_array.get_array().get_lock():

            # Check if there is any data in the shared array
            if len(self.shared_array.get_array().value.decode()) > 0:

                word = self.shared_array.get_array().value.decode().split(',')[0]

                print(f'process {os.getpid()} has {self.shared_array.get_array().value.decode()}')

                # Remove the word from the shared array
                self.shared_array.get_array().value = self.shared_array.get_array().value[len(word)+1:]

                results.append(word)

                print(f'process {os.getpid()} is using {word}')
            else:

                # Manda dormir a todos los procesos
                time.sleep(5)
                


                print(f'Process {os.getpid()} has no more words to consume')
                print(f'Process {os.getpid()} is reloading the shared array')
                self.shared_array = SharedArray(b'hello world,hi there,test this,out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up')
        
    def run_consumers(self, num_processes=5):
        results = multiprocessing.Manager().list()
        processes = []
        for i in range(num_processes):
            p = multiprocessing.Process(target=self.consume, args=(i, results))
            processes.append(p)
            p.start()
        for p in processes:
            p.join()
        return list(results)

if __name__ == '__main__':
    shared_array = SharedArray(b'hello world,hi there,test this,out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up')
    consumer = Consumer(shared_array)
    consumer.run_consumers(20)  


process 86647 has hello world,hi there,test this,out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 86647 is using hello world
process 86650 has hi there,test this,out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 86650 is using hi there
process 86658 has test this,out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 86658 is using test this
process 86665 has out of ideas,programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 86665 is using out of ideas
process 86672 has programming is fun,time flies,life is short,learn everyday,stay curious,never give up
process 86672 is using programming is fun
process 86679 has time flies,life is short,learn everyday,stay curious,never give up
process 86679 is using time flies
process 86686 has life is short,learn everyday,stay curious,never give 