# **Постановка задачи**

Необходимо сделать следующее:
1. Написать клиент-серверное приложение.
2. Реализовать Клиент 1, в нем 3 процесса по 3 нити в каждом методом PIPE, QUEUE, добавить ещё один фильтр.
3. Реализовать Клиент 2, в нем 2 процесса и 3 нити методом PIPE, QUEUE, добавить ещё один фильтр.
4. Реализовать соединение клиент-сервер методом сокетов

## **Этапы реализации работы**

1. Написать сервер
2. Написать два клиента, которые подключаются к серверу по порту
3. Собрать необходимые данные о выполнении с каждого клиента
4. Отослать данные на сервер и нарисовать графики для каждого клиента

## **Фильтры, взятые для реализации**
### **Фильтр**, разделяющий слова в строке на строки
### **Фильтр**, нумерующий слова в строке


 В итоге получается нумерация слов в тексте, при этом слова будут на разных строках


### **Сервер**

Отвечает за прослушивание портов к которым должны подключаться клиенты. После отработки клиентов и пересылки их данных на сервер, он обрабатывает входные данные и строит два графика, для первого и для второго клиента, причем в разных окнах

In [None]:
import multiprocessing
import socket
import logging
import sys
from multiprocessing import Queue
from threading import Thread
from json import loads, dumps
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation


HOST = "127.0.0.1"
PORT_1 = 65430
PORT_2 = 65429


class Server:
    def __init__(self, queue: multiprocessing.Queue, host: str) -> None:
        self.q, self.host = queue, host

    def __str__(self) -> str:
        return f'(Server {self.host})'

    def listner(self, port: int) -> None:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.bind((self.host, port))
            s.listen()
            conn, addr = s.accept()
            with conn:
                print(f"Connected by {addr}")
                data = ' '
                while data != b'':
                    data = conn.recv(1024)
                    if data == b'':
                        break
                    self.q.put(data)

    def client_server(self, port1: int, port2: int) -> None:
        client1 = Thread(target=self.listner, args=(port1,))
        client2 = Thread(target=self.listner, args=(port2,))
        client1.start()
        client2.start()
        client1.join()
        client2.join()
        self.serialize_data()

    def logger(self, data: dict, client: str) -> None:
        logger = logging.getLogger()
        if logger.hasHandlers():
            logger.handlers.clear()
        handler = logging.StreamHandler(sys.stdout)
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
        logger.info(f"Recieved '{client} started.'")
        for k in data.keys():
            info = data[k]['info']
            for i in info:
                logger.info(f"Recieved '{i}'")
        logger.info(f"Recieved '{client} ended.'")

    def serialize_data(self) -> None:
        data = ''
        while not self.q.empty():
            data = f'{data}{self.q.get().decode("utf-8")}'
        sp = [loads(i) for i in data.split('###')[:-1]]
        self.logger(sp[0], "Client 1")
        self.logger(sp[1], "Client 2")
        self.plot_data(sp)

    def fracture(self, lst: list[list[list[float], list[float]]]) -> None:
        # print(lst)
        for i in range(len(lst)):
            for j in range(len(lst[0])):
                lst[i][j] = np.linspace(lst[i][j][0], lst[i][j][-1], 50)

    def plot_data(self, data_clients: list[dict, dict]):
        p1 = [data_clients[0]['Process-1'][f'Thread-{i}'] for i in range(1, 4)] + \
             [data_clients[0]['Process-1'][f'Process-1_t']]
        p2 = [data_clients[0]['Process-2'][f'Thread-{i}'] for i in range(1, 4)] + \
             [data_clients[0]['Process-2'][f'Process-2_t']]
        p3 = [data_clients[0]['Process-3'][f'Thread-{i}'] for i in range(1, 4)] + \
             [data_clients[0]['Process-3'][f'Process-3_t']]
        p4 = [data_clients[1]['Process-1'][f'Thread-{i}'] for i in range(1, 4)] + \
             [data_clients[1]['Process-1'][f'Process-1_t']]
        p5 = [data_clients[1]['Process-2'][f'Thread-{i}'] for i in range(1, 4)] + \
             [data_clients[1]['Process-2'][f'Process-2_t']]
        mn1 = min([min(i) for i in [p1[-1][0], p2[-1][0], p3[-1][0]]])
        mn2 = min([min(i) for i in [p4[-1][0], p5[-1][0]]])

        self.fracture(p1)
        self.fracture(p2)
        self.fracture(p3)
        self.fracture(p4)
        self.fracture(p5)

        def animate(i, p1, p2, p3, left_border, title):
            axes.clear()
            plt.title(title, color="black")
            x1_11, x1_21, x1_31 = (p1[0][0][i] - p1[0][0][0]) * 10, (p1[1][0][i] - p1[1][0][0]) * 10, (
                        p1[2][0][i] - p1[2][0][0]) * 10
            x1_12, x1_22, x1_32 = (p1[0][1][i] - p1[0][1][0]) * 10, (p1[1][1][i] - p1[1][1][0]) * 10, (
                        p1[2][1][i] - p1[2][1][0]) * 10
            x11, x12 = (p1[3][0][i] - p1[3][0][0]) * 10, (p1[3][1][i] - p1[3][1][0]) * 10

            axes.barh(0, x1_11 / 1e-4, color=palette[0], left=(p1[0][0][0] - left_border) / 1e-4)
            axes.barh(0, x1_12 / 1e-4, color=palette[0], left=(p1[0][1][0] - left_border) / 1e-4)

            axes.barh(1, x1_21 / 1e-4, color=palette[0], left=(p1[1][0][0] - left_border) / 1e-4)
            axes.barh(1, x1_22 / 1e-4, color=palette[0], left=(p1[1][1][0] - left_border) / 1e-4)

            axes.barh(2, x1_31 / 1e-4, color=palette[0], left=(p1[2][0][0] - left_border) / 1e-4)
            axes.barh(2, x1_32 / 1e-4, color=palette[0], left=(p1[2][1][0] - left_border) / 1e-4)

            axes.barh(3, x11 / 1e-4, color=palette[0], left=(p1[3][0][0] - left_border) / 1e-4)
            axes.barh(3, x12 / 1e-4, color=palette[0], left=(p1[3][1][0] - left_border) / 1e-4)

            x2_11, x2_21, x2_31 = (p2[0][0][i] - p2[0][0][0]) * 10, (p2[1][0][i] - p2[1][0][0]) * 10, (
                        p2[2][0][i] - p2[2][0][0]) * 10
            x2_12, x2_22, x2_32 = (p2[0][1][i] - p2[0][1][0]) * 10, (p2[1][1][i] - p2[1][1][0]) * 10, (
                        p2[2][1][i] - p2[2][1][0]) * 10
            x21, x22 = (p2[3][0][i] - p2[3][0][0]) * 10, (p2[3][1][i] - p2[3][1][0]) * 10

            axes.barh(4, x2_11 / 1e-4, color=palette[1], left=(p2[0][0][0] - left_border) / 1e-4)
            axes.barh(4, x2_12 / 1e-4, color=palette[1], left=(p2[0][1][0] - left_border) / 1e-4)

            axes.barh(5, x2_21 / 1e-4, color=palette[1], left=(p2[1][0][0] - left_border) / 1e-4)
            axes.barh(5, x2_22 / 1e-4, color=palette[1], left=(p2[1][1][0] - left_border) / 1e-4)

            axes.barh(6, x2_31 / 1e-4, color=palette[1], left=(p2[2][0][0] - left_border) / 1e-4)
            axes.barh(6, x2_32 / 1e-4, color=palette[1], left=(p2[2][1][0] - left_border) / 1e-4)

            axes.barh(7, x21 / 1e-4, color=palette[1], left=(p2[3][0][0] - left_border) / 1e-4)
            axes.barh(7, x22 / 1e-4, color=palette[1], left=(p2[3][1][0] - left_border) / 1e-4)

            if p3:
                x3_11, x3_21, x3_31 = (p3[0][0][i] - p3[0][0][0]) * 10, (p3[1][0][i] - p3[1][0][0]) * 10, (
                        p3[2][0][i] - p3[2][0][0]) * 10
                x3_12, x3_22, x3_32 = (p3[0][1][i] - p3[0][1][0]) * 10, (p3[1][1][i] - p3[1][1][0]) * 10, (
                        p3[2][1][i] - p3[2][1][0]) * 10
                x31, x32 = (p3[3][0][i] - p3[3][0][0]) * 10, (p3[3][1][i] - p3[3][1][0]) * 10

                axes.barh(8, x3_11 / 1e-4, color=palette[2], left=(p3[0][0][0] - left_border) / 1e-4)
                axes.barh(8, x3_12 / 1e-4, color=palette[2], left=(p3[0][1][0] - left_border) / 1e-4)

                axes.barh(9, x3_21 / 1e-4, color=palette[2], left=(p3[1][0][0] - left_border) / 1e-4)
                axes.barh(9, x3_22 / 1e-4, color=palette[2], left=(p3[1][1][0] - left_border) / 1e-4)

                axes.barh(10, x3_31 / 1e-4, color=palette[2], left=(p3[2][0][0] - left_border) / 1e-4)
                axes.barh(10, x3_32 / 1e-4, color=palette[2], left=(p3[2][1][0] - left_border) / 1e-4)

                axes.barh(11, x31 / 1e-4, color=palette[2], left=(p3[3][0][0] - left_border) / 1e-4)
                axes.barh(11, x32 / 1e-4, color=palette[2], left=(p3[3][1][0] - left_border) / 1e-4)

                tick_lst = ['Thread-1', 'Thread-2', 'Thread-3', 'Process-1',
                            'Thread-1', 'Thread-2', 'Thread-3', 'Process-2',
                            'Thread-1', 'Thread-2', 'Thread-3', 'Process-3']
                plt.yticks(np.arange(12), tick_lst)
            else:
                tick_lst = ['Thread-1', 'Thread-2', 'Thread-3', 'Process-1',
                            'Thread-1', 'Thread-2', 'Thread-3', 'Process-2']
                plt.yticks(np.arange(8), tick_lst)

        palette = ('red', 'royalblue', 'darkgray')
        fig, axes = plt.subplots(figsize=(12, 8))

        anim = FuncAnimation(fig, animate, interval=100, frames=50, repeat=False,
                             fargs=(p1, p2, p3, mn1, "Client 1\nprocesses & threads animation"))
        plt.show()
        fig, axes = plt.subplots(figsize=(12, 8))
        anim2 = FuncAnimation(fig, animate, interval=100, frames=50, repeat=False,
                              fargs=(p4, p5, [], mn2, "Client 2\nprocesses & threads animation"))
        plt.show()


if __name__ == '__main__':
    queue = Queue()
    serv = Server(queue, host=HOST)
    print(serv)
    serv.client_server(PORT_1, PORT_2)


### **Клиент 1**

In [None]:
import socket
import random
from time import sleep, perf_counter
from threading import current_thread, Thread
from multiprocessing import current_process, Process, Queue, Pipe
from json import dumps


HOST = "127.0.0.1"
PORT = 65430


def thread_task_1(strok, queue, input, output):
    timeit_1 = [perf_counter()]
    thread_name = current_thread().name
    timeit_1.append(perf_counter())
    process_name = current_process().name
    timeit_1.append(perf_counter())
    # delay = 1e-4
    timeit_1.append(perf_counter())
    # sleep(delay)
    timeit_2 = [perf_counter()]
    timeit_2.append(perf_counter())
    additive = ''
    if input != 0:
        additive = input.recv()
    timeit_2.append(perf_counter())
    strok = strok.split()
    for i, val in enumerate(strok):
        timeit_2.append(perf_counter())
        strok[i] = f'{val}\\n'
    timeit_2.append(perf_counter())
    strok = ' '.join(strok)
    timeit_2.append(perf_counter())
    data = f'{additive} {strok}'
    timeit_2.append(perf_counter())
    timeit = [process_name, thread_name] + [[timeit_1, timeit_2]]
    timeit.append(f'Thread {thread_name} in process {process_name} done with result {data}')
    queue.put(timeit)
    output.send(data)
    # queue.put(f'Sent {data} to next thread')
    # [proc_name, thread_name, [list[float], list[float]], data: str]


def thread_task_2(queue, input, output):
    timeit_1 = [perf_counter()]
    thread_name = current_thread().name
    timeit_1.append(perf_counter())
    process_name = current_process().name
    timeit_1.append(perf_counter())
    # delay = 1e-4
    timeit_1.append(perf_counter())
    # sleep(delay)
    timeit_2 = [perf_counter()]
    timeit_2.append(perf_counter())
    strok = input.recv()
    timeit_2.append(perf_counter())
    strok = strok.split()
    timeit_2.append(perf_counter())
    for i, val in enumerate(strok):
        timeit_2.append(perf_counter())
        strok[i] = f'{i+1}: {val}'
    timeit_2.append(perf_counter())
    data = ' '.join(strok)
    timeit_2.append(perf_counter())
    timeit = [process_name, thread_name]
    timeit.append([timeit_1])
    timeit_2.append(perf_counter())
    timeit[2].append(timeit_2)
    timeit += [f'Thread {thread_name} in process {process_name} done with result {data}']
    queue.put(timeit)
    output.send(data)
    # queue.put(f'Sent {data} to next thread')


def process_task(queue):
    timeit_1 = [perf_counter()]
    conn1, conn2 = Pipe(duplex=True)
    timeit_1.append(perf_counter())
    conn3, conn4 = Pipe()
    timeit_1.append(perf_counter())
    conn5, conn6 = Pipe()
    timeit_1.append(perf_counter())
    stringa = 'Do if u want'
    timeit_1.append(perf_counter())
    stringb = 'did if u need'
    timeit_1.append(perf_counter())
    thread1 = Thread(target=thread_task_1, args=([stringa, queue, 0, conn1]))
    timeit_1.append(perf_counter())
    thread2 = Thread(target=thread_task_1, args=([stringb, queue, conn2, conn3]))
    timeit_1.append(perf_counter())
    thread3 = Thread(target=thread_task_2, args=([queue, conn4, conn5]))
    timeit_1.append(perf_counter())
    thread1.start()
    timeit_1.append(perf_counter())
    thread2.start()
    timeit_1.append(perf_counter())
    thread3.start()
    timeit_2 = [perf_counter()]
    thread1.join()
    timeit_2.append(perf_counter())
    thread2.join()
    timeit_2.append(perf_counter())
    thread3.join()
    timeit_2.append(perf_counter())
    item = conn6.recv()
    timeit_2.append(perf_counter())
    # queue.put(f'Finally {item}')
    process_name = current_process().name
    timeit = [process_name, f'{process_name}_t'] + [[timeit_1, timeit_2]] + \
             [f"Process {process_name} done with result {item}."]
    queue.put(timeit)


if __name__ == '__main__':
    queue = Queue()
    glob_keys = ['Process-1', 'Process-2', 'Process-3']
    inner_keys = ['Thread-1', 'Thread-2', 'Thread-3', 'info', '_t']
    info_dict = {val: {j if j != '_t' else f'{val}{j}': [] for j in inner_keys} for i, val in enumerate(glob_keys)}
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        # s.sendall(bytes(dumps(['Client 1 started.', 0, 0, 0]) + '###', encoding='utf-8'))
        processes = [Process(target=process_task, args=(queue,)) for i in range(3)]
        for process in processes:
            process.start()
        for process in processes:
            process.join()
        while not queue.empty():
            data = queue.get()
            proc_name, thread_name, *times, info = data
            info_dict[proc_name][thread_name] = times[0]
            info_dict[proc_name]['info'].append(info)
            print(info)

        b = bytes(dumps(info_dict, indent=6) + '###', encoding='utf-8')
        s.sendall(b)
        # s.sendall(bytes(dumps(['Client 1 done.', 0, 0, 0]) + '###', encoding='utf-8'))



### **Клиент 2**

In [None]:
import socket
import random
from time import sleep, perf_counter
from threading import current_thread, Thread
from multiprocessing import current_process, Process, Queue, Pipe
from json import dumps



HOST = "127.0.0.1"
PORT = 65429


def thread_task_1(strok, queue, input, output):
    timeit_1 = [perf_counter()]
    thread_name = current_thread().name
    timeit_1.append(perf_counter())
    process_name = current_process().name
    timeit_1.append(perf_counter())
    # delay = 1e-4
    timeit_1.append(perf_counter())
    # sleep(delay)
    timeit_2 = [perf_counter()]
    timeit_2.append(perf_counter())
    additive = ''
    if input != 0:
        additive = input.recv()
    timeit_2.append(perf_counter())
    strok = strok.split()
    for i, val in enumerate(strok):
        timeit_2.append(perf_counter())
        strok[i] = f'{val}\\n'
    timeit_2.append(perf_counter())
    strok = ' '.join(strok)
    timeit_2.append(perf_counter())
    data = f'{additive} {strok}'
    timeit_2.append(perf_counter())
    timeit = [process_name, thread_name] + [[timeit_1, timeit_2]]
    timeit.append(f'Thread {thread_name} in process {process_name} done with result {data}')
    queue.put(timeit)
    output.send(data)
    # queue.put(f'Sent {data} to next thread')


def thread_task_2(queue, input, output):
    timeit_1 = [perf_counter()]
    thread_name = current_thread().name
    timeit_1.append(perf_counter())
    process_name = current_process().name
    timeit_1.append(perf_counter())
    # delay = 1e-4
    timeit_1.append(perf_counter())
    # sleep(delay)
    timeit_2 = [perf_counter()]
    timeit_2.append(perf_counter())
    strok = input.recv()
    timeit_2.append(perf_counter())
    strok = strok.split()
    timeit_2.append(perf_counter())
    for i, val in enumerate(strok):
        timeit_2.append(perf_counter())
        strok[i] = f'{i + 1}: {val}'
    timeit_2.append(perf_counter())
    data = ' '.join(strok)
    timeit_2.append(perf_counter())
    timeit = [process_name, thread_name]
    timeit.append([timeit_1])
    timeit_2.append(perf_counter())
    timeit[2].append(timeit_2)
    timeit += [f'Thread {thread_name} in process {process_name} done with result {data}']
    queue.put(timeit)
    output.send(data)
    # queue.put(f'Sent {data} to next thread')


def process_task(queue):
    timeit_1 = [perf_counter()]
    conn1, conn2 = Pipe(duplex=True)
    timeit_1.append(perf_counter())
    conn3, conn4 = Pipe()
    timeit_1.append(perf_counter())
    conn5, conn6 = Pipe()
    timeit_1.append(perf_counter())
    stringa = 'Do if u want'
    timeit_1.append(perf_counter())
    stringb = 'did if u need'
    timeit_1.append(perf_counter())
    thread1 = Thread(target=thread_task_1, args=([stringa, queue, 0, conn1]))
    timeit_1.append(perf_counter())
    thread2 = Thread(target=thread_task_1, args=([stringb, queue, conn2, conn3]))
    timeit_1.append(perf_counter())
    thread3 = Thread(target=thread_task_2, args=([queue, conn4, conn5]))
    timeit_1.append(perf_counter())
    thread1.start()
    timeit_1.append(perf_counter())
    thread2.start()
    timeit_1.append(perf_counter())
    thread3.start()
    timeit_2 = [perf_counter()]
    thread1.join()
    timeit_2.append(perf_counter())
    thread2.join()
    timeit_2.append(perf_counter())
    thread3.join()
    timeit_2.append(perf_counter())
    item = conn6.recv()
    timeit_2.append(perf_counter())
    # queue.put(f'Finally {item}')
    process_name = current_process().name
    timeit = [process_name, f'{process_name}_t'] + [[timeit_1, timeit_2]] + [f"Process {process_name} done."]
    queue.put(timeit)


if __name__ == '__main__':
    queue = Queue()
    glob_keys = ['Process-1', 'Process-2']
    inner_keys = ['Thread-1', 'Thread-2', 'Thread-3', 'info', '_t']
    info_dict = {val: {j if j != '_t' else f'{val}{j}': [] for j in inner_keys}for i, val in enumerate(glob_keys)}
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        # s.sendall(bytes(dumps(['Client 1 started.', 0, 0, 0]) + '###', encoding='utf-8'))
        processes = [Process(target=process_task, args=(queue,)) for i in range(2)]
        for process in processes:
            process.start()
        for process in processes:
            process.join()
        while not queue.empty():
            data = queue.get()
            proc_name, thread_name, *times, info = data
            info_dict[proc_name][thread_name] = times[0]
            info_dict[proc_name]['info'].append(info)
            print(info)

        b = bytes(dumps(info_dict, indent=8) + '###', encoding='utf-8')
        s.sendall(b)
        # s.sendall(bytes(dumps(['Client 2 done.', 0, 0, 0]) + '###', encoding='utf-8'))


#### **График для 1 Клиента**

![](Figure_11.png)

График  1 показывает время выполнения нитей и процессов в милисекундах

#### **График для 1 Клиента**

![](Figure_12.png)

График  2 показывает время выполнения нитей и процессов в милисекундах

# **Вывод:**
В результате выполнения работы было написано клиент-сервеное приложение, где клиент подключается к серверу, выполняет некоторую задачу с помощью процессов и нитей и пересылает данные о выполнении работы на сервер, который, в свою очередь, собирает полученные данные, обрабатывает их и строит график по данным, собранным ранее. В процессе написания кода я научился работать с процессами и нитями, пересылать данные между клиентами, а также обрабатывать большие объемы данных и строить графики работы.