-
Notifications
You must be signed in to change notification settings - Fork 393
/
topic.cc
173 lines (128 loc) · 4.21 KB
/
topic.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
#include <string>
#include <vector>
#include "src/common.h"
#include "src/connection.h"
#include "src/topic.h"
namespace NodeKafka {
/**
* @brief Producer v8 wrapped object.
*
* Wraps the RdKafka::Producer object with compositional inheritence and
* provides methods for interacting with it exposed to node.
*
* The base wrappable RdKafka::Handle deals with most of the wrapping but
* we still need to declare its prototype.
*
* @sa RdKafka::Producer
* @sa NodeKafka::Connection
*/
Topic::Topic(std::string topic_name, RdKafka::Conf* config):
m_topic_name(topic_name),
m_config(config) {
// We probably want to copy the config. May require refactoring if we do not
}
Topic::~Topic() {
if (m_config) {
delete m_config;
}
}
std::string Topic::name() {
return m_topic_name;
}
Baton Topic::toRDKafkaTopic(Connection* handle) {
if (m_config) {
return handle->CreateTopic(m_topic_name, m_config);
} else {
return handle->CreateTopic(m_topic_name);
}
}
/*
bool partition_available(int32_t partition) {
return topic_->partition_available(partition);
}
Baton offset_store (int32_t partition, int64_t offset) {
RdKafka::ErrorCode err = topic_->offset_store(partition, offset);
switch (err) {
case RdKafka::ERR_NO_ERROR:
break;
default:
break;
}
}
*/
Nan::Persistent<v8::Function> Topic::constructor;
void Topic::Init(v8::Local<v8::Object> exports) {
Nan::HandleScope scope;
v8::Local<v8::FunctionTemplate> tpl = Nan::New<v8::FunctionTemplate>(New);
tpl->SetClassName(Nan::New("Topic").ToLocalChecked());
tpl->InstanceTemplate()->SetInternalFieldCount(1);
Nan::SetPrototypeMethod(tpl, "name", NodeGetName);
// connect. disconnect. resume. pause. get meta data
constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext()))
.ToLocalChecked());
Nan::Set(exports, Nan::New("Topic").ToLocalChecked(),
tpl->GetFunction(Nan::GetCurrentContext()).ToLocalChecked());
}
void Topic::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
if (!info.IsConstructCall()) {
return Nan::ThrowError("non-constructor invocation not supported");
}
if (info.Length() < 1) {
return Nan::ThrowError("topic name is required");
}
if (!info[0]->IsString()) {
return Nan::ThrowError("Topic name must be a string");
}
RdKafka::Conf* config = NULL;
if (info.Length() >= 2 && !info[1]->IsUndefined() && !info[1]->IsNull()) {
// If they gave us two parameters, or the 3rd parameter is null or
// undefined, we want to pass null in for the config
std::string errstr;
if (!info[1]->IsObject()) {
return Nan::ThrowError("Configuration data must be specified");
}
config = Conf::create(RdKafka::Conf::CONF_TOPIC, (info[1]->ToObject(Nan::GetCurrentContext())).ToLocalChecked(), errstr); // NOLINT
if (!config) {
return Nan::ThrowError(errstr.c_str());
}
}
Nan::Utf8String parameterValue(Nan::To<v8::String>(info[0]).ToLocalChecked());
std::string topic_name(*parameterValue);
Topic* topic = new Topic(topic_name, config);
// Wrap it
topic->Wrap(info.This());
// Then there is some weird initialization that happens
// basically it sets the configuration data
// we don't need to do that because we lazy load it
info.GetReturnValue().Set(info.This());
}
// handle
v8::Local<v8::Object> Topic::NewInstance(v8::Local<v8::Value> arg) {
Nan::EscapableHandleScope scope;
const unsigned argc = 1;
v8::Local<v8::Value> argv[argc] = { arg };
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
v8::Local<v8::Object> instance =
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
return scope.Escape(instance);
}
NAN_METHOD(Topic::NodeGetName) {
Nan::HandleScope scope;
Topic* topic = ObjectWrap::Unwrap<Topic>(info.This());
info.GetReturnValue().Set(Nan::New(topic->name()).ToLocalChecked());
}
NAN_METHOD(Topic::NodePartitionAvailable) {
// @TODO(sparente)
}
NAN_METHOD(Topic::NodeOffsetStore) {
// @TODO(sparente)
}
} // namespace NodeKafka