Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1207] Clear references to potentially large objects in Fork,… #3052

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -349,6 +349,9 @@ public class ConfigurationKeys {
public static final String DEFAULT_FORK_RECORD_QUEUE_TIMEOUT_UNIT = TimeUnit.MILLISECONDS.name();
public static final String FORK_MAX_WAIT_MININUTES = "fork.max.wait.minutes";
public static final long DEFAULT_FORK_MAX_WAIT_MININUTES = 60;
public static final String FORK_CLOSE_WRITER_ON_COMPLETION = "fork.closeWriterOnCompletion";
public static final boolean DEFAULT_FORK_CLOSE_WRITER_ON_COMPLETION = true;


/**
* Writer configuration properties.
Expand Down
Expand Up @@ -160,6 +160,8 @@ private void getNextFileToRead() throws IOException {
if (this.currentFile != null && this.currentFileItr != null) {
closeCurrentFile();
incrementBytesReadCounter();
// release the reference to allow garbage collection
this.currentFileItr = null;
}

while (!this.hasNext && !this.filesToPull.isEmpty()) {
Expand Down
Expand Up @@ -42,7 +42,7 @@
*/
public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> {

protected final RecordWriter writer;
protected RecordWriter writer;
protected final AtomicLong count = new AtomicLong(0);
// the close method may be invoked multiple times, but the underlying writer only supports close being called once
private boolean closed = false;
Expand Down Expand Up @@ -101,24 +101,25 @@ public long bytesWritten() throws IOException {

@Override
public void close() throws IOException {
// close the underlying writer if not already closed. The close can only be called once for the underlying writer,
// so remember the state
if (!this.closed) {
this.writer.close(false);
this.closed = true;
}

closeInternal();
super.close();
}

@Override
public void commit() throws IOException {
closeInternal();
super.commit();
}

private void closeInternal() throws IOException {
// close the underlying writer if not already closed. The close can only be called once for the underlying writer,
// so remember the state
if (!this.closed) {
this.writer.close(false);
// release reference to allow GC since this writer can hold onto large buffers for some formats like ORC.
this.writer = null;
this.closed = true;
}

super.commit();
}

@Override
Expand Down
Expand Up @@ -248,6 +248,15 @@ public void run() {
compareAndSetForkState(ForkState.PENDING, ForkState.RUNNING);
try {
processRecords();

// Close the writer now if configured. One case where this is set is to release memory from ORC writers that can
// have large buffers. Making this an opt-in option to avoid breaking anything that relies on keeping the writer
// open until commit.
if (this.writer.isPresent() && taskContext.getTaskState().getPropAsBoolean(
ConfigurationKeys.FORK_CLOSE_WRITER_ON_COMPLETION, ConfigurationKeys.DEFAULT_FORK_CLOSE_WRITER_ON_COMPLETION)) {
this.writer.get().close();
}

compareAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED);
} catch (Throwable t) {
// Set throwable to holder first because AsynchronousFork::putRecord can pull the throwable when it detects ForkState.FAILED status.
Expand Down
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.runtime.fork;

import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.runtime.ExecutionModel;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.DataWriterBuilder;
import org.apache.gobblin.writer.RetryWriter;
import org.junit.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


public class ForkTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention in this file doesn't seem to be right. Can you fix it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@DataProvider(name = "closeConfigProvider")
public static Object[][] closeConfigProvider() {
// {close on done, expected count}
return new Object[][]{{"true", 1}, {"false", 0}};
}

@Test(dataProvider = "closeConfigProvider")
public void TestForCloseWriterTrue(String closeConfig, int expectedCloseCount) throws Exception {
WorkUnitState wus = new WorkUnitState();
wus.setProp(ConfigurationKeys.FORK_CLOSE_WRITER_ON_COMPLETION, closeConfig);
wus.setProp(ConfigurationKeys.JOB_ID_KEY, "job2");
wus.setProp(ConfigurationKeys.TASK_ID_KEY, "task1");
wus.setProp(RetryWriter.RETRY_WRITER_ENABLED, "false");
wus.setProp(ConfigurationKeys.WRITER_EAGER_INITIALIZATION_KEY, "true");
wus.setProp(ConfigurationKeys.WRITER_BUILDER_CLASS, DummyDataWriterBuilder.class.getName());

TaskContext taskContext = new TaskContext(wus);
Fork testFork = new TestFork(taskContext, null, 0, 0, ExecutionModel.BATCH);

Assert.assertNotNull(testFork.getWriter());

testFork.run();

Assert.assertEquals(expectedCloseCount, DummyDataWriterBuilder.getWriter().getCloseCount());
}

private static class TestFork extends Fork {

public TestFork(TaskContext taskContext, Object schema, int branches, int index, ExecutionModel executionModel)
throws Exception {
super(taskContext, schema, branches, index, executionModel);
}

@Override
protected void processRecords() throws IOException, DataConversionException {
}

@Override
protected boolean putRecordImpl(Object record) throws InterruptedException {
return true;
}
}

public static class DummyDataWriterBuilder extends DataWriterBuilder<String, Integer> {
private static ThreadLocal<DummyWriter> myThreadLocal = ThreadLocal.withInitial(() -> new DummyWriter());

@Override
public DataWriter<Integer> build() throws IOException {
getWriter().setCloseCount(0);
return getWriter();
}

public static DummyWriter getWriter() {
return myThreadLocal.get();
}
}

private static class DummyWriter implements DataWriter<Integer> {
@Getter
@Setter
private int closeCount = 0;

DummyWriter() {
}

@Override
public void write(Integer record) throws IOException {
}

@Override
public void commit() throws IOException {
}

@Override
public void cleanup() throws IOException {
}

@Override
public long recordsWritten() {
return 0;
}

@Override
public long bytesWritten() throws IOException {
return 0;
}

@Override
public void close() throws IOException {
this.closeCount++;
}
}
}