<a href="https://colab.research.google.com/github/SachinScaler/Aug23AdvancedPython/blob/main/Problem_Solving_3_Concurrent_Programming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

class MyModule(nn.Module):

    # Initialize the parameter
    def __init__(self, num_inputs, num_outputs, hidden_size):
        super(MyModule, self).__init__() # old method of calling  # super().__init__()
        self.linear1 = nn.Linear(num_inputs, hidden_size)
        self.linear2 = nn.Linear(hidden_size, num_outputs)

    # Forward pass
    def forward(self, input):
        lin    = self.linear1(input)
        output = nn.functional.relu(lin)
        pred   = self.linear2(output)
        return pred

m1 = MyModule(3,4,4)
#m1(data)# __call__

### Concurrent Programming:
- Running Multiple tasks at same time
- Speed and Efficiency


In [1]:
import time

def my_func(*args):
    time.sleep(args[0])
    print(f"Slept for {args[0]} seconds")

s = time.monotonic()
my_func(3)
my_func(5)
e = time.monotonic()
print(f"Total Time {e-s:.2f}")

Slept for 3 seconds
Slept for 5 seconds
Total Time 8.01


Multi-threading
- A thread is a lightweight process and shares memory with parent(main) Process
- Thread creation is very simple:
Example

In [2]:
import threading

def my_func(*args):
    time.sleep(args[0])
    print(f"Slept for {args[0]} seconds")

t1 = threading.Thread(target = my_func,args = (5,))
t2 = threading.Thread(target = my_func,args = (3,))

# start thread
s = time.monotonic()
t1.start()
t2.start()

# wait for both threads to finish
t1.join()
t2.join()

e = time.monotonic()
print(f"Total Time {e-s:.2f}")

Slept for 3 seconds
Slept for 5 seconds
Total Time 5.01


### Example:

In [4]:
import threading
import requests

dict1 = {}
def scrape_url(url):
    print(f"Scraping URL {url}")
    response = requests.get(url) # io bound: network
    dict1[url]=(response, response.text)
urls = ['http://google.com',"http://linkedin.com", "http://facebook.com","http://example.net"]
scrape_url(urls[0])
print(dict1)


list_t = []
for url in urls:
    t = threading.Thread(target = scrape_url,args = (url,))
    list_t.append(t)
    t.start()

# just waiting for all threads to complete
for t in list_t:
    t.join()

print(dict1)

Scraping URL http://google.com
{'http://google.com': (<Response [200]>, '<!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content="Search the world\'s information, including webpages, images, videos and more. Google has many special features to help you find exactly what you\'re looking for." name="description"><meta content="noodp" name="robots"><meta content="text/html; charset=UTF-8" http-equiv="Content-Type"><meta content="/images/branding/googleg/1x/googleg_standard_color_128dp.png" itemprop="image"><title>Google</title><script nonce="Hy5Ojo710npXrItWQDrazQ">(function(){var _g={kEI:\'vmwMZcKQL-jgkPIP8NGe6A4\',kEXPI:\'0,18168,1342950,4350,206,4804,2316,383,246,5,1129120,1197786,615,380089,16115,28684,22431,1361,12313,2821,1930,12835,4998,15065,2010,38444,2872,2891,3926,4423,3405,606,30668,27619,2403,15324,2025,1,16916,2652,4,32894,26723,2980,1457,22583,6654,7596,1,42160,2,16389,342,23024,5679,1021,31121,4569,6258,23418,1252,30151,2913,2,2

In [6]:
for k,v in dict1.items():
    print(k,v)

http://google.com (<Response [200]>, '<!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content="Search the world\'s information, including webpages, images, videos and more. Google has many special features to help you find exactly what you\'re looking for." name="description"><meta content="noodp" name="robots"><meta content="text/html; charset=UTF-8" http-equiv="Content-Type"><meta content="/images/branding/googleg/1x/googleg_standard_color_128dp.png" itemprop="image"><title>Google</title><script nonce="8aPSg86uWczUlraoWgd_0A">(function(){var _g={kEI:\'vmwMZf6iNMbEkPIP-8q0oAk\',kEXPI:\'0,793110,572357,207,4804,2316,383,246,5,1129120,1197761,380730,16114,28684,22431,1361,12311,17588,4998,17075,38444,887,1985,2891,3926,213,4209,3406,606,30668,27619,2402,15325,2025,1,16916,2652,4,32894,26723,2980,24070,6627,7593,1,42154,2,16395,342,23024,5679,1020,31123,4567,6256,23419,1254,33064,2,2,1,26632,8155,8861,14490,873,9625,10008,8,1921,9779,42459,201

### Multi Processing
- A process is Heavyweight
- Process have thier memory space and CPU Resources
- Create Process:

In [7]:
import multiprocessing

print(multiprocessing.cpu_count())

2


In [8]:
import multiprocessing

def worker(n):
    print(f"Worker {n} Started")
    print("Do Some Cpu Intensive")
    print(f"Worker {n} Finished")

p1 = multiprocessing.Process(target= worker, args=(1,))
p2 = multiprocessing.Process(target= worker, args=(2,))

# start the process
p1.start()
p2.start()

# wait for both process to finishes
p1.join()
p2.join()

print("All Workers finished")


Worker 1 Started
Worker 2 StartedDo Some Cpu Intensive

Do Some Cpu IntensiveWorker 1 Finished

Worker 2 Finished
All Workers finished


In [9]:
import threading
import requests

dict1 = {}
def scrape_url(url):
    print(f"Scraping URL {url}")
    response = requests.get(url) # io bound: network
    dict1[url]=(response, response.text)
urls = ['http://google.com',"http://linkedin.com", "http://facebook.com","http://example.net"]
scrape_url(urls[0])
print(dict1)


list_t = []
for url in urls:
    t = multiprocessing.Process(target = scrape_url,args = (url,))
    list_t.append(t)
    t.start()

# just waiting for all threads to complete
for t in list_t:
    t.join()

print(dict1)

Scraping URL http://google.com
{'http://google.com': (<Response [200]>, '<!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content="Search the world\'s information, including webpages, images, videos and more. Google has many special features to help you find exactly what you\'re looking for." name="description"><meta content="noodp" name="robots"><meta content="text/html; charset=UTF-8" http-equiv="Content-Type"><meta content="/images/branding/googleg/1x/googleg_standard_color_128dp.png" itemprop="image"><title>Google</title><script nonce="0S9rEuhypxKj39GvQnNLOw">(function(){var _g={kEI:\'V3YMZZPeF_3xkPIPjJyG4AY\',kEXPI:\'0,1365468,206,4804,2316,383,246,5,1129120,1806,1195947,637,380101,16114,28684,22430,1362,283,12036,4745,12835,4998,17075,38444,889,1983,2891,3926,7828,606,30668,19390,8228,2404,15324,781,1244,1,16916,2652,4,32894,26723,2980,1457,22589,6648,7596,1,42154,2,16395,342,21266,1758,5679,1020,14866,14084,2173,4567,6256,23421,1252,33

In [10]:
print(len(dict1))

1


# Explore queue, pipe

### Example:

In [13]:
def is_prime(n):
    """Check if a number is prime."""
    if n <= 1:
        return False
    elif n <= 3:
        return True
    elif n % 2 == 0 or n % 3 == 0:
        return False
    i = 5
    while i * i <= n:
        if n % i == 0 or n % (i + 2) == 0:
            return False
        i += 6
    return True

def find_primes(numbers):
    """Find all prime numbers in a list."""
    primes = []
    for number in numbers:
        if is_prime(number):
            primes.append(number)
    return primes

# find all prime in below range
numbers = list(range(100_000_000, 101_000_001))
print(len(numbers))

1000001


In [14]:
processes = 4
chunk_size = len(numbers)//processes
chunks = [numbers[i:i+chunk_size] for i in range(0, len(numbers),chunk_size )]
print(len(chunks))

5


In [16]:
# pool: makes creating pricess super easy
pool = multiprocessing.Pool(processes=processes)


s = time.monotonic()
results = pool.map(find_primes, chunks)


all_primes = []
for result in results:
    all_primes +=result

pool.close()
pool.join()
e = time.monotonic()
print(f"Total Time {e-s:.2f}")

Total Time 31.97


In [18]:
print(
    f"Found {len(all_primes)} prime numbers "
    f"between {numbers[0]} and {numbers[-1]} "
    f"in {(e - s):.2f} seconds."
)

Found 54208 prime numbers between 100000000 and 101000000 in 31.97 seconds.


### Concurrent.futures:
Can be used to spawn threads as well as process.

https://docs.python.org/3/library/concurrent.futures.html

In [32]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def func_b():
    time.sleep(5)
    return 5

def func_a():
    time.sleep(5)
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(func_a)
b = executor.submit(func_b)
print(a,b)

<Future at 0x7de3e870e470 state=running> <Future at 0x7de3e870d090 state=running>


In [33]:
print(a)

<Future at 0x7de3e870e470 state=running>


In [26]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# deadlock example
def wait_on_b():

    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

- Coroutines
- async functions
- threads: acquire and release lock, Deadlock Scenario

In [21]:
[1]+[2]

[1, 2]