Skip to content
This repository has been archived by the owner on May 5, 2020. It is now read-only.

Commit

Permalink
Elasticsearch 0.90.5 compatability
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Zhu committed Oct 15, 2013
1 parent b2cfde6 commit a21974a
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 63 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
</parent>

<properties>
<elasticsearch.version>0.90.3</elasticsearch.version>
<elasticsearch.version>0.90.5</elasticsearch.version>
</properties>

<distributionManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package crate.elasticsearch.facet.distinct;

import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
Expand All @@ -17,18 +18,13 @@ public abstract class InternalDistinctDateHistogramFacet extends InternalDateHis
public static final String TYPE = "distinct_date_histogram";
protected ComparatorType comparatorType;

ExtTLongObjectHashMap<InternalDistinctDateHistogramFacet.DistinctEntry> tEntries;
boolean cachedEntries;
Collection<DistinctEntry> entries = null;
protected final CacheRecycler cacheRecycler;

public InternalDistinctDateHistogramFacet() {
cacheRecycler = null;
}

public InternalDistinctDateHistogramFacet(String facetName,final CacheRecycler cacheRecycler) {
public InternalDistinctDateHistogramFacet(String facetName) {
super(facetName);
this.cacheRecycler = cacheRecycler;
}

public static void registerStreams() {
Expand Down Expand Up @@ -103,15 +99,6 @@ public Iterator<Entry> iterator() {
return (Iterator) entries().iterator();
}

void releaseCache() {
if (cachedEntries) {
cacheRecycler.pushLongObjectMap(tEntries);
cachedEntries = false;
tEntries = null;
}
}


static final class Fields {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString ENTRIES = new XContentBuilderString("entries");
Expand All @@ -128,38 +115,36 @@ public Facet reduce(ReduceContext context) {
InternalDistinctDateHistogramFacet internalFacet = (InternalDistinctDateHistogramFacet) facets.get(0);
List<DistinctEntry> entries = internalFacet.entries();
Collections.sort(entries, comparatorType.comparator());
internalFacet.releaseCache();
return internalFacet;
}

ExtTLongObjectHashMap<DistinctEntry> map = context.cacheRecycler().popLongObjectMap();
Recycler.V<ExtTLongObjectHashMap<DistinctEntry>> map = context.cacheRecycler().longObjectMap(-1);
for (Facet facet : facets) {
InternalDistinctDateHistogramFacet histoFacet = (InternalDistinctDateHistogramFacet) facet;
for (DistinctEntry fullEntry : histoFacet.entries) {
DistinctEntry current = map.get(fullEntry.getTime());
DistinctEntry current = map.v().get(fullEntry.getTime());
if (current != null) {
current.getValues().addAll(fullEntry.getValues());

} else {
map.put(fullEntry.getTime(), fullEntry);
map.v().put(fullEntry.getTime(), fullEntry);
}
}
histoFacet.releaseCache();
}

// sort
Object[] values = map.internalValues();
Object[] values = map.v().internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
List<DistinctEntry> ordered = new ArrayList<DistinctEntry>(map.size());
for (int i = 0; i < map.size(); i++) {
List<DistinctEntry> ordered = new ArrayList<DistinctEntry>(map.v().size());
for (int i = 0; i < map.v().size(); i++) {
DistinctEntry value = (DistinctEntry) values[i];
if (value == null) {
break;
}
ordered.add(value);
}

context.cacheRecycler().pushLongObjectMap(map);
map.release();

// just initialize it as already ordered facet
InternalDistinctDateHistogramFacet ret = newFacet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.joda.time.MutableDateTime;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.LongValues;
Expand All @@ -12,6 +13,7 @@
import org.elasticsearch.search.facet.datehistogram.DateHistogramFacet;

import java.io.IOException;
import java.util.ArrayList;

/**
* Collect the distinct values per time interval.
Expand All @@ -25,7 +27,7 @@ public class LongDistinctDateHistogramFacetExecutor extends FacetExecutor {
private MutableDateTime dateTime;
private final long interval;
private final DateHistogramFacet.ComparatorType comparatorType;
final ExtTLongObjectHashMap<InternalDistinctDateHistogramFacet.DistinctEntry> entries;
final Recycler.V<ExtTLongObjectHashMap<InternalDistinctDateHistogramFacet.DistinctEntry>> entries;
private final CacheRecycler cacheRecycler;

public LongDistinctDateHistogramFacetExecutor(IndexNumericFieldData keyIndexFieldData,
Expand All @@ -35,7 +37,7 @@ public LongDistinctDateHistogramFacetExecutor(IndexNumericFieldData keyIndexFiel
this.comparatorType = comparatorType;
this.keyIndexFieldData = keyIndexFieldData;
this.distinctIndexFieldData = distinctIndexFieldData;
this.entries = cacheRecycler.popLongObjectMap();
this.entries = cacheRecycler.longObjectMap(-1);
this.dateTime = dateTime;
this.interval = interval;
this.cacheRecycler = cacheRecycler;
Expand All @@ -48,7 +50,10 @@ public Collector collector() {

@Override
public InternalFacet buildFacet(String facetName) {
return new LongInternalDistinctDateHistogramFacet(facetName, comparatorType, entries, true, cacheRecycler);
ArrayList<LongInternalDistinctDateHistogramFacet.DistinctEntry> entries1 = new ArrayList<LongInternalDistinctDateHistogramFacet.DistinctEntry>(entries.v().valueCollection());

entries.release();
return new LongInternalDistinctDateHistogramFacet(facetName, comparatorType, entries1);
}

/*
Expand All @@ -62,7 +67,7 @@ class Collector extends FacetExecutor.Collector {
private final DateHistogramProc histoProc;

public Collector() {
this.histoProc = new DateHistogramProc(entries, dateTime, interval);
this.histoProc = new DateHistogramProc(entries.v(), dateTime, interval);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ public static void registerStreams() {
LongInternalDistinctDateHistogramFacet() {
}

LongInternalDistinctDateHistogramFacet(String name, final CacheRecycler cacheRecycler) {
super(name, cacheRecycler);
LongInternalDistinctDateHistogramFacet(String name) {
super(name);
}

public LongInternalDistinctDateHistogramFacet(String name, ComparatorType comparatorType, ExtTLongObjectHashMap<DistinctEntry> entries, boolean cachedEntries,final CacheRecycler cacheRecycler) {
super(name, cacheRecycler);
public LongInternalDistinctDateHistogramFacet(String name, ComparatorType comparatorType, List<DistinctEntry> entries) {
super(name);
this.comparatorType = comparatorType;
this.tEntries = entries;
this.cachedEntries = cachedEntries;
this.entries = entries.valueCollection();
this.entries = entries;
}

static Stream STREAM = new Stream() {
Expand All @@ -54,7 +52,7 @@ public BytesReference streamType() {

@Override
protected LongInternalDistinctDateHistogramFacet newFacet() {
return new LongInternalDistinctDateHistogramFacet(getName(), cacheRecycler);
return new LongInternalDistinctDateHistogramFacet(getName());
}

/**
Expand All @@ -65,7 +63,6 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
comparatorType = ComparatorType.fromId(in.readByte());

cachedEntries = false;
int size = in.readVInt();
entries = new ArrayList<DistinctEntry>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -95,6 +92,5 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong((Long) name);
}
}
releaseCache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.joda.time.MutableDateTime;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.index.fielddata.BytesValues;
import org.elasticsearch.index.fielddata.LongValues;
Expand All @@ -15,6 +16,7 @@
import org.elasticsearch.search.facet.terms.strings.HashedAggregator;

import java.io.IOException;
import java.util.ArrayList;

/**
* Collect the distinct values per time interval.
Expand All @@ -28,8 +30,7 @@ public class StringDistinctDateHistogramFacetExecutor extends FacetExecutor {
private MutableDateTime dateTime;
private final long interval;
private final DateHistogramFacet.ComparatorType comparatorType;
final ExtTLongObjectHashMap<InternalDistinctDateHistogramFacet.DistinctEntry> entries;
final CacheRecycler cacheRecycler;
final Recycler.V<ExtTLongObjectHashMap<InternalDistinctDateHistogramFacet.DistinctEntry>> entries;

public StringDistinctDateHistogramFacetExecutor(PackedArrayIndexFieldData keyIndexFieldData,
PagedBytesIndexFieldData distinctIndexFieldData,
Expand All @@ -38,10 +39,9 @@ public StringDistinctDateHistogramFacetExecutor(PackedArrayIndexFieldData keyInd
this.comparatorType = comparatorType;
this.keyIndexFieldData = keyIndexFieldData;
this.distinctIndexFieldData = distinctIndexFieldData;
this.entries = cacheRecycler.popLongObjectMap();
this.entries = cacheRecycler.longObjectMap(-1);
this.dateTime = dateTime;
this.interval = interval;
this.cacheRecycler = cacheRecycler;
}

@Override
Expand All @@ -51,7 +51,9 @@ public Collector collector() {

@Override
public InternalFacet buildFacet(String facetName) {
return new StringInternalDistinctDateHistogramFacet(facetName, comparatorType, entries, true, cacheRecycler);
ArrayList<InternalDistinctDateHistogramFacet.DistinctEntry> entries1 = new ArrayList<InternalDistinctDateHistogramFacet.DistinctEntry>(entries.v().valueCollection());
entries.release();
return new StringInternalDistinctDateHistogramFacet(facetName, comparatorType, entries1);
}

/*
Expand Down Expand Up @@ -96,11 +98,11 @@ public static class DateHistogramProc {
BytesValues.WithOrdinals valueValues;
private final long interval;
private MutableDateTime dateTime;
final ExtTLongObjectHashMap<InternalDistinctDateHistogramFacet.DistinctEntry> entries;
final Recycler.V<ExtTLongObjectHashMap<InternalDistinctDateHistogramFacet.DistinctEntry>> entries;

final ValueAggregator valueAggregator = new ValueAggregator();

public DateHistogramProc(ExtTLongObjectHashMap<InternalDistinctDateHistogramFacet.DistinctEntry> entries, MutableDateTime dateTime, long interval) {
public DateHistogramProc(Recycler.V<ExtTLongObjectHashMap<InternalDistinctDateHistogramFacet.DistinctEntry>> entries, MutableDateTime dateTime, long interval) {
this.dateTime = dateTime;
this.entries = entries;
this.interval = interval;
Expand Down Expand Up @@ -135,10 +137,10 @@ protected void onValue(int docId, long time) {
time = ((time / interval) * interval);
}

InternalDistinctDateHistogramFacet.DistinctEntry entry = entries.get(time);
InternalDistinctDateHistogramFacet.DistinctEntry entry = entries.v().get(time);
if (entry == null) {
entry = new InternalDistinctDateHistogramFacet.DistinctEntry(time);
entries.put(time, entry);
entries.v().put(time, entry);
}
valueAggregator.entry = entry;
valueAggregator.onDoc(docId, valueValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.elasticsearch.common.bytes.HashedBytesArray;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.InternalFacet;
Expand All @@ -12,6 +13,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import org.elasticsearch.cache.recycler.CacheRecycler;

/*
Expand All @@ -26,21 +28,19 @@ public static void registerStreams() {
InternalFacet.Streams.registerStream(STREAM, STREAM_TYPE);
}

StringInternalDistinctDateHistogramFacet(String name, final CacheRecycler cacheRecycler) {
super(name, cacheRecycler);
StringInternalDistinctDateHistogramFacet(String name) {
super(name);
}

public StringInternalDistinctDateHistogramFacet(String name, ComparatorType comparatorType, ExtTLongObjectHashMap<DistinctEntry> entries, boolean cachedEntries, final CacheRecycler cacheRecycler) {
super(name, cacheRecycler);
public StringInternalDistinctDateHistogramFacet(String name, ComparatorType comparatorType, List<DistinctEntry> entries) {
super(name);
this.comparatorType = comparatorType;
this.tEntries = entries;
this.cachedEntries = cachedEntries;
this.entries = entries.valueCollection();
this.entries = entries;
}

@Override
protected InternalDistinctDateHistogramFacet newFacet() {
return new StringInternalDistinctDateHistogramFacet(getName(),cacheRecycler);
return new StringInternalDistinctDateHistogramFacet(getName());
}

static InternalFacet.Stream STREAM = new Stream() {
Expand Down Expand Up @@ -71,7 +71,6 @@ public static StringInternalDistinctDateHistogramFacet readHistogramFacet(Stream
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
comparatorType = ComparatorType.fromId(in.readByte());
cachedEntries = false;
int size = in.readVInt();
entries = new ArrayList<DistinctEntry>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -99,6 +98,5 @@ public static StringInternalDistinctDateHistogramFacet readHistogramFacet(Stream
out.writeString((String) name);
}
}
releaseCache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
Expand All @@ -26,6 +27,8 @@ public class LatestFacetExecutor extends FacetExecutor {
protected int size = 10;
protected int start = 0;

final Recycler.V<ExtTLongObjectHashMap<InternalLatestFacet.Entry>> entries;

public LatestFacetExecutor(IndexNumericFieldData keyField, IndexNumericFieldData valueField,
IndexNumericFieldData tsField, int size, int start, CacheRecycler cacheRecycler) {
super();
Expand All @@ -35,8 +38,8 @@ public LatestFacetExecutor(IndexNumericFieldData keyField, IndexNumericFieldData
this.keyFieldName = keyField;
this.valueFieldName = valueField;
this.tsFieldName = tsField;
final ExtTLongObjectHashMap<InternalLatestFacet.Entry> entries = cacheRecycler.popLongObjectMap();
this.aggregator = new Aggregator(entries);
entries = cacheRecycler.longObjectMap(-1);
this.aggregator = new Aggregator(entries.v());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void setupTemplates(Client client) throws Exception {
}

public void flush() {
client.admin().indices().prepareFlush().setRefresh(true).execute()
client.admin().indices().prepareFlush().execute()
.actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setupTemplates(Client client) throws Exception {
}

public void flush() {
client.admin().indices().prepareFlush().setRefresh(true).execute()
client.admin().indices().prepareFlush().execute()
.actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setupTemplates(Client client) throws Exception {
}

public void flush() {
client.admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
client.admin().indices().prepareFlush().execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
try {
Thread.sleep(300); // sleep a bit here...
Expand Down

0 comments on commit a21974a

Please sign in to comment.