Skip to content

Commit

Permalink
[CARBONDATA-3889] Cleanup code for carbondata-streaming module
Browse files Browse the repository at this point in the history
Why is this PR needed?
need cleanup code in carbondata-streaming module

What changes were proposed in this PR?
Cleanup code in carbondata-streaming module

Does this PR introduce any user interface change?
No
Yes. (please explain the change and update document)

Is any new testcase added?
No
Yes

This closes #3826
  • Loading branch information
QiangCai authored and kevinjmh committed Jul 19, 2020
1 parent 6def83f commit 23e2760
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 155 deletions.
19 changes: 18 additions & 1 deletion streaming/pom.xml
@@ -1,3 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>carbondata-parent</artifactId>
Expand Down Expand Up @@ -95,7 +112,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18</version>
<!-- Note config is repeated in scalatest config -->
<!-- Note config is repeated in scala test config -->
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
Expand Down
Expand Up @@ -82,7 +82,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
private RowParser rowParser;
private BadRecordsLogger badRecordLogger;
private RowConverter converter;
private CarbonRow currentRow = new CarbonRow(null);
private final CarbonRow currentRow = new CarbonRow(null);

// encoder
private DataField[] dataFields;
Expand Down Expand Up @@ -174,7 +174,7 @@ private void initializeAtFirstRow() throws IOException {
if (carbonFile.exists()) {
// if the file is existed, use the append api
outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath);
// get the compressor from the fileheader. In legacy store,
// get the compressor from the file header. In legacy store,
// the compressor name is not set and it use snappy compressor
FileHeader header = new CarbonHeaderReader(filePath).readHeader();
if (header.isSetCompressor_name()) {
Expand Down Expand Up @@ -329,7 +329,7 @@ private void appendBlockletToDataFile() throws IOException {
if (output.getRowIndex() == -1) {
return;
}
output.apppendBlocklet(outputStream);
output.appendBlocklet(outputStream);
outputStream.flush();
if (!isClosed) {
batchMinMaxIndex = StreamSegment.mergeBlockletMinMax(
Expand Down
Expand Up @@ -44,14 +44,14 @@ public class StreamBlockletWriter {
private byte[] buffer;
private int maxSize;
private int maxRowNum;
private int rowSize;
private final int rowSize;
private int count = 0;
private int rowIndex = -1;
private Compressor compressor;
private final Compressor compressor;

private int dimCountWithoutComplex;
private int measureCount;
private DataType[] measureDataTypes;
private final int dimCountWithoutComplex;
private final int measureCount;
private final DataType[] measureDataTypes;

// blocklet level stats
ColumnPageStatsCollector[] dimStatsCollectors;
Expand Down Expand Up @@ -93,11 +93,11 @@ private void initializeStatsCollector() {
}

private void ensureCapacity(int space) {
int newcount = space + count;
if (newcount > buffer.length) {
byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)];
System.arraycopy(buffer, 0, newbuf, 0, count);
buffer = newbuf;
int newCount = space + count;
if (newCount > buffer.length) {
byte[] newBuffer = new byte[Math.max(newCount, buffer.length + rowSize)];
System.arraycopy(buffer, 0, newBuffer, 0, count);
buffer = newBuffer;
}
}

Expand Down Expand Up @@ -212,7 +212,7 @@ BlockletMinMaxIndex generateBlockletMinMax() {
return blockletMinMaxIndex;
}

void apppendBlocklet(DataOutputStream outputStream) throws IOException {
void appendBlocklet(DataOutputStream outputStream) throws IOException {
outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);

BlockletInfo blockletInfo = new BlockletInfo();
Expand Down

0 comments on commit 23e2760

Please sign in to comment.