Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration conf
* @throws Exception the user can throw errors if necessary
*/
default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
for (final TabletInsertionEvent tabletInsertionEvent :
tsFileInsertionEvent.toTabletInsertionEvents()) {
transfer(tabletInsertionEvent);
try {
for (final TabletInsertionEvent tabletInsertionEvent :
tsFileInsertionEvent.toTabletInsertionEvents()) {
transfer(tabletInsertionEvent);
}
} finally {
tsFileInsertionEvent.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,13 @@ void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventColl
*/
default void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
throws Exception {
for (final TabletInsertionEvent tabletInsertionEvent :
tsFileInsertionEvent.toTabletInsertionEvents()) {
process(tabletInsertionEvent, eventCollector);
try {
for (final TabletInsertionEvent tabletInsertionEvent :
tsFileInsertionEvent.toTabletInsertionEvents()) {
process(tabletInsertionEvent, eventCollector);
}
} finally {
tsFileInsertionEvent.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* {@link TsFileInsertionEvent} is used to define the event of writing TsFile. Event data stores in
* disks, which is compressed and encoded, and requires IO cost for computational processing.
*/
public interface TsFileInsertionEvent extends Event {
public interface TsFileInsertionEvent extends Event, AutoCloseable {

/**
* The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,12 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception
}

if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
transfer(event);
try {
for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
transfer(event);
}
} finally {
tsFileInsertionEvent.close();
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,12 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception
}

if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
transfer(event);
try {
for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
transfer(event);
}
} finally {
tsFileInsertionEvent.close();
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,14 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception
tsFileInsertionEvent);
return;
}
for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent) event).increaseReferenceCount(WebSocketConnector.class.getName());
server.addEvent(new Pair<>(commitId, event));
try {
for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent) event).increaseReferenceCount(WebSocketConnector.class.getName());
server.addEvent(new Pair<>(commitId, event));
}
} finally {
tsFileInsertionEvent.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,32 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
}
return dataContainer.toTabletInsertionEvents();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
close();

final String errorMsg =
String.format(
"Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath());
LOGGER.warn(errorMsg, e);
Thread.currentThread().interrupt();
throw new PipeException(errorMsg);
} catch (IOException e) {
close();

final String errorMsg = String.format("Read TsFile %s error.", resource.getTsFilePath());
LOGGER.warn(errorMsg, e);
throw new PipeException(errorMsg);
}
}

/** Release the resource of data container. */
@Override
public void close() {
if (dataContainer != null) {
dataContainer.close();
dataContainer = null;
}
}

/////////////////////////// Object ///////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
public boolean hasNext() {
while (tabletIterator == null || !tabletIterator.hasNext()) {
if (!deviceMeasurementsMapIterator.hasNext()) {
close();
return false;
}

Expand Down