New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Minificpp 1007 - ECU C2 integration. #674
Conversation
7af6d5c
to
ae170d8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some improvements that need to be made.
WIP.
2. Logging is needed.
3. Task scheduling has to be separated from thread pool.
c4d4c7f
to
e8932ca
Compare
nanofi/include/ecu_api/ecuapi.h
Outdated
properties_t * ecu_configuration; | ||
void * input_processor_ctx; | ||
void * output_processor_ctx; | ||
threadpool_t * thread_pool; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is thread pool something that is configurable? ideally we create no threads and be entirely synchronous if desired. I think the default should be async but we should also support synchronous interactions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a synchronous api for ingesting data from within the application. However, that is not yet integrated into c2. Threadpool is not configurable yet. I will include it with the c2 configuration parameters when I am working on it.
pthread_mutex_unlock(&ctx->stop_mutex); | ||
} | ||
|
||
void free_s2s_output_context(site2site_output_context_t * ctx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should avoid s2s and site2site and keep it sitetosite throughout. MiNiFI C++ has some vestigial references to site2site, but this should be a no-op change and improve use of this. It may be confusing especially for structs to have 2 in the name when I think the convention from nifi is to use 'to'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it!
e8932ca
to
c2db9f5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should implement ringbuffer and threadpool.
Did you check for solutions already available on github? (considering licesing as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@msharee9 Just started the review, I am releasing what I have so far, but I have fundamental questions before continuing the C2 part of the review.
What's the purpose of using CBOR for C2 payload serialization? Is this minifi's C2 CoAP reimplemented? If so, I don't believe that uses CBOR, but a custom binary format, so I don't believe it is compatible. Is this purposely a new serialization format? How does it relate to https://cwiki.apache.org/confluence/display/MINIFI/C2+Design ?
ExternalProject_Add( | ||
libcbor-external | ||
GIT_REPOSITORY "https://github.com/PJK/libcbor.git" | ||
GIT_TAG "82512d851205fbc7f65d96a0b4a8e1bad2e4f3c6" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dislike not using released versions of third parties, but I see that there has been a lot of commits on master since the last release, so this could be an exception.
Please comment this here and create an issue to update this to a released version when they do release one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest release is almost 3 years ago but it looks like the project has commits as recent as 5 months ago on master with some improvements. I will open a ticket to make sure we checkout release version when it comes out.
nanofi/src/coap/c2protocol.c
Outdated
return 0; | ||
} | ||
|
||
uint16_t endian_check_uint16(uint16_t value, int is_little_endian) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
libcbor already seems to take care of any byte order issues on decoding: https://github.com/PJK/libcbor/blob/master/src/cbor/internal/loaders.c (it would be very surprising to me, if it did not), so these are unnecessary.
I know these have been copied from MiNiFi code, but the runtime check for endianness and manually writing byte order transformations are horrific.
There are compile-time checks for endianness and the ntoh* and hton* family of functions that automatically take care of these conversions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we compile the code on a little endian machine and run the executable on big endian wouldn't that be problematic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the target platform matters. You can cross-compile e.g. from amd64 (little endian host platform) to arm (big endian target platform), in that case the generated binary will only work on arm machines and the compile-time check for endianness will return big endian, i.e. the endianness of the target platform rather than the host.
The hton* and ntoh* functions conveniently hide this and their API is flexible enough to even allow the hypothetical scenario of a CPU changing its endianness at runtime. The point is: the developer doesn't need to care how they work when correctly using the functions.
nanofi/src/coap/c2protocol.c
Outdated
return buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; | ||
} | ||
|
||
uint16_t endian_check_uint64(uint64_t value, int is_little_endian) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value should be uint64_t (but as per my previous comment, it shouldn't really exist at all).
nanofi/src/coap/c2protocol.c
Outdated
case 7: | ||
return STOP; | ||
} | ||
return ACKNOWLEDGE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is good from a forward-compatibility standpoint.
endif() | ||
|
||
ExternalProject_Add( | ||
libcbor-external |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we using CBOR for? I see that it is being used for encoding C2 protocol payloads, but as far as I know our CoAP C2 payload is a custom format, not CBOR.
What have you tested this with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tested this with coap_server.c and c2_client.c in nanofi/ecu.
The coap_server.c stands as a C2 server listening for heartbeats from ecus. c2_client.c stands as a c2 client requesting operations on ecu via coap_server.
I will write unit tests for testing the serde of c2 protocol including heartbeats and c2 operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next batch of review, not nearly ready yet.
nanofi/src/ecu_api/ecuapi.c
Outdated
#include <core/threadpool.h> | ||
#include <uuid/uuid.h> | ||
|
||
void initialize_ecu(const char * name, ecu_context_t * ecu, io_type_t ip, void * ip_ctx, io_type_t op, void * op_ctx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is kind a of a "member function" I would prefer if the ecu_context_t would be the first parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
@@ -0,0 +1,835 @@ | |||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concept of having a fix set of input-output modules and switch-casing through them in all of these functions is not very extensible or generic, and code is not too nice this way either.
I think it is reasonable to expect that we, or the user might want to expand on these capabilities.
Therefore I think it would be better to have an input-output module interface (practically a struct of function pointers) defined for these and a global map to which they can be registered.
Each input-output module can then implement these functions and register itself in the map, which we can access from these functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate. I did not get it.
nanofi/src/processors/file_input.c
Outdated
data[ctx->chunk_size] = '\0'; | ||
data_buff_t buff; | ||
buff.data = (char *)malloc(strlen(data) + 1); | ||
memcpy(buff.data, data, strlen(data) + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a \0
in the file, and in this case, where it is perfectly valid to have binary data, there will be, you skip the part of the chunk after the \0
, which is not good.
I don't see the point of treating this as a C string at all. It is chunk sized data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Will correct it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next batch of review.
A general observation:
The new mass of code uses a huge amount of pthread functions. This means that it will very hard to port to Windows, which is, as far as I understand something we want to do sooner or later.
I would suggest choosing a platform-independent threading library or implementing platform-specific threading utilities now, and replace pthread functions, or this will become exponentially harder in the future.
f117cbf
to
3937f3f
Compare
Yes, checked for solutions online for threadpool, but I don't think we will get an out of the box solution for threadpool in C that is platform independent. Plus I had some special requirements for the threadpool to be able to run repeated tasks that are scheduled by the threadpool itself. Planning to add a time based scheduler so as to separate scheduling from the threadpool. For the ring buffer, yes there are solutions on github but the api we have is very simple with just two functions, read and write. |
Added platform independent thread utils. I was working on it as part of MINIFICPP-777. Cherry picked them here. |
We are using CBOR for serde of C2 protocol in nanofi. CBOR has an out of the box api for decoding and encoding information without having to have a schema. It is tedious to write but it is smart enough to pack object types concisely. |
3937f3f
to
fc2b828
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides the comments added my main concern is that I still don't feel convinced about the need of implementing thread pools, message queues, ringbuffers and all the related stuff.
ExternalProject_Add( | ||
libcoap-external | ||
GIT_REPOSITORY "https://github.com/obgm/libcoap.git" | ||
GIT_TAG "d6c25a9a0757af9d13842b7aae9713136e720567" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A version please!
nanofi/ecu/coap_server.c
Outdated
@@ -0,0 +1,138 @@ | |||
#include <nanofi/coap_server.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple issues here:
- License
- What is the goal of this file? Are we trying to emulate a C2 server? Some comments would be nice
- A server including the header of the agent is a bad smell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file was initially not supposed to be pushed to the repo. I wrote it to test the c2 operations on ecus without having to have any other external tool but realized that it could be handy for anyone to test their ecus if they write one. Same goes for c2_client.c.
I will mention the purpose of each in the respective file and make the code much cleaner.
int ret = mkdir(path, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH); | ||
char path_sep = '/'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you simpl use "get_separator()"?
fc2b828
to
c06cd2c
Compare
The main reason for having thread pool is to let the user automatically start input and output processing when commanded by the c2 agent. After the ecu is started the application using the sdk won't block and the data ingestion and forwarding is started in the background in the threadpool. Ofcourse, the api is not limited to this, the user can opt to call out the functions individually to manually ingest data (some work might be needed here) and forward it if he wishes. The message queue is implemented to concurrently run the input and output processors and the ring buffer to limit the size of buffering. The size of the buffer can be configurable (some work is needed for making it configurable) though. One of the other goals of this sdk is to let the user integrate ecu into his application. We want to make it easier for the user to do that by making the api calls to a few number. |
343f229
to
06cb2bf
Compare
06cb2bf
to
df0071c
Compare
68ce755
to
1e7cacd
Compare
988a2d0
to
30023c5
Compare
62dc000
to
f0cecb0
Compare
I see some drawbacks and performance concerns with the ring buffer approach. I am revisiting this design and documenting a revised ecu core design/approach. Will send it out for review once I wrap up other work related to light weight heartbeats. |
f0cecb0
to
0f10b7a
Compare
.value = cbor_move(list_item)}); | ||
break; | ||
} | ||
case PROP_TYPE: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All keys in the C2 protocol are unsigned ints except the keys of properties which happen to be strings. Use semantic tagging here to identify properties (string key and string value pairs). While decoding a cbor serialized data, tag will help determine if the incoming payload is a regular cbor map or a property.
} | ||
struct cbor_pair * kvps = cbor_map_handle(item); | ||
size_t sz = cbor_map_size(item); | ||
//check if we are parsing properties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tagging will help better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About half of the issues are newly introduced compiler warnings.
In addition to the issues, I suggest a reformatting of the new files to eliminate indentation inconsistencies.
int is_little_endian() { | ||
const uint16_t x = 1; | ||
uint8_t * y = (uint8_t *)&x; | ||
return *y == 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- CPU byte ordering can be determined at compile-time. This is how abseil (core utilities by google) does it: https://github.com/abseil/abseil-cpp/blob/master/absl/base/config.h#L433
- The above way of type punning violates the aliasing rules of C and invokes undefined behavior. Use union-based type punning (C only, might not work on MSVC) or
memcpy
tochar[]
} | ||
|
||
void setup_ecu_signal_action() { | ||
#ifdef _WIN32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we mostly use WIN32
but sometimes _WIN32
?
add_dependencies(minifi-coap minifi-http-curl) | ||
endif() | ||
add_dependencies(minifi-coap minifi-http-curl) | ||
endif() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file is indented with tabs. Why was this changed on these lines but not on the rest of the file?
* @attention the source string is expected to be | ||
* null terminated | ||
*/ | ||
void copystr(const char * source, char ** dest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use the return value to return the result like strdup
does.
if (bytes_sent % (1 << (szx + 4)) == 0) { | ||
num = block.num = (bytes_sent >> (szx + 4)) - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
szx + 4
looks like a magic number that deserves a name. Could you give it a name?
|
||
void thread_sleep_ms(uint64_t millis) { | ||
#ifndef WIN32 | ||
usleep(millis * 1000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing #include <unistd.h>
#endif | ||
|
||
#include <c2_api/c2api.h> | ||
#include "nanofi/coap_message.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've got a "No such file or directory" for this header. Please adapt the include path or change to #include "coap_message.h"
value_t value_string(const char * value) { | ||
value_t val; | ||
memset(&val, 0, sizeof(value_t)); | ||
copystr(value, &val.v_str); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copystr
is undeclared. Add #include "core/string_utils.h"
} | ||
|
||
int extract_input_manifest(const c2_payload_list_t * inputs, io_manifest * io_mnfst) { | ||
c2_payload_list_t * ip; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The iterator variable (ip
) should be const
if the collection variable (inputs
) is const
.
c2_payload_list_t * op; | ||
LL_FOREACH(outputs, op) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The iterator variable (op
) should be const
if the collection variable (outputs
) is const
.
0f10b7a
to
bf10350
Compare
UT_hash_handle hh; | ||
} c2_payload_map_t; | ||
|
||
c2_payload_map_t * c2_payload_heartbeat(c2heartbeat_t hb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function headers would be useful
c2_payload_map_t * c2_payload_agent_response(c2_response_t * ap); | ||
void free_c2_payload(c2_payload_map_t * c2payload); | ||
|
||
c2heartbeat_t * extract_c2_heartbeat(const c2_payload_map_t * c2_payload); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A payload parser utility would be necessary for extensibility.
bf10350
to
1b3f859
Compare
Closing this PR in lieu of MINIFICPP-1008. Incorporated ECU core related comments there. Will revisit C2 part of nanofi from this PR in near future. |
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.