Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink/Azure job graph serialization fails when used with storage account shared key authentication #10245

Open
ms1111 opened this issue Apr 28, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@ms1111
Copy link

ms1111 commented Apr 28, 2024

Apache Iceberg version

1.5.1 (latest release)

Query engine

Flink

Please describe the bug 馃悶

ADLSFileIO has an AzureProperties object. When ADLS_SHARED_KEY_ACCOUNT_NAME or ADLS_SHARED_KEY_ACCOUNT_KEY are set, AzureProperties creates a StorageSharedKeyCredential in its constructor. StorageSharedKeyCredential is not Serializable, so serialization fails during job startup.

If the storage account key is not supplied, DefaultAzureCredential will try to get credentials from the Azure CLI or another source like workload identity. That appears to work, but some environments may require shared key authentication.

The serialization error is below:

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF.
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:322)
	... 13 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF.
	at org.apache.flink.streaming.api.graph.StreamConfig.lambda$serializeAllConfigs$1(StreamConfig.java:203)
	at java.base/java.util.HashMap.forEach(HashMap.java:1421)
	at org.apache.flink.streaming.api.graph.StreamConfig.serializeAllConfigs(StreamConfig.java:197)
	at org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:174)
	at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.NotSerializableException: com.azure.storage.common.StorageSharedKeyCredential

Sample app to trigger it below.

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.azure.adlsv2.ADLSFileIO;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;

/**
 * Run with environment variables:
 * <ul>
 *     <li>STORAGE_ACCOUNT=storage account name</li>
 *     <li>STORAGE_ACCOUNT_KEY=key</li>
 *     <li>CONTAINER=name of storage container</li>
 * </ul>
 */
public class ADLSSharedKeyAuthIssue {
    public static void main(String[] args) throws Exception {
        final String storageAccount = System.getenv("STORAGE_ACCOUNT");
        final String storageAccountKey = System.getenv("STORAGE_ACCOUNT_KEY");
        final String container = System.getenv("CONTAINER");

        Map<String, String> options = new HashMap<>();
        options.put("warehouse", "abfss://" + container + "@" + storageAccount + ".dfs.core.windows.net");
        options.put("uri", "http://localhost:19120/api/v1");
        options.put(CatalogProperties.FILE_IO_IMPL, ADLSFileIO.class.getCanonicalName());
        options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_NAME, storageAccount);
        options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_KEY, storageAccountKey);

        CatalogLoader catalogLoader = CatalogLoader.custom(
                "flink",
                options,
                new Configuration(),
                CatalogUtil.ICEBERG_CATALOG_NESSIE);

        Catalog catalog = catalogLoader.loadCatalog();

        Schema schema = new Schema(
                Types.NestedField.required(1, "id", Types.LongType.get()));

        PartitionSpec spec = PartitionSpec.builderFor(schema).build();
        Namespace namespace = Namespace.of("nsname_" + UUID.randomUUID().toString().substring(0, 4));
        ((SupportsNamespaces) catalog).createNamespace(namespace);
        TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "t1");
        Table table = catalog.createTable(tableIdentifier, schema, spec);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Row> source = env.fromElements(1)
                .map(data -> {
                    Row row = new Row(1);
                    row.setField(0, data);
                    return row;
                });

        FlinkSink.forRow(source, FlinkSchemaUtil.toSchema(schema))
                .tableLoader(TableLoader.fromCatalog(catalogLoader, tableIdentifier))
                .overwrite(true)
                .append();

        env.execute();
    }
}

POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>adls-shared-key-auth-issue</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <iceberg.version>1.5.1</iceberg.version>
        <flink.version>1.18.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-core</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-api</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-parquet</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <!-- Needed for to load the nessie catalog  -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-nessie</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-azure</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-data</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
        </dependency>

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-storage-file-datalake</artifactId>
        </dependency>

        <!--  to be able to create parquet file       -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.3</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-column</artifactId>
            <version>1.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-1.18</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-dropwizard</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.azure</groupId>
                <artifactId>azure-sdk-bom</artifactId>
                <version>1.2.22</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>
@ms1111 ms1111 added the bug Something isn't working label Apr 28, 2024
@nastra
Copy link
Contributor

nastra commented Apr 29, 2024

This will be fixed by #10045

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants