Skip to content

Commit

Permalink
Store Compression: Term Vector Vector, closes elastic#2049.
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Jun 23, 2012
1 parent bdfff3d commit ce7682c
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 16 deletions.
Expand Up @@ -206,9 +206,10 @@ protected boolean readyBuffer() throws IOException {

@Override
public Object clone() {
// we clone and we need to make sure we keep the same positions!
CompressedIndexInput cloned = (CompressedIndexInput) super.clone();
cloned.position = 0;
cloned.valid = 0;
cloned.uncompressed = new byte[uncompressed.length];
System.arraycopy(uncompressed, 0, cloned.uncompressed, 0, uncompressed.length);
cloned.in = (IndexInput) cloned.in.clone();
return cloned;
}
Expand Down
Expand Up @@ -66,8 +66,6 @@ protected void doClose() throws IOException {
@Override
public Object clone() {
LZFCompressedIndexInput cloned = (LZFCompressedIndexInput) super.clone();
cloned.uncompressed = new byte[LZFChunk.MAX_CHUNK_LEN];
System.arraycopy(uncompressed, 0, cloned.uncompressed, 0, uncompressed.length);
cloned.inputBuffer = new byte[LZFChunk.MAX_CHUNK_LEN];
return cloned;
}
Expand Down
30 changes: 19 additions & 11 deletions src/main/java/org/elasticsearch/index/store/Store.java
Expand Up @@ -57,17 +57,23 @@ public class Store extends AbstractIndexShardComponent {

static {
IndexMetaData.addDynamicSettings(
"index.store.compress.stored"
"index.store.compress.stored",
"index.store.compress.tv"
);
}

class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
boolean compressedStoredFields = settings.getAsBoolean("index.store.compress.stored", Store.this.compressedStoredFields);
if (compressedStoredFields != Store.this.compressedStoredFields) {
logger.info("updating [index.store.compress.stored] from [{}] to [{}]", Store.this.compressedStoredFields, compressedStoredFields);
Store.this.compressedStoredFields = compressedStoredFields;
boolean compressStored = settings.getAsBoolean("index.store.compress.stored", Store.this.compressStored);
if (compressStored != Store.this.compressStored) {
logger.info("updating [index.store.compress.stored] from [{}] to [{}]", Store.this.compressStored, compressStored);
Store.this.compressStored = compressStored;
}
boolean compressTv = settings.getAsBoolean("index.store.compress.tv", Store.this.compressTv);
if (compressTv != Store.this.compressTv) {
logger.info("updating [index.store.compress.tv] from [{}] to [{}]", Store.this.compressTv, compressTv);
Store.this.compressTv = compressTv;
}
}
}
Expand Down Expand Up @@ -95,7 +101,8 @@ public static final boolean isChecksum(String name) {

private final boolean sync;

private volatile boolean compressedStoredFields;
private volatile boolean compressStored;
private volatile boolean compressTv;

private final ApplySettings applySettings = new ApplySettings();

Expand All @@ -109,9 +116,10 @@ public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore
this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway...
this.directory = new StoreDirectory(directoryService.build());

this.compressedStoredFields = componentSettings.getAsBoolean("compress.stored", false);
this.compressStored = componentSettings.getAsBoolean("compress.stored", false);
this.compressTv = componentSettings.getAsBoolean("compress.tv", false);

logger.debug("using compress.stored [{}]", compressedStoredFields);
logger.debug("using compress.stored [{}], compress.tv [{}]", compressStored, compressTv);

indexSettingsService.addListener(applySettings);
}
Expand Down Expand Up @@ -480,7 +488,7 @@ public IndexOutput createOutput(String name, boolean raw) throws IOException {
computeChecksum = false;
}
}
if (!raw && compressedStoredFields && name.endsWith(".fdt")) {
if (!raw && ((compressStored && name.endsWith(".fdt")) || (compressTv && name.endsWith(".tvf")))) {
if (computeChecksum) {
// with compression, there is no need for buffering when doing checksums
// since we have buffering on the compressed index output
Expand All @@ -503,7 +511,7 @@ public IndexInput openInput(String name) throws IOException {
throw new FileNotFoundException(name);
}
IndexInput in = metaData.directory().openInput(name);
if (name.endsWith(".fdt")) {
if (name.endsWith(".fdt") || name.endsWith(".tvf")) {
Compressor compressor = CompressorFactory.compressor(in);
if (compressor != null) {
in = compressor.indexInput(in);
Expand All @@ -519,7 +527,7 @@ public IndexInput openInput(String name, int bufferSize) throws IOException {
throw new FileNotFoundException(name);
}
IndexInput in = metaData.directory().openInput(name, bufferSize);
if (name.endsWith(".fdt")) {
if (name.endsWith(".fdt") || name.endsWith(".tvf")) {
Compressor compressor = CompressorFactory.compressor(in);
if (compressor != null) {
in = compressor.indexInput(in);
Expand Down
Expand Up @@ -45,6 +45,7 @@ public class LuceneCompressionBenchmark {

public static void main(String[] args) throws Exception {
final long MAX_SIZE = ByteSizeValue.parseBytesSizeValue("50mb").bytes();
final boolean WITH_TV = true;

final Compressor compressor = CompressorFactory.defaultCompressor();

Expand All @@ -61,6 +62,9 @@ public IndexOutput createOutput(String name) throws IOException {
if (name.endsWith(".fdt")) {
return compressor.indexOutput(super.createOutput(name));
}
if (WITH_TV && name.endsWith(".tvf")) {
return compressor.indexOutput(super.createOutput(name));
}
return super.createOutput(name);
}

Expand All @@ -75,12 +79,21 @@ public IndexInput openInput(String name) throws IOException {
return in;
}
}
if (WITH_TV && name.endsWith(".tvf")) {
IndexInput in = super.openInput(name);
Compressor compressor1 = CompressorFactory.compressor(in);
if (compressor1 != null) {
return compressor1.indexInput(in);
} else {
return in;
}
}
return super.openInput(name);
}

@Override
public IndexInput openInput(String name, int bufferSize) throws IOException {
if (name.endsWith(".fdt")) {
if (name.endsWith(".fdt") || name.endsWith(".tvf")) {
IndexInput in = super.openInput(name, bufferSize);
// in case the override called openInput(String)
if (in instanceof CompressedIndexInput) {
Expand Down Expand Up @@ -108,6 +121,10 @@ public IndexInput openInput(String name, int bufferSize) throws IOException {
builder.close();
Document doc = new Document();
doc.add(new Field("_source", builder.underlyingBytes(), 0, builder.underlyingBytesLength()));
if (WITH_TV) {
Field field = new Field("text", builder.string(), Field.Store.NO, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS);
doc.add(field);
}
uncompressedWriter.addDocument(doc);
compressedWriter.addDocument(doc);
}
Expand Down
Expand Up @@ -69,6 +69,10 @@ public boolean next() throws Exception {
return true;
}

public String currentText() {
return text;
}

/**
*/
public XContentBuilder current(XContentBuilder builder) throws Exception {
Expand Down

0 comments on commit ce7682c

Please sign in to comment.