### Подключение библиотек

In [None]:
import asyncio
import datetime
import time
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

### Код производителя и потребителя

In [None]:
async def producer(queue, data, p_id):
    num = 1
    for item in data.split():
        task_name = "producer " + str(p_id) + " iteration " + str(num)
        start_time = str(datetime.datetime.now())
        await queue.put((start_time, (str(num) + ":" + item)))
        await asyncio.sleep(0.1)
        await queue.put((str(datetime.datetime.now()), task_name + " done"))
        num += 1
    await queue.put(None)

async def consumer(queue, c_id):
    while True:
        item = await queue.get()
        await asyncio.sleep(0.1)
        if item is None:
            break
        print("consumer " + str(c_id) + ": " + str(item))

### Код с gather() и await()

In [None]:
async def main():
    print("Program started at " + str(datetime.datetime.now()))
    data = ["sad fsdds ooo cmcmcm\nttttt", "ijsi\niwdje uuuu vvvv", "iiiii nnnnn ppppp aaaaa"]
    queue = asyncio.Queue()

    producers = [asyncio.create_task(producer(queue, data[i], i+1)) for i in range(3)]
    consumers = [consumer(queue, i+1) for i in range(3)]

    await asyncio.gather(*producers)
    await asyncio.wait(consumers)
    print("Program ended at " + str(datetime.datetime.now()))
    
await main()

### Код с gather() и wait_for()

In [None]:
async def main():
    print("Program started at " + str(time.time()))
    data = ["sad fsdds ooo cmcmcm\nttttt", "ijsi\niwdje uuuu vvvv", "iiiii nnnnn ppppp aaaaa"]
    queue = asyncio.Queue()

    producers = [producer(queue, data[i], i+1) for i in range(3)]
    consumers = [asyncio.create_task(consumer(queue, i+1)) for i in range(3)]

    await asyncio.wait(producers)
    try:
        await asyncio.wait_for(queue.join(), timeout=5)
    except asyncio.TimeoutError:
        print("Queue join timed out")
        print("Program ended at " + str(time.time()))

await main()

### Вывод

**Использование asyncio позволяет выполнять вместе несколько задач/корутин: когда одна из запущенных вместе задач простаивает(находится в ожидании), следующая задача запускается. В случае если asyncio не используется, во время простаивания(ожидания) одной задачи, другие не выполняются, время остается не занятым**









---

In [None]:
# пример использования функции asyncio.shield() для защиты задачи от отмены
import asyncio
import string

# мой фильтр
async def my_filter(input_string):
    output_string = input_string.replace("Program 1", "Successful!")
    # блокировка на некоторое время
    await asyncio.sleep(1)
    return output_string

# корутина, отменяющая переданную ей задачу через некоторое время
async def cancel_task(task):
    await asyncio.sleep(0.2)
    was_cancelled = task.cancel()
    print(f'cancelled: {was_cancelled}')

async def main():
    # создание корутины
    coro = my_filter("Program 1")
    # создание задачи
    task = asyncio.create_task(coro)
    # создание защищённой задачи
    shielded = asyncio.shield(task)
    # создание задачи, отменяющей переданную ей задачу
    asyncio.create_task(cancel_task(shielded))
    # обработка отмены
    try:
        # ожидание завершения работы защищённой задачи
        result = await shielded
        # вывод сведений о полученных результатах
        print(f'>got: {result}')
    except asyncio.CancelledError:
        print('shielded was cancelled')
    # ожидание
    await asyncio.sleep(1)
    print(f'shielded: {shielded}')
    print(f'task: {task}')

asyncio.run(main())

In [None]:
# пример запуска блокированной задачи в asyncio
import asyncio

async def my_filter(input_string):
    output_string = input_string.replace("Program 2", "Successful!")
    yield output_string

async def main():
    result = await asyncio.to_thread(my_filter, "Program 2")
    print(f'>got: {result}')
    async for i in result:
        print(i)

asyncio.run(main())

In [None]:
# пример выполнения блокирующей задачи, зависящей от подсистемы ввода/вывода, в asyncio
import asyncio
import time

# мой фильтр
def my_filter(input_string):
    print('Task starting')
    output_string = input_string.replace("Program 3", "Successful!")
    print("Result = " + output_string)
    print('Task done')
 
async def main():
    print('Main running the blocking task')
    # создание корутины для блокирующей задачи
    coro = asyncio.to_thread(my_filter, "Program 3")
    # планирование задачи
    task = asyncio.create_task(coro)
    print('Main doing other things')
    # позволяем запланированной задаче запуститься
    await asyncio.sleep(0)
    # ожидание завершения задачи
    await task
 
asyncio.run(main())

## Программа 4  
### Асинхронные итераторы

In [None]:
# пример работы с асинхронным итератором и применением цикла async for
import asyncio
import random
import string

# мой фильтр
async def my_filter(str):
    strbuff=str
    flag = 0
    i = 0
    brackets = 0
    while (i != len(str)):
        if (47 < ord(str[i]) < 58) and flag == 0:
            strbuff = strbuff[:i + brackets] + '(' + strbuff[i + brackets:] 
            brackets += 1
            flag = 1
        if (ord(str[i]) < 48 or ord(str[i]) > 57) and  flag == 1:
            strbuff = strbuff[:i + brackets] + ')' + strbuff[i + brackets:] 
            brackets += 1
            flag = 0
        i += 1
        if i == len(str) and flag == 1:
            strbuff = strbuff[:i + brackets] + ')' + strbuff[i + brackets:] 
            brackets += 1
            flag = 0
    return strbuff

# определение асинхронного итератора
class AsyncIterator():
    # конструктор, объявление некоторых данных, относящихся к состоянию объекта
    def init(self):
        self.counter = 0
 
    # создание экземпляра итератора
    def aiter(self):
        return self
 
    # возврат следующего объекта, допускающего ожидание
    async def anext(self):
        # проверка на то, есть ли ещё невозвращённые элементы
        if self.counter >= 10:
            raise StopAsyncIteration
        # инкрементирование счётчика
        self.counter += 1
        # имитация работы
        await asyncio.sleep(0.05)
        # возврат значения счётчика
        return self.counter

async def main():
    # обход асинхронного итератора с помощью цикла async for
    async for item in AsyncIterator():
        print(item + "\n")
        random_string = ""
        async for j in AsyncIterator():
            random_string += random.choice(string.ascii_letters + string.digits)
        print("Input string: " + random_string + " -> Output string: " + await my_filter(random_string))
 
asyncio.run(main())

## Программа 5
### Асинхронные генераторы

In [None]:
# пример использования асинхронного генератора в цикле async for
import asyncio
import random
import string

# мой фильтр
async def my_filter(input_string):
    output_string = input_string.replace("a", "#")
    output_string = output_string.replace("A", "*")
    return output_string

# определение асинхронного генератора
async def async_generator(count):
    for i in range(count):
        # блокировка, имитирующая выполнение работы
        await asyncio.sleep(0.1)
        # выдача результата
        yield i

async def main():
    # обход асинхронного генератора с помощью цикла async for
    async for i in async_generator(5):
        random_string = ""
        async for j in async_generator(5):
            random_string += random.choice(string.ascii_letters)
        print("Input string: " + random_string + " -> Output string: " + await my_filter(random_string))

asyncio.run(main())

## Программа 6
### Асинхронные менеджеры контекста

In [None]:
# пример работы с асинхронным менеджером контекста и с async with
import asyncio

async def my_filter(input_string):
    output_string = input_string.replace("Program 6", "Successful!")
    return output_string

# определение асинхронного менеджера контекста
class AsyncContextManager:
    # вход в асинхронный менеджер контекста
    async def __aenter__(self):
        print('Entering the context manager')
        # блокировка на некоторое время
        await asyncio.sleep(0.5)
 
    # выход из асинхронного менеджера контекста
    async def __aexit__(self, exc_type, exc, tb):
        print('Exiting the context manager')
        # блокировка на некоторое время
        await asyncio.sleep(0.5)

async def custom_coroutine():
    # создание и использование асинхронного менеджера контекста
    async with AsyncContextManager() as manager:
        print(await my_filter("Program 6"))
 
asyncio.run(custom_coroutine())

## Подключение бибилиотек

In [1]:
import asyncio
import time
from urllib.parse import urlsplit

## Определение состояния сайтов 

In [None]:
async def get_status(url):
    url_parsed = urlsplit(url)
    if url_parsed.scheme == 'https':
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)
    # send GET request
    query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
    writer.write(query.encode())
    await writer.drain()
    response = await reader.readline()
    writer.close()
    status = response.decode().strip()
    return status

async def main():
    start = time.time()
    sites = ['https://www.google.com/',
        'https://www.youtube.com/',
        'https://www.vk.com/',
        'https://telegram.org/',
        'https://discord.com/',
        'https://www.baidu.com/',
        'https://www.wikipedia.org/',
        'https://yandex.ru/',
        'https://yahoo.com/',
        'https://www.ozon.ru/'
        ]
    for url in sites:
        status = await get_status(url)
        print(f'{url:30}:\t{status}')
    end = time.time()
    print(f'Running time is {(end-start):4.2f} seconds')

await main()

## Одновременное определение состояния сайтов 

In [None]:
async def get_status(url):
    url_parsed = urlsplit(url)
    if url_parsed.scheme == 'https':
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)
    query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
    writer.write(query.encode())
    await writer.drain()
    response = await reader.readline()
    writer.close()
    status = response.decode().strip()
    return status

async def main():
    start = time.time()
    sites = ['https://www.google.com/',
        'https://www.youtube.com/',
        'https://www.vk.com/',
        'https://telegram.org/',
        'https://discord.com/',
        'https://www.baidu.com/',
        'https://www.wikipedia.org/',
        'https://yandex.ru/',
        'https://yahoo.com/',
        'https://www.ozon.ru/'
        ]
    coros = [get_status(url) for url in sites]
    results = await asyncio.gather(*coros)
    for url, status in zip(sites, results):
        print(f'{url:30}:\t{status}')
    end = time.time()
    print(f'Running time is {(end-start):4.2f} seconds')

await main()

### Вывод

**Использование асинхронных функций ускоряет работу программы, позволяя делать запросы к новым серверам во время ожидания подключения от старых, тем самым обходя блокирование цикла событий. В случае недоступности одного из серверов, одновременный запуск нескольких подключений также облегчает обработку возможных ошибок, переходя к работе со следующим подключением, а не прерывая всю основную программу полученной ошибкой**

---

## Программа 1

In [None]:
import threading
import time

# класс производителя
class Producer(threading.Thread):
    # конструктор класса (private, с защищенными полями)
    def __init__(self, idd, queue, lock, event, barrier):
        super().__init__()
        self.id = idd
        self.queue = queue
        self.lock = lock
        self.event = event
        self.barrier = barrier

    # основная функция  	
    def run(self):
        for i in range(4):
            self.lock.acquire() # блокируем с помощью обычной блокировки
            if len(self.queue) < 3:
                self.queue.append(i) # кладем в очередь
                print(f'Producer {self.id}: {i} added to queue')
                self.event.set() # ставим флаг
            self.lock.release() # снимаем блокировку
            self.barrier.wait() # блокируем временно процесс 
        print('All elements processed by consumers')

# класс потребителя
class Consumer(threading.Thread):
    # конструктор класса (private, с защищенными полями)
    def __init__(self, idd, queue, lock, event, barrier):
        super().__init__()
        self.id = idd
        self.queue = queue
        self.lock = lock
        self.event = event
        self.barrier = barrier

    # основная функция
    def run(self):
        while True:
            self.event.wait() # потребитель ожидает установки флага
            self.lock.acquire() # блокируем с помощью обычной блокировки
            if len(self.queue) > 0: # в очереди есть данные
                item = self.queue.pop(0)
                print(f'Consumer {self.id}: {item} removed from queue')
            if len(self.queue) == 0: # извлекли всё из очереди
                self.event.clear() # убираем флаг
            self.lock.release() # снимаем обычную блокировку
            self.barrier.wait() # блокируем временно процесс

if __name__ == '__main__':
    queue = []
    lock = threading.Lock() # простые блокировки потоков (они же mutex)
    event = threading.Event() # блокировка с помощью событий (флаги)
    barrier = threading.Barrier(6) # блокировка барьерами

    producers = [Producer(i+1, queue, lock, event, barrier) for i in range(3)]
    consumers = [Consumer(i+1, queue, lock, event, barrier) for i in range(3)]

    # запускаем
    for producer in producers:
        producer.start()
    for consumer in consumers:
        consumer.start()
    
    # обЪединяем
    for producer in producers:
        producer.join()
    for consumer in consumers:
        consumer.join()

## Использование блокировок и переменных условий

In [None]:
import threading
import time

class Producer(threading.Thread):
    def __init__(self, idd, queue, mutex, empty, full):
        super().__init__()
        self.id = idd
        self.queue = queue
        self.mutex = mutex
        self.empty = empty
        self.full = full

    def run(self):
        for i in range(6):
            with self.mutex:
                while len(self.queue) == 3:
                    self.empty.wait()
                self.queue.append(i)
                print(f'Producer {self.id}: {i} added to queue')
                self.full.notify()
        print(f'Producer {self.id} done')

class Consumer(threading.Thread):
    def __init__(self, idd, queue, mutex, empty, full):
        super().__init__()
        self.id = idd
        self.queue = queue
        self.mutex = mutex
        self.empty = empty
        self.full = full
        
    def run(self):
        while True:
            with self.mutex:
                while len(self.queue) == 0:
                    self.full.wait()
                item = self.queue.pop(0)
                print(f'Consumer {self.id}: {item} removed from queue')
                self.empty.notify()
                
if __name__ == '__main__':
    queue = []
    mutex = threading.Lock()
    empty = threading.Condition(mutex)
    full = threading.Condition(mutex)
    
    producers = [Producer(i+1, queue, mutex, empty, full) for i in range(3)]
    consumers = [Consumer(i+1, queue, mutex, empty, full) for i in range(3)]
    
    for producer in producers:
        producer.start()
    for consumer in consumers:
        consumer.start()
    
    for producer in producers:
        producer.join()
    for consumer in consumers:
        consumer.join()

## Код использующий семафоры

In [None]:
import threading
import time

class Producer(threading.Thread):
    def __init__(self, idd, queue, lock, empty, full, event, barrier):
        super().__init__()
        self.id = idd
        self.queue = queue
        self.lock = lock
        self.empty = empty
        self.full = full
        self.event = event
        self.barrier = barrier

    def run(self):
        for i in range(6):
            self.empty.acquire()
            with self.lock:
                self.queue.append(i)
                print(f'Producer {self.id}: {i} added to queue')
                self.full.release()
                self.event.set()
            self.barrier.wait()
        print('All elements processed by consumers')

class Consumer(threading.Thread):
    def __init__(self, idd, queue, lock, empty, full, event, barrier):
        super().__init__()
        self.id = idd
        self.queue = queue
        self.lock = lock
        self.empty = empty
        self.full = full
        self.event = event
        self.barrier = barrier

    def run(self):
        while True:
            self.event.wait()
            self.full.acquire()
            with self.lock:
                item = self.queue.pop(0)
                print(f'Consumer {self.id}: {item} removed from queue')
                if len(self.queue) == 0:
                    self.event.clear()
                    self.empty.release()
            self.barrier.wait()

if __name__ == '__main__':
    queue = []
    lock = threading.Lock()
    empty = threading.Semaphore(3)
    full = threading.Semaphore(0)
    event = threading.Event()
    barrier = threading.Barrier(6)

    producers = [Producer(i+1, queue, lock, empty, full, event, barrier) for i in range(3)]
    consumers = [Consumer(i+1, queue, lock, empty, full, event, barrier) for i in range(3)]

    for producer in producers:
        producer.start()
    for consumer in consumers:
        consumer.start()

    for producer in producers:
        producer.join()
    for consumer in consumers:
        consumer.join()

**Были изучены основные средства контроля доступа к общим ресурсам и синхронизации потоков**

**Среди них:**

**1. Мьютекс - объект, который позволяет защитить определенный участок кода от параллельного доступа нескольких потоков. При использовании мьютекса, только один поток может захватить его, а все остальные потоки, которые попытаются захватить мьютекс, будут заблокированы до тех пор, пока мьютекс не будет освобожден.**

**2. Блокировки - то же, что и мьютексы, но применяемые с контекстным менеджером.**

**3. События - механизм, который позволяет потокам сигнализировать друг другу о каких-то изменениях. Событие может быть установлено (значение True) или сброшено (значение False). Поток может ожидать установки события и продолжать работу только после того, как событие было установлено другим потоком.**

**4. Барьеры - используются для того, чтобы остановить выполнение всех потоков до тех пор, пока все они не достигнут определенной точки в программе. Как только все потоки достигнут барьера, они могут продолжить выполнение.**

**5. Переменные условия - используются для того, чтобы потоки могли ожидать определенных условий. У переменной условия есть методы wait() и notify(), которые позволяют потокам ожидать, когда условие будет выполнено, и уведомлять другие потоки о том, что условие выполнено.**

**6. Семафоры - используются для того, чтобы ограничить доступ к общим ресурсам определенным количеством потоков. Семафор имеет определенное количество доступных ресурсов. Когда поток хочет получить доступ к ресурсу, он должен сначала захватить семафор. Если все ресурсы заняты, поток будет заблокирован до тех пор, пока один из ресурсов не будет освобождено.**

In [1]:
class Handle:
    def __init__(self):
        self.__id = 0

    def getID(self):
        return self.__id

    def setID(self, ID):
        self.__id = ID


class ArchitectureCPU:
    pass


class State:
    NotRunning = 0
    Running = 1
    Swapped = 2
    New = 3
    Ready = 4
    Blocked = 5
    ExitProcess = 6

class Memory:
    def __init__(self, size):
        self.__data = [0] * size

    def read(self, addr):
        return self.__data[addr]

    def write(self, addr, value):
        self.__data[addr] = value


class VirtualMemory:
    def __init__(self, size):
        self.__data = [0] * size

    def read(self, addr):
        return self.__data[addr]

    def write(self, addr, value):
        self.__data[addr] = value


class Scheduler:
    def __init__(self):
        self.__processes = []
        self.__running_process = None

    def add_process(self, process):
        self.__processes.append(process)

    def remove_process(self, process):
        self.__processes.remove(process)

    def schedule(self):
        if self.__running_process is not None:
            self.__running_process.setState(State.Ready)
            self.__processes.append(self.__running_process)

        if len(self.__processes) > 0:
            index = random.randint(0, len(self.__processes) - 1)
            self.__running_process = self.__processes[index]
            self.__running_process.setState(State.Running)
            self.__processes.remove(self.__running_process)
        else:
            self.__running_process = None


class Process(Handle):
    def __init__(self):
        super().__init__()
        self.__user = " "
        self.__state = State.NotRunning
        self.__pc = 0
        self.__running_time = 0
        self.__total_time = 0

    def getUser(self):
        return self.__user

    def setUser(self, user):
        self.__user = user

    def getState(self):
        return self.__state

    def setState(self, state):
        self.__state = state

    def get_pc(self):
        return self.__pc

    def set_pc(self, pc):
        self.__pc = pc

    def start(self):
        self.__state = State.Running

    def stop(self):
        self.__state = State.NotRunning

    def read_memory(self, addr):
        return self.get_memory().read(addr)

    def write_memory(self, addr, value):
        self.get_memory().write(addr, value)


class PCB(Process):
    def __init__(self, memory_size, virtual_memory_size):
        super().__init__()
        self.__memory = Memory(memory_size)
        self.__virtual_memory = VirtualMemory(virtual_memory_size)
        self.__sysReg = None
        self.__addr = -1
        self.__virtualAddr = -1
        self.__timeSlice = -1
        self.__priority = -1

    def get_memory(self):
        return self.__memory

    def get_virtual_memory(self):
        return self.__virtual_memory

    def getSysReg(self):
        return self.__sysReg

    def setSysReg(self, sysReg):
        self.__sysReg = sysReg

    def getAddr(self):
        return self.__addr

    def setAddr(self, addr):
        self.__addr = addr

    def getVirtualAddr(self):
        return self.__virtualAddr

    def setVirtualAddr(self, virtualAddr):
        self.__virtualAddr = virtualAddr

    def getPriority(self):
        return self.__priority

    def setPriority(self, priority):
        self.__priority = priority

    def getTimeSlice(self):
        return self.__timeSlice

    def setTimeSlice(self, timeSlice):
        self.__timeSlice = timeSlice

    def NameOf(self, state):
        return {
            State.NotRunning: "NotRunning",
            State.Running: "Running",
            State.Swapped: "Swapped",
            State.New: "New",
            State.Ready: "Ready",
            State.Blocked: "Blocked",
            State.ExitProcess: "ExitProcess"
        }.get(state, "Unknown")


class ProcessImage(PCB):
    def __init__(self):
        super().__init__()

    def Debug(self):
        print(" begin Debug Processlmage ")
        print(f"user: {self.getUser()}")
        print(f"ID: {self.getID()}")
        print(f"addr: {self.getAddr()}")
        print(f"VirtualAddr: {self.getVirtualAddr()}")
        print(f"State: {self.NameOf(self.getState())}")
        print(f"priority: {self.getPriority()}")
        print(f"timeSlice: {self.getTimeSlice()}")
        print(f"status: {self.__status}")
        print(f"flag: {self.__flag}")
        print("SysReg: ")
        sysReg = self.getSysReg()
        if sysReg is None:
            print("SysReg: NULL")
        else:
            sysReg.Debug()
        print("Code: ")