Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the DTP framework call the appropriate metric hooks #689

Merged
merged 1 commit into from
Apr 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,6 @@ public void exportPageFinished(
duration);
}

@Override
public void exportFinished(String dataType, String service, boolean success, Duration duration) {
monitor.debug(
() -> "Metric: exportFinished, data type: %s, service: %s, success: %s, duration: %s",
dataType,
service,
success,
duration);
}

@Override
public void importPageAttemptFinished(
String dataType,
Expand Down Expand Up @@ -107,16 +97,6 @@ public void importPageFinished(
duration);
}

@Override
public void importFinished(String dataType, String service, boolean success, Duration duration) {
monitor.debug(
() -> "Metric: importFinished, data type: %s, service: %s, success: %s, duration: %s",
dataType,
service,
success,
duration);
}

@Override
public void finishedJob(
String dataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,6 @@ public void exportPageFinished(
duration);
}

@Override
public void exportFinished(String dataType, String service, boolean success, Duration duration) {
monitor.debug(
() -> "Metric: exportFinished, data type: %s, service: %s, success: %s, duration: %s",
dataType,
service,
success,
duration);
}

@Override
public void importPageAttemptFinished(
String dataType,
Expand Down Expand Up @@ -105,16 +95,6 @@ public void importPageFinished(
duration);
}

@Override
public void importFinished(String dataType, String service, boolean success, Duration duration) {
monitor.debug(
() -> "Metric: importFinished, data type: %s, service: %s, success: %s, duration: %s",
dataType,
service,
success,
duration);
}

@Override
public void finishedJob(
String dataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,6 @@ public void exportPageFinished(
duration);
}

@Override
public void exportFinished(String dataType, String service, boolean success, Duration duration) {
monitor.debug(
() -> "Metric: exportFinished, data type: %s, service: %s, success: %s, duration: %s",
dataType,
service,
success,
duration);
}

@Override
public void importPageAttemptFinished(
String dataType,
Expand Down Expand Up @@ -106,16 +96,6 @@ public void importPageFinished(
duration);
}

@Override
public void importFinished(String dataType, String service, boolean success, Duration duration) {
monitor.debug(
() -> "Metric: importFinished, data type: %s, service: %s, success: %s, duration: %s",
dataType,
service,
success,
duration);
}

@Override
public void finishedJob(
String dataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ void exportPageAttemptFinished(
/** An attempt to export a page of data finished including all retires. **/
void exportPageFinished(String dataType, String service, boolean success, Duration duration);

/** The export of all data from a service finished. **/
void exportFinished(String dataType, String service, boolean success, Duration duration);

/** An single attempt to import a page of data finished. **/
void importPageAttemptFinished(
String dataType,
Expand All @@ -62,9 +59,6 @@ void importPageAttemptFinished(
/** An attempt to import a page of data finished including all retires. **/
void importPageFinished(String dataType, String service, boolean success, Duration duration);

/** The import of all data from a service finished. **/
void importFinished(String dataType, String service, boolean success, Duration duration);

// Metrics from {@link MetricRecorder}
void recordGenericMetric(String dataType, String service, String tag);
void recordGenericMetric(String dataType, String service, String tag, boolean bool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.datatransferproject.api.action.transfer.ReserveWorkerAction;
import org.datatransferproject.api.action.transfer.StartTransferJobAction;
import org.datatransferproject.api.auth.PortabilityAuthServiceProviderRegistry;
import org.datatransferproject.api.launcher.DtpInternalMetricRecorder;
import org.datatransferproject.api.launcher.ExtensionContext;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.api.launcher.TypeManager;
Expand Down Expand Up @@ -87,6 +88,8 @@ protected void configure() {
bind(TypeManager.class).toInstance(typeManager);
bind(JobStore.class).toInstance(jobStore);
bind(TokenManager.class).toInstance(tokenManager);
bind(DtpInternalMetricRecorder.class)
.toInstance(context.getService(DtpInternalMetricRecorder.class));

if (trustManagerFactory != null) {
bind(TrustManagerFactory.class).toInstance(trustManagerFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package org.datatransferproject.transfer;

import com.google.common.base.Stopwatch;
import com.google.inject.Provider;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.datatransferproject.api.launcher.DtpInternalMetricRecorder;
import org.datatransferproject.spi.transfer.provider.ExportResult;
import org.datatransferproject.spi.transfer.provider.Exporter;
import org.datatransferproject.types.common.ExportInformation;
import org.datatransferproject.types.transfer.auth.AuthData;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;

/**
* Callable around an {@link Exporter}.
*/
Expand All @@ -34,19 +37,36 @@ public class CallableExporter implements Callable<ExportResult> {
private UUID jobId;
private AuthData authData;
private Optional<ExportInformation> exportInformation;
private final DtpInternalMetricRecorder metricRecorder;

public CallableExporter(Provider<Exporter> exporterProvider, UUID jobId, AuthData authData,
Optional<ExportInformation> exportInformation) {
public CallableExporter(
Provider<Exporter> exporterProvider,
UUID jobId,
AuthData authData,
Optional<ExportInformation> exportInformation,
DtpInternalMetricRecorder metricRecorder) {
this.exporterProvider = exporterProvider;

this.jobId = jobId;
this.authData = authData;
this.exportInformation = exportInformation;
this.metricRecorder = metricRecorder;
}

@Override
public ExportResult call() throws Exception {
return exporterProvider.get()
.export(jobId, authData, exportInformation);
boolean success = false;
Stopwatch stopwatch = Stopwatch.createStarted();
try {
ExportResult result = exporterProvider.get().export(jobId, authData, exportInformation);
success = result.getType() != ExportResult.ResultType.ERROR;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the importer, we should probably use == ResultType.OK for the success condition here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, for export there are 3 conditions: END, CONTINUE, and ERROR, so I think we have to do != ERROR, or == with an or, so I think the != is cleaner.

return result;
} finally{
metricRecorder.exportPageAttemptFinished(
JobMetadata.getDataType(),
JobMetadata.getExportService(),
success,
stopwatch.elapsed());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package org.datatransferproject.transfer;

import com.google.common.base.Stopwatch;
import com.google.inject.Provider;
import org.datatransferproject.api.launcher.DtpInternalMetricRecorder;
import org.datatransferproject.spi.transfer.provider.IdempotentImportExecutor;
import org.datatransferproject.spi.transfer.provider.ImportResult;
import org.datatransferproject.spi.transfer.provider.Importer;
Expand All @@ -36,21 +38,38 @@ public class CallableImporter implements Callable<ImportResult> {
private final IdempotentImportExecutor idempotentImportExecutor;
private final AuthData authData;
private final DataModel data;
private final DtpInternalMetricRecorder metricRecorder;

public CallableImporter(Provider<Importer> importerProvider,
public CallableImporter(
Provider<Importer> importerProvider,
UUID jobId,
IdempotentImportExecutor idempotentImportExecutor,
AuthData authData,
DataModel data) {
DataModel data,
DtpInternalMetricRecorder metricRecorder) {
this.importerProvider = importerProvider;
this.jobId = jobId;
this.idempotentImportExecutor = idempotentImportExecutor;
this.authData = authData;
this.data = data;
this.metricRecorder = metricRecorder;
}

@Override
public ImportResult call() throws Exception {
return importerProvider.get().importItem(jobId, idempotentImportExecutor, authData, data);
boolean success = false;
Stopwatch stopwatch = Stopwatch.createStarted();
try {
ImportResult result = importerProvider.get()
.importItem(jobId, idempotentImportExecutor, authData, data);
success = result.getType() == ImportResult.ResultType.OK;
return result;
} finally{
metricRecorder.importPageAttemptFinished(
JobMetadata.getDataType(),
JobMetadata.getImportService(),
success,
stopwatch.elapsed());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@
*/
package org.datatransferproject.transfer;

import static java.lang.String.format;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.inject.Inject;
import java.io.IOException;
import java.security.PrivateKey;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.datatransferproject.api.launcher.DtpInternalMetricRecorder;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.launcher.monitor.events.EventCode;
import org.datatransferproject.spi.cloud.storage.JobStore;
Expand All @@ -38,6 +32,15 @@
import org.datatransferproject.types.common.ExportInformation;
import org.datatransferproject.types.transfer.auth.AuthData;
import org.datatransferproject.types.transfer.auth.AuthDataPair;

import javax.annotation.Nullable;
import java.io.IOException;
import java.security.PrivateKey;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

import static java.lang.String.format;
/**
* Process a job in two steps: <br>
* (1) Decrypt the stored credentials, which have been encrypted with this transfer worker's public
Expand All @@ -52,6 +55,7 @@ final class JobProcessor {
private final InMemoryDataCopier copier;
private final Set<AuthDataDecryptService> decryptServices;
private final Monitor monitor;
private final DtpInternalMetricRecorder dtpInternalMetricRecorder;

@Inject
JobProcessor(
Expand All @@ -60,13 +64,15 @@ final class JobProcessor {
ObjectMapper objectMapper,
InMemoryDataCopier copier,
Set<AuthDataDecryptService> decryptServices,
Monitor monitor) {
Monitor monitor,
DtpInternalMetricRecorder dtpInternalMetricRecorder) {
this.store = store;
this.hooks = hooks;
this.objectMapper = objectMapper;
this.copier = copier;
this.decryptServices = decryptServices;
this.monitor = monitor;
this.dtpInternalMetricRecorder = dtpInternalMetricRecorder;
}

/** Process our job, whose metadata is available via {@link JobMetadata}. */
Expand All @@ -79,6 +85,7 @@ void processJob() {

PortabilityJob job = store.findJob(jobId);
JobAuthorization jobAuthorization = job.jobAuthorization();
Stopwatch stopwatch = Stopwatch.createUnstarted();

try {
monitor.debug(
Expand Down Expand Up @@ -107,6 +114,11 @@ void processJob() {
Optional<ExportInformation> exportInfo = Optional.ofNullable(job.exportInformation());

// Copy the data
dtpInternalMetricRecorder.startedJob(
JobMetadata.getDataType(),
JobMetadata.getExportService(),
JobMetadata.getImportService());
stopwatch.start();
copier.copy(exportAuthData, importAuthData, jobId, exportInfo);
monitor.debug(() -> "Finished copy for jobId: " + jobId);
success = true;
Expand All @@ -116,6 +128,12 @@ void processJob() {
monitor.debug(() -> "Finished processing jobId: " + jobId, EventCode.WORKER_JOB_FINISHED);
markJobFinished(jobId, success);
hooks.jobFinished(jobId, success);
dtpInternalMetricRecorder.finishedJob(
JobMetadata.getDataType(),
JobMetadata.getExportService(),
JobMetadata.getImportService(),
success,
stopwatch.elapsed());
JobMetadata.reset();
}
}
Expand Down