Skip to content

Commit

Permalink
Allow to config pulsar client allocator out of memory policy (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
shoothzj authored and eolivelli committed Nov 29, 2021
1 parent 1206034 commit a7a1d1e
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 0 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,7 @@ flexible messaging model and an intuitive client API.</description>
-Dpulsar.allocator.pooled=true
-Dpulsar.allocator.leak_detection=Advanced
-Dpulsar.allocator.exit_on_oom=false
-Dpulsar.allocator.out_of_memory_policy=FallbackToHeap
-Dio.netty.tryReflectionSetAccessible=true
${test.additional.args}
</argLine>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;

/**
Expand All @@ -39,6 +40,7 @@ public class PulsarByteBufAllocator {
public static final String PULSAR_ALLOCATOR_POOLED = "pulsar.allocator.pooled";
public static final String PULSAR_ALLOCATOR_EXIT_ON_OOM = "pulsar.allocator.exit_on_oom";
public static final String PULSAR_ALLOCATOR_LEAK_DETECTION = "pulsar.allocator.leak_detection";
public static final String PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY = "pulsar.allocator.out_of_memory_policy";

public static final ByteBufAllocator DEFAULT;

Expand All @@ -53,6 +55,7 @@ public static void registerOOMListener(Consumer<OutOfMemoryError> listener) {
static {
boolean isPooled = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_POOLED, "true"));
EXIT_ON_OOM = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_EXIT_ON_OOM, "false"));
OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.valueOf(System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY, "FallbackToHeap"));

LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy
.valueOf(System.getProperty(PULSAR_ALLOCATOR_LEAK_DETECTION, "Disabled"));
Expand Down Expand Up @@ -85,6 +88,7 @@ public static void registerOOMListener(Consumer<OutOfMemoryError> listener) {
} else {
builder.poolingPolicy(PoolingPolicy.UnpooledHeap);
}
builder.outOfMemoryPolicy(outOfMemoryPolicy);

DEFAULT = builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.allocator;

import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl;
import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

@PrepareForTest({ByteBufAllocatorImpl.class, ByteBufAllocatorBuilderImpl.class})
@PowerMockIgnore({"javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*"})
@Slf4j
public class PulsarByteBufAllocatorDefaultTest {

@ObjectFactory
public IObjectFactory getObjectFactory() {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}

@Test
public void testDefaultConfig() throws Exception {
final ByteBufAllocatorImpl mockAllocator = PowerMockito.mock(ByteBufAllocatorImpl.class);
PowerMockito.whenNew(ByteBufAllocatorImpl.class).withAnyArguments().thenReturn(mockAllocator);
final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT;
// use the variable, in case the compiler optimization
log.trace("{}", byteBufAllocator);
PowerMockito.verifyNew(ByteBufAllocatorImpl.class).withArguments(Mockito.any(ByteBufAllocator.class), Mockito.any(),
Mockito.eq(PoolingPolicy.PooledDirect), Mockito.any(), Mockito.eq(OutOfMemoryPolicy.FallbackToHeap),
Mockito.any(), Mockito.eq(LeakDetectionPolicy.Advanced));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.allocator;

import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl;
import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

@PrepareForTest({ByteBufAllocatorImpl.class, ByteBufAllocatorBuilderImpl.class})
@PowerMockIgnore({"javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*"})
@Slf4j
public class PulsarByteBufAllocatorOomThrowExceptionTest {

@ObjectFactory
public IObjectFactory getObjectFactory() {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}

@Test
public void testDefaultConfig() throws Exception {
try {
System.setProperty("pulsar.allocator.out_of_memory_policy", "ThrowException");
final ByteBufAllocatorImpl mockAllocator = PowerMockito.mock(ByteBufAllocatorImpl.class);
PowerMockito.whenNew(ByteBufAllocatorImpl.class).withAnyArguments().thenReturn(mockAllocator);
final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT;
// use the variable, in case the compiler optimization
log.trace("{}", byteBufAllocator);
PowerMockito.verifyNew(ByteBufAllocatorImpl.class).withArguments(Mockito.any(ByteBufAllocator.class), Mockito.any(),
Mockito.eq(PoolingPolicy.PooledDirect), Mockito.any(), Mockito.eq(OutOfMemoryPolicy.ThrowException),
Mockito.any(), Mockito.eq(LeakDetectionPolicy.Advanced));
} finally {
System.clearProperty("pulsar.allocator.out_of_memory_policy");
}
}

}

0 comments on commit a7a1d1e

Please sign in to comment.