/
EsLocalScheme.java
188 lines (164 loc) · 7.11 KB
/
EsLocalScheme.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.cascading;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.mr.WritableBytesConverter;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.util.FieldAlias;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
/**
* Cascading Scheme handling
*/
class EsLocalScheme extends Scheme<Properties, ScrollQuery, Object, Object[], Object[]> {
private static final long serialVersionUID = 979036202776892844L;
private final String resource;
private final String query;
private final String host;
private final int port;
private final Properties props;
private transient RestRepository client;
private boolean IS_ES_10;
EsLocalScheme(String host, int port, String index, String query, Fields fields, Properties props) {
this.resource = index;
this.query = query;
this.host = host;
this.port = port;
if (fields != null) {
setSinkFields(fields);
setSourceFields(fields);
}
this.props = props;
}
@Override
public void sourcePrepare(FlowProcess<Properties> flowProcess, SourceCall<Object[], ScrollQuery> sourceCall) throws IOException {
super.sourcePrepare(flowProcess, sourceCall);
Object[] context = new Object[1];
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_10 = SettingsUtils.isEs10(settings);
}
@Override
public void sourceCleanup(FlowProcess<Properties> flowProcess, SourceCall<Object[], ScrollQuery> sourceCall) throws IOException {
sourceCall.getInput().close();
sourceCall.setContext(null);
cleanupClient();
}
@Override
public void sinkCleanup(FlowProcess<Properties> flowProcess, SinkCall<Object[], Object> sinkCall) throws IOException {
cleanupClient();
}
private void cleanupClient() throws IOException {
if (client != null) {
client.close();
client = null;
}
}
@Override
public void sinkPrepare(FlowProcess<Properties> flowProcess, SinkCall<Object[], Object> sinkCall) throws IOException {
super.sinkPrepare(flowProcess, sinkCall);
Object[] context = new Object[1];
Settings settings = SettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields());
sinkCall.setContext(context);
}
@Override
public void sourceConfInit(FlowProcess<Properties> flowProcess, Tap<Properties, ScrollQuery, Object> tap, Properties conf) {
initClient(conf, true);
}
@Override
public void sinkConfInit(FlowProcess<Properties> flowProcess, Tap<Properties, ScrollQuery, Object> tap, Properties conf) {
initClient(conf, false);
InitializationUtils.checkIndexExistence(SettingsManager.loadFrom(conf).merge(props), client);
}
private void initClient(Properties props, boolean read) {
if (client == null) {
Settings settings = SettingsManager.loadFrom(props).merge(this.props);
CascadingUtils.init(settings, host, port, resource, query, read);
Log log = LogFactory.getLog(EsTap.class);
InitializationUtils.setValueWriterIfNotSet(settings, CascadingValueWriter.class, log);
InitializationUtils.setValueReaderIfNotSet(settings, JdkValueReader.class, log);
InitializationUtils.setBytesConverterIfNeeded(settings, WritableBytesConverter.class, log);
InitializationUtils.setFieldExtractorIfNotSet(settings, CascadingLocalFieldExtractor.class, log);
client = new RestRepository(settings);
}
}
@SuppressWarnings("unchecked")
@Override
public boolean source(FlowProcess<Properties> flowProcess, SourceCall<Object[], ScrollQuery> sourceCall) throws IOException {
ScrollQuery query = sourceCall.getInput();
if (!query.hasNext()) {
return false;
}
TupleEntry entry = sourceCall.getIncomingEntry();
Map<String, ?> data = (Map<String, ?>) query.next()[1];
FieldAlias alias = (FieldAlias) sourceCall.getContext()[0];
if (entry.getFields().isDefined()) {
// lookup using writables
// TODO: it's worth benchmarking whether using an index/offset yields significantly better performance
for (Comparable<?> field : entry.getFields()) {
if (IS_ES_10) {
Object result = data;
// check for multi-level alias
for (String level : StringUtils.tokenize(alias.toES(field.toString()), ".")) {
result = ((Map) result).get(level);
if (result == null) {
break;
}
}
entry.setObject(field, result);
}
else {
//NB: coercion should be applied automatically by the TupleEntry
entry.setObject(field, data.get(alias.toES(field.toString())));
}
}
}
else {
// no definition means no coercion
List<Object> elements = Tuple.elements(entry.getTuple());
elements.clear();
elements.addAll(data.values());
}
return true;
}
@Override
public void sink(FlowProcess<Properties> flowProcess, SinkCall<Object[], Object> sinkCall) throws IOException {
client.writeToIndex(sinkCall);
}
}