/
FindAsyncListener.java
98 lines (82 loc) · 3.45 KB
/
FindAsyncListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/*
* Copyright (c) 2017 Otávio Santana and others
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Apache License v2.0 is available at http://www.opensource.org/licenses/apache2.0.php.
*
* You may elect to redistribute this code under either of these licenses.
*
* Contributors:
*
* Otavio Santana
*/
package org.jnosql.diana.elasticsearch.document;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.jnosql.diana.api.ExecuteAsyncQueryException;
import org.jnosql.diana.api.document.DocumentEntity;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;
import static java.util.Collections.synchronizedList;
import static java.util.stream.StreamSupport.stream;
final class FindAsyncListener {
private final Consumer<List<DocumentEntity>> callBack;
private final String collection;
private final List<DocumentEntity> entities = synchronizedList(new ArrayList<>());
private AtomicBoolean ids = new AtomicBoolean(true);
private AtomicBoolean query = new AtomicBoolean(true);
FindAsyncListener(Consumer<List<DocumentEntity>> callBack, String collection) {
this.callBack = callBack;
this.collection = collection;
}
ActionListener<MultiGetResponse> getIds() {
ids.set(false);
return new ActionListener<MultiGetResponse>() {
@Override
public void onResponse(MultiGetResponse multiGetItemResponses) {
ids.set(true);
Stream.of(multiGetItemResponses.getResponses())
.map(MultiGetItemResponse::getResponse)
.map(ElasticsearchEntry::of)
.filter(ElasticsearchEntry::isNotEmpty)
.map(ElasticsearchEntry::toEntity)
.forEach(entities::add);
if (ids.get() && query.get()) {
callBack.accept(entities);
}
}
@Override
public void onFailure(Exception e) {
throw new ExecuteAsyncQueryException("An error when execute query", e);
}
};
}
ActionListener<SearchResponse> getSearch() {
query.set(false);
return new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
query.set(true);
stream(searchResponse.getHits().spliterator(), false)
.map(ElasticsearchEntry::of)
.filter(ElasticsearchEntry::isNotEmpty)
.map(ElasticsearchEntry::toEntity)
.forEach(entities::add);
if (ids.get() && query.get()) {
callBack.accept(entities);
}
}
@Override
public void onFailure(Exception e) {
throw new ExecuteAsyncQueryException("An error when execute query", e);
}
};
}
}