-
Notifications
You must be signed in to change notification settings - Fork 9
/
sink.py
135 lines (90 loc) · 3.21 KB
/
sink.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import logging
from ..abc.sink import Sink
from .connection import ElasticSearchBulk
from .data_feeder import data_feeder_create_or_index
import asab
#
L = logging.getLogger(__name__)
#
class ElasticSearchSink(Sink):
"""
ElasticSearchSink allows you to insert events into ElasticSearch through POST requests
The following attributes can be passed to the context and thus override the default behavior
of the sink:
es_index (STRING): ElasticSearch index name
data_feeder accepts the event as its only parameter and yields data as Python generator
The example implementation is:
def data_feeder_create_or_index(event):
_id = event.pop("_id", None)
if _id is None:
yield b'{"create":{}}\n'
else:
yield orjson.dumps(
{"index": {"_id": _id}}, option=orjson.OPT_APPEND_NEWLINE
)
yield orjson.dumps(event, option=orjson.OPT_APPEND_NEWLINE)
|
"""
ConfigDefaults = {
"index_prefix": "bspump_", # Obsolete, use 'index'
"index": "bspump_",
}
def __init__(self, app, pipeline, connection, id=None, config=None, bulk_class=ElasticSearchBulk, data_feeder=data_feeder_create_or_index):
"""
Description:
**Parameters**
app : Application
Name of the Application
pipeline : Pipeline
Name of the Pipeline
connection : Connection
Name of the Connection
id : ID, default= None
ID
config : JSON, default= None
Configuration file with additional information.
bulk_class=ElasticBulk :
data_feeder=data_feeder_create_or_index :
|
"""
super().__init__(app, pipeline, id=id, config=config)
self.Connection = pipeline.locate_connection(app, connection)
self.BulkClass = bulk_class
self.Index = self.Config.get('index')
# intex_prefix is obsolete. It is supported currently ensure backward compatibility
if self.Index == "bspump_" and self.Config.get('index_prefix') != "bspump_" and len(self.Config.get('index_prefix')) > 0:
asab.LogObsolete.warning("The 'index_prefix' has been renamed to 'index', please adjust the configuration.")
self.Index = self.Config.get('index_prefix')
if data_feeder is None:
raise RuntimeError("data_feeder must not be None.")
self.__data_feeder = data_feeder
app.PubSub.subscribe("ElasticSearchConnection.pause!", self._connection_throttle)
app.PubSub.subscribe("ElasticSearchConnection.unpause!", self._connection_throttle)
def process(self, context, event):
"""
Description:
**Parameters**
context :
event : any data type
Information with timestamp.
"""
try:
_id = event.pop("_id", None)
except TypeError:
if isinstance(event, dict) is False:
L.error("You are trying to pass event of type: {} to ElasticSearchSink, but only dict is supported".format(type(event)))
raise
self.Connection.consume(
context.get("es_index", self.Index),
self.__data_feeder(event, _id),
bulk_class=self.BulkClass,
)
def _connection_throttle(self, event_name, connection):
if connection != self.Connection:
return
if event_name == "ElasticSearchConnection.pause!":
self.Pipeline.throttle(self, True)
elif event_name == "ElasticSearchConnection.unpause!":
self.Pipeline.throttle(self, False)
else:
raise RuntimeError("Unexpected event name '{}'".format(event_name))