Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2657: Blob Store as backend for Samza State backup and restore #1501

Merged
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ project(":samza-core_$scalaSuffix") {
compile "org.apache.commons:commons-lang3:$commonsLang3Version"
compile "commons-io:commons-io:$commonsIoVersion"
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
compile "org.scala-lang:scala-library:$scalaVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.storage;

import org.apache.samza.config.Config;
import org.apache.samza.job.model.JobModel;


shekhars-li marked this conversation as resolved.
Show resolved Hide resolved
/**
* Factory to create instance of {@link StateBackendAdmin}s that needs to be implemented for every
* state backend
*/
public interface BlobStoreAdminFactory {
/**
* Returns an instance of {@link StateBackendAdmin}
* @param config job configuration
* @param jobModel Job Model
*/
StateBackendAdmin getStateBackendAdmin(Config config, JobModel jobModel);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
package org.apache.samza.storage;

/**
* Creates and validate resources for the StateBackendFactory
* Admin responsible for loading any resources related to state backend
*/
public interface TaskStorageAdmin {

public interface StateBackendAdmin {
/**
* Create all the resources required per job per store state backend
*/
void createResources();

/**
* Validate all resources required per job per state for state backend
*/
void validateResources();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@
import org.apache.samza.config.Config;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.util.Clock;


/**
* Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link TaskStorageAdmin}
* Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link StateBackendAdmin}
* for a particular state storage backend, which are used to durably backup the Samza task state.
*/
public interface StateBackendFactory {
TaskBackupManager getBackupManager(JobContext jobContext,
ContainerContext containerContext,
ContainerModel containerModel,
TaskModel taskModel,
ExecutorService backupExecutor,
MetricsRegistry taskInstanceMetricsRegistry,
Expand All @@ -55,5 +57,5 @@ TaskRestoreManager getRestoreManager(JobContext jobContext,
File nonLoggedStoreBaseDir,
KafkaChangelogRestoreParams kafkaChangelogRestoreParams);

TaskStorageAdmin getAdmin();
StateBackendAdmin getAdmin(JobModel jobModel, Config config);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.storage.blobstore;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.CompletionStage;
import org.apache.samza.annotation.InterfaceStability;


/**
* Provides interface for common blob store operations: GET, PUT and DELETE
*/
@InterfaceStability.Unstable
public interface BlobStoreManager {
/**
* Initialize underlying blob store client, if necessary.
*
*/
void init();
prateekm marked this conversation as resolved.
Show resolved Hide resolved

/**
* Non-blocking PUT call to remote blob store with supplied metadata
shekhars-li marked this conversation as resolved.
Show resolved Hide resolved
* @param inputStream InputStream to read the file
* @param metadata user supplied {@link Metadata} of the request
* @return a future containing the blob ID of the uploaded blob if the upload is successful.
*/
CompletionStage<String> put(InputStream inputStream, Metadata metadata);

/**
* Non-blocking GET call to remote blob store
* @param id Blob ID of the blob to get
* @param outputStream OutputStream to write the downloaded blob
* @param metadata User supplied {@link Metadata} of the request
* @return A future that completes when all the chunks are downloaded and written successfully to the OutputStream
* @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
* exceptionally with DeletedException on failure with the blob already deleted error.
*/
CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata);

/**
* Non-blocking call to mark a blob for deletion in the remote blob store
* @param id Blob ID of the blob to delete
* @param metadata User supplied {@link Metadata} of the request
* @return A future that completes when the blob is successfully deleted from the blob store.
* @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
* exceptionally with DeletedException on failure with the blob already deleted error. This exception is
* caught and ignored by the caller of the delete method during initial cleanup and SnapshotIndex read.
*/
CompletionStage<Void> delete(String id, Metadata metadata);

/**
* Non-blocking call to remove the Time-To-Live (TTL) for a blob and make it permanent.
* @param blobId Blob ID of blob to remove TTL for.
* @param metadata User supplied {@link Metadata} of the request
* @return a future that completes when the TTL for the blob is removed.
* @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
* exceptionally with DeletedException on failure with the blob already deleted error.
*/
CompletionStage<Void> removeTTL(String blobId, Metadata metadata);

/**
* Cleanly close resources like blob store client
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.storage.blobstore;

import java.util.concurrent.ExecutorService;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;

@InterfaceStability.Unstable
public interface BlobStoreManagerFactory {
BlobStoreManager getBackupBlobStoreManager(Config config, ExecutorService backupExecutor);

BlobStoreManager getRestoreBlobStoreManager(Config config, ExecutorService restoreExecutor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.storage.blobstore;

import java.util.Optional;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;


/**
* Metadata associated with every BlobStore request. This class is used to trace a request and to determine the
* bucket/container informationof the blob.
*/
public class Metadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs on what this metadata is for? is BlobMetadata a better name, Metadata seems too generic

public static final String SNAPSHOT_INDEX_PAYLOAD_PATH = "snapshot-index";

private final String payloadPath;
private final long payloadSize;
private final String jobName;
private final String jobId;
private final String taskName;
private final String storeName;

public Metadata(String payloadPath, Optional<Long> payloadSize,
String jobName, String jobId, String taskName, String storeName) {
this.payloadPath = payloadPath;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No Preconditions check here ?

// Payload size may not be known in advance for requests like getSnapshotIndex, where only blob ID is known. Set -1.
this.payloadSize = payloadSize.orElse(-1L);
this.jobName = jobName;
this.jobId = jobId;
this.taskName = taskName;
this.storeName = storeName;
}

public String getPayloadPath() {
return payloadPath;
}

public long getPayloadSize() {
return payloadSize;
}

public String getJobName() {
return jobName;
}

public String getJobId() {
return jobId;
}

public String getTaskName() {
return taskName;
}

public String getStoreName() {
return storeName;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (!(o instanceof Metadata)) {
return false;
}

Metadata that = (Metadata) o;

return new EqualsBuilder().append(getPayloadPath(), that.getPayloadPath())
.append(getPayloadSize(), that.getPayloadSize())
.append(getJobName(), that.getJobName())
.append(getJobId(), that.getJobId())
.append(getTaskName(), that.getTaskName())
.append(getStoreName(), that.getStoreName())
.isEquals();
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(getPayloadPath())
.append(getPayloadSize())
.append(getJobName())
.append(getJobId())
.append(getTaskName())
.append(getStoreName())
.toHashCode();
}

@Override
public String toString() {
return "Metadata{" + "payloadPath='" + payloadPath + '\'' + ", payloadSize='" + payloadSize + '\''
+ ", jobName='" + jobName + '\'' + ", jobId='" + jobId + '\'' + ", taskName='" + taskName + '\''
+ ", storeName='" + storeName + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.storage.blobstore.exceptions;

/**
* Future should complete with this exception to indicate that the exception occurred due to the request for an
* already deleted blob. This exception is caught and ignored by caller of the DELETE request during initial cleanup
* and snapshot index read.
*
*/
public class DeletedException extends RuntimeException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is used to abstract out the specific blob store client, we need to make sure that on contract for the blob store impl must follow wrap this (make sure it is documented in the interface class)

Also, how do we know if it is deleted vs missing/not present? Would the behaviour for MissingException be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An instance of this class is passed to the called from Ambry specific impl of blob store manager, if a blob is was explicitly deleted (Ambry returns 410 for this event) and not just missing blob. For other managers, they can implement their own logic to handle this kind of error. Do you think we should let other impls of BlobStoreManager decide how to use/handle this or do you think I should document something here, since this will be dependent on the service.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not extend SamzaException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow Up: Document that blob store managers should use these exceptions for the relevant errors, because the blob store agnostic error handling and retry logic in backup and restore managers depends on these. RetriableException is obvious. For DeletedException, document what the error-handling behavior is (e.g. ignores deleted blobs during initial cleanup / snapshot index read etc.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: We would need to document in both BlobStoreManager as well as in this class exactly the expectation of how this class would be used in the BlobStoreManager so that implementers of that and this interface could have that background

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Dan's comment - let's add this to at-throws sections in the BlobStoreManager javadocs instead. Also rephrase: this exception should not be "thrown" synchronously by the manager. Futures should be completed exceptionally with this exception on failure.


private static final long serialVersionUID = 1L;

public DeletedException(String message, Throwable cause) {
super(message, cause);
}

public DeletedException(String message) {
super(message);
}

public DeletedException(Throwable cause) {
super(cause);
}

public DeletedException() {
}
}
Loading