forked from EvgBitWhiskey/BitWhiskey
-
Notifications
You must be signed in to change notification settings - Fork 0
/
RequestsProcessingThread.cs
154 lines (141 loc) · 5.41 KB
/
RequestsProcessingThread.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace BitWhiskey
{
public class RequestResult
{
public string error = "";
public Exception exception = null;
public object resultData = "";
}
public class RequestParams
{
public string ticker;
}
public class RequestItem
{
public string requestString ="";
public RequestParams reqparam = new RequestParams();
public RequestResult result = new RequestResult();
public Action<RequestItem> ProcessResultAction;
// public int waitnext;
}
public class RequestItemGroup
{
public string market;
public List<RequestItem> items;
public Action<RequestItemGroup> ProcessResultUIAction;
// priority
public RequestItemGroup(Action<RequestItemGroup> ProcessResultUIAction_)
{
items = new List<RequestItem>();
ProcessResultUIAction = ProcessResultUIAction_;
}
public void AddItem(string requestString, Action<RequestItem> ProcessResultAction, RequestParams reqparam=null)
{
RequestItem reqitem = new RequestItem();
reqitem.reqparam = reqparam;
reqitem.requestString = requestString;
reqitem.ProcessResultAction = ProcessResultAction;
items.Add(reqitem);
}
}
public class RequestManager
{
static readonly object _locker = new object();
private Dictionary<string, BlockingCollection<RequestItemGroup>> requestQueue = new Dictionary<string, BlockingCollection<RequestItemGroup>>();
public void Create(List<string> marketList)
{
foreach(string market in marketList)
requestQueue.Add(market, new BlockingCollection<RequestItemGroup>());
}
public BlockingCollection<RequestItemGroup> GetQueue(string marketName)
{
BlockingCollection<RequestItemGroup> queue;
lock (_locker)
{
queue = requestQueue[marketName];
}
return queue;
}
public void AddItemGroup(string marketName, RequestItemGroup itemgroup)
{
if (itemgroup.items.Count > 5)
throw new Exception("Can't process request, Limit=5 requests in sec");
BlockingCollection<RequestItemGroup> queue;
lock (_locker)
{
queue = requestQueue[marketName];
}
queue.Add(itemgroup);
}
}
public class RequestConsumer
{
public static RequestManager requestManager = new RequestManager();
public static Dictionary<string, Task> consumeTasks = new Dictionary<string, Task>();
public static void CreateRequestThreads(List<string> marketList)
{
foreach (string market in marketList)
{
Task consumeTask = Task.Run(() => ProcessAPIRequest(market));
consumeTasks.Add(market, consumeTask);
}
}
public static void ProcessAPIRequest(string marketName)
{
Stopwatch stopTimer = new Stopwatch();
List<long> elapsedTime = new List<long>();
stopTimer.Start();
elapsedTime.Add(stopTimer.ElapsedMilliseconds);
while (true)
{
// = new RequestItemGroup();
BlockingCollection<RequestItemGroup> requestQueue = requestManager.GetQueue(marketName);
RequestItemGroup itemgroup = requestQueue.Take();
int curItemCount = itemgroup.items.Count;
int maxPerSecond = 5;
if (elapsedTime.Count > maxPerSecond - curItemCount)
{
long startTime = elapsedTime[elapsedTime.Count - (maxPerSecond - curItemCount)];
while (stopTimer.ElapsedMilliseconds - startTime < 1100)
{
int totalReqDelay = (int)(stopTimer.ElapsedMilliseconds - startTime);
//itemgroup.items[0].item = itemgroup.items[0].item + "..Wait.." + (1170 - totalReqDelay).ToString() + "ms";
Thread.Sleep(1170 - totalReqDelay);
}
}
itemgroup.market = marketName;
foreach (RequestItem reqitem in itemgroup.items)
{
elapsedTime.Add(stopTimer.ElapsedMilliseconds);
if (elapsedTime.Count > 10)
elapsedTime.RemoveAt(0);
}
var taskList = new List<Task>();
foreach (RequestItem reqitem in itemgroup.items)
{
var task = Task.Run(() => reqitem.ProcessResultAction(reqitem));
taskList.Add(task);
}
Task.WaitAll(taskList.ToArray());
Task.Factory.StartNew(() =>
{
try
{
itemgroup.ProcessResultUIAction(itemgroup);
}
catch (ObjectDisposedException ex)
{
}
}, CancellationToken.None, TaskCreationOptions.None, Global.uiScheduler).Wait();
}
}
}
}