diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java index 0547fe686253..d868f625ffb3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java @@ -19,6 +19,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.util.SafeObjectInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +29,6 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.HashMap; @@ -69,7 +69,7 @@ public synchronized void stop() { @SuppressWarnings("unchecked") private void load() { - try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(file))) { + try (SafeObjectInputStream is = new SafeObjectInputStream(new FileInputStream(file))) { Object obj = is.readObject(); if (!(obj instanceof HashMap)) throw new ConnectException("Expected HashMap but found " + obj.getClass()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java new file mode 100644 index 000000000000..b5c4c2559a4d --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class SafeObjectInputStream extends ObjectInputStream { + + protected static final Set DEFAULT_NO_DESERIALIZE_CLASS_NAMES; + + static { + + Set s = new HashSet<>(); + s.add("org.apache.commons.collections.functors.InvokerTransformer"); + s.add("org.apache.commons.collections.functors.InstantiateTransformer"); + s.add("org.apache.commons.collections4.functors.InvokerTransformer"); + s.add("org.apache.commons.collections4.functors.InstantiateTransformer"); + s.add("org.codehaus.groovy.runtime.ConvertedClosure"); + s.add("org.codehaus.groovy.runtime.MethodClosure"); + s.add("org.springframework.beans.factory.ObjectFactory"); + s.add("com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl"); + s.add("org.apache.xalan.xsltc.trax.TemplatesImpl"); + DEFAULT_NO_DESERIALIZE_CLASS_NAMES = Collections.unmodifiableSet(s); + } + + + public SafeObjectInputStream(InputStream in) throws IOException { + super(in); + } + + @Override + protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + String name = desc.getName(); + + if (isBlacklisted(name)) { + throw new SecurityException("Illegal type to deserialize: prevented for security reasons"); + } + + return super.resolveClass(desc); + } + + private boolean isBlacklisted(String name) { + for (String list : DEFAULT_NO_DESERIALIZE_CLASS_NAMES) { + if (name.endsWith(list)) { + return true; + } + } + + return false; + } +}