Skip to content

Commit

Permalink
emitter plugins
Browse files Browse the repository at this point in the history
Signed-off-by: Sahil Suneja <sahilsuneja@gmail.com>
  • Loading branch information
sahilsuneja1 committed Jan 6, 2017
1 parent d74c595 commit e536c55
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 245 deletions.
3 changes: 2 additions & 1 deletion crawler/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ def main():
emitters = EmittersManager(urls=args.url,
format=args.format,
compress=args.compress,
extra_metadata=args.extraMetadata)
extra_metadata=args.extraMetadata,
plugin_places=args.plugin_places)

if args.crawlmode == 'OUTCONTAINER':
crawler = ContainersCrawler(
Expand Down
104 changes: 14 additions & 90 deletions crawler/emitters_manager.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import cStringIO
import logging
import urlparse

import plugins_manager
from base_crawler import BaseFrame
from formatters import (write_in_csv_format,
write_in_json_format,
write_in_graphite_format)
from plugins.emitters.file_emitter import FileEmitter
from plugins.emitters.http_emitter import HttpEmitter
from plugins.emitters.kafka_emitter import KafkaEmitter
from plugins.emitters.mtgraphite_emitter import MtGraphiteEmitter
from plugins.emitters.stdout_emitter import StdoutEmitter
from utils.crawler_exceptions import (EmitterUnsupportedFormat,
EmitterUnsupportedProtocol)
from utils.crawler_exceptions import EmitterUnsupportedProtocol

logger = logging.getLogger('crawlutils')


class EmittersManager:

"""
Class that manages a list of formatter and emitter objects, one per url.
The formatter takes a frame and writes it into an iostream, and the
Expand All @@ -29,52 +20,13 @@ class EmittersManager:
and emit() should be called for each frame.
"""

"""
This maps url-protocols to emitters and formatters. For example,
when writing to stdout in csv format, this should use the
write_in_csv_format formatter and the StdoutEmitter.
"""
proto_to_class = {
'stdout': {'csv': {'class': StdoutEmitter, 'per_line': False,
'formatter': write_in_csv_format},
'graphite': {'class': StdoutEmitter, 'per_line': False,
'formatter': write_in_graphite_format},
'json': {'class': StdoutEmitter, 'per_line': False,
'formatter': write_in_json_format},
},
'file': {'csv': {'class': FileEmitter, 'per_line': False,
'formatter': write_in_csv_format},
'graphite': {'class': FileEmitter, 'per_line': False,
'formatter': write_in_graphite_format},
'json': {'class': FileEmitter, 'per_line': False,
'formatter': write_in_json_format},
},
'http': {'csv': {'class': HttpEmitter, 'per_line': False,
'formatter': write_in_csv_format},
'graphite': {'class': HttpEmitter, 'per_line': False,
'formatter': write_in_graphite_format},
'json': {'class': HttpEmitter, 'per_line': True,
'formatter': write_in_json_format},
},
'kafka': {'csv': {'class': KafkaEmitter, 'per_line': False,
'formatter': write_in_csv_format},
'graphite': {'class': KafkaEmitter, 'per_line': False,
'formatter': write_in_graphite_format},
'json': {'class': KafkaEmitter, 'per_line': True,
'formatter': write_in_json_format},
},
'mtgraphite': {'graphite': {'class': MtGraphiteEmitter,
'per_line': True,
'formatter': write_in_graphite_format},
},
}

def __init__(
self,
urls,
format='csv',
compress=False,
extra_metadata={}
extra_metadata={},
plugin_places=['plugins']
):
"""
Initializes a list of emitter objects; also stores all the args.
Expand All @@ -85,35 +37,15 @@ def __init__(
:param extra_metadata: dict added to the metadata of each frame
"""
self.extra_metadata = extra_metadata
self.urls = urls
self.compress = compress
self.format = format

# Create a list of Emitter objects based on the list of passed urls
self.emitters = []
for url in self.urls:
self.allocate_emitter(url)

def allocate_emitter(self, url):
"""
Allocate a formatter and an emitter object based on the
self.proto_to_class mapping. The formatter takes a frame and writes
it into an iostream. The emitter takes the iostream and emits.
:param url:
:return:
"""
parsed = urlparse.urlparse(url)
proto = parsed.scheme
if proto not in self.proto_to_class:
raise EmitterUnsupportedProtocol('Not supported: %s' % proto)
if self.format not in self.proto_to_class[proto]:
raise EmitterUnsupportedFormat('Not supported: %s' % self.format)
emitter_class = self.proto_to_class[proto][self.format]['class']
emit_per_line = self.proto_to_class[proto][self.format]['per_line']
emitter = emitter_class(url, emit_per_line=emit_per_line)
formatter = self.proto_to_class[proto][self.format]['formatter']
self.emitters.append((formatter, emitter))
# Create a list of Emitter objects based on urls
self.emitters = plugins_manager.get_emitter_plugins(
urls,
format,
plugin_places)
if not self.emitters:
raise EmitterUnsupportedProtocol('Emit protocols not supported')

def emit(self, frame, snapshot_num=0):
"""
Expand All @@ -129,14 +61,6 @@ def emit(self, frame, snapshot_num=0):

metadata = frame.metadata
metadata.update(self.extra_metadata)

iostream = cStringIO.StringIO()

# Pass iostream to the emitters so they can sent its content to their
# respective url
for formatter, emitter in self.emitters:
# this writes the frame metadata and data into iostream
formatter(iostream, frame)
# this emits the iostream data
emitter.emit(iostream, self.compress,
for emitter in self.emitters:
emitter.emit(frame, self.compress,
metadata, snapshot_num)
55 changes: 55 additions & 0 deletions crawler/iemit_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import cStringIO
from yapsy.IPlugin import IPlugin
from formatters import (write_in_csv_format,
write_in_json_format,
write_in_graphite_format)
from utils.crawler_exceptions import (EmitterUnsupportedFormat)


class IEmitter(IPlugin):

"""
Base emitter class from which emitters like FileEmitter, StdoutEmitter
should inherit. The main idea is that all emitters get a url, and should
implement an emit() function given an iostream (a buffer with the features
to emit).
"""

def init(self, url, timeout=1, max_retries=5, emit_format='csv'):
self.url = url
self.timeout = timeout
self.max_retries = max_retries
self.emit_per_line = False

self.supported_formats = {'csv': write_in_csv_format,
'graphite': write_in_graphite_format,
'json': write_in_json_format}

if emit_format in self.supported_formats:
self.formatter = self.supported_formats[emit_format]
else:
raise EmitterUnsupportedFormat('Not supported: %s' % emit_format)

def get_emitter_protocol(self):
raise NotImplementedError()

def format(self, frame):
# this writes the frame metadata and data into iostream
# Pass iostream to the emitters so they can send its content to their
# respective url
iostream = cStringIO.StringIO()
self.formatter(iostream, frame)
return iostream

def emit(self, frame, compress=False,
metadata={}, snapshot_num=0):
"""
:param iostream: a CStringIO used to buffer the formatted features.
:param compress:
:param metadata:
:param snapshot_num:
:return:
"""
# this formats and emits an input frame
raise NotImplementedError()
8 changes: 8 additions & 0 deletions crawler/plugins/emitters/file_emitter.plugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[Core]
Name = File Emitter
Module = file_emitter

[Documentation]
Author = IBM
Version = 0.1
Description = Plugin to emit frame to file
11 changes: 8 additions & 3 deletions crawler/plugins/emitters/file_emitter.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import gzip
import shutil

from plugins.emitters.base_emitter import BaseEmitter
from iemit_plugin import IEmitter


class FileEmitter(BaseEmitter):
class FileEmitter(IEmitter):

"""
Emitter to file. This creates one file per frame. The file names
are the ones in the url. For example: for file:///tmp/a the file for
the first frame would be /tmp/a.0 for a host, and /tmp/a.xyz.0 for a
container with id xyz.
"""

def emit(self, iostream, compress=False,
def get_emitter_protocol(self):
return 'file'

def emit(self, frame, compress=False,
metadata={}, snapshot_num=0):
"""
Expand All @@ -22,6 +26,7 @@ def emit(self, iostream, compress=False,
:param snapshot_num:
:return:
"""
iostream = self.format(frame)
output_path = self.url[len('file://'):]
short_name = metadata.get('emit_shortname', None)
if not short_name:
Expand Down
8 changes: 8 additions & 0 deletions crawler/plugins/emitters/http_emitter.plugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[Core]
Name = Http Emitter
Module = http_emitter

[Documentation]
Author = IBM
Version = 0.1
Description = Plugin to post frame data to http server
25 changes: 15 additions & 10 deletions crawler/plugins/emitters/http_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,35 @@

import requests

from plugins.emitters.base_emitter import BaseEmitter
from iemit_plugin import IEmitter

logger = logging.getLogger('crawlutils')


class HttpEmitter(BaseEmitter):
class HttpEmitter(IEmitter):

def __init__(self, url, timeout=1, max_retries=5,
emit_per_line=False):
BaseEmitter.__init__(self, url,
timeout=timeout,
max_retries=max_retries,
emit_per_line=emit_per_line)
def get_emitter_protocol(self):
return 'http'

def emit(self, iostream, compress=False,
def init(self, url, timeout=1, max_retries=5, emit_format='csv'):
IEmitter.init(self, url,
timeout=timeout,
max_retries=max_retries,
emit_format=emit_format)
if emit_format == 'json':
self.emit_per_line = True

def emit(self, frame, compress=False,
metadata={}, snapshot_num=0):
"""
:param iostream: a CStringIO used to buffer the formatted features.
:param frame: a frame containing extracted features
:param compress:
:param metadata:
:param snapshot_num:
:return: None
"""
iostream = self.format(frame)
if compress:
raise NotImplementedError('http emitter does not support gzip.')
if self.emit_per_line:
Expand Down
8 changes: 8 additions & 0 deletions crawler/plugins/emitters/kafka_emitter.plugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[Core]
Name = Kafka Emitter
Module = kafka_emitter

[Documentation]
Author = IBM
Version = 0.1
Description = Plugin to emit frame over kafka
25 changes: 15 additions & 10 deletions crawler/plugins/emitters/kafka_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,27 @@
import kafka as kafka_python
import pykafka

from plugins.emitters.base_emitter import BaseEmitter
from iemit_plugin import IEmitter
from utils.misc import (NullHandler, call_with_retries)

logger = logging.getLogger('crawlutils')
# Kafka logs too much
logging.getLogger('kafka').addHandler(NullHandler())


class KafkaEmitter(BaseEmitter):
class KafkaEmitter(IEmitter):

def __init__(self, url, timeout=1, max_retries=10,
emit_per_line=False):
BaseEmitter.__init__(self, url,
timeout=timeout,
max_retries=max_retries,
emit_per_line=emit_per_line)
def get_emitter_protocol(self):
return 'kafka'

def init(self, url, timeout=1, max_retries=10, emit_format='csv'):
IEmitter.init(self, url,
timeout=timeout,
max_retries=max_retries,
emit_format=emit_format)

if emit_format == 'json':
self.emit_per_line = True

try:
broker, topic = url[len('kafka://'):].split('/')
Expand All @@ -40,16 +45,16 @@ def connect_to_broker(self, broker, topic):
self.client = pykafka.KafkaClient(hosts=broker)
self.producer = self.client.topics[topic].get_producer()

def emit(self, iostream, compress=False,
def emit(self, frame, compress=False,
metadata={}, snapshot_num=0):
"""
:param iostream: a CStringIO used to buffer the formatted features.
:param compress:
:param metadata:
:param snapshot_num:
:return:
"""
iostream = self.format(frame)
if compress:
raise NotImplementedError('Compress not implemented.')

Expand Down
8 changes: 8 additions & 0 deletions crawler/plugins/emitters/mtgraphite_emitter.plugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[Core]
Name = MTGraphite Emitter
Module = mtgraphite_emitter

[Documentation]
Author = IBM
Version = 0.1
Description = Plugin to emit frame to MTGraphite server
Loading

0 comments on commit e536c55

Please sign in to comment.