From 7c689dda9d626da18af7e0a706201b3f67a96643 Mon Sep 17 00:00:00 2001 From: phrocker Date: Wed, 24 Feb 2016 15:40:08 -0500 Subject: [PATCH] ACCUMULO-4153: Update the getCodec method to no longer be synchronized and to use an atomic reference. --- .../core/file/rfile/bcfile/Compression.java | 23 ++-- .../file/rfile/bcfile/CompressionTest.java | 124 ++++++++++++++++++ 2 files changed, 137 insertions(+), 10 deletions(-) create mode 100644 core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java index 9defa1c639b..3cf586f4be3 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -147,25 +148,27 @@ public synchronized OutputStream createCompressionStream(OutputStream downStream }, GZ(COMPRESSION_GZ) { - private transient DefaultCodec codec; + private transient AtomicReference codec = new AtomicReference(); @Override - synchronized CompressionCodec getCodec() { - if (codec == null) { - codec = new DefaultCodec(); - codec.setConf(conf); + CompressionCodec getCodec() { + DefaultCodec resultCodec = codec.get(); + if (null == resultCodec) { + DefaultCodec newCodec = new DefaultCodec(); + newCodec.setConf(conf); + codec.compareAndSet(null, newCodec); } - return codec; + return codec.get(); } @Override public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { // Set the internal buffer size to read from down stream. if (downStreamBufferSize > 0) { - codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize); + codec.get().getConf().setInt("io.file.buffer.size", downStreamBufferSize); } - CompressionInputStream cis = codec.createInputStream(downStream, decompressor); + CompressionInputStream cis = codec.get().createInputStream(downStream, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; } @@ -178,8 +181,8 @@ public synchronized OutputStream createCompressionStream(OutputStream downStream } else { bos1 = downStream; } - codec.getConf().setInt("io.file.buffer.size", 32 * 1024); - CompressionOutputStream cos = codec.createOutputStream(bos1, compressor); + codec.get().getConf().setInt("io.file.buffer.size", 32 * 1024); + CompressionOutputStream cos = codec.get().createOutputStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java new file mode 100644 index 00000000000..f9f9b2c491b --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.accumulo.core.file.rfile.bcfile; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class CompressionTest { + + @Test + public void testSingle() throws IOException { + Assert.assertNotNull(Compression.Algorithm.GZ.getCodec()); + + Assert.assertNotNull(Compression.Algorithm.GZ.getCodec()); + } + + @Test + public void testManyStartNotNull() throws IOException { + final CompressionCodec codec = Algorithm.GZ.getCodec(); + + ExecutorService service = Executors.newFixedThreadPool(10); + + for (int i = 0; i < 30; i++) { + service.submit(new Callable() + + { + + @Override + public Boolean call() throws Exception { + Assert.assertNotNull(Compression.Algorithm.GZ.getCodec()); + return true; + } + + }); + } + + service.shutdown(); + + Assert.assertNotNull(codec); + + } + + // don't start until we have created the codec + @Test + public void testManyDontStartUntilThread() throws IOException { + + ExecutorService service = Executors.newFixedThreadPool(10); + + for (int i = 0; i < 30; i++) { + + service.submit(new Callable() { + + @Override + public Boolean call() throws Exception { + Assert.assertNotNull(Compression.Algorithm.GZ.getCodec()); + return true; + } + + }); + } + + service.shutdown(); + + } + + // don't start until we have created the codec + @Test + public void testThereCanBeOnlyOne() throws IOException, InterruptedException { + + ExecutorService service = Executors.newFixedThreadPool(20); + + Collection> list = Lists.newArrayList(); + + // keep track of the system's identity hashcodes. + final Set testSet = Sets.newHashSet(); + + for (int i = 0; i < 40; i++) { + list.add(new Callable() { + + @Override + public Boolean call() throws Exception { + CompressionCodec codec = Compression.Algorithm.GZ.getCodec(); + Assert.assertNotNull(codec); + // add the identity hashcode to the set. + testSet.add(System.identityHashCode(codec)); + return true; + } + }); + } + + service.invokeAll(list); + // ensure that we + Assert.assertEquals(1, testSet.size()); + service.shutdown(); + + } + +}