Skip to content

Commit

Permalink
[FLINK-11838][flink-gs-fs-hadoop] Add utility functions
Browse files Browse the repository at this point in the history
Add some utility functions used by the recoverable writer. Includes unit tests.
  • Loading branch information
galenwarren committed Apr 13, 2021
1 parent f8bf558 commit f2399cd
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs.utils;

import com.google.cloud.storage.BlobId;

import java.net.URI;

/** Utility functions related to blobs. */
public class BlobUtils {

/** The maximum number of blobs that can be composed in a single operation. */
public static final int COMPOSE_MAX_BLOBS = 32;

/**
* Normalizes a blob id, ensuring that the generation is null.
*
* @param blobId The blob id
* @return The blob id with the generation set to null
*/
public static BlobId normalizeBlobId(BlobId blobId) {
return BlobId.of(blobId.getBucket(), blobId.getName());
}

/**
* Parses a blob id from a Google storage uri, i.e. gs://bucket/foo/bar yields a blob with
* bucket name "bucket" and object name "foo/bar".
*
* @param uri The gs uri
* @return The blob id
*/
public static BlobId parseUri(URI uri) {
String finalBucketName = uri.getAuthority();
if (finalBucketName == null) {
throw new IllegalArgumentException(String.format("Bucket name in %s is invalid", uri));
}
String path = uri.getPath();
if (path == null) {
throw new IllegalArgumentException(String.format("Object name in %s is invalid", uri));
}
String finalObjectName = path.substring(1); // remove leading slash from path
if (finalObjectName.isEmpty()) {
throw new IllegalArgumentException(String.format("Object name in %s is invalid", uri));
}
return BlobId.of(finalBucketName, finalObjectName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs.utils;

import org.apache.flink.shaded.guava18.com.google.common.hash.HashFunction;
import org.apache.flink.shaded.guava18.com.google.common.hash.Hashing;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Base64;

/** Utility class related to checksums. */
public class ChecksumUtils {

/** THe crc hash function used by Google storage. */
public static final HashFunction CRC_HASH_FUNCTION = Hashing.crc32c();

/** The encoder use to construct string checksums, as done by Google storage. */
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();

/**
* Converts an int crc32 checksum to the string format used by Google storage, which is the
* base64 string for the int in big-endian format.
*
* @param checksum The int checksum
* @return The string checksum
*/
public static String convertChecksumToString(int checksum) {
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.order(ByteOrder.BIG_ENDIAN);
buffer.putInt(checksum);
return BASE64_ENCODER.encodeToString(buffer.array());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs.utils;

import com.google.cloud.storage.BlobId;
import org.junit.Test;

import java.net.URI;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

/** Test blob utilities. */
public class BlobUtilsTest {

@Test
public void shouldNormalizeBlobId() {
BlobId normalizedBlobId = BlobUtils.normalizeBlobId(BlobId.of("bucket", "foo/bar", 12L));
assertEquals("bucket", normalizedBlobId.getBucket());
assertEquals("foo/bar", normalizedBlobId.getName());
assertNull(normalizedBlobId.getGeneration());
}

@Test
public void shouldParseValidUri() {
BlobId blobId = BlobUtils.parseUri(URI.create("gs://bucket/foo/bar"));
assertEquals("bucket", blobId.getBucket());
assertEquals("foo/bar", blobId.getName());
assertNull(blobId.getGeneration());
}

@Test(expected = IllegalArgumentException.class)
public void shouldFailToParseUriMissingBucketName() {
BlobUtils.parseUri(URI.create("gs:///foo/bar"));
}

@Test(expected = IllegalArgumentException.class)
public void shouldFailToParseUriMissingObjectName() {
BlobUtils.parseUri(URI.create("gs://bucket/"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs.utils;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

/** Test checksum utilities. */
public class ChecksumUtilsTest {

@Test
public void shouldConvertToStringChecksum() {
assertEquals("AAAwOQ==", ChecksumUtils.convertChecksumToString(12345));
assertEquals("AADUMQ==", ChecksumUtils.convertChecksumToString(54321));
}
}

0 comments on commit f2399cd

Please sign in to comment.