Skip to content

Commit

Permalink
Introduce attributes, new example client
Browse files Browse the repository at this point in the history
Also updates protobuf protocol version to newer version.
  • Loading branch information
openglx committed Aug 18, 2016
1 parent cd9f67d commit f5699e8
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 69 deletions.
10 changes: 10 additions & 0 deletions ChangeLog
@@ -0,0 +1,10 @@
Version 0.2 - Aug 18, 2016
- Created Makefile to add --require protobuf-python when building RPMs
Dependency met in upstream RHEL7 repositories
- Updated Protobuf protocol definition with later Riemann additions
- Support attributes on messages
- Introduced send-riemann-events.py as example, works out-of-the-box

Version 0.1 - Mar 4-May 3, 2012
Initial version by Gleicon Moraes (gleicon) with patches Steve Losh (sjl).

7 changes: 7 additions & 0 deletions Makefile
@@ -0,0 +1,7 @@
rpm:
python setup.py bdist_rpm --requires protobuf-python --build_requires protobuf-compiler

clean:
python setup.py clean


61 changes: 61 additions & 0 deletions examples/send-riemann-event.py
@@ -0,0 +1,61 @@
#!/usr/bin/env python
"""
Send events to Riemann from CLI
"""

from riemann import RiemannClient
import argparse
import socket
import time

def main():
parser = argparse.ArgumentParser(description='Send events to Riemann.')
parser.add_argument('-r', '--riemann', dest='rie_host', action='store',
type=str, required=True,
help='IP/hostname for Riemann server')
parser.add_argument('-R', '--riemann-port', dest='rie_port', action='store',
type=int, default=5555,
help='IP/hostname for Riemann server')

parser.add_argument('-H', '--host', dest='evt_host', action='store',
type=str, default=socket.getfqdn(),
help='Hostname for event')
parser.add_argument('-s', '--service', dest='evt_service', action='store',
type=str, help='Service for event')
parser.add_argument('-S', '--state', dest='evt_state', action='store',
type=str, help='Current state for event')
parser.add_argument('-T', '--time', dest='evt_time', action='store',
type=int, default=int(time.time()),
help='Timestamp for event')

parser.add_argument('-a', '--attribute', dest='evt_attrib', action='append',
type=str, help='Event attribute (key=value), multiple allowed')
parser.add_argument('-d', '--description', dest='evt_description',
action='store', type=str, help='Event description')
parser.add_argument('-t', '--tags', dest='evt_tags', action='append',
type=str, help='Event tags, multiple allowed')
parser.add_argument('-m', '--metric', dest='evt_metric', action='store',
type=float, help='Event metric')
parser.add_argument('--ttl', dest='evt_ttl', action='store', type=int,
default=60, help='Event TTL')
args = parser.parse_args()


event = {}
for i in ['host', 'service', 'time', 'description', 'tags', 'metric', 'ttl']:
if getattr(args, 'evt_' + i):
event[i] = getattr(args, 'evt_' + i)

if getattr(args, 'evt_attrib', None):
event['attributes'] = dict(item.split('=') for item in getattr(args, 'evt_attrib'))

print(repr(event))

rc = RiemannClient(host=args.rie_host, port=args.rie_port)
rc.send(event)

if __name__ == '__main__':
main()


2 changes: 1 addition & 1 deletion protobuf/Makefile
@@ -1,5 +1,5 @@

PROTOC=/usr/local/bin/protoc
PROTOC=/usr/bin/protoc
MV = /bin/mv

protobuf:
Expand Down
17 changes: 15 additions & 2 deletions protobuf/proto.proto
@@ -1,6 +1,11 @@
option java_package = "riemann";
// Verbatim copy from riemann-java-client/src/main/proto/riemann/proto.proto
// as of commit-id bc94e4512e10e63eacaa3b512560988e6763f801

option java_package = "io.riemann.riemann";
option java_outer_classname = "Proto";

// Deprecated; state was used by early versions of the protocol, but not any
// more.
message State {
optional int64 time = 1;
optional string state = 2;
Expand All @@ -10,7 +15,6 @@ message State {
optional bool once = 6;
repeated string tags = 7;
optional float ttl = 8;
optional float metric_f = 15;
}

message Event {
Expand All @@ -21,6 +25,10 @@ message Event {
optional string description = 5;
repeated string tags = 7;
optional float ttl = 8;
repeated Attribute attributes = 9;

optional sint64 metric_sint64 = 13;
optional double metric_d = 14;
optional float metric_f = 15;
}

Expand All @@ -35,3 +43,8 @@ message Msg {
optional Query query = 5;
repeated Event events = 6;
}

message Attribute {
required string key = 1;
optional string value = 2;
}
26 changes: 19 additions & 7 deletions riemann/__init__.py
Expand Up @@ -6,7 +6,7 @@ class RiemannUDPTransport():
def __init__(self, host='127.0.0.1', port=5555):
self._host = host
self._port = port

def write(self, buffer): # needs cleanup, conn pool
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(buffer, (self._host, self._port))
Expand All @@ -16,31 +16,31 @@ class RiemannTCPTransport():
def __init__(self, host='127.0.0.1', port=5555):
self._host = host
self._port = port

def write(self, buffer):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self._host,self._port))
l = len(buffer)
hdr = struct.pack('!I', l)
sock.send(hdr)
sock.send(buffer)
r = sock.recv(4)
r = sock.recv(4)
hl = struct.unpack('!I', r)
res = sock.recv(hl[0])
c = pb.Msg().FromString(res)
sock.close()
sock.close()
return c

class RiemannClient():
"""
Client to Riemann.
Client to Riemann.
RiemannClient constructor can receive the following parameters:
- host
- port
- transport (RiemannUDPTransport, RiemannTCPTransport or any class that
implements the write() method and receive both host and port on __init__())
Example:
Example:
from riemann import RiemannClient
rc = RiemannClient()
Expand All @@ -59,8 +59,19 @@ def send(self, edict):
ev = pb.Event()
for k in self._fields:
if edict.has_key(k): setattr(ev, k, edict[k])
if 'attributes' in edict:
attrs = []
for k in edict['attributes']:
attrib = pb.Attribute()
setattr(attrib, 'key', k)
setattr(attrib, 'value', edict['attributes'][k])
attrs.append(attrib)
print(repr(attrs))
ev.attributes.extend(attrs)
print(repr(ev))
if 'tags' in edict:
ev.tags.extend(edict['tags'])
print(repr(ev))
if 'metric' in edict:
ev.metric_f = float(edict['metric'])
msg = pb.Msg()
Expand All @@ -72,3 +83,4 @@ def query(self, query):
msg = pb.Msg()
msg.query.string = str(query)
return self._transport.write(msg.SerializeToString())

0 comments on commit f5699e8

Please sign in to comment.