In [None]:
def factorize(number):
    for i in range(1, number+1):
        if number % i == 0:
            yield i

In [None]:
import time
numbers = [2139079, 1214759, 1516637, 1852285]
start = time.time()
for number in numbers:
    list(factorize(number))
print(time.time() - start)

In [None]:
from threading import Thread

class FactThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.factors = None

    def run(self):
        self.factors = list(factorize(self.number))

start = time.time()
threads = []

for n in numbers:
    thread = FactThread(n)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

print(time.time() - start)

In [None]:
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server_address = ('localhost', 10001)
sock.bind(server_address)

sock.listen(5)

inputs = [sock]

while True:
    readable, writable, exceptional = select.select(inputs, [], [])

    print(readable)
    for s in readable:
        if s is sock:
            connection, client_address = s.accept()
            inputs.append(connection)
        else:
            data = s.recv(100)
            if data:
                # print(f'recieved {data} from {s.getpeername()}')
                pass
            else:
                # connection closed
                inputs.remove(s)
                s.close()


In [None]:
import select, time
import socket
from threading import Thread

def slow_systemcall():
    select.select([socket.socket()], [], [], 0.1)

start = time.time()

for _ in range(5):
    slow_systemcall()
    compute(_)

end = time.time()
print(end - start)

In [None]:
start = time.time()

threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

def compute(index):
    return index * 10000

for i in range(5):
    compute(i)

for thread in threads:
    thread.join()
print(f"Took {time.time() - start}")

In [None]:
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self, offset):
        self.count += offset

def worker(sensor_index, how_many, counter:Counter):
    for _ in range(how_many):
        time.sleep(0)
        counter.increment(1)

from threading import Thread
counter = Counter()

how_many = 10 ** 5
threads = []

for i in range(5):
    thread = Thread(target=worker, args=(i, how_many, counter))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Should be {how_many * 5} expected {counter.count}")


In [None]:
import threading

# a shared resource
counter = 0

# a lock to protect the shared resource
lock = threading.Lock()

# a function that performs an I/O-bound task
def do_io():
    global counter
    # with lock:
        # access the shared resource in a thread-safe manner
    counter += 1
    print('Thread', threading.current_thread().name, 'finished I/O-bound task')

# create and start multiple threads
threads = []
start = time.time()

for i in range(10):
    time.sleep(1)
    t = threading.Thread(target=do_io, name='Thread {}'.format(i))
    threads.append(t)
    t.start()

# wait for all threads to finish
for t in threads:
    t.join()

# print the final value of the shared resource
print('Final counter value:', counter)
print(time.time() - start)


In [None]:
import sys
counter = 0

lock = threading.Lock()

def increment():
    # global counter
    for i in range(100000):
        # with lock:
        main = sys.modules[__name__]
        value = getattr(main, 'counter')
        result =value + 1
        setattr(main, 'counter', result)
t1 = threading.Thread(target=increment, name='T1')
t2 = threading.Thread(target=increment, name='T2')

t1.start()
t2.start()

t1.join()
t2.join()

print(counter)


In [None]:
from collections import deque
from queue import Queue
from threading import Thread, Lock
import time
from dataclasses import dataclass, field

@dataclass
class MyQueue:
    items: deque = field(default_factory=deque, compare=False)
    lock: Lock = Lock()

    def put(self, item):
        with self.lock:
            self.items.append(item)

    def get(self):
        with self.lock:
            return self.items.popleft()

    def __len__(self):
        return self.items.__len__()

class Wroker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

    def run(self) -> None:
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.1)
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

def download(item):
    time.sleep(1)
    print("Downloading ... ", item)
    return item

def resize(item):
    time.sleep(1)
    print("Resizing ... ", item)
    return item

def upload(item):
    time.sleep(1)
    print("Uploading ... ", item)
    return item


In [None]:
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()

threads = [
    Wroker(download, download_queue, resize_queue),
    Wroker(resize, resize_queue, upload_queue),
    Wroker(upload, upload_queue, done_queue)
]

for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())

print("Outside", len(download_queue), len(resize_queue), len(done_queue))

while len(done_queue) < 100:
    print("Inside", len(download_queue), len(resize_queue), len(done_queue))
    time.sleep(1)

print("Done", len(download_queue), len(resize_queue), len(done_queue))


In [None]:
[ t.polled_count for t in threads]

In [None]:
from queue import Queue

my_queue = Queue()

def consumer():
    print("Consumer waiting")
    my_queue.get()
    print("Consumer Done")

thread = Thread(target=consumer)
thread.start()
my_queue.put(10)
thread.join()

In [None]:
my_queue = Queue(1)

def consumer():
    # time.sleep(0.1)
    my_queue.get()
    print("Consumer got 1")
    my_queue.get()
    print("Consumer got 2")
    print("Consumer Done")

thread = Thread(target=consumer)
thread.start()

In [None]:
my_queue.put(10)
print("Producer put 1")
my_queue.put(10)
print("Producer put 2")
print("Producer Done")
thread.join()

In [None]:
from queue import Queue
class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self) -> None:
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

download_queue = ClosableQueue(maxsize=1)
resize_queue = ClosableQueue(maxsize=1)
upload_queue = ClosableQueue(maxsize=1)
done_queue = ClosableQueue()

threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue)
]

for thread in threads:
    thread.start()

for _ in range(10):
    download_queue.put(_)

download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()

In [None]:
import threading

lock = threading.RLock()

def bar():
    with lock:
        print("Acquired Lock Again")


def foo():
    with lock:
        print("Lock Acquired")
        bar()

foo()

In [None]:
def start_thread(count, *args):
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads

def stop_threads(closable_queue, threads):
    for _ in threads:
        print('closable_queue', type(closable_queue))
        closable_queue.close()

    closable_queue.join()

    for thread in threads:
        thread.join()

In [None]:
download_queue = ClosableQueue(maxsize=1)
resize_queue = ClosableQueue(maxsize=1)
upload_queue = ClosableQueue(maxsize=1)
done_queue = ClosableQueue()

download_threads = start_thread(3, download, download_queue, resize_queue)
# resize_threads = start_thread(4, resize, resize_queue, upload_queue)
# upload_threads = start_thread(5, upload, upload_queue, done_queue)

for _ in range(10):
    download_queue.put(_)

# stop_threads(download_queue, download_threads)
# stop_threads(resize_queue, resize_threads)
# stop_threads(upload_queue, upload_threads)

print(done_queue.qsize(), 'items finished')


In [None]:
from threading import Lock

ALIVE = "*"
EMPTY = "_"

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []

        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        response = ""
        for row in self.rows:
            response += (" ".join(row) + "\n")
        return response


class LockingGrid(Grid):
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()

    def get(self, y, x):
        with self.lock:
            return super().get(y, x)

    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)

In [None]:
grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)

In [None]:
import asyncio

In [None]:
def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)
    ne = get(y - 1, x + 1)
    e_ = get(y + 0, x + 1)
    se = get(y + 1, x + 1)
    s_ = get(y + 1, x + 0)
    sw = get(y + 1, x - 1)
    w_ = get(y + 0, x - 1)
    nw = get(y - 1, x - 1)

    neighbour_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbour_states:
        if state == ALIVE:
            count += 1
    return count

async def game_logic(state, neighbours):
    await asyncio.sleep(0.1)

    if state == ALIVE:
        if neighbours < 2:
            return EMPTY
        elif neighbours > 3:
            return EMPTY
    else:
        if neighbours == 3:
            return ALIVE
    return state


In [None]:
async def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = await game_logic(state, neighbors)
    set(y, x, next_state)

async def simulate(grid: Grid):
    next_grid = Grid(grid.height, grid.width)
    tasks = []
    for y in range(grid.height):
        for x in range(grid.width):
            task = step_cell(y, x, grid.get, grid.set)
            tasks.append(task)
    await asyncio.gather(*tasks)
    return next_grid

from threading import Thread

def simulate_threaded(grid):
    next_grid = LockingGrid(grid.height, grid.width)

    threads = []

    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            thread = Thread(target=step_cell, args=args)
            thread.start()
            threads.append(thread)

    for thread in threads:
        thread.join()

    return next_grid


In [None]:
grid = LockingGrid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

for i in range(5):
    grid = asyncio.run(simulate(grid)) # run the event loop
    print(grid)


In [None]:
def simple_func():
    raise OSError("Problem with I/O")

import contextlib
import io

fake_stderr = io.StringIO()
with contextlib.redirect_stderr(fake_stderr):
    thread = Thread(target=simple_func)
    thread.start()
    thread.join()

print(fake_stderr.getvalue())

In [None]:
from queue import Queue
from threading import Thread


class ClosableQueue(Queue):
    SENTINEL = object

    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


In [None]:
# Write same program with Thread, ThreadPoolExecutor and asyncio

import threading
import requests
import sys

def download_url(url):
    import time
    start = time.time()
    response = requests.get(url)
    print(url, response.status_code, sys.getsizeof(response.content), time
          .time() - start)

urls = ["https://www.google.com", "https://www.github.com", "https://www.python.org"]
threads = []

for url in urls:
    thread = threading.Thread(target=download_url, args=(url,))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

In [None]:
from concurrent.futures import ThreadPoolExecutor
import requests

with ThreadPoolExecutor(max_workers=3) as executor:
    for url in urls:
        executor.submit(download_url, url)

In [None]:
import asyncio
import aiohttp

async def download(url):
    import time
    s = time.time()
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print(url, response.status, sys.getsizeof(await response.text()), time.time() - s)

async def main():
    tasks = []

    for url in urls:
        task = asyncio.create_task(download_url(url))
        tasks.append(task)
    await asyncio.gather(*tasks)

asyncio.run(main())

In [None]:
class EOFError(Exception):
    pass

class ConnectionBase:
    def __init__(self, connection):
        self.connection = connection
        self.file = connection.makefile('rb')

    def send(self, command):
        line = command + '\n'
        data = line.encode()
        self.connection.send(data)

    def receive(self):
        line = self.file.readline()
        if not line:
            raise EOFError('Connection Closed')
        # print(line, line[::-1].decode())
        return line.decode()

import random

WARMER = "Warmer"
COLDER = "Colder"
UNSURE = "Unsure"
CORRECT = "Correct"

class UnknownCommandError(Exception):
    pass


class Session(ConnectionBase):
    def __init__(self, *args):
        super().__init__(*args)
        self._clear_state(None, None)

    def _clear_state(self, lower, upper):
        self.lower = lower
        self.upper = upper
        self.secret = None
        self.guesses = []

    def loop(self):
        while command := self.receive():
            parts = command.split(' ')
            print("parts[0],", parts[0])
            if parts[0] == 'PARAMS':
                self.set_params(parts)
            elif parts[0] == 'NUMBER':
                self.send_number()
            elif parts[0] == 'REPORT':
                self.receive_report(parts)
            else:
                raise UnknownCommandError(command)

    def set_params(self, parts):
        assert len(parts) == 3
        lower = int(parts[1])
        upper = int(parts[2])
        self._clear_state(lower, upper)

    def next_guess(self):
        if self.lower is not None:
            return self.secret

        while True:
            guess = random.randint(self.lower, self.upper)
            if guess not in self.guesses:
                return guess

    def send_number(self):
        guess = self.next_guess()
        self.guesses.append(guess)
        self.send(format(guess))

    def receive_report(self, parts):
        assert len(parts) == 2
        decision = parts[1]
        last = self.guesses[-1]
        if decision == CORRECT:
            self.secret = last
        print(f"Server: {last} is {decision}")



In [None]:
import contextlib
import math
import asyncio

asyncio.start_server()
asyncio.open_connection()
class Client(ConnectionBase):
    def __init__(self, *args):
        super().__init__(*args)
        self._clear_state()

    def _clear_state(self):
        self.secret = None
        self.last_distance = None

    @contextlib.contextmanager
    def session(self, lower, upper, secret):
        print(f"Guess a number between {lower} and {upper}. Secret is "
              f"{secret}")
        self.secret = secret
        self.send(f"PARAMS {lower} {upper}")
        try:
            yield
        finally:
            self._clear_state()
            self.send('PARAMS 0 -1')

    def request_number(self, count):
        for _ in range(count):
            self.send("NUMBER")
            data = self.receive()
            yield int(data)
            if self.last_distance == 0:
                return

    def report_outcome(self, number):
        new_distance = math.fbs(number - self.secret)
        decision = UNSURE

        if  new_distance == 0:
            decision = CORRECT
        elif self.last_distance is None:
            pass
        elif new_distance < self.last_distance:
            decision = WARMER
        elif new_distance > self.last_distance:
            decision = COLDER

        self.last_distance = new_distance
        self.send(f'REPORT {decision}')
        return decision





In [None]:
import socket
from threading import Thread


def handle_connection(connection):
    if connection:
        session = Session(connection)
        try:
            session.loop()
        except EOFError:
            pass

def run_server(address):
    with socket.socket() as listener:
        listener.bind(address)
        listener.listen()
        while True:
            connection, _ = listener.accept()
            thread = Thread(target=handle_connection, args=(connection,),
                            daemon=True)
            thread.start()




In [None]:
def run_client(address):
    with socket.create_connection(address) as connection:
        client = Client(connection)

        with client.session(1, 5, 3):
            results = [(x, client.report_outcome(x)) for x in client
            .request_number(3)]

        with client.session(10, 15, 12):
            for number in client.request_number(5):
                outcome = client.report_outcome(number)
                results.append((number, outcome))


def main():
    address = ('127.0.0.1', 8006)
    server_thread = Thread(target=run_server, args=(address,), daemon=True)
    server_thread.start()

    result = run_client(address)
    for number, outcome in result:
        print(f"{number} is {outcome}")

main()

In [None]:
class AsyncConnectionBase:
    def __init__(self, reader, writer):
        self.reader = reader
        self.writer = writer

    async def send(self, command):
        line = command + '\n'
        data = line.encode()
        self.writer.write(data)
        await self.writer.drain()

    async def receive(self):
        line = await self.reader.readline()
        if not line:
            raise EOFError()
        return line[:-1].decode()


In [None]:
import socket
import threading

server_host = '0.0.0.0'
server_port = 1234

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((server_host, server_port))
server_socket.listen()

clients = []

def handle_client(client_socket, client_address):
    while True:
        data = client_socket.recv(1024).decode()

        if not data:
            clients.remove(client_socket)
            break

        for client in clients:
            if client != client_socket:
                client.send(data.encode())

    client_socket.close()

while True:
    try:
        client_socket, client_address = server_socket.accept()
        clients.append(client_socket)
        client_thread = threading.Thread(target=handle_client, args=
        (client_socket, client_address))
        client_thread.start()
    finally:
        print("Here Here Here")
        server_socket.close()



In [None]:
import socket
import threading


server_host = 'localhost'
server_port = 1234

client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((server_host, server_port))

def receive_message():
    while True:
        data = client_socket.recv(1024).decode()
        print(data)

receive_thread = threading.Thread(target=receive_message)
receive_thread.start()

while True:
    message = input('> ')
    client_socket.send(message.encode())


In [1]:
class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0, 2)
    length = handle.tell()

    if length == offset:
        raise NoNewData()
    handle.seek(0)

    return handle.readline()

import time

def tail_file(handle, interval, write_func):
    while not handle.closed:
        try:
            line = readline(handle)
        except NoNewData:
            time.sleep(interval)
        else:
            write_func(line)

async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)


from threading import Thread, Lock

def run_thread(handles, interval, output_path):
    with open(output_path, 'w') as output:
        lock = Lock()
        def write(data):
            with lock:
                output.write(data)
        threads = []
        for handle in handles:
            args = (handle, interval, write)
            thread = Thread(target=tail_file, args=args)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

SyntaxError: expected ':' (4043112099.py, line 31)

In [None]:
import asyncio

async def run_tasks_mixed(handles, interval, output_path):
    loop = asyncio.get_event_loop()

    with open(output_path, 'w') as output:
        async def write_async(data):
            output.write(data)

        def write(data):
            coro = write_async(data)
            future = asyncio.run_coroutine_threadsafe(coro, loop)
            future.result()

        tasks = []
        for handle in handles:
            task = loop.run_in_executor(None, tail_file, handle, interval,
                                        write)
            tasks.append(task)

        await asyncio.gather(*tasks)
asyncio.run(run_tasks_mixed(handles, 0.1, 'output.txt'))

In [None]:
async def run_tasks(handles, interval, output_path):
    with open(output_path, "w") as f:
        async def write_async(data):
            f.write(data)

        tasks  = []
        for handle in handles:
            coro = tail_async(handle, interval, write_async)
            task = asyncio.create_task(coro)
            tasks.append(task)
        await asyncio.gather(*tasks)

In [None]:
def tail_file(handle, interval, write_func):
    loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)

    async def write_async(data):
        write_func(data)

    coro = tail_async(handle, interval, write_async)
    loop.run_until_complete(coro)


In [None]:
async def run_tasks(handles, interval, output_path):
    with open(output_path, "wb") as f:
        async def write_async(data):
            f.write(data)

        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, write_async)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)

In [None]:
class WriteThread(Thread):
    def __init__(self, output_path):
        super().__init__()
        self.output_path = output_path
        self.output = None
        self.loop = asyncio.new_event_loop()

    def run(self):
        asyncio.set_event_loop(self.loop)

        with open(self.output_path, 'wb') as self.output:
            self.loop.run_forever()
        self.loop.run_until_complete(asyncio.sleep(0))

    async def real_write(self, data):
        self.output.write(data)

    async def write(self, data):
        coro = self.real_write(data)
        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
        await asyncio.wrap_future(future)

    async def real_stop(self):
        self.loop.stop()

    async def stop(self):
        coro = self.real_stop()
        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
        await asyncio.wrap_future(future)

    async def __aenter__(self):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.start)
        return self

    async def __aexit__(self, *_):
        await self.stop()



In [None]:
async def run_fully_async(handles, interval, output_path):
    async with WriteThread(output_path) as output:
        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, output.write)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)