From 11df84b210a88a3c078db218a10152752a5a8e97 Mon Sep 17 00:00:00 2001 From: kunal642 Date: Wed, 3 Jul 2019 10:54:08 +0530 Subject: [PATCH] Fixed EOFException in CarbonScanRDD --- .../core/indexstore/ExtendedBlocklet.java | 1 - .../carbondata/hadoop/CarbonInputSplit.java | 52 +++++++++---------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java index d97148d15d0..a85423babed 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java @@ -177,7 +177,6 @@ public void serializeData(DataOutput out, Map uniqueLocation) DataOutputStream dos = new DataOutputStream(ebos); inputSplit.setFilePath(null); inputSplit.setBucketId(null); - inputSplit.setWriteDeleteDelta(false); if (inputSplit.isBlockCache()) { inputSplit.updateFooteroffset(); inputSplit.updateBlockLength(); diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index da1bc2c6c4a..edbfcfe6e9b 100644 --- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.carbondata.hadoop; import java.io.ByteArrayInputStream; @@ -150,8 +151,6 @@ public class CarbonInputSplit extends FileSplit */ private int rowCount; - private boolean writeDeleteDelta = true; - public CarbonInputSplit() { segment = null; taskId = "0"; @@ -195,7 +194,13 @@ public CarbonInputSplit(int serializeLen, DataInput in, String filePath, String[ this.version = ColumnarFormatVersion.valueOf(in.readShort()); // will be removed after count(*) optmization in case of index server this.rowCount = in.readInt(); - this.writeDeleteDelta = in.readBoolean(); + if (in.readBoolean()) { + int numberOfDeleteDeltaFiles = in.readInt(); + deleteDeltaFiles = new String[numberOfDeleteDeltaFiles]; + for (int i = 0; i < numberOfDeleteDeltaFiles; i++) { + deleteDeltaFiles[i] = in.readUTF(); + } + } // after deseralizing required field get the start position of field which will be only used // in executor int leftoverPosition = underlineStream.getPosition(); @@ -359,7 +364,13 @@ public Segment getSegment() { this.length = in.readLong(); this.version = ColumnarFormatVersion.valueOf(in.readShort()); this.rowCount = in.readInt(); - this.writeDeleteDelta = in.readBoolean(); + if (in.readBoolean()) { + int numberOfDeleteDeltaFiles = in.readInt(); + deleteDeltaFiles = new String[numberOfDeleteDeltaFiles]; + for (int i = 0; i < numberOfDeleteDeltaFiles; i++) { + deleteDeltaFiles[i] = in.readUTF(); + } + } this.bucketId = in.readUTF(); } this.blockletId = in.readUTF(); @@ -379,13 +390,6 @@ public Segment getSegment() { validBlockletIds.add((int) in.readShort()); } this.isLegacyStore = in.readBoolean(); - if (writeDeleteDelta) { - int numberOfDeleteDeltaFiles = in.readInt(); - deleteDeltaFiles = new String[numberOfDeleteDeltaFiles]; - for (int i = 0; i < numberOfDeleteDeltaFiles; i++) { - deleteDeltaFiles[i] = in.readUTF(); - } - } } @Override public void write(DataOutput out) throws IOException { @@ -397,11 +401,10 @@ public Segment getSegment() { out.writeLong(length); out.writeShort(version.number()); out.writeInt(rowCount); - out.writeBoolean(writeDeleteDelta); + writeDeleteDeltaFile(out); out.writeUTF(bucketId); out.writeUTF(blockletId); out.write(serializeData, offset, actualLen); - writeDeleteDeltaFile(out); return; } // please refer writeDetailInfo doc @@ -419,7 +422,7 @@ public Segment getSegment() { } else { out.writeInt(0); } - out.writeBoolean(writeDeleteDelta); + writeDeleteDeltaFile(out); if (null != bucketId) { out.writeUTF(bucketId); } @@ -442,18 +445,19 @@ public Segment getSegment() { out.writeShort(blockletId); } out.writeBoolean(isLegacyStore); - writeDeleteDeltaFile(out); } private void writeDeleteDeltaFile(DataOutput out) throws IOException { - if (!writeDeleteDelta) { - return; - } - out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0); - if (null != deleteDeltaFiles) { - for (int i = 0; i < deleteDeltaFiles.length; i++) { - out.writeUTF(deleteDeltaFiles[i]); + if (deleteDeltaFiles != null) { + out.writeBoolean(true); + out.writeInt(deleteDeltaFiles.length); + if (null != deleteDeltaFiles) { + for (int i = 0; i < deleteDeltaFiles.length; i++) { + out.writeUTF(deleteDeltaFiles[i]); + } } + } else { + out.writeBoolean(false); } } @@ -586,7 +590,6 @@ public String[] getDeleteDeltaFiles() { } public void setDeleteDeltaFiles(String[] deleteDeltaFiles) { - this.writeDeleteDelta = true; this.deleteDeltaFiles = deleteDeltaFiles; } @@ -879,7 +882,4 @@ public void setBucketId(String bucketId) { this.bucketId = bucketId; } - public void setWriteDeleteDelta(boolean writeDeleteDelta) { - this.writeDeleteDelta = writeDeleteDelta; - } }