Skip to content

Commit

Permalink
SPARK-24252: Add v2 data source mix-in for catalog support.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Aug 14, 2018
1 parent b81e303 commit 21acda2
Show file tree
Hide file tree
Showing 9 changed files with 872 additions and 0 deletions.
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import java.util.Collection;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

/**
* Case-insensitive map of string keys to string values.
* <p>
* This is used to pass options to v2 implementations to ensure consistent case insensitivity.
* <p>
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
* keys converted to lower case.
*/
public class CaseInsensitiveStringMap implements Map<String, String> {

public static CaseInsensitiveStringMap empty() {
return new CaseInsensitiveStringMap();
}

private final Map<String, String> delegate;

private CaseInsensitiveStringMap() {
this.delegate = new HashMap<>();
}

@Override
public int size() {
return delegate.size();
}

@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}

@Override
public String get(Object key) {
return delegate.get(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public String put(String key, String value) {
return delegate.put(key.toLowerCase(Locale.ROOT), value);
}

@Override
public String remove(Object key) {
return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public void putAll(Map<? extends String, ? extends String> m) {
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
delegate.put(entry.getKey().toLowerCase(Locale.ROOT), entry.getValue());
}
}

@Override
public void clear() {
delegate.clear();
}

@Override
public Set<String> keySet() {
return delegate.keySet();
}

@Override
public Collection<String> values() {
return delegate.values();
}

@Override
public Set<Map.Entry<String, String>> entrySet() {
return delegate.entrySet();
}
}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.sql.internal.SQLConf;

/**
* A marker interface to provide a catalog implementation for Spark.
* <p>
* Implementations can provide catalog functions by implementing additional interfaces, like
* {@link TableCatalog} to expose table operations.
* <p>
* Catalog implementations must implement this marker interface to be loaded by
* {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
* required public no-arg constructor. After creating an instance, it will be configured by calling
* {@link #initialize(CaseInsensitiveStringMap)}.
* <p>
* Catalog implementations are registered to a name by adding a configuration option to Spark:
* {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
* in the Spark configuration that share the catalog name prefix,
* {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
* string map of options in initialization with the prefix removed. An additional property,
* {@code name}, is also added to the options and will contain the catalog's name; in this case,
* "catalog-name".
*/
public interface CatalogProvider {
/**
* Called to initialize configuration.
* <p>
* This method is called once, just after the provider is instantiated.
*
* @param options a case-insensitive string map of configuration
*/
void initialize(CaseInsensitiveStringMap options);
}
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.SparkException;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.util.Utils;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static scala.collection.JavaConverters.mapAsJavaMapConverter;

public class Catalogs {
private Catalogs() {
}

/**
* Load and configure a catalog by name.
* <p>
* This loads, instantiates, and initializes the catalog provider for each call; it does not
* cache or reuse instances.
*
* @param name a String catalog name
* @param conf a SQLConf
* @return an initialized CatalogProvider
* @throws SparkException If the provider class cannot be found or instantiated
*/
public static CatalogProvider load(String name, SQLConf conf) throws SparkException {
String providerClassName = conf.getConfString("spark.sql.catalog." + name, null);
if (providerClassName == null) {
throw new SparkException(String.format(
"Catalog '%s' provider not found: spark.sql.catalog.%s is not defined", name, name));
}

ClassLoader loader = Utils.getContextOrSparkClassLoader();

try {
Class<?> providerClass = loader.loadClass(providerClassName);

if (!CatalogProvider.class.isAssignableFrom(providerClass)) {
throw new SparkException(String.format(
"Provider class for catalog '%s' does not implement CatalogProvider: %s",
name, providerClassName));
}

CatalogProvider provider = CatalogProvider.class.cast(providerClass.newInstance());

provider.initialize(catalogOptions(name, conf));

return provider;

} catch (ClassNotFoundException e) {
throw new SparkException(String.format(
"Cannot find catalog provider class for catalog '%s': %s", name, providerClassName));

} catch (IllegalAccessException e) {
throw new SparkException(String.format(
"Failed to call public no-arg constructor for catalog '%s': %s", name, providerClassName),
e);

} catch (InstantiationException e) {
throw new SparkException(String.format(
"Failed while instantiating provider for catalog '%s': %s", name, providerClassName),
e.getCause());
}
}

/**
* Extracts a named catalog's configuration from a SQLConf.
*
* @param name a catalog name
* @param conf a SQLConf
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
*/
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");

CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
Matcher matcher = prefix.matcher(entry.getKey());
if (matcher.matches() && matcher.groupCount() > 0) {
options.put(matcher.group(1), entry.getValue());
}
}

// add name last to ensure it overwrites any conflicting options
options.put("name", name);

return options;
}
}
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.types.StructType;

import java.util.List;
import java.util.Map;

/**
* Represents table metadata from a {@link TableCatalog} or other table sources.
*/
public interface Table {
/**
* Return the table properties.
* @return this table's map of string properties
*/
Map<String, String> properties();

/**
* Return the table schema.
* @return this table's schema as a struct type
*/
StructType schema();

/**
* Return the table partitioning expressions.
* @return this table's partitioning expressions
*/
List<Expression> partitionExpressions();
}

0 comments on commit 21acda2

Please sign in to comment.