Skip to content

Commit

Permalink
[fix][broker] fix Update contains no change error when use `--updat…
Browse files Browse the repository at this point in the history
…e-auth-data` flag to update function/sink/source (apache#19450)

Co-authored-by: tison <wander4096@gmail.com>
  • Loading branch information
labuladong and tisonkun committed Jun 13, 2023
1 parent 63f1505 commit 2b92ed1
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ private static void verifyNoTopicClash(Collection<String> inputTopics, String ou
}
}

private static void doCommonChecks(FunctionConfig functionConfig) {
public static void doCommonChecks(FunctionConfig functionConfig) {
if (isEmpty(functionConfig.getTenant())) {
throw new IllegalArgumentException("Function tenant cannot be null");
}
Expand Down Expand Up @@ -890,7 +890,7 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
}
}

private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {
public static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {
List<String> retval = new LinkedList<>();
if (functionConfig.getInputs() != null) {
retval.addAll(functionConfig.getInputs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
return new ExtractedSinkDetails(sinkClassName, typeArg.getName(), functionClassName);
}

private static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
public static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
List<String> retval = new LinkedList<>();
if (sinkConfig.getInputs() != null) {
retval.addAll(sinkConfig.getInputs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ public void updateFunction(final String tenant,
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}

if (existingFunctionConfig.equals(mergedConfig) && isBlank(functionPkgUrl) && uploadedInputStream == null) {
if (existingFunctionConfig.equals(mergedConfig) && isBlank(functionPkgUrl) && uploadedInputStream == null
&& (updateOptions == null || !updateOptions.isUpdateAuthData())) {
log.error("{}/{}/{} Update contains no changes", tenant, namespace, functionName);
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ public void updateSink(final String tenant,
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}

if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null) {
if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null
&& (updateOptions == null || !updateOptions.isUpdateAuthData())) {
log.error("{}/{}/{} Update contains no changes", tenant, namespace, sinkName);
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ public void updateSource(final String tenant,
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}

if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) && uploadedInputStream == null) {
if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) && uploadedInputStream == null
&& (updateOptions == null || !updateOptions.isUpdateAuthData())) {
log.error("{}/{}/{} Update contains no changes", tenant, namespace, sourceName);
throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -57,6 +59,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -604,6 +607,96 @@ public void testUpdateMissingFunctionConfig() {
null, null);
}

@Test
public void testUpdateSourceWithNoChange() throws ClassNotFoundException {
mockWorkerUtils();

FunctionDetails functionDetails = createDefaultFunctionDetails();
NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
mockStatic(FunctionCommon.class, ctx -> {
ctx.when(() -> FunctionCommon.getFunctionTypes(any(FunctionConfig.class), any(Class.class))).thenReturn(new Class[]{String.class, String.class});
ctx.when(() -> FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod();
ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(),any(),any(),any())).thenCallRealMethod();
ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod();
});

doReturn(Function.class).when(mockedClassLoader).loadClass(anyString());

FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
FunctionArchive functionArchive = FunctionArchive.builder()
.classLoader(mockedClassLoader)
.build();
when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive);
when(mockedFunctionsManager.getFunctionArchive(any())).thenReturn(getPulsarApiExamplesNar().toPath());

when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);

// No change on config,
FunctionConfig funcConfig = createDefaultFunctionConfig();
mockStatic(FunctionConfigUtils.class, ctx -> {
ctx.when(() -> FunctionConfigUtils.convertFromDetails(any())).thenReturn(funcConfig);
ctx.when(() -> FunctionConfigUtils.validateUpdate(any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionConfigUtils.convert(any(FunctionConfig.class), any(ClassLoader.class))).thenReturn(functionDetails);
ctx.when(() -> FunctionConfigUtils.convert(any(FunctionConfig.class), any(FunctionConfigUtils.ExtractedFunctionDetails.class))).thenReturn(functionDetails);
ctx.when(() -> FunctionConfigUtils.validateJavaFunction(any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionConfigUtils.doCommonChecks(any())).thenCallRealMethod();
ctx.when(() -> FunctionConfigUtils.collectAllInputTopics(any())).thenCallRealMethod();
ctx.when(() -> FunctionConfigUtils.doJavaChecks(any(), any())).thenCallRealMethod();
});

// config has not changes and don't update auth, should fail
try {
resource.updateFunction(
funcConfig.getTenant(),
funcConfig.getNamespace(),
funcConfig.getName(),
null,
mockedFormData,
null,
funcConfig,
null,
null);
fail("Update without changes should fail");
} catch (RestException e) {
assertTrue(e.getMessage().contains("Update contains no change"));
}

try {
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(false);
resource.updateFunction(
funcConfig.getTenant(),
funcConfig.getNamespace(),
funcConfig.getName(),
null,
mockedFormData,
null,
funcConfig,
null,
updateOptions);
fail("Update without changes should fail");
} catch (RestException e) {
assertTrue(e.getMessage().contains("Update contains no change"));
}

// no changes but set the auth-update flag to true, should not fail
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(true);
resource.updateFunction(
funcConfig.getTenant(),
funcConfig.getNamespace(),
funcConfig.getName(),
null,
mockedFormData,
null,
funcConfig,
null,
updateOptions);
}


private void registerDefaultFunction() {
registerDefaultFunctionWithPackageUrl(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,6 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v3;

import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -60,6 +45,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
Expand Down Expand Up @@ -97,6 +83,23 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

/**
* Unit test of {@link SinksApiV3Resource}.
Expand Down Expand Up @@ -965,29 +968,7 @@ private void testUpdateSinkMissingArguments(
String className,
Integer parallelism,
String expectedError) throws Exception {
mockStatic(ConnectorUtils.class, ctx -> {
ctx.when(() -> ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)))
.thenReturn(CASSANDRA_STRING_SINK);
});

mockStatic(ClassLoaderUtils.class, ctx -> {
});

mockStatic(FunctionCommon.class, ctx -> {
ctx.when(() -> FunctionCommon.createPkgTempFile()).thenCallRealMethod();
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mock(NarClassLoader.class));
ctx.when(() -> FunctionCommon
.convertProcessingGuarantee(eq(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE)))
.thenReturn(ATLEAST_ONCE);
});

this.mockedFunctionMetaData =
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData);

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
mockFunctionCommon(tenant, namespace, sink);

SinkConfig sinkConfig = new SinkConfig();
if (tenant != null) {
Expand Down Expand Up @@ -1026,6 +1007,32 @@ private void testUpdateSinkMissingArguments(

}

private void mockFunctionCommon(String tenant, String namespace, String sink) throws IOException {
mockStatic(ConnectorUtils.class, ctx -> {
ctx.when(() -> ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)))
.thenReturn(CASSANDRA_STRING_SINK);
});

mockStatic(ClassLoaderUtils.class, ctx -> {
});

mockStatic(FunctionCommon.class, ctx -> {
ctx.when(() -> FunctionCommon.createPkgTempFile()).thenCallRealMethod();
ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class);
ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mock(NarClassLoader.class));
ctx.when(() -> FunctionCommon
.convertProcessingGuarantee(eq(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE)))
.thenReturn(ATLEAST_ONCE);
});

this.mockedFunctionMetaData =
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData);

when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
}

private void updateDefaultSink() throws Exception {
updateDefaultSinkWithPackageUrl(null);
}
Expand Down Expand Up @@ -1848,4 +1855,72 @@ public void testRegisterSinkSuccessK8sWithUpload() throws Exception {
}
}
}

@Test
public void testUpdateSinkWithNoChange() throws IOException {
mockWorkerUtils();

// No change on config,
SinkConfig sinkConfig = createDefaultSinkConfig();

mockStatic(SinkConfigUtils.class, ctx -> {
ctx.when(() -> SinkConfigUtils.convertFromDetails(any())).thenReturn(sinkConfig);
ctx.when(() -> SinkConfigUtils.convert(any(), any())).thenCallRealMethod();
ctx.when(() -> SinkConfigUtils.validateUpdate(any(), any())).thenCallRealMethod();
ctx.when(() -> SinkConfigUtils.clone(any())).thenCallRealMethod();
ctx.when(() -> SinkConfigUtils.collectAllInputTopics(any())).thenCallRealMethod();
ctx.when(() -> SinkConfigUtils.validateAndExtractDetails(any(),any(),any(),anyBoolean())).thenCallRealMethod();
});

mockFunctionCommon(sinkConfig.getTenant(), sinkConfig.getNamespace(), sinkConfig.getName());

// config has not changes and don't update auth, should fail
try {
resource.updateSink(
sinkConfig.getTenant(),
sinkConfig.getNamespace(),
sinkConfig.getName(),
null,
mockedFormData,
null,
sinkConfig,
null,
null);
fail("Update without changes should fail");
} catch (RestException e) {
assertTrue(e.getMessage().contains("Update contains no change"));
}

try {
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(false);
resource.updateSink(
sinkConfig.getTenant(),
sinkConfig.getNamespace(),
sinkConfig.getName(),
null,
mockedFormData,
null,
sinkConfig,
null,
updateOptions);
fail("Update without changes should fail");
} catch (RestException e) {
assertTrue(e.getMessage().contains("Update contains no change"));
}

// no changes but set the auth-update flag to true, should not fail
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(true);
resource.updateSink(
sinkConfig.getTenant(),
sinkConfig.getNamespace(),
sinkConfig.getName(),
null,
mockedFormData,
null,
sinkConfig,
null,
updateOptions);
}
}
Loading

0 comments on commit 2b92ed1

Please sign in to comment.