# MultiThreading

In [2]:
# Simple thread
import threading
import time

def print_number() -> None:
    for i in range(5):
        print(f"Thread: {i}")
        time.sleep(1)

Thread=threading.Thread(target=print_number)
Thread.start()
print("Main thread is running...")

Thread.join()
print("Thread Execution finished")



Thread: 0Main thread is running...

Thread: 1
Thread: 2
Thread: 3
Thread: 4
Thread Execution finished


In [None]:
# Multiple Threads

def worker(task_id: int) -> None:
    """Worker function that prints task ID."""
    print(f"Task {task_id} started")
    time.sleep(1)
    print(f"Task {task_id} completed")

# Creating multiple threads
threads = []
for i in range(5):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print("All tasks completed.")


Task 0 started
Task 1 started
Task 2 started
Task 3 started
Task 4 started
Task 0 completed
Task 1 completed
Task 2 completed
Task 3 completed
Task 4 completed
All tasks completed.


In [None]:
# Using ThreadPool Executor

from concurrent.futures import ThreadPoolExecutor

def task(n: int) -> str:
    """A sample task that takes time to execute."""
    time.sleep(1)
    return f"Task {n+1} completed"

# Using ThreadPoolExecutor for thread management
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(task, i) for i in range(10)]
    for future in futures:
        print(future.result())


Task 1 completed
Task 2 completed
Task 3 completed
Task 4 completed
Task 5 completed
Task 6 completed
Task 7 completed
Task 8 completed
Task 9 completed
Task 10 completed


In [3]:
# Data fetching using Multi-Threading

import requests
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict
import pandas as pd
from IPython.display import display

def fetch_url(url: str) -> Dict[str, str]:
    """Fetches the status code and response time for a URL."""
    start_time = time.time()
    response = requests.get(url, timeout=5)
    end_time = time.time()
    return {
        "URL": url,
        "Status Code": response.status_code,
        "Response Time (s)": round(end_time - start_time, 2)
    }

def threaded_scraper(urls: List[str]) -> pd.DataFrame:
    """Fetches multiple URLs concurrently and returns results as a DataFrame."""
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(fetch_url, urls))
    return pd.DataFrame(results)

urls_list = [
    "https://www.amrita.edu/campus/amritapuri/",
    "https://www.github.com",
    "https://www.stackoverflow.com",
    "https://www.reddit.com",
    "https://www.wikipedia.org"
]

df_results = threaded_scraper(urls_list)
display(df_results)


Unnamed: 0,URL,Status Code,Response Time (s)
0,https://www.amrita.edu/campus/amritapuri/,200,0.55
1,https://www.github.com,200,0.41
2,https://www.stackoverflow.com,200,1.9
3,https://www.reddit.com,403,0.09
4,https://www.wikipedia.org,200,0.19


In [6]:
# I/O bound Stock Price Fetcher

import random
from typing import List


def fetch_stock_price(stock: str) -> None:
    """Fetches a simulated stock price."""
    while True:
        price = round(random.uniform(100, 500), 2)
        print(f"{stock}: ${price}")
        time.sleep(1)


def main() -> None:
    """Fetches multiple stock prices concurrently using threads."""
    stocks: List[str] = ["AAPL", "GOOG", "AMZN", "MSFT", "TSLA"]

    threads: List[threading.Thread] = []
    for stock in stocks:
        thread = threading.Thread(target=fetch_stock_price, args=(stock,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()


if __name__ == "__main__":
    main()


AAPL: $451.2
GOOG: $194.68
AMZN: $148.77
MSFT: $415.58
TSLA: $299.51
AAPL: $196.11
GOOG: $192.58
AMZN: $257.7
MSFT: $410.37
TSLA: $228.4
AAPL: $483.65GOOG: $203.32

AMZN: $385.91
MSFT: $147.49
TSLA: $488.99
GOOG: $158.38
AAPL: $401.04
AMZN: $473.11
MSFT: $346.26
TSLA: $265.0
GOOG: $292.15AAPL: $456.5
AMZN: $192.92

TSLA: $352.43
MSFT: $177.59
AAPL: $250.07GOOG: $131.95
AMZN: $106.61

MSFT: $467.71
TSLA: $243.18
AAPL: $402.25GOOG: $453.42
AMZN: $156.35
MSFT: $354.9
TSLA: $288.2

GOOG: $487.48
AMZN: $292.69
MSFT: $232.99
TSLA: $379.69
AAPL: $139.49
GOOG: $310.81
AMZN: $481.2
MSFT: $236.06
TSLA: $249.96
AAPL: $249.96
GOOG: $156.38
AMZN: $485.37
MSFT: $327.94
TSLA: $376.02
AAPL: $486.31
GOOG: $248.0
AMZN: $237.07
MSFT: $332.75
TSLA: $307.81
AAPL: $434.88
GOOG: $388.35
TSLA: $305.69
MSFT: $373.66
AAPL: $252.96
AMZN: $288.43
GOOG: $338.34
AMZN: $495.86
AAPL: $112.62
MSFT: $268.34
TSLA: $428.24
GOOG: $441.29
AMZN: $221.35
TSLA: $375.53
AAPL: $326.81
MSFT: $280.62
GOOG: $334.99
TSLA: $134.01
A

KeyboardInterrupt: 

# Multi-Processing

In [None]:
# Simple Process

import multiprocessing

def worker():
    print("Worker process started")
    time.sleep(2)
    print("Worker process finished")

# Process Creation
process = multiprocessing.Process(target=worker)
process.start()

# Main process execution
print("Main process running...")

# Waiting for the process to finish
process.join()
print("Worker process completed.")


Worker process started
Main process running...
Worker process finished
Worker process completed.


In [None]:
# Multiple Processes running

def worker(task_id: int) -> None:
    """Function executed in a separate process."""
    print(f"Process {task_id} started")
    time.sleep(1)
    print(f"Process {task_id} completed")

# Creating multiple processes
processes = []
for i in range(5):
    process = multiprocessing.Process(target=worker, args=(i,))
    processes.append(process)
    process.start()

# Waiting for all processes to complete
for process in processes:
    process.join()

print("All processes completed.")


Process 0 started
Process 1 startedProcess 2 started

Process 3 startedProcess 4 started

Process 0 completed
Process 1 completed
Process 2 completed
Process 4 completedProcess 3 completed

All processes completed.


In [10]:
# Parallel Computation

from multiprocessing import Array, Value, Lock
from typing import List


def square_list(mylist: List[int], result: Array, square_sum: Value, lock: Lock) -> None:
    """
    Computes the square of each element in the given list and stores the results
    in a shared memory array. Also calculates and stores the sum of squares.
    """
    with lock:
        for idx, num in enumerate(mylist):
            result[idx] = num * num


        square_sum.value = sum(result)

    print(f"Result (in process p1): {list(result)}")
    print(f"Sum of squares (in process p1): {square_sum.value}")


if __name__ == "__main__":

    mylist: List[int] = [1, 2, 3, 4, 5, 6]

    result: Array = multiprocessing.Array('i', len(mylist))

    square_sum: Value = multiprocessing.Value('i')

    lock = multiprocessing.Lock()

    p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum, lock))
    p1.start()

    p1.join()

    print(f"Result (in main program): {list(result)}")
    print(f"Sum of squares (in main program): {square_sum.value}")


Result (in process p1): [1, 4, 9, 16, 25, 36]
Sum of squares (in process p1): 91
Result (in main program): [1, 4, 9, 16, 25, 36]
Sum of squares (in main program): 91


In [9]:
# Pipes

"""
Inter-Process Communication using Multiprocessing Pipe in Python
"""

import multiprocessing
from multiprocessing.connection import Connection
from typing import List


def sender(conn: Connection, msgs: List[str]) -> None:
    """
    Sends messages through a multiprocessing Pipe.
    """
    for msg in msgs:
        conn.send(msg)
        print(f"Sent the message: {msg}")
    conn.close()


def receiver(conn: Connection) -> None:
    """
    Receives and prints messages from a multiprocessing Pipe.
    """
    while True:
        msg = conn.recv()
        if msg == "END":
            break
        print(f"Received the message: {msg}")


if __name__ == "__main__":

    msgs: List[str] = ["hello", "hey", "hru?", "END"]

    parent_conn, child_conn = multiprocessing.Pipe()

    p1 = multiprocessing.Process(target=sender, args=(parent_conn, msgs))
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()


Sent the message: hello
Received the message: helloSent the message: hey

Sent the message: hru?Received the message: hey

Sent the message: ENDReceived the message: hru?



In [None]:
# Using ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import time
import os

def compute(n: int) -> int:
    """Simulate a CPU-intensive task."""
    try:
        time.sleep(1)
        return n * n
    except Exception as e:
        print(f"Error in process {os.getpid()}: {e}")
        return -1

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=3) as executor:
        results = executor.map(compute, range(5))

    for result in results:
        print(f"Computed result: {result}")


Computed result: 0
Computed result: 1
Computed result: 4
Computed result: 9
Computed result: 16


In [None]:
# Large Dataset processing using Multi-Processing

import multiprocessing
import numpy as np
from typing import List
import pandas as pd
from IPython.display import display

def compute_square(numbers: List[int]) -> List[int]:
    """Computes the square of each number."""
    return [n ** 2 for n in numbers]

data = np.random.randint(1, 1000, size=(1000000,)).tolist()
num_chunks = 4
chunk_size = len(data) // num_chunks
chunks = [data[i * chunk_size:(i + 1) * chunk_size] for i in range(num_chunks)]

with multiprocessing.Pool(processes=num_chunks) as pool:
    results = pool.map(compute_square, chunks)

final_result = [item for sublist in results for item in sublist]  # Flatten the list

df_numbers = pd.DataFrame({"Original": data[:100], "Squared": final_result[:100]})  # Display sample

# Display DataFrame
display(df_numbers)


Unnamed: 0,Original,Squared
0,661,436921
1,718,515524
2,407,165649
3,521,271441
4,684,467856
...,...,...
95,307,94249
96,22,484
97,51,2601
98,639,408321


In [16]:
# Parallel Matrix Multiplication using MultiProcessing

import numpy as np
from typing import Tuple


def multiply_matrices(matrix_pair: Tuple[np.ndarray, np.ndarray]) -> np.ndarray:
    """Multiplies two matrices and returns the result."""
    result = np.dot(matrix_pair[0], matrix_pair[1])
    return result


def main() -> None:
    """Performs matrix multiplications in parallel using multiprocessing."""
    matrix_size = 500
    matrices = [(np.random.rand(matrix_size, matrix_size), np.random.rand(matrix_size, matrix_size)) for _ in range(4)]

    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(multiply_matrices, matrices)
    print(matrices)
    print("Matrix multiplications completed!")


if __name__ == "__main__":
    main()


[(array([[0.00947059, 0.44952426, 0.07270465, ..., 0.65719253, 0.58796879,
        0.5369756 ],
       [0.55604966, 0.73506827, 0.39608992, ..., 0.13041695, 0.82334157,
        0.8709132 ],
       [0.4313739 , 0.84368145, 0.21744749, ..., 0.31109976, 0.42925803,
        0.34584949],
       ...,
       [0.38126616, 0.8279023 , 0.15682749, ..., 0.06828662, 0.39980329,
        0.3588295 ],
       [0.37283848, 0.72144981, 0.95697941, ..., 0.05682561, 0.5206711 ,
        0.39648392],
       [0.91675098, 0.21097322, 0.66057149, ..., 0.11623475, 0.0328943 ,
        0.302729  ]]), array([[0.69452804, 0.98258903, 0.21188427, ..., 0.4174917 , 0.4506397 ,
        0.4070561 ],
       [0.45540656, 0.2465734 , 0.77011173, ..., 0.86552761, 0.58642087,
        0.3186346 ],
       [0.96014363, 0.93229906, 0.3562054 , ..., 0.68716243, 0.93957157,
        0.35837471],
       ...,
       [0.6010581 , 0.41761139, 0.20395978, ..., 0.12201246, 0.87006648,
        0.14929756],
       [0.34559551, 0.00831616, 

# Async Processing

In [None]:
# Running a async function

import asyncio

async def say_hello():
    await asyncio.sleep(1)
    print("Hello, Async!")

asyncio.run(say_hello())


Hello, Async!


In [None]:
# Running Multiple Coroutines

async def task(n: int) -> str:
    """Simulate an async task."""
    await asyncio.sleep(1)
    return f"Task {n} completed"

async def main():
    tasks = [task(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)

asyncio.run(main())


Task 0 completed
Task 1 completed
Task 2 completed
Task 3 completed
Task 4 completed


In [None]:
# Using asyncio with HTTP Requests
import asyncio
import aiohttp

async def fetch(url: str) -> str:
    """Fetch data from a URL asynchronously."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        "https://www.google.com",
        "https://www.python.org",
        "https://www.github.com"
    ]
    tasks = [fetch(url) for url in urls]
    responses = await asyncio.gather(*tasks)

    for url, response in zip(urls, responses):
        print(f"Fetched {len(response)} characters from {url}")

asyncio.run(main())


Fetched 19119 characters from https://www.google.com
Fetched 50261 characters from https://www.python.org
Fetched 280685 characters from https://www.github.com


In [None]:
# Async Web Scraper with AIOHTTP
from typing import List, Dict
import nest_asyncio

nest_asyncio.apply()

async def fetch(url: str, session: aiohttp.ClientSession) -> Dict[str, str]:
    """Fetches the status code and response time of a URL asynchronously."""
    async with session.get(url) as response:
        return {"URL": url, "Status Code": response.status, "Content Length": len(await response.text())}

async def async_scraper(urls: List[str]) -> pd.DataFrame:
    """Scrapes multiple URLs asynchronously using aiohttp."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(url, session) for url in urls]
        responses = await asyncio.gather(*tasks)
        return pd.DataFrame(responses)

urls_list = [
    "https://www.python.org",
    "https://www.github.com",
    "https://www.stackoverflow.com",
    "https://www.reddit.com",
    "https://www.wikipedia.org"
]

df_async_results = await async_scraper(urls_list)

# Display DataFrame
df_async_results


Unnamed: 0,URL,Status Code,Content Length
0,https://www.python.org,200,50261
1,https://www.github.com,200,280706
2,https://www.stackoverflow.com,200,218628
3,https://www.reddit.com,403,190238
4,https://www.wikipedia.org,200,87449
