In [1]:
from pathos.multiprocessing import ProcessPool
from textblob import TextBlob
from tqdm import tqdm 

# Create a process pool

Pools are a group of poccesses where you will send tasks. Inside you will define the number of processes to create. By defualt it will be number of CPU cores, however you can define more than that.

It is for limited number of tasks; computationally intensive.

Scheduling more processes than you have CPU cores can increase performance where the processes run into wait times or I/O

In [2]:
pool = ProcessPool(nodes=3)       #creates a ProcessPool object with three worker processes.

# Functions

Map methods provided:

map - blocking and ordered worker pool [returns: list]

imap - non-blocking and ordered worker pool [returns: iterator]

uimap - non-blocking and unordered worker pool [returns: iterator]

amap - asynchronous worker pool [returns: object]

Blocking: handles jobs in batches rather than 1 by 1

Ordered: Batches must be completed in order


In [3]:
#pool.map(function to run, data to run it on, other arguments )

pool.map(pow, [1,2,3,4], [5,6,7,8])

[1, 64, 2187, 65536]

In [4]:
#Iterate through the returned data using imap
for x in pool.imap(pow, [1,2,3,4], [5,6,7,8]):
    print(x)

1
64
2187
65536


In [5]:
# do an asynchronous map, then get the results
import time

results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
while not results.ready():
    time.sleep(5); print(".", end=' ')
    
# Retrieve the results as a list
output = results.get()

# Print the output
print(output)

. [1, 64, 2187, 65536]


The while loop that follows checks the status of the AsyncResult object using its ready() method, which returns True if all the tasks have completed and their results are ready. If the results are not yet ready, the program waits for 5 seconds using the time.sleep() function and prints a dot to the console, indicating that the program is still running. This loop allows the program to continue executing while the tasks are being processed in the background.

Once the results are ready, the program exits the while loop and retrieves the results using the get() method of the AsyncResult object. The get() method returns the results of the parallel computation as a list, in the order that the input sequences were submitted.


# Build your function

First lets build a function that can take a line of text and produce the sentiment

In [2]:
def get_sentiment(text):
    from textblob import TextBlob
    blob = TextBlob(text)
    score = blob.sentiment.polarity
    return score

#The TextBlob class represents a text document and provides various methods for analyzing and manipulating the text.
#The sentiment polarity score is obtained using blob.sentiment.polarity. 
#The sentiment attribute of the TextBlob object provides a sentiment analysis result,
# polarity returns the sentiment polarity score between -1 and 1, 
#where negative values indicate negative sentiment, positive values indicate positive sentiment, and 0 indicates neutral sentiment.

In [3]:
#text = "I love this product! It's amazing."
text = 'He hated everyone who did not obey him'
sentiment_score = get_sentiment(text)
print("Sentiment score:", sentiment_score)

Sentiment score: -0.9


Then we will need a function that will download the poems for us

In [4]:
import urllib.request
#module provides classes and functions to work with URLs.

def download_poem(url):
    poems = []
    with urllib.request.urlopen(url) as f: 
        for line in f:
            line = line.decode("utf-8") #The line is decoded from bytes to a string
            line = line.strip()    #Leading and trailing whitespace characters are removed from the line
            if line:      #If the line is not empty, it is appended to the poems list
                poems.append(line)
    return poems

#return the poem as a list of lines.

Let's check out what one of these poems look like

In [5]:
test_url = 'https://raw.githubusercontent.com/okfn/openmilton/master/miltondata/texts/poems.txt'
poems = download_poem(test_url)

print(len(poems))
print(poems[:10])


6203
['The Poetical Works of John Milton', 'PREFACE by the Rev. H. C. Beeching, M. A.', "This edition of Milton's Poetry is a reprint, as careful as Editor", 'and Printers have been able to make it, from the earliest printed', 'copies of the several poems.  First the 1645 volume of the', 'Minor Poems has been printed entire; then follow in order the', 'poems added in the reissue of 1673; the Paradise Lost, from the', "edition of 1667; and the Paradise Regain'd and Samson", 'Agonistes from the edition of 1671.', 'The most interesting portion of the book must be reckoned the']


In [6]:
def process_poems(url):
    scores = []
    poems = download_poem(url)
 
    for line in poems:
        scores.append(get_sentiment(line))
    return scores

#the scores list containing the sentiment scores for each line of the poems is returned from the function using return scores.

In [7]:
url = test_url
sentiment_scores = process_poems(url)
for score in sentiment_scores:
    print(score)

0.0
0.0
-0.1
0.5
0.125
-0.025
0.0
0.0
0.0
0.5
0.25
-0.09999999999999999
0.1
0.0
0.0
0.0
-0.25
0.0
0.0
0.5333333333333333
0.0
0.4
-0.125
-0.25
0.0
0.05000000000000002
0.0
-0.25
0.041666666666666664
0.0
0.25
0.0
0.0
0.0
0.0
0.1875
-0.10833333333333334
0.0
0.0
-0.4
-0.4
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.2
0.35
-0.075
0.0
0.10000000000000002
0.0
0.0
0.0
-0.012499999999999997
0.0
0.0
0.0
0.1
0.03333333333333333
0.0
0.125
-0.3333333333333333
0.6
0.2
0.0
0.0
0.0
0.0
0.0
1.0
0.2
0.375
0.0
0.25
0.6
0.0
0.2
-0.05
0.08333333333333333
0.7
-0.05
0.0
0.0
-0.3
0.0
0.21428571428571427
0.35
0.0
0.6
0.0
0.0
0.0
0.0
-0.30000000000000004
0.0
0.0
0.0
0.21428571428571427
0.0
0.0
0.0
0.0
0.0
0.0
-0.5
0.0
0.0
0.0
-0.8
0.0
0.0
0.0
0.1
0.0
0.375
-0.033333333333333326
0.0
-0.0125
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.25
0.0
0.0
0.0
1.0
0.25
0.25
0.0
0.0
0.0
0.0
0.0
0.0625
0.16666666666666669
0.10000000000000002
0.0
0.0
0.05
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.1
0.25
0.25
-0.5
0.0
0.17857142857142858
0.0
-0.7
0

 Finally we will build out main function that puts the whole process together

 Let's build a hard task, like having to download and process multiple poems

In [8]:
# duplicates a list named urls to make it larger by appending its own contents multiple times

urls = ['https://raw.githubusercontent.com/okfn/openmilton/master/miltondata/texts/poems.txt']

#Duplicating the list to make it larger
for _ in range(0,3):
    urls += urls

print(len(urls))

#prints the final length of the urls list,i.e.,the original length multiplied by 2 raised to the power of 3 (i.e., multiplied by 8 in total).

8


Now let's test how long it takes to process the sentiment for each line of our poems dataset

We can use TQDM to show us the progress of any for-loop operation


In [9]:
#Serial Processing
scores = []

for urls in tqdm(urls, position=0 ): #position=0 forces the bars into the same line when printing
    scores += process_poems(url)
    #print(scores)


100%|██████████| 8/8 [00:06<00:00,  1.16it/s]


In [17]:
pool = ProcessPool(nodes=3)

In [18]:
#parallel processing
score=[]
for result in tqdm(pool.uimap(process_poem, urls), total=len(urls), position=0):
    score.extend(result)
    #result.append(score)


100%|██████████| 8/8 [00:00<00:00, 42.15it/s]


# Another example of parallel processing

In [1]:
import numpy as np
import time


In [2]:
import multiprocessing
import time

def sleepy_man():
    print('Starting to sleep')
    time.sleep(1)
    print('Done sleeping')


In [3]:
# serial processing
tic = time.time()
sleepy_man()
sleepy_man()
toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

Starting to sleep
Done sleeping
Starting to sleep
Done sleeping
Done in 2.0033 seconds


In [4]:
def sleepy_man():
    print('Starting to sleep')
    time.sleep(1)
    print('Done sleeping')


In [5]:
tic = time.time()
p1 =  multiprocessing.Process(target= sleepy_man)
p2 =  multiprocessing.Process(target= sleepy_man)
p1.start()
p2.start()
toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

Done in 0.0080 seconds
Starting to sleep
Starting to sleep
Done sleeping
Done sleeping


In [6]:
tic = time.time()
p1 =  multiprocessing.Process(target= sleepy_man)
p2 =  multiprocessing.Process(target= sleepy_man)
p1.start()
p2.start()
p1.join()
p2.join()
toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

#The start() method is called on both p1 and p2 to start their execution. 
#Then the join() method is called on both processes, 
#which means the program will wait for both processes to complete before proceeding to the next line of code.

Starting to sleep
Starting to sleep
Done sleeping
Done sleeping
Done in 1.0492 seconds


The join() method is called immediately after starting each process (p1 and p2). This means that the program will wait for each process to finish before proceeding to the next line of code. As a result, the execution time measured (toc - tic) will include the time taken by both processes to complete.

If the join() method calls are not present then the program will not wait for the processes to finish before proceeding to the next line of code. As a result, the execution time measured (toc - tic) will only include the time it took to start the processes, but not their actual execution time. The program will continue to execute and may exit before the processes have completed their work.

In summary, the second code ensures that the program waits for both processes to finish before measuring the total execution time, while the first code measures only the time it took to start the processes but not their execution time.

In [7]:
tic = time.time()

process_list = []
for i in range(10):
    p =  multiprocessing.Process(target= sleepy_man)
    p.start()
    process_list.append(p)

for process in process_list:
    process.join()

toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

#This code creates and starts 10 processes running the sleepy_man function concurrently. 
#It ensures that the program waits for all the processes to finish before measuring the total execution time.

Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep

Starting to sleepStarting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Done sleeping
Done sleepingDone sleeping

Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done in 1.1059 seconds


In [8]:
def sleepy_man(sec):
    print('Starting to sleep')
    time.sleep(sec)
    print('Done sleeping')

import multiprocessing
import time

def sleepy_man(sec):
    print('Starting to sleep')
    time.sleep(sec)
    print('Done sleeping')

tic = time.time()

process_list = []
for i in range(10):
    p =  multiprocessing.Process(target= sleepy_man, args = [2])
    p.start()
    process_list.append(p)
    
#The loop iterates 10 times and creates a new process p for each iteration 
#using the sleepy_man target function and passing 2 as the sec argument.    

for process in process_list:
    process.join()

toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

#here the only difference is that I can give any time in secs as argument

Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleepingDone sleeping

Done in 2.0596 seconds


In [9]:
def sleepy_man(sec):
    print('Starting to sleep for {} seconds'.format(sec))
    time.sleep(sec)
    print('Done sleeping for {} seconds'.format(sec))

tic = time.time()

pool = multiprocessing.Pool(5)      #using multiple process here (5)
pool.map(sleepy_man, range(1, 11))
pool.close()     #indicate that no more tasks will be submitted to the pool.

toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

# The pool allows for parallel execution of the sleepy_man function on multiple inputs.
#The pool.map() function distributes the workload across the worker processes in the pool, 
#with each process handling a different value from the range. The order of execution is not guaranteed.

Starting to sleep for 3 secondsStarting to sleep for 5 secondsStarting to sleep for 2 secondsStarting to sleep for 4 secondsStarting to sleep for 1 seconds




Done sleeping for 1 seconds
Starting to sleep for 6 seconds
Done sleeping for 2 seconds
Starting to sleep for 7 seconds
Done sleeping for 3 seconds
Starting to sleep for 8 seconds
Done sleeping for 4 seconds
Starting to sleep for 9 seconds
Done sleeping for 5 seconds
Starting to sleep for 10 seconds
Done sleeping for 6 seconds
Done sleeping for 7 seconds
Done sleeping for 8 seconds
Done sleeping for 9 seconds
Done sleeping for 10 seconds
Done in 15.0553 seconds


## perfect numbers 

In [10]:
#calculates and prints all the perfect numbers within the range of 1 to 100,000.
def is_perfect(n):
    sum_factors = 0
    for i in range(1, n):
        if (n % i == 0):
            sum_factors = sum_factors + i
    if (sum_factors == n):
        print('{} is a Perfect number'.format(n))

tic = time.time()
for n in range(1,100000):
    is_perfect(n)
toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

6 is a Perfect number
28 is a Perfect number
496 is a Perfect number
8128 is a Perfect number
Done in 149.0317 seconds


In [11]:
def is_perfect(n):
    sum_factors = 0
    for i in range(1, n):
        if(n % i == 0):
            sum_factors = sum_factors + i
    if (sum_factors == n):
        print('{} is a Perfect number'.format(n))

tic = time.time()

processes = []
for i in range(1,1000):
    p = multiprocessing.Process(target=is_perfect, args=(i,))
    processes.append(p)
    p.start()

for process in processes:
    process.join()

toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))

6 is a Perfect number
28 is a Perfect number
496 is a Perfect number
Done in 3.5687 seconds


In [12]:
def is_perfect(n):
    sum_factors = 0
    for i in range(1, n):
        if(n % i == 0):
            sum_factors = sum_factors + i
    if (sum_factors == n):
        print('{} is a Perfect number'.format(n))

tic = time.time()
pool = multiprocessing.Pool()
pool.map(is_perfect, range(1,10000))
pool.close()
toc = time.time()

print('Done in {:.4f} seconds'.format(toc-tic))

6 is a Perfect number
28 is a Perfect number496 is a Perfect number

8128 is a Perfect number
Done in 0.5788 seconds
