Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-698] Add wildcard support for copy and move action #1418

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/GCSCopy-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Properties
It can be found on the Dashboard in the Google Cloud Platform Console.

**Source Path**: Path to a source object or directory.
> Use `*` to copy multiple files. For example, `gs://demo0/prod/reports/*.csv` will copy all CSV files in the `reports` directory.

**Destination Path**: Path to the destination. The bucket will be created if it does not exist.

Expand Down
1 change: 1 addition & 0 deletions docs/GCSMove-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Properties
It can be found on the Dashboard in the Google Cloud Platform Console.

**Source Path**: Path to a source object or directory.
> Use `*` to move multiple files. For example, `gs://demo0/prod/reports/*.csv` will move all CSV files in the `reports` directory.

**Destination Path**: Path to the destination. The bucket will be created if it does not exist.

Expand Down
44 changes: 43 additions & 1 deletion src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -111,7 +117,7 @@ public void mapMetaDataForAllBlobs(String path, Consumer<Map<String, String>> fu
/**
* Creates the given bucket if it does not exists.
*
* @param path the path of the bucket
* @param path the path of the bucket
* @param location the location of bucket
* @param cmekKeyName the name of the cmek key
*/
Expand Down Expand Up @@ -163,6 +169,42 @@ public void move(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolea
pairTraverse(sourcePath, destPath, recursive, overwrite, BlobPair::move);
}

/**
* Get all the matching wildcard paths given the regex input.
*/
public List<GCSPath> getMatchedPaths(GCSPath sourcePath, boolean recursive, Pattern wildcardRegex) {
Page<Blob> blobPage = storage.list(sourcePath.getBucket(), Storage.BlobListOption.prefix(
getWildcardPathPrefix(sourcePath, wildcardRegex)
));
List<String> blobPageNames = new ArrayList<>();
blobPage.getValues().forEach(blob -> blobPageNames.add(blob.getName()));
return getFilterMatchedPaths(sourcePath, blobPageNames, recursive);
}

static String getWildcardPathPrefix(GCSPath sourcePath, Pattern wildcardRegex) {
String pattern = sourcePath.getName();
String[] patternSplits = pattern.split(wildcardRegex.pattern());
// prefix may be empty
return patternSplits.length >= 1 ? patternSplits[0] : "";
}

static List<GCSPath> getFilterMatchedPaths(GCSPath sourcePath, List<String> blobPageNames, boolean recursive) {
Set<GCSPath> matchedPaths = new HashSet<>();
String globPattern = "glob:" + sourcePath.getName();
PathMatcher matcher = FileSystems.getDefault().getPathMatcher(globPattern);
for (String blobName : blobPageNames) {
if (matcher.matches(Paths.get(blobName))) {
LOG.debug("Blob name {} matches the glob pattern {}", blobName, globPattern);
String gcsPath = String.format("gs://%s/%s", sourcePath.getBucket(), blobName);
matchedPaths.add(GCSPath.from(gcsPath));
}
}
if (!recursive) {
matchedPaths.removeIf(path -> path.getName().endsWith("/"));
}
return new ArrayList<>(matchedPaths);
}

/**
* Gets source and destination pairs by traversing the source path. Consumes each pair after the directory structure
* is completely traversed.
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import io.cdap.plugin.gcp.gcs.StorageClient;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;


Expand Down Expand Up @@ -76,9 +78,17 @@ public void run(ActionContext context) throws IOException {
// create the destination bucket if not exist
storageClient.createBucketIfNotExists(destPath, config.location, cmekKeyName);

//noinspection ConstantConditions
storageClient.copy(config.getSourcePath(), config.getDestPath(), config.recursive, config.shouldOverwrite());

List<GCSPath> matchedPaths = new ArrayList<>();
if (SourceDestConfig.WILDCARD_REGEX.matcher(config.getSourcePath().getName()).find()) {
matchedPaths = storageClient.getMatchedPaths(config.getSourcePath(), config.recursive,
SourceDestConfig.WILDCARD_REGEX);
} else {
matchedPaths.add(config.getSourcePath());
}
for (GCSPath sourcePath : matchedPaths) {
//noinspection ConstantConditions
storageClient.copy(sourcePath, config.getDestPath(), config.recursive, config.shouldOverwrite());
}
}

/**
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import io.cdap.plugin.gcp.gcs.StorageClient;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;


Expand Down Expand Up @@ -75,8 +77,17 @@ public void run(ActionContext context) throws IOException {
// create the destination bucket if not exist
storageClient.createBucketIfNotExists(destPath, config.location, cmekKeyName);

//noinspection ConstantConditions
storageClient.move(config.getSourcePath(), config.getDestPath(), config.recursive, config.shouldOverwrite());
List<GCSPath> matchedPaths = new ArrayList<>();
if (SourceDestConfig.WILDCARD_REGEX.matcher(config.getSourcePath().getName()).find()) {
matchedPaths = storageClient.getMatchedPaths(config.getSourcePath(), config.recursive,
SourceDestConfig.WILDCARD_REGEX);
} else {
matchedPaths.add(config.getSourcePath());
}
for (GCSPath sourcePath : matchedPaths) {
//noinspection ConstantConditions
storageClient.move(sourcePath, config.getDestPath(), config.recursive, config.shouldOverwrite());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import io.cdap.plugin.gcp.gcs.GCSPath;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/**
Expand All @@ -44,6 +46,7 @@ public class SourceDestConfig extends GCPConfig {
public static final String NAME_DEST_PATH = "destPath";
public static final String NAME_LOCATION = "location";
public static final String READ_TIMEOUT = "readTimeout";
public static final Pattern WILDCARD_REGEX = Pattern.compile("[*]");

@Name(NAME_SOURCE_PATH)
@Macro
Expand Down Expand Up @@ -132,6 +135,11 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
} catch (IllegalArgumentException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_DEST_PATH);
}
if (WILDCARD_REGEX.matcher(destPath).find()) {
collector.addFailure("Destination path should not contain wildcard characters.",
"Please remove the wildcard characters from the destination path.")
.withConfigProperty(NAME_DEST_PATH);
}
}
if (!containsMacro(NAME_CMEK_KEY)) {
validateCmekKey(collector, arguments);
Expand Down
42 changes: 41 additions & 1 deletion src/test/java/io/cdap/plugin/gcp/gcs/StorageClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import io.cdap.plugin.gcp.gcs.actions.SourceDestConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.times;
Expand All @@ -50,12 +53,20 @@ public class StorageClientTest {
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();

private final PrintStream originalOut = System.out;
private final List<String> blobPageNames = new ArrayList<>();

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
storageClient = new StorageClient(storage);
System.setOut(new PrintStream(outContent));
// Setup blobPageNames
blobPageNames.add("mydir/test_web1/report.html");
blobPageNames.add("mydir/test_web2/report.html");
blobPageNames.add("mydir/test_web2/css/");
blobPageNames.add("mydir/test_web2/css/foo.css");
blobPageNames.add("mydir/test_mob1/report.html");
blobPageNames.add("mydir/test_mob2/report.html");
}

@After
Expand Down Expand Up @@ -144,4 +155,33 @@ public void testCreateBucketIfNotExists() {
}
Assert.fail("Test for detecting bucket creation failure did not succeed. No exception caught");
}

@Test
public void testGetWildcardPathPrefix() {
Assert.assertEquals("mydir/test_web", StorageClient.getWildcardPathPrefix(
GCSPath.from("gs://my-bucket/mydir/test_web*/"), SourceDestConfig.WILDCARD_REGEX));
Assert.assertEquals("", StorageClient.getWildcardPathPrefix(
GCSPath.from("gs://my-bucket/*"), SourceDestConfig.WILDCARD_REGEX));
}

@Test
public void testFilterMatchedPaths() {
GCSPath sourcePath = GCSPath.from("gs://foobucket/mydir/test_web*/*");
List<GCSPath> filterMatchedPaths = StorageClient.getFilterMatchedPaths(sourcePath, blobPageNames, false);
filterMatchedPaths.sort(Comparator.comparing(GCSPath::getUri));
Assert.assertEquals(2, filterMatchedPaths.size());
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web1/report.html"), filterMatchedPaths.get(0));
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/report.html"), filterMatchedPaths.get(1));
}

@Test
public void testFilterMatchedPathsWithRecursive() {
GCSPath sourcePath = GCSPath.from("gs://foobucket/mydir/test_web*/*");
List<GCSPath> filterMatchedPaths = StorageClient.getFilterMatchedPaths(sourcePath, blobPageNames, true);
Assert.assertEquals(3, filterMatchedPaths.size());
sahusanket marked this conversation as resolved.
Show resolved Hide resolved
filterMatchedPaths.sort(Comparator.comparing(GCSPath::getUri));
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web1/report.html"), filterMatchedPaths.get(0));
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/css/"), filterMatchedPaths.get(1));
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/report.html"), filterMatchedPaths.get(2));
}
}
Loading