5858import org .apache .pulsar .common .naming .TopicName ;
5959import org .apache .pulsar .common .partition .PartitionedTopicMetadata ;
6060import org .apache .pulsar .common .policies .data .ClusterData ;
61+ import org .apache .pulsar .common .policies .data .SchemaCompatibilityStrategy ;
6162import org .apache .pulsar .common .policies .data .TenantInfoImpl ;
6263import org .apache .pulsar .common .policies .data .TopicPolicies ;
6364import org .apache .pulsar .common .policies .data .TopicType ;
@@ -74,6 +75,7 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
7475 protected final String defaultTenant = "public" ;
7576 protected final String replicatedNamespace = defaultTenant + "/default" ;
7677 protected final String nonReplicatedNamespace = defaultTenant + "/ns1" ;
78+ protected final String sourceClusterAlwaysSchemaCompatibleNamespace = defaultTenant + "/always-compatible" ;
7779
7880 protected final String cluster1 = "r1" ;
7981
@@ -167,6 +169,10 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
167169 Sets .newHashSet (cluster1 , cluster2 )));
168170 admin1 .namespaces ().createNamespace (replicatedNamespace , Sets .newHashSet (cluster1 , cluster2 ));
169171 admin1 .namespaces ().createNamespace (nonReplicatedNamespace );
172+ admin1 .namespaces ().createNamespace (
173+ sourceClusterAlwaysSchemaCompatibleNamespace , Sets .newHashSet (cluster1 , cluster2 ));
174+ admin1 .namespaces ().setSchemaCompatibilityStrategy (sourceClusterAlwaysSchemaCompatibleNamespace ,
175+ SchemaCompatibilityStrategy .ALWAYS_COMPATIBLE );
170176
171177 if (!usingGlobalZK ) {
172178 admin2 .clusters ().createCluster (cluster1 , ClusterData .builder ()
@@ -187,6 +193,9 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
187193 Sets .newHashSet (cluster1 , cluster2 )));
188194 admin2 .namespaces ().createNamespace (replicatedNamespace );
189195 admin2 .namespaces ().createNamespace (nonReplicatedNamespace );
196+ admin2 .namespaces ().createNamespace (sourceClusterAlwaysSchemaCompatibleNamespace );
197+ admin2 .namespaces ().setSchemaCompatibilityStrategy (sourceClusterAlwaysSchemaCompatibleNamespace ,
198+ SchemaCompatibilityStrategy .FORWARD );
190199 }
191200
192201 }
0 commit comments