Skip to content

Commit

Permalink
fix code smells
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Mar 3, 2020
1 parent 5044dcb commit abbd58f
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 194 deletions.
Expand Up @@ -95,6 +95,11 @@ public class StorageEngine implements IService {
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");

static class InstanceHolder {

private InstanceHolder() {
// forbidding instantiation
}

private static final StorageEngine INSTANCE = new StorageEngine();
}

Expand Down
Expand Up @@ -1617,12 +1617,11 @@ private void removeFullyOverlapFiles(TsFileResource resource, Iterator<TsFileRes
while (iterator.hasNext()) {
TsFileResource seqFile = iterator.next();
if (resource.getHistoricalVersions().containsAll(seqFile.getHistoricalVersions())
&& !resource.getHistoricalVersions().equals(seqFile.getHistoricalVersions())) {
if (seqFile.getWriteQueryLock().writeLock().tryLock()) {
iterator.remove();
seqFile.remove();
seqFile.getWriteQueryLock().writeLock().unlock();
}
&& !resource.getHistoricalVersions().equals(seqFile.getHistoricalVersions())
&& seqFile.getWriteQueryLock().writeLock().tryLock()) {
iterator.remove();
seqFile.remove();
seqFile.getWriteQueryLock().writeLock().unlock();
}
}
}
Expand Down
Expand Up @@ -37,6 +37,8 @@
*/
public abstract class PhysicalPlan {

private static final String SERIALIZATION_UNIMPLEMENTED = "serialization unimplemented";

private boolean isQuery;
private Operator.OperatorType operatorType;
private static final int NULL_VALUE_LEN = -1;
Expand Down Expand Up @@ -88,15 +90,15 @@ public void setQuery(boolean query) {
}

public void serializeTo(DataOutputStream stream) throws IOException {
throw new UnsupportedOperationException("serialize of unimplemented");
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
}

public void serializeTo(ByteBuffer buffer) {
throw new UnsupportedOperationException("serialize of unimplemented");
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
}

public void deserializeFrom(ByteBuffer buffer) {
throw new UnsupportedOperationException("serialize of unimplemented");
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
}

protected void putString(ByteBuffer buffer, String value) {
Expand Down
Expand Up @@ -140,57 +140,66 @@ public void serializeTo(DataOutputStream stream) throws IOException {
}

if (valueBuffer == null) {
for (int i = 0; i < measurements.length; i++) {
TSDataType dataType = dataTypes[i];
switch (dataType) {
case INT32:
int[] intValues = (int[]) columns[i];
for(int loc : index){
stream.writeInt(intValues[loc]);
}
break;
case INT64:
long[] longValues = (long[]) columns[i];
for(int loc : index){
stream.writeLong(longValues[loc]);
}
break;
case FLOAT:
float[] floatValues = (float[]) columns[i];
for(int loc : index){
stream.writeFloat(floatValues[loc]);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) columns[i];
for(int loc : index){
stream.writeDouble(doubleValues[loc]);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) columns[i];
for(int loc : index){
stream.write(BytesUtils.boolToByte(boolValues[loc]));
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) columns[i];
for(int loc : index){
stream.writeInt(binaryValues[loc].getLength());
stream.write(binaryValues[loc].getValues());
}
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataType));
}
}
serializeValues(stream);
} else {
stream.write(valueBuffer.array());
valueBuffer = null;
}
}

private void serializeValues(DataOutputStream stream) throws IOException {
for (int i = 0; i < measurements.length; i++) {
serializeColumn(dataTypes[i], columns[i], stream, index);
}
}

private void serializeColumn(TSDataType dataType, Object column, DataOutputStream stream,
Set<Integer> index)
throws IOException {
switch (dataType) {
case INT32:
int[] intValues = (int[]) column;
for(int loc : index){
stream.writeInt(intValues[loc]);
}
break;
case INT64:
long[] longValues = (long[]) column;
for(int loc : index){
stream.writeLong(longValues[loc]);
}
break;
case FLOAT:
float[] floatValues = (float[]) column;
for(int loc : index){
stream.writeFloat(floatValues[loc]);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) column;
for(int loc : index){
stream.writeDouble(doubleValues[loc]);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) column;
for(int loc : index){
stream.write(BytesUtils.boolToByte(boolValues[loc]));
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) column;
for(int loc : index){
stream.writeInt(binaryValues[loc].getLength());
stream.write(binaryValues[loc].getValues());
}
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataType));
}
}

@Override
public void serializeTo(ByteBuffer buffer) {
int type = PhysicalPlanType.BATCHINSERT.ordinal();
Expand Down Expand Up @@ -219,57 +228,65 @@ public void serializeTo(ByteBuffer buffer) {
}

if (valueBuffer == null) {
for (int i = 0; i < measurements.length; i++) {
TSDataType dataType = dataTypes[i];
switch (dataType) {
case INT32:
int[] intValues = (int[]) columns[i];
for (int j = start; j < end; j++) {
buffer.putInt(intValues[j]);
}
break;
case INT64:
long[] longValues = (long[]) columns[i];
for (int j = start; j < end; j++) {
buffer.putLong(longValues[j]);
}
break;
case FLOAT:
float[] floatValues = (float[]) columns[i];
for (int j = start; j < end; j++) {
buffer.putFloat(floatValues[j]);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) columns[i];
for (int j = start; j < end; j++) {
buffer.putDouble(doubleValues[j]);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) columns[i];
for (int j = start; j < end; j++) {
buffer.putInt(BytesUtils.boolToByte(boolValues[j]));
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) columns[i];
for (int j = start; j < end; j++) {
buffer.putInt(binaryValues[j].getLength());
buffer.put(binaryValues[j].getValues());
}
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataType));
}
}
serializeValues(buffer);
} else {
buffer.put(valueBuffer.array());
valueBuffer = null;
}
}

private void serializeValues(ByteBuffer buffer) {
for (int i = 0; i < measurements.length; i++) {
serializeColumn(dataTypes[i], columns[i], buffer, start, end);
}
}

private void serializeColumn(TSDataType dataType, Object column, ByteBuffer buffer,
int start, int end) {
switch (dataType) {
case INT32:
int[] intValues = (int[]) column;
for (int j = start; j < end; j++) {
buffer.putInt(intValues[j]);
}
break;
case INT64:
long[] longValues = (long[]) column;
for (int j = start; j < end; j++) {
buffer.putLong(longValues[j]);
}
break;
case FLOAT:
float[] floatValues = (float[]) column;
for (int j = start; j < end; j++) {
buffer.putFloat(floatValues[j]);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) column;
for (int j = start; j < end; j++) {
buffer.putDouble(doubleValues[j]);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) column;
for (int j = start; j < end; j++) {
buffer.putInt(BytesUtils.boolToByte(boolValues[j]));
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) column;
for (int j = start; j < end; j++) {
buffer.putInt(binaryValues[j].getLength());
buffer.put(binaryValues[j].getValues());
}
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataType));
}
}

public void setTimeBuffer(ByteBuffer timeBuffer) {
this.timeBuffer = timeBuffer;
this.timeBuffer.position(0);
Expand Down Expand Up @@ -369,13 +386,13 @@ public long getMaxTime() {
if (maxTime != null) {
return maxTime;
}
long maxTime = Long.MIN_VALUE;
long tmpMaxTime = Long.MIN_VALUE;
for (Long time : times) {
if (time > maxTime) {
maxTime = time;
if (time > tmpMaxTime) {
tmpMaxTime = time;
}
}
return maxTime;
return tmpMaxTime;
}

public long[] getTimes() {
Expand Down
Expand Up @@ -19,9 +19,9 @@

package org.apache.iotdb.db.query.aggregation;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
Expand All @@ -30,8 +30,6 @@
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.utils.Binary;

import java.io.IOException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

public abstract class AggregateResult {
Expand Down
Expand Up @@ -19,10 +19,11 @@

package org.apache.iotdb.db.query.dataset.groupby;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.PathException;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
Expand All @@ -35,10 +36,6 @@
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {

private List<IReaderByTimestamp> allDataReaderList;
Expand Down

0 comments on commit abbd58f

Please sign in to comment.