**Отчет по задаче Token Ring**

---



Условие задачи:

Задача состоит в построении простой модели доисторического сетевого протокола сети под названием TokenRing и исследовании его свойств.

 

1. Система состоит из N пронумерованных от 0 до N-1 узлов (потоков). Узлы упорядочены по порядковому номеру. После состояния N-1 следует узкл 0, т.е. узлы формируют кольцо. 

2. Соседние в кольце потоки могут обмениваться пакетами. Обмен возможен только по часовой стрелке. 

3. Каждый поток, получив пакет от предыдущего, отдает его следующему.

4. Пакеты не могут обгонять друг друга.

 

Необходимо исследовать пропускную способность сети (throughput) и характерное время задержки (latency) в зависимости от количества узлов N и количества пакетов P (1...N), находящихся в транзите одновременно.

Стуруктура Token ring:


*   TokenRing - Запускает потоки, связывая 1 к 1 объекты Node. Создает сообщения и распределяет все сообщения равномерно между всеми потоками, назначая получателя с заданным изначально сдвигом по кольцу.
*   Node - Берут из своей MessageQueue сообщение, куда складывают друг другие потоки. После через передают дальше или оставляет себе, если это было адресовано этом потоку.
*   Message - Передаваемы объект по кольцу. Внутри записан получатель. Время отправления и получения.



**Как происходит работа кода**

Равномерно распределяем все сообщения между всеми исполняющими узлами. Причем в каждое сообщение устанавливаем ноду получателя по сдвигу. После чего запускаем кольцо.

Если потоку пришло сообщение от другого потока при обработке ноды, то вначале поток обработает его, а потом уже начнет рассылать сообщения из своего запаса.

***Аналитические параметры***

Дополнительные классы:


*   MessageBank - Хранит ссылки на все Message, чтобы после завершения работы посчитать аналитику.
*   NodeBank - Хранит ссылка на все Node, чтобы после завершения посчитать аналитику.

Класс Statistic хранит для аналитики следующие поля:



*   numberThread - колличество стартовых потоков
*   numberMessages - колличество всех сообщений для пересылки в системе
*   shift - колличество нод через, которые покругу надо передать сообщение (берется по остатку от колличества потоков)
*   timePrecessingRing - время которое кольцо работало в миллисекундах
*   throughputRing - пропускная способность кольца за секунду
*   throughputChain - пропуская способность цепочки в среднем за секунду
*   latencyChain - среднее время затрачиваемое сообщением на прохождение цепочки в миллисекундах
*   minTimeWorkThread - минимальное время работы потока
*   maxTimeWorkThread - максимальное время работы потока
*   percentWork - отношение доставленных сообщений от всех сообщений



*LatencyChain* - расчитывается тем, что поток отправляющий сообщение в первый раз выставляет время в сообщении, а принимающий поток(которому было адресовано) выставляет время получения в сообщении

*ThroughputRing* - расчитывается, как колличество сообщений, которые успела доставить система, деленное на время работы кольца

*ThroughputThread* - расчитывается, как ThroughputRing / колличество цепочек обработки сообщений

**Характеристики ноутбука**

*   6 ядер
*   Intel Core i7
*   2.6 GHz
*   16 оперативной памяти


Проведем аналитику на следующих входных параметрах:

Зафиксируем сдвиг на цепочки по 5 вершин:
зафиксируем колличество сообщений: 10_000_000
Рассмотрим следующие колличество потоков:


1.   6
2.   10
3.   15
4.   20



Сделаем замеры при 50 подходах

In [None]:
import pandas as pd
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly
import plotly.graph_objs as go

df = pd.read_table('data1.txt',sep='\s+', engine='python')

In [None]:
df

Unnamed: 0,numberThread,numberMessages,shift,timePrecessingRing,throughputRing,throughputChain,latencyChain,minTimeWorkThread,maxTimeWorkThread,percentWork
0,6,10000000,5,1005,1.806219e+06,301036.500000,168,1001,1002,0.187201
1,6,10000000,5,3590,1.094490e+06,182415.055556,328,3588,3588,0.271803
2,6,10000000,5,4002,1.835238e+06,305872.958333,480,4000,4000,0.880912
3,6,10000000,5,6005,1.158652e+06,193108.638889,510,6003,6004,0.834229
4,10,10000000,5,1003,2.492430e+06,249243.000000,331,1002,1002,0.431421
...,...,...,...,...,...,...,...,...,...,...
1595,15,10000000,5,6006,5.555557e+05,37037.044444,208,6005,6005,1.000000
1596,20,10000000,5,1033,1.218836e+06,60941.800000,248,1002,1032,0.393733
1597,20,10000000,5,2071,1.250000e+06,62500.000000,230,2026,2070,1.000000
1598,20,10000000,5,4080,6.250000e+05,31250.000000,271,4018,4079,1.000000


Теперь вначале построим графики работы для каждого фиксированного колличества потоков

***Throughput***

In [None]:
def throughput():
  df1 = df.loc[df['numberThread'] == 6]
  df2 = df.loc[df['numberThread'] == 10]
  df3 = df.loc[df['numberThread'] == 15]
  df4 = df.loc[df['numberThread'] == 20]

  df1 = df1.sort_values(by='timePrecessingRing')
  df2 = df2.sort_values(by='timePrecessingRing')
  df3 = df3.sort_values(by='timePrecessingRing')
  df4 = df4.sort_values(by='timePrecessingRing')

  trace1 = go.Scatter(
      x=df1.timePrecessingRing,
      y=df1.throughputChain,
      name='Threads 6'
  )
  trace2 = go.Scatter(
      x=df2.timePrecessingRing,
      y=df2.throughputChain,
      name='Threads 10'
  )
  trace3 = go.Scatter(
      x=df3.timePrecessingRing,
      y=df3.throughputChain,
      name='Threads 15'
  )
  trace4 = go.Scatter(
      x=df4.timePrecessingRing,
      y=df4.throughputChain,
      name='Threads 20'
  )


  fig = go.Figure(data=[trace1, trace2, trace3, trace4])

  fig.update_layout(
      title="Throughput Chain by time processing ring",
      yaxis_title="Number accepted messanges by one sec",
      xaxis_title="Time work ring (milliseconds)"
  )

  iplot(fig, show_link=False)

In [None]:
throughput()

На графике можем заметить, что при увеличении колличества потоков падает пропускная способность каждой цепочки. Скорей всего это связано с тем, что на пк всего 6 ядер, хоть и icore7, но когда ядро разбивается на работу нескольких потоков, падает вычислительная мощность каждого потока. Хотя можно заметить, что самая большая разница, при первом разделении ядер.

Так же можем заметить по графику, что несмотря на то, что производительность цепочки почти в 2 раза больше с 6 потоками, но вот система в целом не успевает обработать все сообщения.

**Work Time**

Рассмотрим такой же график, но по проценту выполненой работы. (Но при это при конкретном времени на вход)

In [None]:
def workTime(min_time, max_time):
  dfT = df.loc[(min_time <= df['timePrecessingRing']) & (df['timePrecessingRing'] < max_time)]

  df1 = dfT.loc[dfT['numberThread'] == 6]
  df2 = dfT.loc[dfT['numberThread'] == 10]
  df3 = dfT.loc[dfT['numberThread'] == 15]
  df4 = dfT.loc[dfT['numberThread'] == 20]

  df1 = df1.sort_values(by='timePrecessingRing')
  df2 = df2.sort_values(by='timePrecessingRing')
  df3 = df3.sort_values(by='timePrecessingRing')
  df4 = df4.sort_values(by='timePrecessingRing')

  trace1 = go.Scatter(
      x=df1.timePrecessingRing,
      y=df1.percentWork,
      name='Threads 6'
  )
  trace2 = go.Scatter(
      x=df2.timePrecessingRing,
      y=df2.percentWork,
      name='Threads 10'
  )
  trace3 = go.Scatter(
      x=df3.timePrecessingRing,
      y=df3.percentWork,
      name='Threads 15'
  )
  trace4 = go.Scatter(
      x=df4.timePrecessingRing,
      y=df4.percentWork,
      name='Threads 20'
  )

  fig = go.Figure(data=[trace1, trace2, trace3, trace4])

  fig.update_layout(
      title="Percent of accepted messages by time processing ring",
      yaxis_title="Percentage of work completed",
      xaxis_title="Time work ring (milliseconds)"
  )

  iplot(fig, show_link=True)


In [None]:
workTime(0, 2000)

Можем заметить, что для обработки 10_000_000 сообщений не хватает даже 1 секунды, но при одинаковом времени работы потоков, конечно же сделают работу те кольца, у кого больше активых потоков(нод) изначально

In [None]:
workTime(2000, 4000)

Можем заметить, что в случае, если сигнал от главного потока доходит на выключение позже, то и другие потоки в целом не очень отработали в каких-то эксперементах

In [None]:
workTime(4000, 6000)

тут уже можно заметить, что с 10, 15, 20 потоками работу они давно сделали, а все остальное время стояли. **Значит нужно оптимизировать отдых системы в такие моменты.**

In [None]:
workTime(6000, 8000)

Тут уже можно заметить,что и 6 потокам хватает за 7000 секунд работы обработать 10_000_000

**Latency**

In [None]:
def latency(min_time, max_time):
  dfT = df.loc[(min_time <= df['timePrecessingRing']) & (df['timePrecessingRing'] < max_time)]

  df1 = dfT.loc[dfT['numberThread'] == 6]
  df2 = dfT.loc[dfT['numberThread'] == 10]
  df3 = dfT.loc[dfT['numberThread'] == 15]
  df4 = dfT.loc[dfT['numberThread'] == 20]

  df1 = df1.sort_values(by='timePrecessingRing')
  df2 = df2.sort_values(by='timePrecessingRing')
  df3 = df3.sort_values(by='timePrecessingRing')
  df4 = df4.sort_values(by='timePrecessingRing')

  trace1 = go.Scatter(
      x=df1.timePrecessingRing,
      y=df1.latencyChain,
      name='Threads6'
  )
  trace2 = go.Scatter(
      x=df2.timePrecessingRing,
      y=df2.latencyChain,
      name='Threads10'
  )
  trace3 = go.Scatter(
      x=df3.timePrecessingRing,
      y=df3.latencyChain,
      name='Threads15'
  )
  trace4 = go.Scatter(
      x=df4.timePrecessingRing,
      y=df4.latencyChain,
      name='Threads20'
  )

  fig = go.Figure(data=[trace1, trace2, trace3, trace4], layout={'title':'Latency Chain by time processing ring'})

  fig.update_layout(
        title="Latency Chain by time processing ring",
        yaxis_title="Time to deliver message (milliseconds)",
        xaxis_title="Time work ring (milliseconds)"
    )

  iplot(fig, show_link=True)


In [None]:
latency(0, 2000)

In [None]:
latency(2000, 4000)

In [None]:
latency(4000, 6000)

In [None]:
latency(6000, 8000)

**Вывод**



*   Некоторые замеры производительности показали, что происходит проседание производительности время от времени. Скорей всего было связано с переодическим сбросом загруженной оперативной памятью.
*   Выявлена не оптимальность в том, что потоки продолжают крутиться в цикле потребляя cpu даже если все сообщения они уже доставили или ждут другого потока. Надо оптимизировать тем, чтобы потоки засыпали, если для них нет работы.
*   При увеличении числа потоков производительность цепочек падает, что скорей всего связано с тем, что ядро начинает эмулировать работу нескольких потоков.
*   Время доставки быстрее всего происходит при большом колличестве сообщений, но отличие не существенно



**Оптимизация**

В данной оптимизации добавлен *Condition* для того, чтобы в случае завершния работы для любого потока он засыпал.

Производительность же потоков будет пересчитана в зависимости от среднего времени работы потоков.

**Рассмотрим, как оно отразится на полной загруженности**

**При неплотной загруженности же, потоки не тратят cpu засыпая, тем самым не простаивают**

In [None]:
df = pd.read_table('data2.txt',sep='\s+', engine='python')

In [None]:
throughput()

In [189]:
df_1 = pd.read_table('data1.txt',sep='\s+', engine='python')
df_22 = pd.read_table('data2.txt',sep='\s+', engine='python')

In [190]:
def throughputThreads(numThreads):
  df1 = df_1.loc[df_1['numberThread'] == numThreads]
  df2 = df_2.loc[df_2['numberThread'] == numThreads]

  df1 = df1.sort_values(by='timePrecessingRing')
  df2 = df2.sort_values(by='timePrecessingRing')

  name1= 'Threads ' + str(numThreads) + ' before opt'
  trace1 = go.Scatter(
      x=df1.timePrecessingRing,
      y=df1.throughputChain,
      name=name1
  )
  name2= 'Threads ' + str(numThreads) + ' after opt'
  trace2 = go.Scatter(
      x=df2.timePrecessingRing,
      y=df2.throughputChain,
      name=name2
  )


  fig = go.Figure(data=[trace1, trace2])

  fig.update_layout(
      title="Throughput Chain by time processing ring",
      yaxis_title="Number accepted messanges by one sec",
      xaxis_title="Time work ring (milliseconds)"
  )

  iplot(fig, show_link=False)

In [191]:
throughputThreads(6)

In [192]:
throughputThreads(10)

In [193]:
throughputThreads(15)

In [194]:
throughputThreads(20)

Мы можем заметить, что

In [163]:
workTime(0, 10000)

Заметно изменилось время работы системы в целом. Система стала мзавершать работу раньше

In [195]:
def latencyThreads(numThreads):
  df1 = df_1.loc[df_1['numberThread'] == numThreads]
  df2 = df_2.loc[df_2['numberThread'] == numThreads]

  df1 = df1.sort_values(by='timePrecessingRing')
  df2 = df2.sort_values(by='timePrecessingRing')

  name1= 'Threads ' + str(numThreads) + ' before opt'
  trace1 = go.Scatter(
      x=df1.timePrecessingRing,
      y=df1.latencyChain,
      name=name1
  )
  name2= 'Threads ' + str(numThreads) + ' after opt'
  trace2 = go.Scatter(
      x=df2.timePrecessingRing,
      y=df2.latencyChain,
      name=name2
  )


  fig = go.Figure(data=[trace1, trace2])

  fig.update_layout(
        title="Latency Chain by time processing ring",
        yaxis_title="Time to deliver message (milliseconds)",
        xaxis_title="Time work ring (milliseconds)"
    )

  iplot(fig, show_link=True)


In [196]:
latencyThreads(6)

In [197]:
latencyThreads(10)

In [198]:
latencyThreads(15)

In [199]:
latencyThreads(20)

**Вывод по оптимизации:**



*   Заметим, что пропускная способность выросла в целом при любом колличестве потоков. Единственное проседает при замере в 1 секунду, когда система перегружена сообщениями. При таком малом времени захваты локов сильно снижают.
*   Latency просела засчет колличества засыпания потоков при большом числе потоков. Скорей всего это связано с тем, что у меня каждое ядро эмулирует по 3-4 потока и некоторые потоки долго просыпаются передать письмо дальше
*   Хоть и время доставки сообщения увеличилось, но в целом система стала завершать свою работу раньше



**Измерение зависимости полной загрузки цепочки к неполной по пропускной способности:**

Рассмотрим цепочки длины 8 и 4 при колличестве потоков 9 и 12.

Такие числа были взяты из расчета, что честных потоков максимально 12 (6ядер * icore7)


И рассмотрим падает ли пропускная способность вершины при полной и не полной загруженности

In [200]:
df_3 = pd.read_table('data3.txt',sep='\s+', engine='python')

In [203]:
def throughputThreadsShift(numThread1, numThread2, shift1, shift2):
  df1 = df_3.loc[(df_3['numberThread'] == numThread1) & (df_3['shift'] == shift1)]
  df2 = df_3.loc[(df_3['numberThread'] == numThread2) & (df_3['shift'] == shift2)]

  df1 = df1.sort_values(by='timePrecessingRing')
  df2 = df2.sort_values(by='timePrecessingRing')

  name1= 'Threads ' + str(numThread1) + ' and shift ' + str(shift1)
  trace1 = go.Scatter(
      x=df1.timePrecessingRing,
      y=df1.throughputChain,
      name=name1
  )
  name2= 'Threads ' + str(numThread2) + ' and shift ' + str(shift2)
  trace2 = go.Scatter(
      x=df2.timePrecessingRing,
      y=df2.throughputChain,
      name=name2
  )


  fig = go.Figure(data=[trace1, trace2])

  fig.update_layout(
      title="Throughput Chain by time processing ring",
      yaxis_title="Number accepted messanges by one sec",
      xaxis_title="Time work ring (milliseconds)"
  )

  iplot(fig, show_link=False)

In [204]:
throughputThreadsShift(9, 12, 4, 4)

Как и из иследований выше (12, 4), чем у (12, 8) закончили раньше, но пропускная способность ниже так, как ядра распределяются по мощности больше

In [205]:
throughputThreadsShift(9, 12, 8, 8)

Как и из иследований выше (12, 4), чем у (12, 8) закончили раньше, но пропускная способность ниже так, как ядра распределяются по мощности больше

In [206]:
throughputThreadsShift(9, 9, 4, 8)

In [207]:
throughputThreadsShift(12, 12, 4, 8)

**Вывод**

При увеличении колличества потоков при стандартной цепочке падает пропускная способность каждой отдельной, но увеличивается латентность и производительность системы в целом. 

Оптимальным числом потоков при 10_000_000 сообщений и длине цепочки 5 будет 10 потоков. Так как система заканчивает работу не сильно позже других, но меньше проседает по пропускной способности.