Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation #9413

Merged

Conversation

jerrypeng
Copy link
Contributor

Motivation

Currently, when a user submits a built-in source or sink to be run, the locally stored NAR file of the built-in source and sink is unpacked or MD5 checksums calculated and loaded into a classloader up to 5 times during the startup and execution process. This is redundant and unnecessary waste of resources. We should just unpack one and cache the results. I have noticed multi-second instance start up latencies because of this.

Modifications

  1. Refactored the code for function instance startup
  2. The initial scan for connectors also caches classloaders, connector definitions, etc so that they can be reused later on
  3. Fix a bug involving updating certain fields (nested) in for built-in Sources and Sinks that get incorrectly reject by worker claiming there is no change.

Verifying this change

Add multiple tests for built-in sources and sinks

@jerrypeng jerrypeng added this to the 2.8.0 milestone Feb 2, 2021
@jerrypeng jerrypeng self-assigned this Feb 2, 2021
@sijie
Copy link
Member

sijie commented Feb 2, 2021

@freeznet @nlu90 Can you review this pull request?

@jerrypeng jerrypeng changed the title Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation WIP Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation Feb 2, 2021
@jerrypeng jerrypeng changed the title WIP Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation Feb 2, 2021
@jerrypeng
Copy link
Contributor Author

/pulsarbot run-failure-checks

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great work !
I left some minor comments


@Test
@Test(timeOut = 20000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we need these timeouts,
if the test breaks then it is not likely that other tests will run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The individual timeout is in place so we don't just block for 1.5-2 hour which is the absolute test timeout and consume resources for that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We centrally set 300 seconds timeouts for all the tests that don't have a timeout

@@ -354,6 +377,8 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

workerConfig.setConnectorsDirectory(Files.createTempDirectory("test").toFile().getAbsolutePath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about naming the directory "tempconnectorsdir" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

try {
sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
sinkClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) sinkClassLoader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are blindly casting to NarClassLoader can we change the type of the argument ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sinkClassLoader may not be an instance of NarClassLoader. In the line of code, if the sinkClassName is not specified, we are only trying to see if we can find a className specified in the NAR.

@david-streamlio
Copy link
Contributor

LGTM

Copy link
Member

@nlu90 nlu90 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except some minor comments

public static List<ConfigFieldDefinition> getConnectorConfigDefinition(String narPath,
String configClassName,
String narExtractionDirectory) throws Exception {
public static List<ConfigFieldDefinition> getConnectorConfigDefinition(ClassLoader narClassLoader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean NarClassLoader narClassLoader here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a NarClassLoader. A ClassLoader should suffice. I will rename the variable

throws IOException, ClassNotFoundException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet(), workerConfig.getNarExtractionDirectory())) {
String typeArg = getSourceType(className, ncl).getName();
private void fillSourceTypeClass(FunctionDetails.Builder functionDetails,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method and the following fillSinkTypeClass method looks very similar, wondering if we could implement them into one single method to reduce code duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how much code we can save here because fillSourceTypeClass() sets the typeClassName for the source and checks if a sinkSpec exists and set the typeClassName for that as well. The logic for fillSinkTypeClass() is the reverse.

Also, since this refactor is already large, I don't want to make more changes in the code. If we want to refactor more, let's do it in a subsequent PR.

}

} else {
// if connector class name is provided, we need to try to load it as a JAR and as a NAR.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/and/or

File packageFile,
String narExtractionDirectory) {
String connectorClassName = className;
ClassLoader classLoader;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having this variable and return it at the end. I suggest return the classLoader at the end of each if/else clause. Easier to read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

if (jarClassLoaderException != null) {
errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to add some linebreak or delimiter between these two error message for better reading?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a space

@jerrypeng
Copy link
Contributor Author

/pulsarbot run-failure-checks

@jerrypeng jerrypeng force-pushed the optimize_builtin_connector_startup branch from dd2082a to a5b7f41 Compare February 4, 2021 23:45
@jerrypeng jerrypeng merged commit 1ff1fcd into apache:master Feb 5, 2021
merlimat pushed a commit to merlimat/pulsar that referenced this pull request Apr 6, 2021
…packing and checksum calculation (apache#9413)

Co-authored-by: Jerry Peng <jerryp@splunk.com>
lhotari added a commit to lhotari/datastax-pulsar-helm-chart that referenced this pull request Nov 25, 2021
@lhotari
Copy link
Member

lhotari commented Nov 25, 2021

I reported an issue #12974 since Functions worker now consume more resident memory after these changes.

lhotari added a commit to datastax/pulsar-helm-chart that referenced this pull request Nov 25, 2021
* Reduce CI memory usage

* Limit MaxDirectMemorySize to 256m on bookkeeper, broker and function worker

* Reduce limits even further because of 143 exit code

* Add logging to loading of images to kind

* Use single worker node for CI kind cluster

* Use datastax/lunastreaming image for functions worker to reduce memory usage

- Pulsar loads all nar files to memory after apache/pulsar#9413

* Add solution for skipping yq installation (for local development)

* Bump k8s version used in kind

* Add instructions how to run locally

* Allow less diskspace for CI test run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants