Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement kafka_connection class #1

Closed
avelanarius opened this issue Nov 19, 2019 · 0 comments
Closed

Implement kafka_connection class #1

avelanarius opened this issue Nov 19, 2019 · 0 comments

Comments

@avelanarius
Copy link
Owner

avelanarius commented Nov 19, 2019

The class handles a single TCP connection with some specified Kafka broker.

Later it could store some data about the broker, such as supported API versions.

Initial idea for the class:

class kafka_connection {
public:
    seastar::future<> connect(host, port);

    // payload could char*, std::string or seastar::temporary_buffer<char>
    seastar::future<> write(char *payload);

    seastar::future<seastar::temporary_buffer<char>> read(size_t how_many_bytes);

    seastar::future<> close();
};

Test whether it works by connecting to a Kafka broker, sending hardcoded payload and receiving back another payload.

Please test it with Kafka started with https://github.com/avelanarius/seastar/blob/avelanarius-kafkadev-local-final/scripts/kafkadev_local/README.md.

Later versions should expose higher level API such as "send message" (not blob of chars).

@avelanarius avelanarius changed the title Write kafka_connection class Implement kafka_connection class Nov 19, 2019
avelanarius pushed a commit that referenced this issue Dec 1, 2019
Merge changes from origin into the fork (branch zpp_kafka)
avelanarius pushed a commit that referenced this issue Sep 4, 2020
This reverts commit 33406cf. It
introduces memory leaks:

Direct leak of 24 byte(s) in 1 object(s) allocated from:
    #0 0x7fb773b389d7 in operator new(unsigned long) (/lib64/libasan.so.5+0x10f9d7)
    #1 0x108f0d4 in seastar::reactor::poller::~poller() ../src/core/reactor.cc:2879
    #2 0x11c1e59 in std::experimental::fundamentals_v1::_Optional_base<seastar::reactor::poller, true>::~_Optional_base() /usr/include/c++/9/experimental/optional:288
    #3 0x118f2d7 in std::experimental::fundamentals_v1::optional<seastar::reactor::poller>::~optional() /usr/include/c++/9/experimental/optional:491
    #4 0x108c5a5 in seastar::reactor::run() ../src/core/reactor.cc:2587
    #5 0xf1a822 in seastar::app_template::run_deprecated(int, char**, std::function<void ()>&&) ../src/core/app-template.cc:199
    #6 0xf1885d in seastar::app_template::run(int, char**, std::function<seastar::future<int> ()>&&) ../src/core/app-template.cc:115
    #7 0xeb2735 in operator() ../src/testing/test_runner.cc:72
    #8 0xebb342 in _M_invoke /usr/include/c++/9/bits/std_function.h:300
    #9 0xf3d8b0 in std::function<void ()>::operator()() const /usr/include/c++/9/bits/std_function.h:690
    #10 0x1034c72 in seastar::posix_thread::start_routine(void*) ../src/core/posix.cc:52
    #11 0x7fb7738804e1 in start_thread /usr/src/debug/glibc-2.30-13-g919af705ee/nptl/pthread_create.c:479

Reported-by: Rafael Avila de Espindola <espindola@scylladb.com>
avelanarius pushed a commit that referenced this issue Sep 4, 2020
"
This series enhances seastar backtraces to contain information not only about
current task's call stack, but also information about tasks which are blocked
on the current task.

For example, when a thread is waiting for I/O operation to complete, the
continuation chain will contain the continuation which handles ready I/O, then
a continuation which wakes the thread, then the continuation which waits for
the thread to finish. This gives much more context in the backtrace, similar
to the one we would get in a synchronous programming model.

Presenting only current backtrace of the reactor thread is in many
cases not enough.

This is how extended backtraces will be logged:

INFO  2020-05-06 11:39:32,362 [shard 0] seastar - backtrace:    0x5b7ed9
   0x5b80e2
   0x5b8599
   0x437d76
   0x43e88b
   0x4c9617
   0x4c990d
   0x4f65c5
   0x4a962c
   0x4aa245
   0x49e785
   0x4c288d
   /lib64/libpthread.so.0+0x94e1
   /lib64/libc.so.6+0x1016a2
   --------
   seastar::continuation<seastar::internal::promise_base_with_type<>, seastar::future<>::then_impl_nrvo<func4()::{lambda()#1}, seastar::future<> >(func4()::{lambda()#1}&&)::{lambda()#1}::operator()() const::{lambda(seastar::internal::promise_base_with_type<>&, seastar::future_state<>&&)#1}>
   --------
   seastar::future<>::thread_wake_task

The --- separator separates backtraces of different tasks in the chain.

A side benefit of the series is that all backtraces will now some meaningful
information in them (current continuation name) without the need to resolve addresses.

Another use case is heap profiles. Currently measuring memory
allocations for one large operation is hard if it starts many
allocating continuations. Tasks for those continuations will
be spread in the profile and not correlated with the grand
operation. Backtracing across tasks solves that because different
paths are joined by a common base.

The task class is extended with a virtual method:

   virtual task* waiting_task() noexcept = 0;

which allows one to walk the continuation chain. It returns the next task
blocked on the current one.

When backtrace is collected, for each task its type_info is pushed,
which allows us to obtain the name of the continuation.

Optionally, when the build is configured with --enable-task-backtrace, full backtrace
is attached to continuation tasks when they're created (at deferring points)
so that we get more context.

No regression showed by perf_simple_query (@ 120k tps).
"

* tag 'backtracing-across-tasks-v5.1' of github.com:tgrabiec/seastar:
  util/backtrace: Cache hash of the backtrace
  core/task, util/backtrace: Allow capturing backtrace at preemption points
  core/memory: Move disable_backtrace_temporarily declaration to the header
  tests: future: Add backtracing test
  addr2line: Ignore separator lines
  core/task, util/backtrace: Collect backtraces across continuation chains
  util/backtrace: Extract operator<<(std::ostream&, frame&)
  core/make_task: Implement lambda_task::waiting_task()
  core/task: Store promise inside lambda_task
  core/task: Extrack make_task() to a separate header
  core/reactor: Expose a pointer to currently running task
  core/future: Add ability to walk continuation chains
  util/backtrace: Capture current scheduling group in the backtrace
avelanarius pushed a commit that referenced this issue Sep 4, 2020
The iotune tool measures disk throughput and IOPS by doing four
sequential measurements:

1. sequentially writes into a big file
2. sequentially reads from the same file
3. randomly writes into this file again
4. randomly reads from, you know, the File

It's improtant that the measurement #1 comes first. On start
the test file is created and truncated to its size and this
first measurement fills it with data which is then read by steps
2 and 4. Respectively, after the 1st measurement the size of
the file should be updated to reflect the real amount of data
written into it.

The latter is done by taking the number of bytes written into
file. But in reality the first test may wrap around the initial
file size and re-write some data into it. After this the file
size can be seen bigger than it actually is, even times bigger.

Subsequently, the next tests will go and read from/write to
random holes in this area. For reading tests this becomes quite
problematic as the kernel will not submit real IO requess for
reads from missing (due to holes) blocks. As a result, the shown
bandwidth and IOPS will be some average value of disk IOPS and
kernel "reads-from-holes-per-second".

Fix this by getting the maximum position at which the first test
writes and limiting the next tests with this value, instead of
the amount of (over-)writter bytes.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20200623185120.17694-1-xemul@scylladb.com>
avelanarius pushed a commit that referenced this issue Apr 29, 2021
…o_with

Fixes failures in debug mode:
```
$ build/debug/tests/unit/closeable_test -l all -t deferred_close_test
WARNING: debug mode. Not for benchmarking or production
random-seed=3064133628
Running 1 test case...
Entering test module "../../tests/unit/closeable_test.cc"
../../tests/unit/closeable_test.cc(0): Entering test case "deferred_close_test"
../../src/testing/seastar_test.cc(43): info: check true has passed
==9449==WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false positives in some cases!
terminate called after throwing an instance of 'seastar::broken_promise'
  what():  broken promise
==9449==WARNING: ASan is ignoring requested __asan_handle_no_return: stack top: 0x7fbf1f49f000; bottom 0x7fbf40971000; size: 0xffffffffdeb2e000 (-558702592)
False positive error reports may follow
For details see google/sanitizers#189
=================================================================
==9449==AddressSanitizer CHECK failed: ../../../../libsanitizer/asan/asan_thread.cpp:356 "((ptr[0] == kCurrentStackFrameMagic)) != (0)" (0x0, 0x0)
    #0 0x7fbf45f39d0b  (/lib64/libasan.so.6+0xb3d0b)
    #1 0x7fbf45f57d4e  (/lib64/libasan.so.6+0xd1d4e)
    #2 0x7fbf45f3e724  (/lib64/libasan.so.6+0xb8724)
    #3 0x7fbf45eb3e5b  (/lib64/libasan.so.6+0x2de5b)
    #4 0x7fbf45eb51e8  (/lib64/libasan.so.6+0x2f1e8)
    #5 0x7fbf45eb7694  (/lib64/libasan.so.6+0x31694)
    #6 0x7fbf45f39398  (/lib64/libasan.so.6+0xb3398)
    #7 0x7fbf45f3a00b in __asan_report_load8 (/lib64/libasan.so.6+0xb400b)
    #8 0xfe6d52 in bool __gnu_cxx::operator!=<dl_phdr_info*, std::vector<dl_phdr_info, std::allocator<dl_phdr_info> > >(__gnu_cxx::__normal_iterator<dl_phdr_info*, std::vector<dl_phdr_info, std::allocator<dl_phdr_info> > > const&, __gnu_cxx::__normal_iterator<dl_phdr_info*, std::vector<dl_phdr_info, std::allocator<dl_phdr_info> > > const&) /usr/include/c++/10/bits/stl_iterator.h:1116
    #9 0xfe615c in dl_iterate_phdr ../../src/core/exception_hacks.cc:121
    #10 0x7fbf44bd1810 in _Unwind_Find_FDE (/lib64/libgcc_s.so.1+0x13810)
    #11 0x7fbf44bcd897  (/lib64/libgcc_s.so.1+0xf897)
    #12 0x7fbf44bcea5f  (/lib64/libgcc_s.so.1+0x10a5f)
    #13 0x7fbf44bcefd8 in _Unwind_RaiseException (/lib64/libgcc_s.so.1+0x10fd8)
    #14 0xfe6281 in _Unwind_RaiseException ../../src/core/exception_hacks.cc:148
    #15 0x7fbf457364bb in __cxa_throw (/lib64/libstdc++.so.6+0xaa4bb)
    #16 0x7fbf45e10a21  (/lib64/libboost_unit_test_framework.so.1.73.0+0x1aa21)
    #17 0x7fbf45e20fe0 in boost::execution_monitor::execute(boost::function<int ()> const&) (/lib64/libboost_unit_test_framework.so.1.73.0+0x2afe0)
    #18 0x7fbf45e21094 in boost::execution_monitor::vexecute(boost::function<void ()> const&) (/lib64/libboost_unit_test_framework.so.1.73.0+0x2b094)
    #19 0x7fbf45e43921 in boost::unit_test::unit_test_monitor_t::execute_and_translate(boost::function<void ()> const&, unsigned long) (/lib64/libboost_unit_test_framework.so.1.73.0+0x4d921)
    #20 0x7fbf45e5eae1  (/lib64/libboost_unit_test_framework.so.1.73.0+0x68ae1)
    #21 0x7fbf45e5ed31  (/lib64/libboost_unit_test_framework.so.1.73.0+0x68d31)
    psarna#22 0x7fbf45e2e547 in boost::unit_test::framework::run(unsigned long, bool) (/lib64/libboost_unit_test_framework.so.1.73.0+0x38547)
    psarna#23 0x7fbf45e43618 in boost::unit_test::unit_test_main(bool (*)(), int, char**) (/lib64/libboost_unit_test_framework.so.1.73.0+0x4d618)
    psarna#24 0x44798d in seastar::testing::entry_point(int, char**) ../../src/testing/entry_point.cc:77
    psarna#25 0x4134b5 in main ../../include/seastar/testing/seastar_test.hh:65
    psarna#26 0x7fbf44a1b1e1 in __libc_start_main (/lib64/libc.so.6+0x281e1)
    psarna#27 0x4133dd in _start (/home/bhalevy/dev/seastar/build/debug/tests/unit/closeable_test+0x4133dd)
```

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20210406100911.12278-1-bhalevy@scylladb.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant