/
rivers.py
136 lines (115 loc) · 3.73 KB
/
rivers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# -*- coding: utf-8 -*-
from __future__ import absolute_import
try:
# For Python < 2.6 or people using a newer version of simplejson
import simplejson
json = simplejson
except ImportError:
# For Python >= 2.6
import json
from .es import ES
class River(object):
def __init__(self, index_name=None, index_type=None, bulk_size=100, bulk_timeout=None):
self.name = index_name
self.index_name = index_name
self.index_type = index_type
self.bulk_size = bulk_size
self.bulk_timeout = bulk_timeout
@property
def q(self):
res = self.serialize()
index = {}
if self.name:
index['name'] = self.name
if self.index_name:
index['index'] = self.index_name
if self.index_type:
index['type'] = self.index_type
if self.bulk_size:
index['bulk_size'] = self.bulk_size
if self.bulk_timeout:
index['bulk_timeout'] = self.bulk_timeout
if index:
res['index'] = index
return res
def __repr__(self):
return str(self.q)
def to_json(self):
return json.dumps(self.q, cls=ES.encoder)
def serialize(self):
raise NotImplementedError
class RabbitMQRiver(River):
type = "rabbitmq"
def __init__(self, host="localhost", port=5672, user="guest",
password="guest", vhost="/", queue="es", exchange="es",
routing_key="es", **kwargs):
super(RabbitMQRiver, self).__init__(**kwargs)
self.host = host
self.port = port
self.user = user
self.password = password
self.vhost = vhost
self.queue = queue
self.exchange = exchange
self.routing_key = routing_key
def serialize(self):
return {
"type": self.type,
self.type: {
"host": self.host,
"port": self.port,
"user": self.user,
"pass": self.password,
"vhost": self.vhost,
"queue": self.queue,
"exchange": self.exchange,
"routing_key": self.routing_key
}
}
class TwitterRiver(River):
type = "twitter"
def __init__(self, user, password, **kwargs):
super(TwitterRiver, self).__init__(**kwargs)
self.user = user
self.password = password
def serialize(self):
return {
"type": self.type,
self.type: {
"user": self.user,
"password": self.password,
}
}
class CouchDBRiver(River):
type = "couchdb"
def __init__(self, host="localhost", port=5984, db="mydb", filter=None,
filter_params=None, script=None, user=None, password=None,
**kwargs):
super(CouchDBRiver, self).__init__(**kwargs)
self.host = host
self.port = port
self.db = db
self.filter = filter
self.filter_params = filter_params
self.script = script
self.user = user
self.password = password
def serialize(self):
result = {
"type": self.type,
self.type: {
"host": self.host,
"port": self.port,
"db": self.db,
"filter": self.filter,
}
}
if self.filter_params is not None:
result[self.type]["filter_params"] = self.filter_params
if self.script is not None:
result[self.type]["script"] = self.script
if self.user is not None:
result[self.type]["user"] = self.user
if self.password is not None:
result[self.type]["password"] = self.password
return result