Permalink
Browse files

Starting work on splitting this into a sane library

  • Loading branch information...
1 parent d20da1d commit 1086a477197a84400583a869a5e9288180452d4b @Roguelazer Roguelazer committed Sep 22, 2011
Showing with 318 additions and 43 deletions.
  1. +24 −0 debugs.h
  2. +206 −0 libybinlogp.c
  3. +39 −43 ybinlogp.h
  4. +49 −0 ybinlogp_test.c
View
@@ -0,0 +1,24 @@
+#ifndef DEBUGS_H
+#define DEBUGS_H
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+
+#define Jperror(lbl, format) { fprintf(stdout, format " [%s:%d] : %s\n", __FILE__, __LINE__, strerror(errno)) ; goto lbl; }
+#define Jperrori(lbl, format, ...) { fprintf(stdout, format " [%s:%d] : %s\n", ##__VA_ARGS__, __FILE__, __LINE__, strerror(errno)) ; goto lbl; }
+#if DEBUG
+#define Dprintf(...) fprintf(stderr, __VA_ARGS__)
+#define Dperror(format) fprintf(stdout, format " [%s:%d] : %s\n", __FILE__, __LINE__, strerror(errno))
+#define Dperrori(format, ...) fprintf(stdout, format " [%s:%d] : %s\n", ##__VA_ARGS__, __FILE__, __LINE__, strerror(errno))
+#define DJperror(lbl, format) { fprintf(stdout, format " [%s:%d] : %s\n", __FILE__, __LINE__, strerror(errno)) ; goto lbl; }
+#define DJperrori(lbl, format, ...) { fprintf(stdout, format " [%s:%d] : %s\n", ##__VA_ARGS__, __FILE__, __LINE__, strerror(errno)) ; goto lbl; }
+#else
+#define Dprintf(...) (void) 0
+#define Dperror(...) (void) 0
+#define Dperrori(...) (void) 0
+#define DJperror(lbl, ...) goto lbl
+#define DJperrori(lbl, ...) goto lbl
+#endif /* DEBUG */
+
+#endif /* DEBUGS_H */
View
@@ -0,0 +1,206 @@
+/*
+ * ybinlogp: A mysql binary log parser and query tool
+ *
+ * (C) 2010 Yelp, Inc.
+ *
+ * This work is licensed under the ISC/OpenBSD License. The full
+ * contents of that license can be found under license.txt
+ */
+
+/* Conventions used in this file:
+ *
+ * Functions starting with ybp_ will be in the .h file and will be exported
+ * Functions starting with ybpi_ are internal-only and should be static
+ */
+
+#define _XOPEN_SOURCE 600
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <assert.h>
+
+#include "debugs.h"
+#include "ybinlogp.h"
+
+/* binlog parameters */
+#define MIN_TYPE_CODE 0
+#define MAX_TYPE_CODE 27
+#define MIN_EVENT_LENGTH 19
+#define MAX_EVENT_LENGTH 16*1048576 // Max statement len is generally 16MB
+#define MAX_SERVER_ID 4294967295 // 0 <= server_id <= 2**32
+
+static int ybpi_read_fde(struct ybp_binlog_parser*);
+static int ybpi_read_event(struct ybp_binlog_parser*, off_t, struct ybp_event*);
+static bool ybpi_check_event(struct ybp_event*, struct ybp_binlog_parser*);
+static off64_t ybpi_next_after(struct ybp_event* restrict);
+
+int ybp_init_binlog_parser(int fd, struct ybp_binlog_parser** restrict out) {
+ struct ybp_binlog_parser* result;
+ if ((result = malloc(sizeof(struct binlog_parser*))) == NULL) {
+ return -1;
+ }
+ if (*out == NULL) {
+ return -1;
+ }
+ result->fd = fd;
+ result->offset = 4;
+ result->enforce_server_id = 0;
+ result->slave_server_id = 0;
+ result->master_server_id = 0;
+ result->min_timestamp = 0;
+ result->max_timestamp = time(NULL);
+ result->has_read_fde = false;
+ *out = result;
+ return 0;
+}
+
+void ybp_init_event(struct ybp_event* evbuf) {
+ memset(evbuf, 0, sizeof(struct ybp_event*));
+}
+
+void ybp_dispose_event(struct ybp_event* evbuf)
+{
+ Dprintf(stderr, "About to dispose_event 0x%p\n", evbuf);
+ if (evbuf->data != 0) {
+ free(evbuf->data);
+ evbuf->data = 0;
+ }
+ free(evbuf);
+}
+
+void ybp_reset_event(struct ybp_event* evbuf)
+{
+ Dprintf(stderr, "Resetting event\n");
+ if (evbuf->data != 0) {
+ free(evbuf->data);
+ evbuf->data = 0;
+ }
+ ybp_init_event(evbuf);
+}
+
+/**
+ * check if an event "looks" valid. returns true if it does and false if it
+ * doesn't
+ **/
+static bool ybpi_check_event(struct ybp_event* e, struct ybp_binlog_parser* p)
+{
+ if ((!p->enforce_server_id || (e->server_id == p->slave_server_id) || (e->server_id == p->master_server_id)) &&
+ e->type_code > MIN_TYPE_CODE &&
+ e->type_code < MAX_TYPE_CODE &&
+ e->length > MIN_EVENT_LENGTH &&
+ e->length < MAX_EVENT_LENGTH &&
+ e->timestamp >= p->min_timestamp &&
+ e->timestamp <= p->max_timestamp) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+/**
+ * Find the offset of the next event after the one passed in.
+ * Uses the built-in event chaining.
+ **/
+static off64_t ybpi_next_after(struct ybp_event *evbuf) {
+ /* Can't actually use next_position, because it will vary between
+ * messages that are from master and messages that are from slave.
+ * Usually, only the FDE is from the slave. But, still...
+ */
+ return evbuf->offset + evbuf->length;
+}
+
+/**
+ * Read an event from the parser parser, at offset offet, storing it in
+ * event evbuf (which should be already init'd)
+ */
+static int ybpi_read_event(struct ybp_binlog_parser* p, off_t offset, struct ybp_event* evbuf)
+{
+ ssize_t amt_read;
+ p->max_timestamp = time(NULL);
+ if ((lseek(p->fd, offset, SEEK_SET) < 0)) {
+ perror("Error seeking");
+ return -1;
+ }
+ amt_read = read(p->fd, (void*)evbuf, EVENT_HEADER_SIZE);
+ evbuf->offset = offset;
+ evbuf->data = NULL;
+ if (amt_read < 0) {
+ fprintf(stderr, "Error reading event at %lld: %s\n", (long long) offset, strerror(errno));
+ return -1;
+ } else if ((size_t)amt_read != EVENT_HEADER_SIZE) {
+ return -1;
+ }
+ if (ybpi_check_event(evbuf, p)) {
+#if DEBUG
+ fprintf(stdout, "mallocing %d bytes\n", evbuf->length - EVENT_HEADER_SIZE);
+#endif
+ if ((evbuf->data = malloc(evbuf->length - EVENT_HEADER_SIZE)) == NULL) {
+ perror("malloc:");
+ return -1;
+ }
+#if DEBUG
+ fprintf(stderr, "malloced %d bytes at 0x%p for a %s\n", evbuf->length - EVENT_HEADER_SIZE, evbuf->data, event_types[evbuf->type_code]);
+#endif
+ if (read(p->fd, evbuf->data, evbuf->length - EVENT_HEADER_SIZE) < 0) {
+ perror("reading extra data:");
+ return -1;
+ }
+ }
+ return 0;
+}
+
+/**
+ * Read the FDE. It's the first record in ALL binlogs
+ **/
+static int ybpi_read_fde(struct ybp_binlog_parser* p)
+{
+ struct ybp_event* evbuf;
+ off64_t offset;
+ int esi = p->enforce_server_id;
+ int fd = p->fd;
+
+ if ((evbuf = malloc(sizeof(struct ybp_event))) == NULL) {
+ return -1;
+ }
+ ybp_init_event(evbuf);
+
+ if (ybpi_read_event(p, 4, evbuf) < 0) {
+ return -1;
+ }
+ p->enforce_server_id = esi;
+ struct format_description_event *f = (struct format_description_event*) evbuf->data;
+ if (f->format_version != BINLOG_VERSION) {
+ fprintf(stderr, "Invalid binlog! Expected version %d, got %d\n", BINLOG_VERSION, f->format_version);
+ exit(1);
+ }
+ p->min_timestamp = evbuf->timestamp;
+ p->slave_server_id = evbuf->server_id;
+
+ offset = ybpi_next_after(evbuf);
+ p->offset = offset;
+ ybp_reset_event(evbuf);
+ ybpi_read_event(p, offset, evbuf);
+
+ p->master_server_id = evbuf->server_id;
+ ybp_dispose_event(evbuf);
+
+ lseek(fd, 4, SEEK_SET);
+ p->has_read_fde = true;
+ return 0;
+}
+
+int ybp_next_event(struct ybp_binlog_parser* parser) {
+ if (!parser->has_read_fde) {
+ ybpi_read_fde(parser);
+ }
+ (void) parser;
+ return 0;
+}
View
@@ -10,16 +10,33 @@
#ifndef _YBINLOGP_H_
#define _YBINLOGP_H_
+#include <stdbool.h>
#include <stdint.h>
#include <sys/types.h>
#define BINLOG_VERSION 4
#define EVENT_HEADER_SIZE 19 /* we tack on extra stuff at the end */
+struct ybp_binlog_parser;
+struct ybp_event;
+struct format_description_event;
+struct query_event;
+
+struct ybp_binlog_parser {
+ int fd;
+ ssize_t offset;
+ bool enforce_server_id;
+ bool has_read_fde;
+ uint32_t slave_server_id;
+ uint32_t master_server_id;
+ time_t min_timestamp;
+ time_t max_timestamp;
+};
+
#pragma pack(push)
#pragma pack(1) /* force byte alignment */
-struct event {
+struct ybp_event {
uint32_t timestamp;
uint8_t type_code;
uint32_t server_id;
@@ -78,63 +95,42 @@ struct rotate_event {
#pragma pack(pop)
/**
- * Initialize an event object. Event objects must live on the heap
- * and must be destroyed with dispose_event().
+ * Initialize a ybp_binlog_parser. Returns 0 on success, non-zero otherwise.
*
- * Just sets everything to 0 for now.
+ * Arguments:
+ * fd: A file descriptor open in reading mode to a binlog file
+ * out: a pointer to a *ybp_binlog_parser. will be filled with a malloced
+ * ybp_binlog_parser
**/
-void init_event(struct event*);
+int ybp_init_binlog_parser(int fd, struct ybp_binlog_parser** out);
/**
- * Reset an event object, making it re-fillable
+ * Advance a ybp_binlog_parser structure to the next event.
*
- * Deletes the extra data and re-inits the object
+ * Returns 0 if the current event is the last event, <0 on error, and >0
+ * otherwise.
*/
-void reset_event(struct event *);
+int ybp_next_event(struct ybp_binlog_parser* parser);
/**
- * Copy an event object, including any extra data
- */
-int copy_event(struct event *, struct event *);
-
-/**
- * Destroy an event object and any associated data
- **/
-void dispose_event(struct event *);
-
-/**
- * Print out only statement events, and only the statement
- **/
-void print_statement_event(struct event *e);
-
-/**
- * Print out an event
+ * Initialize an event object. Event objects must live on the heap
+ * and must be destroyed with dispose_event().
+ *
+ * Just sets everything to 0 for now.
**/
-void print_event(struct event *e);
+void ybp_init_event(struct ybp_event*);
/**
- * Read an event from the specified offset into the given event buffer.
+ * Reset an event object, making it re-fillable
*
- * Will also malloc() space for any dynamic portions of the event, if the
- * event passes check_event.
- **/
-int read_event(int, struct event *, off64_t);
+ * Deletes the extra data and re-inits the object
+ */
+void ybp_reset_event(struct ybp_event *);
/**
- * Check to see if an event looks valid.
+ * Destroy an event object and any associated data
**/
-int check_event(struct event *);
+void ybp_dispose_event(struct ybp_event *);
-/**
- * Find the offset of the next event after the one passed in.
- * Uses the built-in event chaining.
- *
- * Usage:
- * struct event *next;
- * struct event *evbuf = ...
- * off_t next_offset = next_after(evbuf);
- * read_event(fd, next, next_offset);
- **/
-off64_t next_after(struct event *evbuf);
#endif /* _YBINLOGP_H_ */
Oops, something went wrong.

0 comments on commit 1086a47

Please sign in to comment.