Skip to content

Commit

Permalink
Fix deadlock with Jet classloader and Compact GenericRecord [HZ-3065]…
Browse files Browse the repository at this point in the history
… [5.3.z] (#25459)

This PR fixes usage of `computeIfAbsent` in `CompactStreamSerializer`
for complex classloaders (like `JetClassLoader`) which may invoke
operations on different threads or members and lead to deadlock.

Jet job classloader uses IMap to store classes.
CompactStreamSerializer checks if the schema name represents existing
class name by loading the class before registering the mapping.
`CompactStreamSerializer` used to load a class inside
`ConcurrentHashMap.computeIfAbsent` invocation which could lead to
deadlock, for example in scenario when at the same time Jet job with
custom classes and IMap.executeOnEntires tried to process data in the
same IMap which were compact `GenericRecord`.

Fixes HZ-3065
Backport of #25379
Slack report:
https://hazelcast.slack.com/archives/C034YQBTG/p1692628725643179
  • Loading branch information
k-jamroz committed Sep 18, 2023
1 parent 4fa8055 commit 33467ff
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.FieldKind;
import com.hazelcast.nio.serialization.genericrecord.GenericRecord;
import com.hazelcast.nio.serialization.HazelcastSerializationException;
import com.hazelcast.nio.serialization.StreamSerializer;
import com.hazelcast.nio.serialization.compact.CompactSerializer;
import com.hazelcast.nio.serialization.genericrecord.GenericRecord;

import javax.annotation.Nonnull;
import java.io.IOException;
Expand Down Expand Up @@ -222,24 +222,38 @@ private CompactSerializableRegistration getOrCreateRegistration(Class clazz) {
}

private CompactSerializableRegistration getOrCreateRegistration(String typeName) {
return typeNameToRegistrationMap.computeIfAbsent(typeName, s -> {
Class<?> clazz;
try {
// When the registration does not exist, we treat typeName as className
// to check if there is a class with the given name in the classpath.
clazz = ClassLoaderUtil.loadClass(classLoader, typeName);
} catch (Exception e) {
// There is no such class that has typeName as its name.
// We should try to read this as GenericRecord. We are
// returning this registration here to remember that we
// should read instances of this typeName as GenericRecords,
// instead of trying to load a class with that name over
// and over.
return CompactSerializableRegistration.GENERIC_RECORD_REGISTRATION;
}
CompactSerializableRegistration currentRegistration = typeNameToRegistrationMap.get(typeName);
if (currentRegistration != null) {
return currentRegistration;
}
// Execute potentially long-lasting operation outside CHM lock in computeIfAbsent.
// Some special classloaders (eg. JetClassLoader) may try to access external resources
// and require other threads.
// We might try to load the same class multiple times in parallel but this is not a problem.
CompactSerializableRegistration newRegistration = getOrCreateRegistration0(typeName);

// Registration might have been created by a concurrent thread.
// If so, use that one instead.
return typeNameToRegistrationMap.computeIfAbsent(typeName, k -> newRegistration);
}

return getOrCreateRegistration(clazz);
});
private CompactSerializableRegistration getOrCreateRegistration0(String typeName) {
Class<?> clazz;
try {
// When the registration does not exist, we treat typeName as className
// to check if there is a class with the given name in the classpath.
clazz = ClassLoaderUtil.loadClass(classLoader, typeName);
} catch (Exception e) {
// There is no such class that has typeName as its name.
// We should try to read this as GenericRecord. We are
// returning this registration here to remember that we
// should read instances of this typeName as GenericRecords,
// instead of trying to load a class with that name over
// and over.
return CompactSerializableRegistration.GENERIC_RECORD_REGISTRATION;
}

return getOrCreateRegistration(clazz);
}

private GenericRecord readGenericRecord(BufferObjectDataInput input, Schema schema, boolean schemaIncludedInBinary) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.jet.impl.deployment;

import com.hazelcast.config.Config;
import com.hazelcast.jet.SimpleTestInClusterSupport;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.map.IMap;
import com.hazelcast.nio.serialization.genericrecord.GenericRecordBuilder;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.Repeat;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;

import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.ThreadLocalRandom;


@Category({NightlyTest.class, ParallelJVMTest.class})
public class JetClassloaderCompactGenericRecordTest extends SimpleTestInClusterSupport {

public static final String MAP_NAME = "various_compact";
// test is very fast unless it hits a deadlock,
// deadlock also causes instance shutdown to timeout.
@Rule
public final Timeout timeoutRule = Timeout.seconds(30);

@BeforeClass
public static void beforeClass() {
Config config = smallInstanceConfig();
config.getJetConfig().setResourceUploadEnabled(true);
initializeWithClient(1, config, null);
}

@Repeat(1000)
@Test(timeout = 15_000)
public void whenCompactGenericRecordInImap_thenShouldNotDeadlock() throws Throwable {
// prepare IMap
IMap<Object, Object> map = instance().getMap(MAP_NAME);
map.clear();
for (int i = 0; i < 100; ++i) {
// randomness increases likelihood of deadlock
map.put("key" + ThreadLocalRandom.current().nextInt(),
GenericRecordBuilder.compact("key" + ThreadLocalRandom.current().nextInt())
.setString("hello", "world" + i)
.build());
}

// prepare Jet job with custom classes so JetClassLoader is created.
// The class does not have to be used in the job.
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.map(MAP_NAME))
// logger triggers deserialization of Compact GenericRecord
.writeTo(Sinks.logger())
.getPipeline();

JobConfig jobConfig = new JobConfig();
URL classUrl = new File(AbstractDeploymentTest.CLASS_DIRECTORY).toURI().toURL();
URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{classUrl}, null);
Class<?> appearance = urlClassLoader.loadClass("com.sample.pojo.person.Person$Appereance");
jobConfig.addClass(appearance);

// prepare entry processor
Thread asyncExecuteOnEntries = new Thread(() ->
map.executeOnEntries(e -> {
// Give some time for Jet job to start.
// We want this entry processor to be executed in parallel with Jet job execution.
sleepMillis(1);
return e.getValue().toString();
}));

// execute in parallel
asyncExecuteOnEntries.start();
client().getJet().newJob(pipeline, jobConfig).join();
asyncExecuteOnEntries.join();
}
}

0 comments on commit 33467ff

Please sign in to comment.