Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESP (Redis serialization protocol) #147

Merged
merged 6 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,10 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
_, path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
}

if d.Protocol == l7_req.L7_PROTOCOL_REDIS {
path = string(d.Payload[0:d.PayloadSize])
}

err := a.setFromTo(skInfo, d, &reqDto, reqHostHeader)
if err != nil {
return
Expand All @@ -1054,11 +1058,12 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
reqDto.Path = path
reqDto.Completed = !d.Failed

// In AMQP-DELIVER event, we are capturing from read syscall,
// In AMQP-DELIVER or REDIS-PUSHED_EVENT event, we are capturing from read syscall,
// exchange sockets
// In Alaz context, From is always the one that makes the write
// and To is the one that makes the read
if d.Protocol == l7_req.L7_PROTOCOL_AMQP && d.Method == l7_req.DELIVER {
if (d.Protocol == l7_req.L7_PROTOCOL_AMQP && d.Method == l7_req.DELIVER) ||
(d.Protocol == l7_req.L7_PROTOCOL_REDIS && d.Method == l7_req.REDIS_PUSHED_EVENT) {
reqDto.FromIP, reqDto.ToIP = reqDto.ToIP, reqDto.FromIP
reqDto.FromPort, reqDto.ToPort = reqDto.ToPort, reqDto.FromPort
reqDto.FromUID, reqDto.ToUID = reqDto.ToUID, reqDto.FromUID
Expand Down
1 change: 1 addition & 0 deletions ebpf/c/bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "http.c"
#include "amqp.c"
#include "postgres.c"
#include "redis.c"
#include "openssl.c"
#include "http2.c"
#include "tcp_sock.c"
Expand Down
6 changes: 6 additions & 0 deletions ebpf/c/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified ebpf/c/bpf_bpfeb.o
Binary file not shown.
6 changes: 6 additions & 0 deletions ebpf/c/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified ebpf/c/bpf_bpfel.o
Binary file not shown.
73 changes: 71 additions & 2 deletions ebpf/c/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#define PROTOCOL_AMQP 2
#define PROTOCOL_POSTGRES 3
#define PROTOCOL_HTTP2 4
#define PROTOCOL_REDIS 5

#define MAX_PAYLOAD_SIZE 1024
#define PAYLOAD_PREFIX_SIZE 16
Expand Down Expand Up @@ -228,6 +229,12 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
}
req->protocol = PROTOCOL_POSTGRES;
}else if (is_redis_ping(buf, count)){
req->protocol = PROTOCOL_REDIS;
req->method = METHOD_REDIS_PING;
}else if (!is_redis_pong(buf,count) && is_redis_command(buf,count)){
req->protocol = PROTOCOL_REDIS;
req->method = METHOD_UNKNOWN;
}else if (is_rabbitmq_publish(buf,count)){
req->protocol = PROTOCOL_AMQP;
req->method = METHOD_PUBLISH;
Expand Down Expand Up @@ -561,6 +568,37 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
}
bpf_map_delete_elem(&active_reads, &id);
return 0;
}else if (is_redis_pushed_event(read_info->buf, ret)){
// reset payload
for (int i = 0; i < MAX_PAYLOAD_SIZE; i++) {
e->payload[i] = 0;
}
e->protocol = PROTOCOL_REDIS;
e->method = METHOD_REDIS_PUSHED_EVENT;
e->duration = timestamp - read_info->read_start_ns;
e->write_time_ns = read_info->read_start_ns; // TODO: it is not write time, but start of read time

bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, read_info->buf);
if (ret > MAX_PAYLOAD_SIZE){
e->payload_size = MAX_PAYLOAD_SIZE;
e->payload_read_complete = 0;
}else{
e->payload_size = ret;
e->payload_read_complete = 1;
}
e->failed = 0; // success
e->status = 0;
e->fd = k.fd;
e->pid = k.pid;

// for distributed tracing
e->seq = 0; // default value
e->tid = bpf_get_current_pid_tgid() & 0xFFFFFFFF;

bpf_map_delete_elem(&active_reads, &id);

bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
return 0;
}

bpf_map_delete_elem(&active_reads, &id);
Expand Down Expand Up @@ -591,8 +629,8 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
e->tid = active_req->tid;

e->status = 0;
if(read_info->buf && ret > PAYLOAD_PREFIX_SIZE){
if(e->protocol==PROTOCOL_HTTP){ // if http, try to parse status code
if(read_info->buf){
if(e->protocol==PROTOCOL_HTTP && ret > PAYLOAD_PREFIX_SIZE){ // if http, try to parse status code
// read first 16 bytes of read buffer
char buf_prefix[PAYLOAD_PREFIX_SIZE];
long r = bpf_probe_read(&buf_prefix, sizeof(buf_prefix), (void *)(read_info->buf)) ;
Expand Down Expand Up @@ -626,6 +664,13 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
}else if (active_req->request_type == POSTGRES_MESSAGE_PARSE || active_req->request_type == POSTGRES_MESSAGE_BIND){
e->method = METHOD_EXTENDED_QUERY;
}
}else if (e->protocol == PROTOCOL_REDIS){
if (e->method == METHOD_REDIS_PING){
e->status = is_redis_pong(read_info->buf, ret);
}else{
e->status = parse_redis_response(read_info->buf, ret);
e->method = METHOD_REDIS_COMMAND;
}
}
}else{
bpf_map_delete_elem(&active_reads, &id);
Expand Down Expand Up @@ -858,6 +903,25 @@ int sys_enter_write(struct trace_event_raw_sys_enter_write* ctx) {
return process_enter_of_syscalls_write_sendto(ctx, ctx->fd, 0, ctx->buf, ctx->count);
}

// SEC("tracepoint/syscalls/sys_enter_writev")
// int sys_enter_writev(struct trace_event_raw_sys_enter_write* ctx) {
// return process_enter_of_syscalls_write_sendto(ctx, ctx->fd, 0, ctx->buf, ctx->count);
// }


struct iov {
char* buf;
__u64 size;
};
SEC("tracepoint/syscalls/sys_enter_writev")
int sys_enter_writev(struct trace_event_raw_sys_enter_writev* ctx) {
struct iov iov0 = {};
if (bpf_probe_read(&iov0, sizeof(struct iov), (void *)ctx->vec) < 0) {
return 0;
}
return process_enter_of_syscalls_write_sendto(ctx, ctx->fd, 0, iov0.buf, iov0.size);
}

SEC("tracepoint/syscalls/sys_enter_sendto")
int sys_enter_sendto(struct trace_event_raw_sys_enter_sendto* ctx) {
return process_enter_of_syscalls_write_sendto(ctx, ctx->fd, 0 ,ctx->buff, ctx->len);
Expand All @@ -868,6 +932,11 @@ int sys_exit_write(struct trace_event_raw_sys_exit_write* ctx) {
return process_exit_of_syscalls_write_sendto(ctx, ctx->ret);
}

SEC("tracepoint/syscalls/sys_exit_writev")
int sys_exit_writev(struct trace_event_raw_sys_exit_writev* ctx) {
return process_exit_of_syscalls_write_sendto(ctx, ctx->ret);
}

SEC("tracepoint/syscalls/sys_exit_sendto")
int sys_exit_sendto(struct trace_event_raw_sys_exit_sendto* ctx) {
return process_exit_of_syscalls_write_sendto(ctx, ctx->ret);
Expand Down
Loading