Skip to content

Commit

Permalink
Merge 11df84b into ebe78dc
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jul 15, 2019
2 parents ebe78dc + 11df84b commit 5012ccc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
inputSplit.setWriteDeleteDelta(false);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.hadoop;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -150,8 +151,6 @@ public class CarbonInputSplit extends FileSplit
*/
private int rowCount;

private boolean writeDeleteDelta = true;

public CarbonInputSplit() {
segment = null;
taskId = "0";
Expand Down Expand Up @@ -195,7 +194,13 @@ public CarbonInputSplit(int serializeLen, DataInput in, String filePath, String[
this.version = ColumnarFormatVersion.valueOf(in.readShort());
// will be removed after count(*) optmization in case of index server
this.rowCount = in.readInt();
this.writeDeleteDelta = in.readBoolean();
if (in.readBoolean()) {
int numberOfDeleteDeltaFiles = in.readInt();
deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
deleteDeltaFiles[i] = in.readUTF();
}
}
// after deseralizing required field get the start position of field which will be only used
// in executor
int leftoverPosition = underlineStream.getPosition();
Expand Down Expand Up @@ -359,7 +364,13 @@ public Segment getSegment() {
this.length = in.readLong();
this.version = ColumnarFormatVersion.valueOf(in.readShort());
this.rowCount = in.readInt();
this.writeDeleteDelta = in.readBoolean();
if (in.readBoolean()) {
int numberOfDeleteDeltaFiles = in.readInt();
deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
deleteDeltaFiles[i] = in.readUTF();
}
}
this.bucketId = in.readUTF();
}
this.blockletId = in.readUTF();
Expand All @@ -379,13 +390,6 @@ public Segment getSegment() {
validBlockletIds.add((int) in.readShort());
}
this.isLegacyStore = in.readBoolean();
if (writeDeleteDelta) {
int numberOfDeleteDeltaFiles = in.readInt();
deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
deleteDeltaFiles[i] = in.readUTF();
}
}
}

@Override public void write(DataOutput out) throws IOException {
Expand All @@ -397,11 +401,10 @@ public Segment getSegment() {
out.writeLong(length);
out.writeShort(version.number());
out.writeInt(rowCount);
out.writeBoolean(writeDeleteDelta);
writeDeleteDeltaFile(out);
out.writeUTF(bucketId);
out.writeUTF(blockletId);
out.write(serializeData, offset, actualLen);
writeDeleteDeltaFile(out);
return;
}
// please refer writeDetailInfo doc
Expand All @@ -419,7 +422,7 @@ public Segment getSegment() {
} else {
out.writeInt(0);
}
out.writeBoolean(writeDeleteDelta);
writeDeleteDeltaFile(out);
if (null != bucketId) {
out.writeUTF(bucketId);
}
Expand All @@ -442,18 +445,19 @@ public Segment getSegment() {
out.writeShort(blockletId);
}
out.writeBoolean(isLegacyStore);
writeDeleteDeltaFile(out);
}

private void writeDeleteDeltaFile(DataOutput out) throws IOException {
if (!writeDeleteDelta) {
return;
}
out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0);
if (null != deleteDeltaFiles) {
for (int i = 0; i < deleteDeltaFiles.length; i++) {
out.writeUTF(deleteDeltaFiles[i]);
if (deleteDeltaFiles != null) {
out.writeBoolean(true);
out.writeInt(deleteDeltaFiles.length);
if (null != deleteDeltaFiles) {
for (int i = 0; i < deleteDeltaFiles.length; i++) {
out.writeUTF(deleteDeltaFiles[i]);
}
}
} else {
out.writeBoolean(false);
}
}

Expand Down Expand Up @@ -586,7 +590,6 @@ public String[] getDeleteDeltaFiles() {
}

public void setDeleteDeltaFiles(String[] deleteDeltaFiles) {
this.writeDeleteDelta = true;
this.deleteDeltaFiles = deleteDeltaFiles;
}

Expand Down Expand Up @@ -879,7 +882,4 @@ public void setBucketId(String bucketId) {
this.bucketId = bucketId;
}

public void setWriteDeleteDelta(boolean writeDeleteDelta) {
this.writeDeleteDelta = writeDeleteDelta;
}
}

0 comments on commit 5012ccc

Please sign in to comment.