diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/FloridaServer.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/FloridaServer.java index 4eb31a96..24b684c8 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/FloridaServer.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/FloridaServer.java @@ -26,14 +26,14 @@ import com.netflix.dynomitemanager.monitoring.ServoMetricsTask; import com.netflix.dynomitemanager.sidecore.IConfiguration; import com.netflix.dynomitemanager.sidecore.aws.UpdateSecuritySettings; +import com.netflix.dynomitemanager.sidecore.backup.SnapshotTask; +import com.netflix.dynomitemanager.sidecore.backup.RestoreTask; import com.netflix.dynomitemanager.sidecore.scheduler.TaskScheduler; import com.netflix.dynomitemanager.sidecore.utils.ProcessMonitorTask; import com.netflix.dynomitemanager.sidecore.utils.Sleeper; import com.netflix.dynomitemanager.sidecore.utils.ProxyAndStorageResetTask; import com.netflix.dynomitemanager.sidecore.utils.TuneTask; import com.netflix.dynomitemanager.sidecore.utils.WarmBootstrapTask; -import com.netflix.dynomitemanager.backup.RestoreFromS3Task; -import com.netflix.dynomitemanager.backup.SnapshotBackup; import com.netflix.servo.DefaultMonitorRegistry; import com.netflix.servo.monitor.Monitors; @@ -106,8 +106,8 @@ else if (UpdateSecuritySettings.firstTimeUpdated) { // Determine if we need to restore from backup else start Dynomite. if (config.isRestoreEnabled()) { logger.info("Restore is enabled."); - scheduler.runTaskNow(RestoreFromS3Task.class); //restore from the AWS - logger.info("Scheduled task " + RestoreFromS3Task.JOBNAME); + scheduler.runTaskNow(RestoreTask.class); //restore from the AWS + logger.info("Scheduled task " + RestoreTask.TaskName); } else { //no restores needed logger.info("Restore is disabled."); @@ -129,7 +129,7 @@ else if (UpdateSecuritySettings.firstTimeUpdated) { // Backup if (config.isBackupEnabled() && config.getBackupHour() >= 0) { - scheduler.addTask(SnapshotBackup.TaskName, SnapshotBackup.class, SnapshotBackup.getTimer(config)); + scheduler.addTask(SnapshotTask.TaskName, SnapshotTask.class, SnapshotTask.getTimer(config)); } // Metrics diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/InstanceState.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/InstanceState.java index 5545aa6f..44ba4b5c 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/InstanceState.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/InstanceState.java @@ -18,20 +18,34 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.inject.Singleton; +import org.joda.time.DateTime; + /** * Contains the state of the health of processed managed by Florida, and * maintains the isHealthy flag used for reporting discovery health check. + * */ @Singleton public class InstanceState { private final AtomicBoolean isSideCarProcessAlive = new AtomicBoolean(false); private final AtomicBoolean isBootstrapping = new AtomicBoolean(false); + private final AtomicBoolean isBootstrapSuccesful = new AtomicBoolean(false); + private final AtomicBoolean firstBootstrap = new AtomicBoolean(true); private final AtomicBoolean isBackup = new AtomicBoolean(false); - private final AtomicBoolean isRestore = new AtomicBoolean (false); + private final AtomicBoolean isBackupSuccessful = new AtomicBoolean(false); + private final AtomicBoolean firstBackup = new AtomicBoolean(true); + private final AtomicBoolean isRestore = new AtomicBoolean(false); + private final AtomicBoolean isRestoreSuccessful = new AtomicBoolean(false); + private final AtomicBoolean firstRestore = new AtomicBoolean(true); private final AtomicBoolean isStorageProxyAlive = new AtomicBoolean(false); private final AtomicBoolean isStorageProxyProcessAlive = new AtomicBoolean(false); private final AtomicBoolean isStorageAlive = new AtomicBoolean(false); + + private long bootstrapTime; + private long backupTime; + private long restoreTime; + // This is true if storage proxy and storage are alive. private final AtomicBoolean isHealthy = new AtomicBoolean(false); // State of whether the rest endpoints /admin/stop or /admin/start are invoked @@ -43,6 +57,8 @@ public String toString() { return "InstanceState{" + "isSideCarProcessAlive=" + isSideCarProcessAlive + ", isBootstrapping=" + isBootstrapping + + ", isBackingup=" + isBackup + + ", isRestoring=" + isRestore + ", isStorageProxyAlive=" + isStorageProxyAlive + ", isStorageProxyProcessAlive=" + isStorageProxyProcessAlive + ", isStorageAlive=" + isStorageAlive + @@ -63,30 +79,105 @@ public void setSideCarProcessAlive(boolean isSideCarProcessAlive) { public int metricIsSideCarProcessAlive() { return isSideCarProcessAlive() ? 1 : 0; } - + + /* Boostrap */ public boolean isBootstrapping() { return isBootstrapping.get(); } - public boolean isBackingup() { - return isBackup.get(); + public boolean isBootstrapSuccessful() { + return isBootstrapSuccesful.get(); } - public boolean isRestoring() { - return isRestore.get(); + public boolean firstBootstrap() { + return firstBootstrap.get(); + } + + public long getBootstrapTime() { + return bootstrapTime; } - + public void setBootstrapping(boolean isBootstrapping) { this.isBootstrapping.set(isBootstrapping); } + public void setBootstrapStatus(boolean isBootstrapSuccesful) { + this.isBootstrapSuccesful.set(isBootstrapSuccesful); + } + + public void setFirstBootstrap(boolean firstBootstrap) { + this.firstBootstrap.set(firstBootstrap); + } + + public void setBootstrapTime(DateTime bootstrapTime) { + this.bootstrapTime = bootstrapTime.getMillis(); + } + + /* Backup */ + public boolean isBackingup() { + return isBackup.get(); + } + + public boolean isBackupSuccessful() { + return isBackupSuccessful.get(); + } + + public boolean firstBackup() { + return firstBackup.get(); + } + + public long getBackupTime() { + return backupTime; + } + public void setBackingup(boolean isBackup) { this.isBackup.set(isBackup); } + public void setBackUpStatus(boolean isBackupSuccessful) { + this.isBackupSuccessful.set(isBackupSuccessful); + } + + public void setFirstBackup(boolean firstBackup) { + this.firstBackup.set(firstBackup); + } + + public void setBackupTime(DateTime backupTime) { + this.backupTime = backupTime.getMillis(); + } + + /* Restore */ + public boolean isRestoring() { + return isRestore.get(); + } + + public boolean isRestoreSuccessful() { + return isRestoreSuccessful.get(); + } + + public boolean firstRestore() { + return firstRestore.get(); + } + + public long getRestoreTime() { + return restoreTime; + } + public void setRestoring(boolean isRestoring) { this.isRestore.set(isRestoring); } + + public void setRestoreStatus(boolean isRestoreSuccessful) { + this.isRestoreSuccessful.set(isRestoreSuccessful); + } + + public void setFirstRestore(boolean firstRestore) { + this.firstRestore.set(firstRestore); + } + + public void setRestoreTime(DateTime restoreTime) { + this.restoreTime = restoreTime.getMillis(); + } //@Monitor(name="bootstrapping", type=DataSourceType.GAUGE) public int metricIsBootstrapping() { @@ -141,7 +232,7 @@ public boolean isHealthy() { private void setHealthy() { this.isHealthy.set(isStorageProxyAlive() && isStorageAlive()); } - + //@Monitor(name="healthy", type=DataSourceType.GAUGE) public int metricIsHealthy() { return isHealthy() ? 1 : 0; @@ -159,4 +250,5 @@ public void setIsProcessMonitoringSuspended(boolean ipms) { public int metricIsProcessMonitoringSuspended() { return getIsProcessMonitoringSuspended() ? 1 : 0; } + } \ No newline at end of file diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/CassandraInstanceFactory.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/CassandraInstanceFactory.java index 4d92151f..179e0cfd 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/CassandraInstanceFactory.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/CassandraInstanceFactory.java @@ -31,7 +31,7 @@ import com.netflix.dynomitemanager.sidecore.IConfiguration; /** - * Factory to use cassandra for managing instance data + * Factory to use Cassandra for managing instance data */ @Singleton @@ -58,6 +58,18 @@ public List getAllIds(String appName) sort(return_); return return_; } + + public List getLocalDCIds(String appName, String region) + { + List return_ = new ArrayList(); + for (AppsInstance instance : dao.getLocalDCInstances(appName, region)) { + return_.add(instance); + } + + sort(return_); + return return_; + } + public void sort(List return_) { diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/IAppsInstanceFactory.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/IAppsInstanceFactory.java index fbac4b02..1ddb8795 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/IAppsInstanceFactory.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/IAppsInstanceFactory.java @@ -32,6 +32,15 @@ public interface IAppsInstanceFactory */ public List getAllIds(String appName); + + /** + * Return a list of Local Dynomite server nodes registered. + * @param appName the cluster name + * @param region the the region of the node + * @return a list of nodes in {@code appName} and same Racks + */ + public List getLocalDCIds(String appName, String region); + /** * Return the Dynomite server node with the given {@code id}. * @param appName the cluster name @@ -80,4 +89,5 @@ public AppsInstance create(String app, int id, String instanceID, String hostnam * @param device */ public void attachVolumes(AppsInstance instance, String mountPath, String device); -} + +} \ No newline at end of file diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/InstanceDataDAOCassandra.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/InstanceDataDAOCassandra.java index a0dd19a9..389d75a1 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/InstanceDataDAOCassandra.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/identity/InstanceDataDAOCassandra.java @@ -251,6 +251,18 @@ public AppsInstance getInstance(String app, String rack, int id) } return null; } + + public Set getLocalDCInstances(String app, String region) + { + Set set = getAllInstances(app); + Set returnSet = new HashSet(); + + for (AppsInstance ins : set) { + if (ins.getDatacenter().equals(region)) + returnSet.add(ins); + } + return returnSet; + } public Set getAllInstances(String app) { diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/resources/DynomiteAdmin.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/resources/DynomiteAdmin.java index a60ef1e8..74906949 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/resources/DynomiteAdmin.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/resources/DynomiteAdmin.java @@ -31,24 +31,27 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.commons.lang.StringUtils; +import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.inject.Inject; import com.netflix.dynomitemanager.IFloridaProcess; import com.netflix.dynomitemanager.InstanceState; import com.netflix.dynomitemanager.defaultimpl.StorageProcessManager; import com.netflix.dynomitemanager.identity.InstanceIdentity; import com.netflix.dynomitemanager.sidecore.IConfiguration; -import com.netflix.dynomitemanager.backup.RestoreFromS3Task; -import com.netflix.dynomitemanager.backup.SnapshotBackup; +import com.netflix.dynomitemanager.sidecore.backup.RestoreTask; +import com.netflix.dynomitemanager.sidecore.backup.SnapshotTask; import com.netflix.dynomitemanager.resources.DynomiteAdmin; import com.netflix.dynomitemanager.sidecore.scheduler.TaskScheduler; import com.netflix.dynomitemanager.sidecore.storage.IStorageProxy; + @Path("/v1/admin") @Produces(MediaType.APPLICATION_JSON) public class DynomiteAdmin @@ -62,8 +65,8 @@ public class DynomiteAdmin private InstanceIdentity ii; private final InstanceState instanceState; private final TaskScheduler scheduler; - private SnapshotBackup snapshotBackup; - private RestoreFromS3Task restoreBackup; + private SnapshotTask snapshotBackup; + private RestoreTask restoreBackup; private IStorageProxy storage; @@ -73,10 +76,8 @@ public class DynomiteAdmin @Inject public DynomiteAdmin(IConfiguration config, IFloridaProcess dynoProcess, InstanceIdentity ii, InstanceState instanceState, - SnapshotBackup snapshotBackup, RestoreFromS3Task restoreBackup, + SnapshotTask snapshotBackup, RestoreTask restoreBackup, TaskScheduler scheduler, IStorageProxy storage) - - { this.config = config; this.dynoProcess = dynoProcess; @@ -86,7 +87,6 @@ public DynomiteAdmin(IConfiguration config, IFloridaProcess dynoProcess, this.restoreBackup = restoreBackup; this.scheduler = scheduler; this.storage = storage; - } @GET @@ -221,40 +221,39 @@ public Response getClusterDescribe() } @GET - @Path("/s3backup") - public Response doS3Backup() + @Path("/backup") + public Response doBackup() { try { - logger.info("REST call: S3 backups"); + logger.info("REST call: backups"); this.snapshotBackup.execute(); return Response.ok(REST_SUCCESS, MediaType.APPLICATION_JSON).build(); } catch (Exception e) { - logger.error("Error while executing s3 backups from REST call", e); + logger.error("Error while executing backups from REST call", e); return Response.serverError().build(); } } @GET - @Path("/s3restore") - public Response doS3Restore() + @Path("/restore") + public Response doRestore() { try { - logger.info("REST call: S3 restore"); + logger.info("REST call: restore"); this.restoreBackup.execute(); return Response.ok(REST_SUCCESS, MediaType.APPLICATION_JSON).build(); } catch (Exception e) { - logger.error("Error while executing s3 restores from REST call", e); + logger.error("Error while executing restores from REST call", e); return Response.serverError().build(); } } - @GET @Path("/takesnapshot") public Response takeSnapshot() @@ -267,9 +266,98 @@ public Response takeSnapshot() } catch (Exception e) { - logger.error("Error while executing data persistence from REST call", e); + logger.error("Error executing data persistence from REST call", e); return Response.serverError().build(); } } + @GET + @Path("/Status") + public Response floridaStatus() + { + try{ + JSONObject statusJson = new JSONObject(); + + /* Warm up status */ + JSONObject warmupJson = new JSONObject(); + if (!this.instanceState.firstBootstrap()){ + if (this.instanceState.isBootstrapping()) { + warmupJson.put("status", "pending"); + } + else if (!this.instanceState.isBootstrapping() && !this.instanceState.isBootstrapSuccessful()) { + warmupJson.put("status", "unsuccessful"); + } + else if (!this.instanceState.isBootstrapping() && this.instanceState.isBootstrapSuccessful()) { + warmupJson.put("status", "completed"); + } + warmupJson.put("time",this.instanceState.getBootstrapTime()); + } + else{ + warmupJson.put("status", "not started"); + } + statusJson.put("warmup", warmupJson); + + /* backup status */ + JSONObject backupJson = new JSONObject(); + if (!this.instanceState.firstBackup()){ + if (this.instanceState.isBackingup()){ + backupJson.put("status", "pending"); + } + else if (!this.instanceState.isBackingup() && !this.instanceState.isBackupSuccessful()) { + backupJson.put("status", "unsuccessful"); + } + else if (!this.instanceState.isBackingup() && this.instanceState.isBackupSuccessful()){ + backupJson.put("status", "completed"); + } + backupJson.put("time",this.instanceState.getBackupTime()); + } + else{ + backupJson.put("status", "not started"); + } + statusJson.put("backup", backupJson); + + + /* restore status */ + JSONObject restoreJson = new JSONObject(); + if (!this.instanceState.firstRestore()){ + if (this.instanceState.isRestoring()) { + restoreJson.put("status", "pending"); + } + else if (!this.instanceState.isRestoring() && !this.instanceState.isRestoreSuccessful()) { + restoreJson.put("status", "unsuccessful"); + } + else if (!this.instanceState.isRestoring() && this.instanceState.isRestoreSuccessful()){ + restoreJson.put("status", "completed"); + } + restoreJson.put("time",this.instanceState.getRestoreTime()); + + } + else{ + restoreJson.put("status","not started"); + } + statusJson.put("restore", restoreJson); + + + /* Dynomite status */ + statusJson.put("dynomiteAlive", this.instanceState.isStorageProxyProcessAlive() ? true : false); + + /* Redis status */ + statusJson.put("storageAlive", this.instanceState.isStorageAlive() ? true : false); + + /* Overall status */ + statusJson.put("healthy", this.instanceState.isHealthy() ? true : false); + + + logger.info("REST call: Florida Status"); + return Response.ok(statusJson, MediaType.APPLICATION_JSON).build(); + + } + catch (Exception e) + { + logger.error("Error requesting Florida status from REST call", e); + return Response.serverError().build(); + } + + } + } \ No newline at end of file diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/Backup.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/Backup.java new file mode 100644 index 00000000..cb633508 --- /dev/null +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/Backup.java @@ -0,0 +1,12 @@ +package com.netflix.dynomitemanager.sidecore.backup; + +import java.io.File; + +import org.joda.time.DateTime; + +import com.netflix.dynomitemanager.sidecore.ICredential; + + +public interface Backup { + boolean upload(File file, DateTime todayStart); +} diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/Restore.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/Restore.java new file mode 100644 index 00000000..0535420d --- /dev/null +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/Restore.java @@ -0,0 +1,5 @@ +package com.netflix.dynomitemanager.sidecore.backup; + +public interface Restore { + boolean restoreData(String dateString); +} diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/RestoreTask.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/RestoreTask.java new file mode 100644 index 00000000..b7ec71b7 --- /dev/null +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/RestoreTask.java @@ -0,0 +1,119 @@ +package com.netflix.dynomitemanager.sidecore.backup; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.netflix.dynomitemanager.IFloridaProcess; +import com.netflix.dynomitemanager.InstanceState; +import com.netflix.dynomitemanager.defaultimpl.DynomitemanagerConfiguration; +import com.netflix.dynomitemanager.defaultimpl.StorageProcessManager; +import com.netflix.dynomitemanager.identity.InstanceIdentity; +import com.netflix.dynomitemanager.sidecore.IConfiguration; +import com.netflix.dynomitemanager.sidecore.ICredential; +import com.netflix.dynomitemanager.sidecore.scheduler.Task; +import com.netflix.dynomitemanager.sidecore.storage.IStorageProxy; +import com.netflix.dynomitemanager.sidecore.utils.JedisUtils; +import com.netflix.dynomitemanager.sidecore.utils.Sleeper; + + +/** + * Task for restoring snapshots from object storage + */ + +@Singleton +public class RestoreTask extends Task +{ + public static final String TaskName = "RestoreTask"; + private static final Logger logger = LoggerFactory.getLogger(RestoreTask.class); + private final ICredential cred; + private final InstanceIdentity iid; + private final InstanceState state; + private final IStorageProxy storageProxy; + private final IFloridaProcess dynProcess; + private final Sleeper sleeper; + private final Restore restore; + + + @Inject + private StorageProcessManager storageProcessMgr; + + @Inject + public RestoreTask(IConfiguration config, InstanceIdentity id, ICredential cred, InstanceState state, + IStorageProxy storageProxy, IFloridaProcess dynProcess, Sleeper sleeper, Restore restore) + { + super(config); + this.cred = cred; + this.iid = id; + this.state = state; + this.storageProxy = storageProxy; + this.dynProcess = dynProcess; + this.sleeper = sleeper; + this.restore = restore; + } + + + public void execute() throws Exception + { + this.state.setRestoring(true); + this.state.setFirstRestore(false); + /** + * Set the status of the restore to "false" every time we start a restore. + * This will ensure that prior to restore we recapture the status of the restore. + */ + this.state.setRestoreStatus(false); + + /* stop dynomite process */ + this.dynProcess.stop(); + + /* stop storage process */ + this.storageProcessMgr.stop(); + + /* restore from Object Storage */ + if(restore.restoreData(config.getRestoreDate())){ + /* start storage process and load data */ + logger.info("Restored successful: Starting storage process with loading data."); + this.storageProcessMgr.start(); + if(!this.storageProxy.loadingData()){ + logger.error("Restore not successful: Restore failed because of Redis."); + } + logger.info("Restore Completed, sleeping 5 seconds before starting Dynomite!"); + + sleeper.sleepQuietly(5000); + this.dynProcess.start(true); + logger.info("Dynomite started"); + this.state.setRestoreStatus(true); + } else { + /* start storage process without loading data */ + logger.error("Restore not successful: Starting storage process without loading data."); + } + this.state.setRestoring(false); + this.state.setRestoreTime(DateTime.now()); + } + + @Override + public String getName() + { + return TaskName; + } + +} + \ No newline at end of file diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/S3Backup.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/S3Backup.java new file mode 100644 index 00000000..c39a5d83 --- /dev/null +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/S3Backup.java @@ -0,0 +1,157 @@ +package com.netflix.dynomitemanager.sidecore.backup; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.joda.time.DateTime; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ResponseMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.CreateBucketRequest; +import com.amazonaws.services.s3.model.GetBucketLocationRequest; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; + +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import com.google.inject.name.Named; + + +import com.netflix.dynomitemanager.sidecore.IConfiguration; +import com.netflix.dynomitemanager.sidecore.ICredential; +import com.netflix.dynomitemanager.identity.InstanceIdentity; + + +@Singleton +public class S3Backup implements Backup { + + private static final Logger logger = LoggerFactory.getLogger(S3Backup.class); + private final long initPartSize = 500 * 1024 * 1024; // we set the part size equal to 500MB. We do not want this too large + // and run out of heap space + + @Inject + private IConfiguration config; + + @Inject + private ICredential cred; + + @Inject + private InstanceIdentity iid; + + /** + * Uses the Amazon S3 API to upload the AOF/RDB to S3 + * Filename: Backup location + DC + Rack + App + Token + */ + @Override + public boolean upload(File file, DateTime todayStart) { + logger.info("Snapshot backup: sending " + file.length() + " bytes to S3"); + + /* Key name is comprised of the + * backupDir + DC + Rack + token + Date + */ + String keyName = + config.getBackupLocation() + "/" + + iid.getInstance().getDatacenter() + "/" + + iid.getInstance().getRack() + "/" + + iid.getInstance().getToken() + "/" + + todayStart.getMillis(); + + // Get bucket location. + logger.info("Key in Bucket: " + keyName); + logger.info("S3 Bucket Name:" + config.getBucketName()); + + AmazonS3Client s3Client = new AmazonS3Client(cred.getAwsCredentialProvider()); + + + try { + // Checking if the S3 bucket exists, and if does not, then we create it + + if(!(s3Client.doesBucketExist(config.getBucketName()))) { + logger.error("Bucket with name: " + config.getBucketName() + " does not exist"); + return false; + } + else { + logger.info("Uploading data to S3\n"); + // Create a list of UploadPartResponse objects. You get one of these for + // each part upload. + List partETags = new ArrayList(); + + InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest( + config.getBucketName(), keyName); + + InitiateMultipartUploadResult initResponse = + s3Client.initiateMultipartUpload(initRequest); + + long contentLength = file.length(); + long filePosition = 0; + long partSize = this.initPartSize; + + try { + for (int i = 1; filePosition < contentLength; i++) { + // Last part can be less than initPartSize (500MB). Adjust part size. + partSize = Math.min(partSize, (contentLength - filePosition)); + + // Create request to upload a part. + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(config.getBucketName()).withKey(keyName) + .withUploadId(initResponse.getUploadId()).withPartNumber(i) + .withFileOffset(filePosition) + .withFile(file) + .withPartSize(partSize); + + // Upload part and add response to our list. + partETags.add(s3Client.uploadPart(uploadRequest).getPartETag()); + + filePosition += partSize; + } + + CompleteMultipartUploadRequest compRequest = new + CompleteMultipartUploadRequest(config.getBucketName(), + keyName, + initResponse.getUploadId(), + partETags); + + s3Client.completeMultipartUpload(compRequest); + + } catch (Exception e) { + logger.error("Abosting multipart upload due to error"); + s3Client.abortMultipartUpload(new AbortMultipartUploadRequest( + config.getBucketName(), keyName, initResponse.getUploadId())); + } + + return true; + } + } catch (AmazonServiceException ase) { + + logger.error("AmazonServiceException;" + + " request made it to Amazon S3, but was rejected with an error "); + logger.error("Error Message: " + ase.getMessage()); + logger.error("HTTP Status Code: " + ase.getStatusCode()); + logger.error("AWS Error Code: " + ase.getErrorCode()); + logger.error("Error Type: " + ase.getErrorType()); + logger.error("Request ID: " + ase.getRequestId()); + return false; + + } catch (AmazonClientException ace) { + logger.error("AmazonClientException;"+ + " the client encountered " + + "an internal error while trying to " + + "communicate with S3, "); + logger.error("Error Message: " + ace.getMessage()); + return false; + } + } +} diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/backup/RestoreFromS3Task.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/S3Restore.java similarity index 56% rename from dynomitemanager/src/main/java/com/netflix/dynomitemanager/backup/RestoreFromS3Task.java rename to dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/S3Restore.java index 0eb08584..888695be 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/backup/RestoreFromS3Task.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/S3Restore.java @@ -1,19 +1,4 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.dynomitemanager.backup; +package com.netflix.dynomitemanager.sidecore.backup; import java.io.File; import java.io.FileOutputStream; @@ -31,99 +16,41 @@ import com.google.inject.Inject; import com.google.inject.Singleton; +import com.netflix.dynomitemanager.sidecore.IConfiguration; +import com.netflix.dynomitemanager.sidecore.ICredential; +import com.netflix.dynomitemanager.identity.InstanceIdentity; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3Object; -import com.netflix.dynomitemanager.IFloridaProcess; -import com.netflix.dynomitemanager.InstanceState; -import com.netflix.dynomitemanager.defaultimpl.DynomitemanagerConfiguration; -import com.netflix.dynomitemanager.defaultimpl.StorageProcessManager; -import com.netflix.dynomitemanager.identity.InstanceIdentity; -import com.netflix.dynomitemanager.sidecore.IConfiguration; -import com.netflix.dynomitemanager.sidecore.ICredential; -import com.netflix.dynomitemanager.sidecore.scheduler.Task; -import com.netflix.dynomitemanager.sidecore.storage.IStorageProxy; -import com.netflix.dynomitemanager.sidecore.utils.JedisUtils; -import com.netflix.dynomitemanager.sidecore.utils.Sleeper; - - -/** - * Task for restoring snapshots from S3 - */ @Singleton -public class RestoreFromS3Task extends Task -{ - public static final String JOBNAME = "Restore"; - private static final Logger logger = LoggerFactory.getLogger(RestoreFromS3Task.class); - private final ICredential cred; - private final InstanceIdentity iid; - private final InstanceState state; - private final IStorageProxy storageProxy; - private final IFloridaProcess dynProcess; - private final Sleeper sleeper; +public class S3Restore implements Restore { + private static final Logger logger = LoggerFactory.getLogger(S3Restore.class); @Inject - private StorageProcessManager storageProcessMgr; + private IConfiguration config; @Inject - public RestoreFromS3Task(IConfiguration config, InstanceIdentity id, ICredential cred, InstanceState state, - IStorageProxy storageProxy, IFloridaProcess dynProcess, Sleeper sleeper) - { - super(config); - this.cred = cred; - this.iid = id; - this.state = state; - this.storageProxy = storageProxy; - this.dynProcess = dynProcess; - this.sleeper = sleeper; - } + private ICredential cred; + @Inject + private InstanceIdentity iid; - public void execute() throws Exception - { - this.state.setRestoring(true); - - /* stop dynomite process */ - this.dynProcess.stop(); - - /* stop storage process */ - this.storageProcessMgr.stop(); - - /* restore from S3 */ - if(restoreFromS3(config.getRestoreDate())){ - /* start storage process and load data */ - logger.info("S3 Restored successful: Starting storage process with loading data."); - this.storageProcessMgr.start(); - if(!this.storageProxy.loadingData()){ - logger.error("S3 Restore not successful: Restore failed because of Redis."); - } - logger.info("Restore Completed, sleeping 5 seconds before starting Dynomite!"); - - sleeper.sleepQuietly(5000); - this.dynProcess.start(true); - } else { - /* start storage process without loading data */ - logger.error("S3 Restore not successful: Starting storage process without loading data."); - } - this.state.setRestoring(false); - } - - public String getName() - { - return JOBNAME; - } - - private boolean restoreFromS3(String dateString) + + /** + * Uses the Amazon S3 API to restore from S3 + */ + @Override + public boolean restoreData(String dateString) { long time = restoreTime(dateString); if (time > -1) { logger.info("Restoring data from S3."); - AmazonS3Client s3Client = new AmazonS3Client(this.cred.getAwsCredentialProvider()); + AmazonS3Client s3Client = new AmazonS3Client(cred.getAwsCredentialProvider()); try { /* construct the key for the backup data */ diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/backup/SnapshotBackup.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/SnapshotTask.java similarity index 61% rename from dynomitemanager/src/main/java/com/netflix/dynomitemanager/backup/SnapshotBackup.java rename to dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/SnapshotTask.java index 60d78fa0..a61560a2 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/backup/SnapshotBackup.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/backup/SnapshotTask.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.netflix.dynomitemanager.backup; +package com.netflix.dynomitemanager.sidecore.backup; import static com.netflix.dynomitemanager.defaultimpl.DynomitemanagerConfiguration.LOCAL_ADDRESS; @@ -22,19 +22,13 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.joda.time.DateTime; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.S3ResponseMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.CreateBucketRequest; -import com.amazonaws.services.s3.model.GetBucketLocationRequest; -import com.amazonaws.AmazonClientException; -import com.amazonaws.AmazonServiceException; + import com.google.common.collect.Lists; import com.google.inject.Inject; import com.google.inject.Provider; @@ -44,11 +38,10 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.exceptions.JedisConnectionException; -import com.netflix.dynomitemanager.backup.SnapshotBackup; -import com.netflix.dynomitemanager.identity.InstanceIdentity; import com.netflix.dynomitemanager.InstanceState; import com.netflix.dynomitemanager.sidecore.IConfiguration; import com.netflix.dynomitemanager.sidecore.ICredential; +import com.netflix.dynomitemanager.identity.InstanceIdentity; import com.netflix.dynomitemanager.sidecore.scheduler.SimpleTimer; import com.netflix.dynomitemanager.sidecore.scheduler.Task; import com.netflix.dynomitemanager.sidecore.scheduler.TaskTimer; @@ -58,35 +51,38 @@ import com.netflix.dynomitemanager.sidecore.scheduler.CronTimer.DayOfWeek; /** - * Task for taking snapshots to S3 + * Task for taking snapshots */ @Singleton -public class SnapshotBackup extends Task +public class SnapshotTask extends Task { - public static final String TaskName = "SnapshotBackup"; - private static final Logger logger = LoggerFactory.getLogger(SnapshotBackup.class); + public static final String TaskName = "SnapshotTask"; + private static final Logger logger = LoggerFactory.getLogger(SnapshotTask.class); private final ThreadSleeper sleeper = new ThreadSleeper(); private final ICredential cred; private final InstanceIdentity iid; private final InstanceState state; private final IStorageProxy storageProxy; + private final Backup backup; private final int storageRetries = 5; + @Inject - public SnapshotBackup(IConfiguration config, InstanceIdentity id, ICredential cred, InstanceState state, - IStorageProxy storageProxy) + public SnapshotTask(IConfiguration config, InstanceIdentity id, ICredential cred, InstanceState state, + IStorageProxy storageProxy, Backup backup) { super(config); this.cred = cred; this.iid = id; this.state = state; this.storageProxy = storageProxy; + this.backup = backup; } public void execute() throws Exception { - + this.state.setFirstBackup(false); if(!state.isRestoring() && !state.isBootstrapping()){ /** Iterate five times until storage (Redis) is ready. * We need storage to be ready to dumb the data, @@ -102,6 +98,12 @@ public void execute() throws Exception } else{ this.state.setBackingup(true); + /** + * Set the status of the backup to false every time we start a backup. + * This will ensure that prior to backup we recapture the status of the backup. + */ + this.state.setBackUpStatus(false); + // the storage proxy takes a snapshot or compacts data boolean snapshot = this.storageProxy.takeSnapshot(); File file = null; @@ -113,7 +115,13 @@ public void execute() throws Exception } // upload the data to S3 if (file.length() > 0 && snapshot == true) { - if(uploadToS3(file)){ + DateTime now = DateTime.now(); + DateTime todayStart = now.withTimeAtStartOfDay(); + this.state.setBackupTime(todayStart); + + + if(this.backup.upload(file, todayStart)){ + this.state.setBackUpStatus(true); logger.info("S3 backup status: Completed!"); } else{ @@ -158,67 +166,7 @@ public static TaskTimer getTimer(IConfiguration config) } return new CronTimer(hour, 1, 0); - } - - - /** - * Uses the Amazon S3 API to upload the AOF/RDB to S3 - * Filename: Backup location + DC + Rack + App + Token - */ - private boolean uploadToS3(File file) - { - logger.info("Snapshot backup: sending " + file.length() + " bytes to S3"); - DateTime now = DateTime.now(); - DateTime todayStart = now.withTimeAtStartOfDay(); - - /* Key name is comprised of the - * backupDir + DC + Rack + token + Date - */ - String keyName = - config.getBackupLocation() + "/" + - iid.getInstance().getDatacenter() + "/" + - iid.getInstance().getRack() + "/" + - iid.getInstance().getToken() + "/" + - todayStart.getMillis(); - - // Get bucket location. - logger.info("Key in Bucket: " + keyName); - logger.info("S3 Bucket Name:" + config.getBucketName()); - - AmazonS3Client s3Client = new AmazonS3Client(this.cred.getAwsCredentialProvider()); - try { - // Checking if the S3 bucket exists, and if does not, then we create it - if(!(s3Client.doesBucketExist(config.getBucketName()))) { - logger.error("Bucket with name: " + config.getBucketName() + " does not exist"); - return false; - } - else { - logger.info("Uploading data to S3\n"); - - s3Client.putObject(new PutObjectRequest( - config.getBucketName(), keyName, file)); - return true; - } - } catch (AmazonServiceException ase) { - - logger.error("AmazonServiceException;" + - " request made it to Amazon S3, but was rejected with an error "); - logger.error("Error Message: " + ase.getMessage()); - logger.error("HTTP Status Code: " + ase.getStatusCode()); - logger.error("AWS Error Code: " + ase.getErrorCode()); - logger.error("Error Type: " + ase.getErrorType()); - logger.error("Request ID: " + ase.getRequestId()); - return false; - - } catch (AmazonClientException ace) { - logger.error("AmazonClientException;"+ - " the client encountered " + - "an internal error while trying to " + - "communicate with S3, "); - logger.error("Error Message: " + ace.getMessage()); - return false; - } - } + } } diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/IStorageProxy.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/IStorageProxy.java index 5eade715..9a93f8eb 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/IStorageProxy.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/IStorageProxy.java @@ -15,11 +15,14 @@ */ package com.netflix.dynomitemanager.sidecore.storage; +import com.netflix.dynomitemanager.IFloridaProcess; + + public interface IStorageProxy { boolean isAlive(); long getUptime(); - boolean warmUpStorage(String[] peers); + boolean warmUpStorage(String[] peers, IFloridaProcess dynProcess); boolean resetStorage(); boolean takeSnapshot(); boolean loadingData(); diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/MemcachedStorageProxy.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/MemcachedStorageProxy.java index a5adad1d..b4afc5e7 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/MemcachedStorageProxy.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/MemcachedStorageProxy.java @@ -15,6 +15,8 @@ */ package com.netflix.dynomitemanager.sidecore.storage; +import com.netflix.dynomitemanager.IFloridaProcess; + public class MemcachedStorageProxy implements IStorageProxy { @Override @@ -28,11 +30,11 @@ public long getUptime() { } @Override - public boolean warmUpStorage(String[] peers) { - // TODO Auto-generated method stub + public boolean warmUpStorage(String[] peers, IFloridaProcess dynProcess) { return false; } + @Override public boolean resetStorage() { return true; diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/RedisStorageProxy.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/RedisStorageProxy.java index f0dc42ac..210d1bb5 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/RedisStorageProxy.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/storage/RedisStorageProxy.java @@ -15,6 +15,8 @@ */ package com.netflix.dynomitemanager.sidecore.storage; +import java.io.IOException; + import com.google.common.base.Splitter; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -25,16 +27,18 @@ import com.netflix.dynomitemanager.sidecore.IConfiguration; import com.netflix.dynomitemanager.sidecore.utils.JedisUtils; import com.netflix.dynomitemanager.sidecore.utils.Sleeper; +import com.netflix.dynomitemanager.IFloridaProcess; + import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.params.HttpMethodParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import redis.clients.jedis.Jedis; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisDataException; - import static com.netflix.dynomitemanager.defaultimpl.DynomitemanagerConfiguration.LOCAL_ADDRESS; import static com.netflix.dynomitemanager.defaultimpl.DynomitemanagerConfiguration.REDIS_PORT; @@ -42,449 +46,482 @@ //TODOs: we should talk to admin port (22222) instead of 8102 for both local and peer @Singleton public class RedisStorageProxy implements IStorageProxy { - + private static final Logger logger = LoggerFactory.getLogger(RedisStorageProxy.class); - private Jedis localJedis; - private final DynamicStringProperty adminUrl = - DynamicPropertyFactory.getInstance().getStringProperty("dynomitemanager.metrics.url", "http://localhost:22222"); - //private final HttpClient client = new HttpClient(); - - @Inject - private IConfiguration config; - - @Inject - private Sleeper sleeper; - - @Inject - private InstanceState instanceState; - - public RedisStorageProxy() { - //connect(); - } - - private void connect() { - try { - if (localJedis == null) - localJedis = new Jedis(LOCAL_ADDRESS, REDIS_PORT, 5000); - else localJedis.disconnect(); - - localJedis.connect(); - } catch (Exception e) { - logger.info("Unable to connect: " + e.getMessage()); - } - } - - + private Jedis localJedis; + private final DynamicStringProperty adminUrl = + DynamicPropertyFactory.getInstance().getStringProperty("florida.metrics.url", "http://localhost:22222"); + //private final HttpClient client = new HttpClient(); + + @Inject + private IConfiguration config; + + @Inject + private Sleeper sleeper; + + @Inject + private InstanceState instanceState; + + public RedisStorageProxy() { + //connect(); + } + + private void connect() { + try { + if (localJedis == null) + localJedis = new Jedis(LOCAL_ADDRESS, REDIS_PORT, 5000); + else localJedis.disconnect(); + + localJedis.connect(); + } catch (Exception e) { + logger.info("Unable to connect: " + e.getMessage()); + } + } + + /* private boolean isAlive(Jedis jedis) { - try { - jedis.ping(); - } catch (JedisConnectionException e) { - connect(); - return false; - } catch (Exception e) { - connect(); - return false; - } - return true; - }*/ - - //issue a 'slaveof peer port to local redis - private void startPeerSync(String peer, int port) { - boolean isDone = false; - connect(); - - while (!isDone) { - try { - //only sync from one peer for now - isDone = (localJedis.slaveof(peer, port) != null); - sleeper.sleepQuietly(1000); - } catch (JedisConnectionException e) { - connect(); - } catch (Exception e) { - connect(); - } - } - } - - //issue a 'slaveof no one' to local reids - //set dynomite to accept writes but no reads - private void stopPeerSync() { - boolean isDone = false; - - while (!isDone) { - logger.info("calling slaveof no one"); - try { - isDone = (localJedis.slaveofNoOne() != null); - sleeper.sleepQuietly(1000); - - } catch (JedisConnectionException e) { - logger.error("Error: " + e.getMessage()); - connect(); - } catch (Exception e) { - logger.error("Error: " + e.getMessage()); - connect(); - } - } - } - - @Override - public boolean takeSnapshot() - { - String scheduled = "ERR Background append only file rewriting already in progress"; - connect(); - logger.info("starting Redis BGREWRITEAOF"); - try { - - localJedis.bgrewriteaof(); - /* We want to check if a bgrewriteaof was already scheduled - * or it has started. If a bgrewriteaof was already scheduled - * then we should get an error from Redis but should continue. - * If a bgrewriteaof has started, we should also continue. - * Otherwise we may be having old data in the disk. - */ - } catch (JedisDataException e) { - if(!e.getMessage().equals(scheduled)){ - throw e; - } - logger.warn("Redis: There is already a pending BGREWRITEAOF."); - } - - String peerRedisInfo = null; - int retry = 0; - - try { - while(true) { - peerRedisInfo = localJedis.info(); - Iterable result = Splitter.on('\n').split(peerRedisInfo); - String pendingAOF = null; - - for(String line : result) { - if (line.startsWith("aof_rewrite_in_progress")) { - String[] items = line.split(":"); - pendingAOF = items[1].trim(); - if(pendingAOF.equals("0")){ - logger.info("Redis: BGREWRITEAOF completed."); - return true; - } else { - retry++; - logger.warn("Redis: BGREWRITEAOF pending. Sleeping 30 secs..."); - sleeper.sleepQuietly(30000); - - if (retry > 20) { - return false; - } - } - } - } - } - - } catch (JedisConnectionException e) { - logger.error("Cannot connect to Redis to perform BGREWRITEAOF"); - } - - logger.error("Redis BGREWRITEAOF was not succesful."); - return false; - - } - - - @Override - public boolean loadingData() - { - connect(); - logger.info("loading AOF from the drive"); - String peerRedisInfo = null; - int retry = 0; - - try { - peerRedisInfo = localJedis.info(); - Iterable result = Splitter.on('\n').split(peerRedisInfo); - String pendingAOF = null; - - for(String line : result) { - if (line.startsWith("loading")) { - String[] items = line.split(":"); - pendingAOF = items[1].trim(); - if(pendingAOF.equals("0")){ - logger.info("Redis: memory loading completed."); - return true; - } else { - retry++; - logger.warn("Redis: memory pending. Sleeping 30 secs..."); - sleeper.sleepQuietly(30000); - - if (retry > 20) { - return false; - } - } - } - } - } catch (JedisConnectionException e) { - logger.error("Cannot connect to Redis to load the AOF"); - } - - return false; - - } - - - @Override - public boolean isAlive() { - // Not using localJedis variable as it can be used by - // ProcessMonitorTask as well. - return JedisUtils.isAliveWithRetry(LOCAL_ADDRESS, REDIS_PORT); - } - - @Override - public long getUptime() { - - return 0; - } - - @Override - //probably use our Retries Util here - public boolean warmUpStorage(String[] peers) { - String alivePeer = null; - Jedis peerJedis = null; - - for(String peer : peers) { - logger.info("Peer node [" + peer + "] has the same token!" ); - peerJedis = JedisUtils.connect(peer, config.getListenerPort()); - if (peerJedis != null && isAlive()) { - alivePeer = peer; - break; - } - } - - if (alivePeer != null && peerJedis != null) { - logger.info("Issue slaveof commnd on peer[" + alivePeer + "] and port[" + REDIS_PORT + "]"); - startPeerSync(alivePeer, REDIS_PORT); - - logger.info("Force Dynomite to be in Standby mode!"); - sendCommand("/state/standby"); - - long diff = 0; - long previousDiff = 0; - short retry = 0; - short numErrors = 0; - long startTime = System.currentTimeMillis(); - - /* - * Conditions under which warmp up will end - * 1. number of Jedis errors are 5. - * 2. number of consecutive increases of offset differences (caused when client produces high load). - * 3. the difference between offsets is very small or zero (success). - * 4. warmp up takes more than 15 minutes. - */ - - while (numErrors < 5) { - sleeper.sleepQuietly(10000); - try { - diff = canPeerSyncStop(peerJedis, startTime); - } catch (Exception e) { - numErrors++; - } - - /* - * Diff meaning: - * a. diff == 0 --> we are either in sync or close to sync. - * b. diff == -1 --> that there was an error in sync process. - * c. diff == -2 --> offset is still zero, peer syncing has not started. - */ - if (diff == 0 || diff == -1 ) { - break; - } - else if (diff == -2 ) { - startTime = System.currentTimeMillis(); - } - - - - /* - * Exit conditions: - * a. retry more than 5 time continuously and if the diff is larger than the previous diff. - */ - if (previousDiff < diff) { - logger.info("Previous diff was smaller than current diff ---> Retry effort: " + retry); - retry++; - if (retry == 5){ - logger.info("Reached 5 consecutive retries, peer syncing cannot complete"); - break; - } - } - else{ - retry = 0; - } - previousDiff = diff; - } - - logger.info("Set Dynomite to allow writes only!!!"); - sendCommand("/state/writes_only"); - - logger.info("Stop Redis' Peer syncing!!!"); - stopPeerSync(); - - logger.info("Set Dynomite to resuming state to allow writes and flush delayed writes"); - sendCommand("/state/resuming"); - - //sleep 15s for the flushing to catch up - sleeper.sleepQuietly(15000); - logger.info("Set Dynomite to normal state"); - sendCommand("/state/normal"); - - peerJedis.disconnect(); - - if (diff > 0) { - logger.error("Peer sync can't finish! Something is wrong."); - return false; - } - } - - return true; - } - /** - * Resets Redis to master if it was slave due to warm up failure. - */ - @Override - public boolean resetStorage() { - logger.info("Checking if Redis needs to be resetted to master"); - connect(); - String peerRedisInfo = null; - try { - peerRedisInfo = localJedis.info(); - } catch (JedisConnectionException e) { - // Try to reconnect - try { - connect(); - peerRedisInfo = localJedis.info(); - } catch (JedisConnectionException ex) { - logger.error("Cannot connect to Redis"); - return false; - } - } - Iterable result = Splitter.on('\n').split(peerRedisInfo); - - String role = null; - - for(String line : result) { - if (line.startsWith("role")) { - String[] items = line.split(":"); - // logger.info(items[0] + ": " + items[1]); - role = items[1].trim(); - if(role.equals("slave")){ - logger.info("Redis: slave ----> master"); - stopPeerSync(); - } - return true; - } - } - - return false; - - } - - - private Long canPeerSyncStop(Jedis peerJedis, long startTime) throws RedisSyncException { - - if (System.currentTimeMillis() - startTime > config.getMaxTimeToBootstrap()) { - logger.warn("Warm up takes more than 15 minutes --> moving on"); - return (long) -1; - } - - logger.info("Checking for peer syncing"); - String peerRedisInfo = peerJedis.info(); - - Long masterOffset = -1L; - Long slaveOffset = -1L; - - //get peer's repl offset - Iterable result = Splitter.on('\n').split(peerRedisInfo); - - for(String line : result) { - if (line.startsWith("master_repl_offset")) { - String[] items = line.split(":"); - logger.info(items[0] + ": " + items[1]); - masterOffset = Long.parseLong(items[1].trim()); - - } - - //slave0:ip=10.99.160.121,port=22122,state=online,offset=17279,lag=0 - if (line.startsWith("slave0")) { - String[] items = line.split(","); - for(String item : items) { - if (item.startsWith("offset")) { - String[] offset = item.split("="); - logger.info(offset[0] + ": " + offset[1]); - slaveOffset = Long.parseLong(offset[1].trim()); - } - } - } - } - - if (slaveOffset == -1) { - logger.error("Slave offset could not be parsed --> check memory overcommit configuration"); - return (long) -1; - } - else if (slaveOffset == 0) { - logger.info("Slave offset is zero ---> Redis master node still dumps data to the disk"); - return (long) -2; - } - Long diff = Math.abs(masterOffset - slaveOffset); - - logger.info("masterOffset: " + masterOffset + " slaveOffset: " + slaveOffset + - " current Diff: " + diff + - " allowable diff: " + config.getAllowableBytesSyncDiff()); - /* - * Allowable bytes sync diff can be configured by a Fast Property. - * If the difference is very small, then we return zero. - */ - if (diff < config.getAllowableBytesSyncDiff()) { - logger.info("master and slave are in sync!"); - return (long) 0; - } - else if (slaveOffset == 0) { - logger.info("slave has not started syncing"); - } - return diff; - } - - private class RedisSyncException extends Exception { - public RedisSyncException(String msg) { - super(msg); + try { + jedis.ping(); + } catch (JedisConnectionException e) { + connect(); + return false; + } catch (Exception e) { + connect(); + return false; + } + return true; + }*/ + + //issue a 'slaveof peer port to local redis + private void startPeerSync(String peer, int port) { + boolean isDone = false; + connect(); + + while (!isDone) { + try { + //only sync from one peer for now + isDone = (localJedis.slaveof(peer, port) != null); + sleeper.sleepQuietly(1000); + } catch (JedisConnectionException e) { + connect(); + } catch (Exception e) { + connect(); } - } - - private boolean sendCommand(String cmd) { - String url = adminUrl.get() + cmd; - HttpClient client = new HttpClient(); - client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, - new DefaultHttpMethodRetryHandler()); - - GetMethod get = new GetMethod(url); - try { - int statusCode = client.executeMethod(get); - if (!(statusCode == 200)) { - logger.error("Got non 200 status code from " + url); - return false; - } - - String response = get.getResponseBodyAsString(); - //logger.info("Received response from " + url + "\n" + response); - - if (!response.isEmpty()) { - logger.info("Received response from " + url + "\n" + response); - } else { - logger.error("Cannot parse empty response from " + url); - return false; - } - - } catch (Exception e) { - logger.error("Failed to sendCommand and invoke url: " + url, e); - return false; - } - - return true; - } - -} \ No newline at end of file + } + } + + //issue a 'slaveof no one' to local reids + //set dynomite to accept writes but no reads + private void stopPeerSync() { + boolean isDone = false; + + while (!isDone) { + logger.info("calling slaveof no one"); + try { + isDone = (localJedis.slaveofNoOne() != null); + sleeper.sleepQuietly(1000); + + } catch (JedisConnectionException e) { + logger.error("JedisConnection Exception in Slave of None: " + e.getMessage()); + connect(); + } catch (Exception e) { + logger.error("Error: " + e.getMessage()); + connect(); + } + } + } + + @Override + public boolean takeSnapshot() + { + connect(); + try { + if(config.isAof()) { + logger.info("starting Redis BGREWRITEAOF"); + localJedis.bgrewriteaof(); + } + else { + logger.info("starting Redis BGSAVE"); + localJedis.bgsave(); + + } + /* We want to check if a bgrewriteaof was already scheduled + * or it has started. If a bgrewriteaof was already scheduled + * then we should get an error from Redis but should continue. + * If a bgrewriteaof has started, we should also continue. + * Otherwise we may be having old data in the disk. + */ + } catch (JedisDataException e) { + String scheduled = null; + if (!config.isAof()) { + scheduled = "ERR Background save already in progress"; + } + else { + scheduled = "ERR Background append only file rewriting already in progress"; + } + + if(!e.getMessage().equals(scheduled)){ + throw e; + } + logger.warn("Redis: There is already a pending BGREWRITEAOF/BGSAVE."); + } + + String peerRedisInfo = null; + int retry = 0; + + try { + while(true) { + peerRedisInfo = localJedis.info(); + Iterable result = Splitter.on('\n').split(peerRedisInfo); + String pendingPersistence = null; + + for(String line : result) { + if ((line.startsWith("aof_rewrite_in_progress") && config.isAof()) || + (line.startsWith("rdb_bgsave_in_progress") && !config.isAof())) { + String[] items = line.split(":"); + pendingPersistence = items[1].trim(); + if(pendingPersistence.equals("0")){ + logger.info("Redis: BGREWRITEAOF/BGSAVE completed."); + return true; + } else { + retry++; + logger.warn("Redis: BGREWRITEAOF/BGSAVE pending. Sleeping 30 secs..."); + sleeper.sleepQuietly(30000); + + if (retry > 20) { + return false; + } + } + } + } + } + + } catch (JedisConnectionException e) { + logger.error("Cannot connect to Redis to perform BGREWRITEAOF/BGSAVE"); + } + + logger.error("Redis BGREWRITEAOF/BGSAVE was not succesful."); + return false; + + } + + + @Override + public boolean loadingData() + { + connect(); + logger.info("loading AOF from the drive"); + String peerRedisInfo = null; + int retry = 0; + + try { + peerRedisInfo = localJedis.info(); + Iterable result = Splitter.on('\n').split(peerRedisInfo); + String pendingAOF = null; + + for(String line : result) { + if (line.startsWith("loading")) { + String[] items = line.split(":"); + pendingAOF = items[1].trim(); + if(pendingAOF.equals("0")){ + logger.info("Redis: memory loading completed."); + return true; + } else { + retry++; + logger.warn("Redis: memory pending. Sleeping 30 secs..."); + sleeper.sleepQuietly(30000); + + if (retry > 20) { + return false; + } + } + } + } + } catch (JedisConnectionException e) { + logger.error("Cannot connect to Redis to load the AOF"); + } + + return false; + + } + + + @Override + public boolean isAlive() { + // Not using localJedis variable as it can be used by + // ProcessMonitorTask as well. + return JedisUtils.isAliveWithRetry(LOCAL_ADDRESS, REDIS_PORT); + } + + @Override + public long getUptime() { + + return 0; + } + + @Override + //probably use our Retries Util here + public boolean warmUpStorage(String[] peers, IFloridaProcess dynProcess) { + String alivePeer = null; + Jedis peerJedis = null; + + // Identify if we can connect with the peer node + for(String peer : peers) { + logger.info("Peer node [" + peer + "] has the same token!" ); + peerJedis = JedisUtils.connect(peer, config.getListenerPort()); + if (peerJedis != null && isAlive()) { + alivePeer = peer; + break; + } + } + + // We check if the select peer is alive and we connect to it. + if (alivePeer == null) { + logger.error("Cannot connect to peer node to bootstrap"); + return false; + } + else { + logger.info("Issue slaveof command on peer[" + alivePeer + "] and port[" + REDIS_PORT + "]"); + startPeerSync(alivePeer, REDIS_PORT); + + + long diff = 0; + long previousDiff = 0; + short retry = 0; + short numErrors = 0; + long startTime = System.currentTimeMillis(); + + /* + * Conditions under which warmp up will end + * 1. number of Jedis errors are 5. + * 2. number of consecutive increases of offset differences (caused when client produces high load). + * 3. the difference between offsets is very small or zero (success). + * 4. warmp up takes more than 15 minutes. + * 5. Dynomite has started and is healthy. + */ + + while (numErrors < 5) { + // sleep 10 seconds in between checks + sleeper.sleepQuietly(10000); + try { + diff = canPeerSyncStop(peerJedis, startTime); + } catch (Exception e) { + numErrors++; + } + + /* + * Diff meaning: + * a. diff == 0 --> we are either in sync or close to sync. + * b. diff == -1 --> there was an error in sync process. + * c. diff == -2 --> offset is still zero, peer syncing has not started. + */ + if (diff == 0) { + // Since we are ready let us start dynProcess. + try { + dynProcess.start(false); + } + catch (IOException ex) { + logger.error("Dynomite failed to start"); + } + // Wait for 1 second before we check dynomite status + sleeper.sleepQuietly(1000); + dynProcess.dynomiteCheck(); + break; + } + else if (diff == -1) { + logger.error("There was an error in the warm up process - do NOT start Dynomite"); + return false; + } + else if (diff == -2 ) { + startTime = System.currentTimeMillis(); + } + + + /* + * Exit conditions: + * a. retry more than 5 time continuously and if the diff is larger than the previous diff. + */ + if (previousDiff < diff) { + logger.info("Previous diff (" + previousDiff +") was smaller than current diff (" + diff +") ---> Retry effort: " + retry); + retry++; + if (retry == 5){ + logger.info("Reached 5 consecutive retries, peer syncing cannot complete"); + break; + } + } + else{ + retry = 0; + } + previousDiff = diff; + } + + logger.info("Set Dynomite to allow writes only!!!"); + sendCommand("/state/writes_only"); + + logger.info("Stop Redis' Peer syncing!!!"); + stopPeerSync(); + + logger.info("Set Dynomite to resuming state to allow writes and flush delayed writes"); + sendCommand("/state/resuming"); + + //sleep 15s for the flushing to catch up + sleeper.sleepQuietly(15000); + logger.info("Set Dynomite to normal state"); + sendCommand("/state/normal"); + + peerJedis.disconnect(); + + if (diff > 0) { + logger.error("Peer sync can't finish! Something is wrong."); + return false; + } + } + + return true; + } + /** + * Resets Redis to master if it was slave due to warm up failure. + */ + @Override + public boolean resetStorage() { + logger.info("Checking if Redis needs to be resetted to master"); + connect(); + String peerRedisInfo = null; + try { + peerRedisInfo = localJedis.info(); + } catch (JedisConnectionException e) { + // Try to reconnect + try { + connect(); + peerRedisInfo = localJedis.info(); + } catch (JedisConnectionException ex) { + logger.error("Cannot connect to Redis"); + return false; + } + } + Iterable result = Splitter.on('\n').split(peerRedisInfo); + + String role = null; + + for(String line : result) { + if (line.startsWith("role")) { + String[] items = line.split(":"); + // logger.info(items[0] + ": " + items[1]); + role = items[1].trim(); + if(role.equals("slave")){ + logger.info("Redis: slave ----> master"); + stopPeerSync(); + } + return true; + } + } + + return false; + + } + + + private Long canPeerSyncStop(Jedis peerJedis, long startTime) throws RedisSyncException { + + if (System.currentTimeMillis() - startTime > config.getMaxTimeToBootstrap()) { + logger.warn("Warm up takes more than 15 minutes --> moving on"); + return (long) -1; + } + + logger.info("Checking for peer syncing"); + String peerRedisInfo = peerJedis.info(); + + Long masterOffset = -1L; + Long slaveOffset = -1L; + + //get peer's repl offset + Iterable result = Splitter.on('\n').split(peerRedisInfo); + + for(String line : result) { + if (line.startsWith("master_repl_offset")) { + String[] items = line.split(":"); + logger.info(items[0] + ": " + items[1]); + masterOffset = Long.parseLong(items[1].trim()); + + } + + //slave0:ip=10.99.160.121,port=22122,state=online,offset=17279,lag=0 + if (line.startsWith("slave0")) { + String[] items = line.split(","); + for(String item : items) { + if (item.startsWith("offset")) { + String[] offset = item.split("="); + logger.info(offset[0] + ": " + offset[1]); + slaveOffset = Long.parseLong(offset[1].trim()); + } + } + } + } + + if (slaveOffset == -1) { + logger.error("Slave offset could not be parsed --> check memory overcommit configuration"); + return (long) -1; + } + else if (slaveOffset == 0) { + logger.info("Slave offset is zero ---> Redis master node still dumps data to the disk"); + return (long) -2; + } + Long diff = Math.abs(masterOffset - slaveOffset); + + logger.info("masterOffset: " + masterOffset + " slaveOffset: " + slaveOffset + + " current Diff: " + diff + + " allowable diff: " + config.getAllowableBytesSyncDiff()); + /* + * Allowable bytes sync diff can be configured by a Fast Property. + * If the difference is very small, then we return zero. + */ + if (diff < config.getAllowableBytesSyncDiff()) { + logger.info("master and slave are in sync!"); + return (long) 0; + } + else if (slaveOffset == 0) { + logger.info("slave has not started syncing"); + } + return diff; + } + + private class RedisSyncException extends Exception { + public RedisSyncException(String msg) { + super(msg); + } + } + + private boolean sendCommand(String cmd) { + String url = adminUrl.get() + cmd; + HttpClient client = new HttpClient(); + client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, + new DefaultHttpMethodRetryHandler()); + + GetMethod get = new GetMethod(url); + try { + int statusCode = client.executeMethod(get); + if (!(statusCode == 200)) { + logger.error("Got non 200 status code from " + url); + return false; + } + + String response = get.getResponseBodyAsString(); + //logger.info("Received response from " + url + "\n" + response); + + if (!response.isEmpty()) { + logger.info("Received response from " + url + "\n" + response); + } else { + logger.error("Cannot parse empty response from " + url); + return false; + } + + } catch (Exception e) { + logger.error("Failed to sendCommand and invoke url: " + url, e); + return false; + } + + return true; + } + +} diff --git a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/utils/WarmBootstrapTask.java b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/utils/WarmBootstrapTask.java index 8b69a336..bee0ab53 100644 --- a/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/utils/WarmBootstrapTask.java +++ b/dynomitemanager/src/main/java/com/netflix/dynomitemanager/sidecore/utils/WarmBootstrapTask.java @@ -30,15 +30,16 @@ import com.netflix.dynomitemanager.sidecore.storage.IStorageProxy; import com.netflix.dynomitemanager.sidecore.utils.Sleeper; import com.netflix.dynomitemanager.sidecore.utils.WarmBootstrapTask; +import com.netflix.dynomitemanager.defaultimpl.StorageProcessManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.joda.time.DateTime; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import redis.clients.jedis.Jedis; @Singleton @@ -53,6 +54,9 @@ public class WarmBootstrapTask extends Task private final InstanceIdentity ii; private final InstanceState state; private final Sleeper sleeper; + + @Inject + private StorageProcessManager storageProcessMgr; @Inject public WarmBootstrapTask(IConfiguration config, IAppsInstanceFactory appsInstanceFactory, @@ -71,24 +75,35 @@ public WarmBootstrapTask(IConfiguration config, IAppsInstanceFactory appsInstanc public void execute() throws IOException { logger.info("Running warmbootstrapping ..."); + this.state.setFirstBootstrap(false); + this.state.setBootstrapTime(DateTime.now()); + + + // Just to be sure testing again if (!state.isStorageAlive()) { + // starting storage + this.storageProcessMgr.start(); logger.info("Redis is up ---> Starting warm bootstrap."); + + // setting the status to bootsraping this.state.setBootstrapping(true); - - //start dynProcess if it is not running. - this.dynProcess.start(false); - //sleep to make sure Dynomite process is up, Storage process is up. - this.sleeper.sleepQuietly(15000); + + //sleep to make sure Storage process is up. + this.sleeper.sleepQuietly(5000); - String[] peers = getPeersWithSameTokensRange(); + String[] peers = getLocalPeersWithSameTokensRange(); //try one node only for now //TODOs: if this peer is not good, try the next one until we can get the data if (peers != null && peers.length != 0) { - this.storageProxy.warmUpStorage(peers); + + // if the warm up was successful set the corresponding state + if(this.storageProxy.warmUpStorage(peers, dynProcess) == true){ + this.state.setBootstrapStatus(true); + } } else { - logger.warn("Unable to find any peer for downstreaming!!!!"); + logger.error("Unable to find any peer with the same token!"); } /* @@ -114,11 +129,11 @@ public static TaskTimer getTimer() { return new SimpleTimer(JOBNAME, 10* 60*1000); } - private String[] getPeersWithSameTokensRange() { + private String[] getLocalPeersWithSameTokensRange() { String tokens = ii.getTokens(); logger.info("Warming up node's own token(s) : " + tokens); - List instances = appsInstanceFactory.getAllIds(config.getAppName()); + List instances = appsInstanceFactory.getLocalDCIds(config.getAppName(), config.getRegion()); List peers = new ArrayList(); for(AppsInstance ins : instances) { @@ -134,3 +149,4 @@ private String[] getPeersWithSameTokensRange() { } + diff --git a/conf/dynomitemanager.properties b/dynomitemanager/src/main/resources/dynomitemanager.properties similarity index 91% rename from conf/dynomitemanager.properties rename to dynomitemanager/src/main/resources/dynomitemanager.properties index a65fb19d..eb2d869c 100644 --- a/conf/dynomitemanager.properties +++ b/dynomitemanager/src/main/resources/dynomitemanager.properties @@ -1,8 +1,8 @@ ## Application info netflix.datacenter=cloud -netflix.appinfo.statusPageUrl=http://${netflix.appinfo.hostname}:8080/Status +netflix.appinfo.statusPageUrl=http://${netflix.appinfo.hostname}:8080/REST/v1/admin/Status netflix.appinfo.homePageUrl=http://${netflix.appinfo.hostname}:8080/Status -netflix.appinfo.healthCheckUrl=http://${netflix.appinfo.hostname}:8080/REST/healthcheck +netflix.appinfo.healthCheckUrl=http://${netflix.appinfo.hostname}:8080/REST/healthchecksss netflix.appinfo.port=7001