Skip to content

Commit

Permalink
MB-13092: Add auditing support for tools
Browse files Browse the repository at this point in the history
Change-Id: I29da85f1baaba390560c59e75fb124cc23704d22
Reviewed-on: http://review.couchbase.org/45302
Reviewed-by: Bin Cui <bin.cui@gmail.com>
Tested-by: Bin Cui <bin.cui@gmail.com>
  • Loading branch information
bcui6611 committed Jan 14, 2015
1 parent 81ab883 commit ccfe9af
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 18 deletions.
7 changes: 7 additions & 0 deletions cb_bin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from couchbaseConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE
from couchbaseConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET
from couchbaseConstants import SET_PKT_FMT, INCRDECR_RES_FMT
from couchbaseConstants import AUDIT_PKT_FMT
import couchbaseConstants

class MemcachedError(exceptions.Exception):
Expand Down Expand Up @@ -425,3 +426,9 @@ def deregister_tap_client(self, tap_name):
def reset_replication_chain(self):
"""Reset the replication chain."""
return self._doCmd(couchbaseConstants.CMD_RESET_REPLICATION_CHAIN, '', '', '', 0)

def audit(self, auditid, event):
print couchbaseConstants.CMD_AUDIT_PUT, auditid, event

return self._doCmd(couchbaseConstants.CMD_AUDIT_PUT, '', event, \
struct.pack(AUDIT_PKT_FMT, auditid))
12 changes: 12 additions & 0 deletions couchbaseConstants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@
CMD_SASL_AUTH = 0x21
CMD_SASL_STEP = 0x22

# AUDIT
CMD_AUDIT_PUT = 0x27
CMD_AUDIT_CONFIG_RELOAD = 0x28
AUDIT_PKT_FMT=">I"

AUDIT_EVENT_BACKUP_START = 0x3000
AUDIT_EVENT_BACKUP_STOP = 0x3001
AUDIT_EVENT_RESTORE_SOURCE_START = 0x3002
AUDIT_EVENT_RESTORE_SOURCE_STOP = 0x3003
AUDIT_EVENT_RESTORE_SINK_START = 0x3004
AUDIT_EVENT_RESTORE_SINK_STOP = 0x3005

# Bucket extension
CMD_CREATE_BUCKET = 0x85
CMD_DELETE_BUCKET = 0x86
Expand Down
130 changes: 130 additions & 0 deletions etc/tools_descriptor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
{
"version" : 1,
"module" : "tools",
"events" : [
{ "id" : 12288,
"name" : "backup start",
"description" : "Start up a backup task",
"sync" : false,
"enabled" : true,
"mandatory_fields" : {
"timestamp" : "",
"real_userid" : {"source" : "", "user" : ""},
"mode" : ""
},
"optional_fields" : {
"source_bucket" : "",
"source_node" : "",
}
},
{ "id" : 12289,
"name" : "backup stop",
"description" : "Backup task stops",
"sync" : false,
"enabled" : true,
"mandatory_fields" : {
"timestamp" : "",
"real_userid" : {"source" : "", "user" : ""},
"status" : ""
},
"optional_fields" : {
"source_bucket" : "",
"source_node" : "",
}
},
{ "id" : 12290,
"name" : "restore start",
"description" : "Start up a restore task",
"sync" : false,
"enabled" : true,
"mandatory_fields" : {
"timestamp" : "",
"real_userid" : {"source" : "", "user" : ""},
"mode" : ""
},
"optional_fields" : {
"source_bucket" : "",
"source_node" : "",
"target_bucket" : "",
}
},
{ "id" : 12291,
"name" : "restore stop",
"description" : "Restore task stops",
"sync" : false,
"enabled" : true,
"mandatory_fields" : {
"timestamp" : "",
"real_userid" : {"source" : "", "user" : ""},
"status" : ""
},
"optional_fields" : {
"source_bucket" : "",
"source_node" : "",
"target_bucket" : "",
}
},
{ "id" : 12292,
"name" : "recovery source start",
"description" : "Start up a recovery source task",
"sync" : false,
"enabled" : true,
"mandatory_fields" : {
"timestamp" : "",
"real_userid" : {"source" : "", "user" : ""},
"mode" : ""
},
"optional_fields" : {
"source_bucket" : "",
"source_node" : "",
}
},
{ "id" : 12293,
"name" : "recovery source stop",
"description" : "Recovery source task stops",
"sync" : false,
"enabled" : true,
"mandatory_fields" : {
"timestamp" : "",
"real_userid" : {"source" : "", "user" : ""},
"status" : ""
},
"optional_fields" : {
"source_bucket" : "",
"source_node" : "",
"target_bucket" : "",
}
},
{ "id" : 12294,
"name" : "recovery sink start",
"description" : "Start up a recovery sink task",
"sync" : false,
"enabled" : true,
"mandatory_fields" : {
"timestamp" : "",
"real_userid" : {"source" : "", "user" : ""},
"mode" : ""
},
"optional_fields" : {
"source_bucket" : "",
"source_node" : "",
}
},
{ "id" : 12295,
"name" : "recovery sink stop",
"description" : "Recovery task sink stops",
"sync" : false,
"enabled" : true,
"mandatory_fields" : {
"timestamp" : "",
"real_userid" : {"source" : "", "user" : ""},
"status" : ""
},
"optional_fields" : {
"source_bucket" : "",
"source_node" : "",
"target_bucket" : "",
}
},
]
}
9 changes: 9 additions & 0 deletions pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,18 @@ def skip(self, key, vbucket_id):

return False

def get_timestamp(self):
#milliseconds with three digits
return time.strftime("%Y-%m-%dT%H:%M:%S.%3s", time.gmtime())[:-7] + "Z"

def add_counter(self, key, val=1):
self.cur[key] = self.cur.get(key, 0.0) + val

def add_start_event(self, conn):
return 0

def add_start_event(self, conn):
return 0

class Source(EndPoint):
"""Base class for all data sources."""
Expand Down
39 changes: 39 additions & 0 deletions pump_cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import urllib

import couchbaseConstants
import pump
import pump_mc

Expand All @@ -20,6 +21,41 @@ def __init__(self, opts, spec, source_bucket, source_node,

self.rehash = opts.extra.get("rehash", 0)

def add_start_event(self, conn):
sasl_user = str(self.source_bucket.get("name", pump.get_username(self.opts.username)))
event = {"timestamp": self.get_timestamp(),
"real_userid": {"source": "internal",
"user": sasl_user,
},
"mode": getattr(self.opts, "mode", "diff"),
"source_bucket": self.source_bucket['name'],
"source_node": self.source_node['hostname'],
"target_bucket": self.sink_map['buckets'][0]['name']
}
if conn:
try:
conn.audit(couchbaseConstants.AUDIT_EVENT_RESTORE_SINK_START, json.dumps(event))
except Exception, e:
logging.warn("auditing error: %s" % e)
return 0

def add_stop_event(self, conn):
sasl_user = str(self.source_bucket.get("name", pump.get_username(self.opts.username)))
event = {"timestamp": self.get_timestamp(),
"real_userid": {"source": "internal",
"user": sasl_user
},
"source_bucket": self.source_bucket['name'],
"source_node": self.source_node['hostname'],
"target_bucket": self.sink_map['buckets'][0]['name']
}
if conn:
try:
conn.audit(couchbaseConstants.AUDIT_EVENT_RESTORE_SINK_STOP, json.dumps(event))
except Exception, e:
logging.warn("auditing error: %s" % e)
return 0

def scatter_gather(self, mconns, batch):
sink_map_buckets = self.sink_map['buckets']
if len(sink_map_buckets) != 1:
Expand Down Expand Up @@ -224,6 +260,7 @@ def find_conn(self, mconns, vbucket_id, msgs):
logging.error("error: CBSink.connect() for send: " + rv)
return rv, None
mconns[host_port] = conn

#check if we need to calll hello command
for i, msg in enumerate(msgs):
msg_format_length = len(msg)
Expand All @@ -234,4 +271,6 @@ def find_conn(self, mconns, vbucket_id, msgs):
logging.warn("fail to call hello command, maybe it is not supported")
pass
break

self.add_start_event(conn)
return 0, conn
74 changes: 56 additions & 18 deletions pump_dcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,40 @@ def check(opts, spec):

@staticmethod
def provide_design(opts, source_spec, source_bucket, source_map):
return pump_tap.TAPDumpSource.provide_design(opts, source_spec, source_bucket, source_map);
return pump_tap.TAPDumpSource.provide_design(opts, source_spec, source_bucket, source_map)

def add_start_event(self, conn):
sasl_user = str(self.source_bucket.get("name", pump.get_username(self.opts.username)))
event = {"timestamp": self.get_timestamp(),
"real_userid": {"source": "internal",
"user": sasl_user,
},
"mode": getattr(self.opts, "mode", "diff"),
"source_bucket": self.source_bucket['name'],
"source_node": self.source_node['hostname']
}
if conn:
try:
conn.audit(couchbaseConstants.AUDIT_EVENT_BACKUP_START, json.dumps(event))
except Exception, e:
logging.warn("auditing error: %s" % e)
return 0

def add_stop_event(self, conn):
sasl_user = str(self.source_bucket.get("name", pump.get_username(self.opts.username)))
event = {"timestamp": self.get_timestamp(),
"real_userid": {"source": "internal",
"user": sasl_user
},
"source_bucket": self.source_bucket['name'],
"source_node": self.source_node['hostname']
}
if conn:
try:
conn.audit(couchbaseConstants.AUDIT_EVENT_BACKUP_STOP, json.dumps(event))
except Exception, e:
logging.warn("auditing error: %s" % e)
return 0

def build_node_vbucket_map(self):
if self.source_bucket.has_key("vBucketServerMap"):
Expand Down Expand Up @@ -113,16 +146,20 @@ def provide_batch(self):

while True:
if self.dcp_done:
if self.dcp_conn:
self.add_stop_event(self.dcp_conn)
self.dcp_conn.close()
self.dcp_conn = None
return 0, None

rv, dcp_conn = self.get_dcp_conn()
rv = self.get_dcp_conn()
if rv != 0:
self.dcp_done = True
return rv, None

rv, batch = self.provide_dcp_batch_actual(dcp_conn)
rv, batch = self.provide_dcp_batch_actual()
if rv == 0:
return rv, batch
return 0, batch

if self.dcp_conn:
self.dcp_conn.close()
Expand All @@ -138,7 +175,7 @@ def provide_batch(self):
cur_sleep = min(cur_sleep * 2, 20) # Max backoff sleep 20 seconds.
cur_retry = cur_retry + 1

def provide_dcp_batch_actual(self, dcp_conn):
def provide_dcp_batch_actual(self):
batch = pump.Batch(self)

batch_max_size = self.opts.extra['batch_max_size']
Expand Down Expand Up @@ -294,7 +331,7 @@ def provide_dcp_batch_actual(self, dcp_conn):
if need_ack:
self.ack_last = True
try:
dcp_conn._sendMsg(cmd, '', '', opaque, vbucketId=0,
self.dcp_conn._sendMsg(cmd, '', '', opaque, vbucketId=0,
fmt=couchbaseConstants.RES_PKT_FMT,
magic=couchbaseConstants.RES_MAGIC_BYTE)
except socket.error:
Expand Down Expand Up @@ -343,11 +380,11 @@ def get_dcp_conn(self):
self.dcp_conn = cb_bin_client.MemcachedClient(host, port)
if not self.dcp_conn:
return "error: could not connect to memcached: " + \
host + ":" + str(port), None
host + ":" + str(port)
self.mem_conn = cb_bin_client.MemcachedClient(host, port)
if not self.dcp_conn:
if not self.mem_conn:
return "error: could not connect to memcached: " + \
host + ":" + str(port), None
host + ":" + str(port)
sasl_user = str(self.source_bucket.get("name", pump.get_username(self.opts.username)))
sasl_pswd = str(self.source_bucket.get("saslPassword", pump.get_password(self.opts.password)))
if sasl_user:
Expand All @@ -360,19 +397,19 @@ def get_dcp_conn(self):
self.mem_conn.sasl_auth_plain(sasl_user, sasl_pswd)
except EOFError:
return "error: SASL auth error: %s:%s, user: %s" % \
(host, port, sasl_user), None
(host, port, sasl_user)
except cb_bin_client.MemcachedError:
return "error: SASL auth failed: %s:%s, user: %s" % \
(host, port, sasl_user), None
(host, port, sasl_user)
except socket.error:
return "error: SASL auth socket error: %s:%s, user: %s" % \
(host, port, sasl_user), None
(host, port, sasl_user)
except EOFError:
return "error: SASL auth error: %s:%s, user: %s" % \
(host, port, sasl_user), None
(host, port, sasl_user)
except socket.error:
return "error: SASL auth socket error: %s:%s, user: %s" % \
(host, port, sasl_user), None
(host, port, sasl_user)
extra = struct.pack(couchbaseConstants.DCP_CONNECT_PKT_FMT, 0, \
couchbaseConstants.FLAG_DCP_PRODUCER)
try:
Expand All @@ -390,16 +427,17 @@ def get_dcp_conn(self):
str(self.batch_max_bytes * 10), opaque)
self.dcp_conn._handleSingleResponse(opaque)
except EOFError:
return "error: Fail to set up DCP connection", None
return "error: Fail to set up DCP connection"
except cb_bin_client.MemcachedError:
return "error: DCP connect memcached error", None
return "error: DCP connect memcached error"
except socket.error:
return "error: DCP connection error", None
return "error: DCP connection error"
self.running = True
self.start()

self.add_start_event(self.dcp_conn)
self.setup_dcp_streams()
return 0, self.dcp_conn
return 0

def ack_buffer_size(self, buf_size):
if self.flow_control:
Expand Down
Loading

0 comments on commit ccfe9af

Please sign in to comment.