Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@
public class FileSystems {

public static void initialize(Path path, Configuration configuration) {
// install if there is kerberos keytab
SecurityConfiguration securityConf = new SecurityConfiguration(configuration);
if (securityConf.getKeytab() != null) {
try {
SecurityUtils.install(securityConf);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

// 1. Try to load file system
try {
// check can obtain
Expand All @@ -63,12 +73,6 @@ public <P> Iterator<P> load(Class<P> service) {
return (Iterator<P>) discoverFactories().iterator();
}
});

try {
SecurityUtils.install(new SecurityConfiguration(configuration));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static List<FileSystemFactory> discoverFactories() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.store.utils;

/**
* Similar to a {@link Runnable}, this interface is used to capture a block of code to be executed.
* In contrast to {@code Runnable}, this interface allows throwing checked exceptions.
*/
@FunctionalInterface
public interface RunnableWithException {

/**
* The work method.
*
* @throws Exception Exceptions may be thrown.
*/
void run() throws Exception;
}
33 changes: 16 additions & 17 deletions flink-table-store-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,10 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-store-format</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
Expand All @@ -111,6 +94,22 @@ under the License.
</exclusions>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-store-format</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
package org.apache.flink.table.store.file.catalog;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.flink.table.store.CatalogOptions.METASTORE;
Expand Down Expand Up @@ -87,8 +90,13 @@ static Catalog createCatalog(Configuration options, ClassLoader classLoader) {
.collect(Collectors.joining("\n")));
}

Path warehousePath = new Path(warehouse);

// initialize to ensure filesystem and kerberos, kerberos is not only for filesystem, but
// also for HiveCatalog.
FileSystems.initialize(warehousePath, options);

try {
Path warehousePath = new Path(warehouse);
FileSystem fs = warehousePath.getFileSystem();
if (fs.exists(warehousePath)) {
checkArgument(
Expand All @@ -103,6 +111,11 @@ static Catalog createCatalog(Configuration options, ClassLoader classLoader) {
throw new UncheckedIOException(e);
}

return factories.get(0).create(warehouse, options);
Supplier<Catalog> supplier = () -> factories.get(0).create(warehouse, options);
if (options.contains(SecurityOptions.KERBEROS_LOGIN_KEYTAB)) {
return new UGIWrappedCatalog(supplier);
} else {
return supplier.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.store.file.catalog;

import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.Table;
import org.apache.flink.table.store.utils.RunnableWithException;

import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

/** A {@link Catalog} wrapped hadoop {@link UserGroupInformation}. */
public class UGIWrappedCatalog implements Catalog {

private final Catalog catalog;

public UGIWrappedCatalog(Supplier<Catalog> supplier) {
this.catalog = doAsUgiCatchEx(supplier::get);
}

@Override
public Optional<CatalogLock.Factory> lockFactory() {
return doAsUgiCatchEx(catalog::lockFactory);
}

@Override
public List<String> listDatabases() {
return doAsUgiCatchEx(catalog::listDatabases);
}

@Override
public boolean databaseExists(String databaseName) {
return doAsUgiCatchEx(() -> catalog.databaseExists(databaseName));
}

@Override
public void createDatabase(String name, boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
try {
doAsUgi(() -> catalog.createDatabase(name, ignoreIfExists));
} catch (Exception e) {
if (e instanceof DatabaseAlreadyExistException) {
throw (DatabaseAlreadyExistException) e;
}
throw new RuntimeException(e);
}
}

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
try {
doAsUgi(() -> catalog.dropDatabase(name, ignoreIfNotExists, cascade));
} catch (Exception e) {
if (e instanceof DatabaseNotExistException) {
throw (DatabaseNotExistException) e;
}
if (e instanceof DatabaseNotEmptyException) {
throw (DatabaseNotEmptyException) e;
}
throw new RuntimeException(e);
}
}

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
try {
return doAsUgi(() -> catalog.listTables(databaseName));
} catch (Exception e) {
if (e instanceof DatabaseNotExistException) {
throw (DatabaseNotExistException) e;
}
throw new RuntimeException(e);
}
}

@Override
public Path getTableLocation(ObjectPath tablePath) {
return doAsUgiCatchEx(() -> catalog.getTableLocation(tablePath));
}

@Override
public TableSchema getTableSchema(ObjectPath tablePath) throws TableNotExistException {
try {
return doAsUgi(() -> catalog.getTableSchema(tablePath));
} catch (Exception e) {
if (e instanceof TableNotExistException) {
throw (TableNotExistException) e;
}
throw new RuntimeException(e);
}
}

@Override
public Table getTable(ObjectPath tablePath) throws TableNotExistException {
try {
return doAsUgi(() -> catalog.getTable(tablePath));
} catch (Exception e) {
if (e instanceof TableNotExistException) {
throw (TableNotExistException) e;
}
throw new RuntimeException(e);
}
}

@Override
public boolean tableExists(ObjectPath tablePath) {
return doAsUgiCatchEx(() -> catalog.tableExists(tablePath));
}

@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException {
try {
doAsUgi(() -> catalog.dropTable(tablePath, ignoreIfNotExists));
} catch (Exception e) {
if (e instanceof TableNotExistException) {
throw (TableNotExistException) e;
}
throw new RuntimeException(e);
}
}

@Override
public void createTable(ObjectPath tablePath, UpdateSchema tableSchema, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
try {
doAsUgi(() -> catalog.createTable(tablePath, tableSchema, ignoreIfExists));
} catch (Exception e) {
if (e instanceof TableAlreadyExistException) {
throw (TableAlreadyExistException) e;
}
if (e instanceof DatabaseNotExistException) {
throw (DatabaseNotExistException) e;
}
throw new RuntimeException(e);
}
}

@Override
public void alterTable(
ObjectPath tablePath, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException {
try {
doAsUgi(() -> catalog.alterTable(tablePath, changes, ignoreIfNotExists));
} catch (Exception e) {
if (e instanceof TableNotExistException) {
throw (TableNotExistException) e;
}
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {
doAsUgi(catalog::close);
}

private static void doAsUgi(RunnableWithException action) throws Exception {
doAsUgi(
(PrivilegedExceptionAction<Void>)
() -> {
action.run();
return null;
});
}

private static <T> T doAsUgiCatchEx(PrivilegedExceptionAction<T> action) {
try {
return doAsUgi(action);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static <T> T doAsUgi(PrivilegedExceptionAction<T> action) throws Exception {
try {
return UserGroupInformation.getLoginUser().doAs(action);
} catch (UndeclaredThrowableException undeclared) {
Throwable throwable = undeclared.getUndeclaredThrowable();
if (throwable instanceof Exception) {
throw (Exception) throwable;
} else {
throw new RuntimeException(throwable);
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Loading