Skip to content

Commit

Permalink
Encode subject names for Maven download plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Nov 7, 2023
1 parent 43df8bb commit d81d5a5
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 14 deletions.
Expand Up @@ -22,6 +22,7 @@

import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.net.URLEncoder;
import org.apache.maven.plugin.MojoExecutionException;
import org.apache.maven.plugin.MojoFailureException;
import org.apache.maven.plugins.annotations.Mojo;
Expand All @@ -46,6 +47,8 @@
@Mojo(name = "download")
public class DownloadSchemaRegistryMojo extends SchemaRegistryMojo {

public static final String PERCENT_REPLACEMENT = "_x";

@Parameter(required = false)
String schemaExtension;

Expand All @@ -55,6 +58,9 @@ public class DownloadSchemaRegistryMojo extends SchemaRegistryMojo {
@Parameter(required = true)
File outputDirectory;

@Parameter(required = false)
boolean encodeSubject = true;

Map<String, ParsedSchema> downloadSchemas(Collection<String> subjects)
throws MojoExecutionException {
Map<String, ParsedSchema> results = new LinkedHashMap<>();
Expand Down Expand Up @@ -157,11 +163,13 @@ public void execute() throws MojoExecutionException, MojoFailureException {
Map<String, ParsedSchema> subjectToSchema = downloadSchemas(subjectsToDownload);

for (Map.Entry<String, ParsedSchema> kvp : subjectToSchema.entrySet()) {
String fileName = String.format("%s%s", kvp.getKey(), getExtension(kvp.getValue()));
String subject = kvp.getKey();
String encodedSubject = encodeSubject ? encode(subject) : subject;
String fileName = String.format("%s%s", encodedSubject, getExtension(kvp.getValue()));
File outputFile = new File(this.outputDirectory, fileName);

getLog().info(
String.format("Writing schema for Subject(%s) to %s.", kvp.getKey(), outputFile)
String.format("Writing schema for Subject(%s) to %s.", subject, outputFile)
);

try (OutputStreamWriter writer = new OutputStreamWriter(
Expand All @@ -170,7 +178,7 @@ public void execute() throws MojoExecutionException, MojoFailureException {
writer.write(kvp.getValue().toString());
} catch (Exception ex) {
throw new MojoExecutionException(
String.format("Exception thrown while writing subject('%s') schema to %s", kvp.getKey(),
String.format("Exception thrown while writing subject('%s') schema to %s", subject,
outputFile),
ex
);
Expand Down Expand Up @@ -198,4 +206,13 @@ private String getExtension(ParsedSchema parsedSchema) {
return ".txt";
}
}
}

protected static String encode(String subject) {
try {
String newSubject = URLEncoder.encode(subject, "UTF-8");
return newSubject.replaceAll("%", PERCENT_REPLACEMENT);
} catch (Exception e) {
return subject;
}
}
}
Expand Up @@ -15,9 +15,11 @@
*/
package io.confluent.kafka.schemaregistry.maven;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -32,16 +34,14 @@ public class DownloadSchemaRegistryMojoTest extends SchemaRegistryTest {
DownloadSchemaRegistryMojo mojo;

@Before
public void createMojo() {
public void createMojo() throws IOException {
this.mojo = new DownloadSchemaRegistryMojo();
this.mojo.client(new MockSchemaRegistryClient());
this.mojo.outputDirectory = tempDirectory;
}

@Test
public void specificSubjects() throws IOException, RestClientException {
int version = 1;

List<File> files = new ArrayList<>();
public void specificSubjects() throws Exception {
this.mojo.subjectPatterns.clear();

for (int i = 0; i < 100; i++) {
Expand All @@ -51,16 +51,62 @@ public void specificSubjects() throws IOException, RestClientException {
Schema valueSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)));
this.mojo.client().register(keySubject, new AvroSchema(keySchema));
this.mojo.client().register(valueSubject, new AvroSchema(valueSchema));

if (i % 10 == 0) {
String subjectPattern = String.format("^TestSubject%03d-(key|value)$", i);
this.mojo.subjectPatterns.add(subjectPattern);
}
}

this.mojo.execute();

for (int i = 0; i < 100; i++) {
String keySubject = String.format("TestSubject%03d-key", i);
String valueSubject = String.format("TestSubject%03d-value", i);
File keySchemaFile = new File(this.tempDirectory, keySubject + ".avsc");
File valueSchemaFile = new File(this.tempDirectory, valueSubject + ".avsc");
if (i % 10 == 0) {
assertTrue(keySchemaFile.exists());
assertTrue(valueSchemaFile.exists());
} else {
assertFalse(keySchemaFile.exists());
assertFalse(valueSchemaFile.exists());
}
}
}

@Test
public void specificContexts() throws Exception {
this.mojo.subjectPatterns.clear();

for (int i = 0; i < 100; i++) {
String keySubject = String.format(":.ctx:TestSubject%03d-key", i);
String valueSubject = String.format(":.ctx:TestSubject%03d-value", i);
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)));
this.mojo.client().register(keySubject, new AvroSchema(keySchema));
this.mojo.client().register(valueSubject, new AvroSchema(valueSchema));

if (i % 10 == 0) {
String subjectPattern = String.format("^TestSubject%03d-(Key|Value)$", i);
files.add(keySchemaFile);
files.add(valueSchemaFile);
String subjectPattern = String.format("^:.ctx:TestSubject%03d-(key|value)$", i);
this.mojo.subjectPatterns.add(subjectPattern);
}
}
}

this.mojo.execute();

for (int i = 0; i < 100; i++) {
String keySubject = String.format("_x3A.ctx_x3ATestSubject%03d-key", i);
String valueSubject = String.format("_x3A.ctx_x3ATestSubject%03d-value", i);
File keySchemaFile = new File(this.tempDirectory, keySubject + ".avsc");
File valueSchemaFile = new File(this.tempDirectory, valueSubject + ".avsc");
if (i % 10 == 0) {
assertTrue(keySchemaFile.exists());
assertTrue(valueSchemaFile.exists());
} else {
assertFalse(keySchemaFile.exists());
assertFalse(valueSchemaFile.exists());
}
}
}
}

0 comments on commit d81d5a5

Please sign in to comment.