/
client.h
188 lines (139 loc) · 4.97 KB
/
client.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#ifndef CLIENT_H
#define CLIENT_H
#include <cassert>
#include <chrono>
#include <cstring>
#include <ctime>
#include <iostream>
#include <map>
#include <math.h>
#include <queue>
#include <signal.h>
#include <sstream>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <uv.h>
#include <vector>
#include "parse_deployment.h"
#include "v8worker.h"
const size_t MAX_BUF_SIZE = 65536;
const int HEADER_FRAGMENT_SIZE = 4; // uint32
const int PAYLOAD_FRAGMENT_SIZE = 4; // uint32
const int SIZEOF_UINT32 = 4;
const size_t MAX_V8_HEAP_SIZE = 1.4 * 1024 * 1024 * 1024;
int64_t timer_context_size;
typedef struct resp_msg_s {
std::string msg;
uint8_t msg_type;
uint8_t opcode;
} resp_msg_t;
typedef union {
sockaddr_in sock4;
sockaddr_in6 sock6;
} sockaddr_in46;
class AppWorker {
public:
static AppWorker *GetAppWorker();
std::vector<char> *GetReadBufferMain();
std::vector<char> *GetReadBufferFeedback();
void FlushToConn(uv_stream_t *stream, char *buffer, int length);
void InitTcpSock(const std::string &function_name,
const std::string &function_id,
const std::string &user_prefix, const std::string &appname,
const std::string &addr, const std::string &worker_id,
int batch_size, int feedback_batch_size, int feedback_port,
int port);
void InitUDS(const std::string &function_name, const std::string &function_id,
const std::string &user_prefix, const std::string &appname,
const std::string &addr, const std::string &worker_id,
int batch_size, int feedback_batch_size,
std::string feedback_sock_path, std::string uds_sock_path);
void OnConnect(uv_connect_t *conn, int status);
void OnFeedbackConnect(uv_connect_t *conn, int status);
void OnRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
static std::pair<bool, std::unique_ptr<WorkerMessage>>
GetWorkerMessage(int encoded_header_size, int encoded_payload_size,
const std::string &msg);
void ParseValidChunk(uv_stream_t *stream, int nread, const char *buf);
void RouteMessageWithResponse(std::unique_ptr<WorkerMessage> worker_msg);
void StartFeedbackUVLoop();
void StartMainUVLoop();
void WriteResponses();
void ReadStdinLoop();
void EventGenLoop();
static void StopUvLoop(uv_async_t *);
void SendFilterAck(int opcode, int msgtype, int vb_no, int64_t seq_no,
bool skip_ack);
void SetNsServerPort(const std::string &port) { ns_server_port_ = port; }
std::thread main_uv_loop_thr_;
std::thread feedback_uv_loop_thr_;
std::thread stdin_read_thr_;
std::thread event_gen_thr_;
size_t memory_quota_;
protected:
void WriteResponseWithRetry(uv_stream_t *handle,
std::vector<uv_buf_t> messages,
size_t batch_size);
std::string GetInsight();
private:
AppWorker();
~AppWorker();
std::vector<std::unordered_set<int64_t>>
PartitionVbuckets(const std::vector<int64_t> &vbuckets) const;
void SendPauseAck(const std::unordered_map<int64_t, uint64_t> &lps_map);
std::thread write_responses_thr_;
std::map<int16_t, V8Worker *> workers_;
std::chrono::milliseconds checkpoint_interval_;
Histogram latency_stats_;
Histogram curl_latency_stats_;
// Socket handles for out of band data channel to pipeline data to parent
// eventing-producer
int feedback_batch_size_;
uv_connect_t feedback_conn_;
uv_stream_t *feedback_conn_handle_;
uv_loop_t feedback_loop_;
uv_async_t feedback_loop_async_;
bool feedback_loop_running_;
sockaddr_in46 feedback_server_sock_;
uv_tcp_t feedback_tcp_sock_;
uv_pipe_t feedback_uds_sock_;
// Socket handles for data channel to pipeline messages from parent
// eventing-producer to cpp workers
int batch_size_;
uv_connect_t conn_;
uv_stream_t *conn_handle_;
uv_loop_t main_loop_;
uv_async_t main_loop_async_;
bool main_loop_running_;
sockaddr_in46 server_sock_;
uv_tcp_t tcp_sock_;
uv_pipe_t uds_sock_;
std::string app_name_;
std::string function_name_;
std::string function_id_;
std::string user_prefix_;
std::string next_message_;
std::string ns_server_port_;
std::map<int16_t, int16_t> partition_thr_map_;
// Controls the number of virtual partitions, in order to shard work among
// worker threads
int16_t partition_count_;
// Controls the size of thread pool, each thread executing user supplied
// handler code against dcp/timer events
int16_t thr_count_;
// Captures the config message that will be written by C++ worker
// to the tcp socket in order to communicate message to Go world
resp_msg_t *resp_msg_;
bool msg_priority_;
bool using_timer_{false};
std::vector<char> read_buffer_main_;
std::vector<char> read_buffer_feedback_;
std::atomic<bool> thread_exit_cond_;
std::atomic<bool> pause_consumer_;
bool v8worker_init_done_{false};
std::mutex workers_map_mutex_;
};
#endif