Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send Customer.io notifications and warnings for connections being auto-disabled #11670

Merged
merged 19 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
},
"message_data": {
"email_title": "Connection Auto-Disabled",
"email_body": "Your connection from <b>%s</b> to <b>%s</b> was disabled because it failed consecutively 100 times or that there were only failed jobs in the past 14 days.<p>Please address the failing issues and re-enable the connection. The most recent attempted %s You can access its logs here: %s"
"email_body": "Your connection from <b>%s</b> to <b>%s</b> was disabled because it failed 100 times consecutively or that there were only failed jobs in the past 14 days.<p>Please address the failing issues and re-enable the connection. The most recent attempted %s You can access its logs here: %s"
},

"disable_message_retention": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
},
"message_data": {
"email_title": "Connection Auto-Disabled Warning",
"email_body": "Your connection from <b>%s</b> to <b>%s</b> is about to be disabled because it failed consecutively 50 times or that there were only failed jobs in the past 7 days. Once it has failed 100 times consecutively or has been failing for 14 days in a row, the connection will be automatically disabled.<p>Please address the failing issues and re-enable the connection. The most recent attempted %s You can access its logs here: %s"
"email_body": "Your connection from <b>%s</b> to <b>%s</b> is about to be disabled because it failed 50 times consecutively or that there were only failed jobs in the past 7 days. Once it has failed 100 times consecutively or has been failing for 14 days in a row, the connection will be automatically disabled.<p>The most recent attempted %s You can access its logs here: %s"
},

"disable_message_retention": false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.models;

import java.util.Objects;

public class JobWithStatusAndTimestamp {

private final long id;
private final JobStatus status;
private final long createdAtInSecond;
private final long updatedAtInSecond;

public JobWithStatusAndTimestamp(final long id,
final JobStatus status,
final long createdAtInSecond,
final long updatedAtInSecond) {
this.id = id;
this.status = status;
this.createdAtInSecond = createdAtInSecond;
this.updatedAtInSecond = updatedAtInSecond;
}

public long getId() {
return id;
}

public JobStatus getStatus() {
return status;
}

public long getCreatedAtInSecond() {
return createdAtInSecond;
}

public long getUpdatedAtInSecond() {
return updatedAtInSecond;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final JobWithStatusAndTimestamp jobWithStatusAndTimestamp = (JobWithStatusAndTimestamp) o;
return id == jobWithStatusAndTimestamp.id &&
status == jobWithStatusAndTimestamp.status &&
createdAtInSecond == jobWithStatusAndTimestamp.createdAtInSecond &&
updatedAtInSecond == jobWithStatusAndTimestamp.updatedAtInSecond;
}

@Override
public int hashCode() {
return Objects.hash(id, status, createdAtInSecond, updatedAtInSecond);
}

@Override
public String toString() {
return "Job{" +
"id=" + id +
", status=" + status +
", createdAtInSecond=" + createdAtInSecond +
", updatedAtInSecond=" + updatedAtInSecond +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.airbyte.scheduler.models.AttemptWithJobInfo;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.models.JobWithStatusAndTimestamp;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.Path;
Expand Down Expand Up @@ -386,18 +387,24 @@ public List<Job> listJobsWithStatus(final ConfigType configType, final JobStatus
}

@Override
public List<JobStatus> listJobStatusWithConnection(final UUID connectionId, final Set<ConfigType> configTypes, final Instant jobCreatedAtTimestamp)
public List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(final UUID connectionId,
final Set<ConfigType> configTypes,
final Instant jobCreatedAtTimestamp)
throws IOException {
final LocalDateTime timeConvertedIntoLocalDateTime = LocalDateTime.ofInstant(jobCreatedAtTimestamp, ZoneOffset.UTC);

final String JobStatusSelect = "SELECT status FROM jobs ";
final String JobStatusSelect = "SELECT id, status, created_at, updated_at FROM jobs ";
return jobDatabase.query(ctx -> ctx
.fetch(JobStatusSelect + "WHERE " +
"scope = ? AND " +
"CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + " AND " +
"created_at >= ? ORDER BY created_at DESC", connectionId.toString(), timeConvertedIntoLocalDateTime))
.stream()
.map(r -> JobStatus.valueOf(r.get("status", String.class).toUpperCase()))
.map(r -> new JobWithStatusAndTimestamp(
r.get("id", Long.class),
JobStatus.valueOf(r.get("status", String.class).toUpperCase()),
r.get("created_at", Long.class) / 1000,
r.get("updated_at", Long.class) / 1000))
.toList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.config.CustomerioNotificationConfiguration;
import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import io.airbyte.config.StandardDestinationDefinition;
Expand All @@ -26,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,20 +79,10 @@ private void notifyJob(final String reason,
try {
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
final Instant jobStartedDate = Instant.ofEpochSecond(job.getStartedAtInSecond().orElse(job.getCreatedAtInSecond()));
final DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.FULL).withZone(ZoneId.systemDefault());
final Instant jobUpdatedDate = Instant.ofEpochSecond(job.getUpdatedAtInSecond());
final Instant adjustedJobUpdatedDate = jobUpdatedDate.equals(jobStartedDate) ? Instant.now() : jobUpdatedDate;
final Duration duration = Duration.between(jobStartedDate, adjustedJobUpdatedDate);
final String durationString = formatDurationPart(duration.toDaysPart(), "day")
+ formatDurationPart(duration.toHoursPart(), "hour")
+ formatDurationPart(duration.toMinutesPart(), "minute")
+ formatDurationPart(duration.toSecondsPart(), "second");
final String sourceConnector = String.format("%s version %s", sourceDefinition.getName(), sourceDefinition.getDockerImageTag());
final String destinationConnector = String.format("%s version %s", destinationDefinition.getName(), destinationDefinition.getDockerImageTag());
final String failReason = Strings.isNullOrEmpty(reason) ? "" : String.format(", as the %s", reason);
final String jobDescription =
String.format("sync started on %s, running for%s%s.", formatter.format(jobStartedDate), durationString, failReason);
final String jobDescription = getJobDescription(job, failReason);
final String logUrl = connectionPageUrl + connectionId;
final ImmutableMap<String, Object> jobMetadata = TrackingMetadata.generateJobAttemptMetadata(job);
final ImmutableMap<String, Object> sourceMetadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
Expand All @@ -114,24 +106,31 @@ private void notifyJob(final String reason,
workspaceId,
action,
MoreMaps.merge(jobMetadata, sourceMetadata, destinationMetadata, notificationMetadata.build()));
if (FAILURE_NOTIFICATION.equals(action)) {
if (!notificationClient.notifyJobFailure(sourceConnector, destinationConnector, jobDescription, logUrl)) {
LOGGER.warn("Failed to successfully notify failure: {}", notification);
}
} else if (SUCCESS_NOTIFICATION.equals(action)) {
if (!notificationClient.notifyJobSuccess(sourceConnector, destinationConnector, jobDescription, logUrl)) {
LOGGER.warn("Failed to successfully notify success: {}", notification);
}
// alert message currently only supported by email through customer.io
} else if (CONNECTION_DISABLED_NOTIFICATION.equals(action) && notification.getNotificationType().equals(NotificationType.CUSTOMERIO)) {
if (!notificationClient.notifyConnectionDisabled(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription, logUrl)) {
LOGGER.warn("Failed to successfully notify auto-disable connection: {}", notification);
}
} else if (CONNECTION_DISABLED_WARNING_NOTIFICATION.equals(action)
&& notification.getNotificationType().equals(NotificationType.CUSTOMERIO)) {
if (!notificationClient.notifyConnectionDisabled(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription, logUrl)) {
LOGGER.warn("Failed to successfully notify auto-disable connection warning: {}", notification);
}

switch (action) {
case FAILURE_NOTIFICATION:
if (!notificationClient.notifyJobFailure(sourceConnector, destinationConnector, jobDescription, logUrl)) {
LOGGER.warn("Failed to successfully notify failure: {}", notification);
}
break;
case SUCCESS_NOTIFICATION:
if (!notificationClient.notifyJobSuccess(sourceConnector, destinationConnector, jobDescription, logUrl)) {
LOGGER.warn("Failed to successfully notify success: {}", notification);
}
break;
case CONNECTION_DISABLED_NOTIFICATION:
if (notification.getNotificationType().equals(NotificationType.CUSTOMERIO)
&& !notificationClient.notifyConnectionDisabled(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription,
logUrl)) {
LOGGER.warn("Failed to successfully notify auto-disable connection: {}", notification);
}
break;
case CONNECTION_DISABLED_WARNING_NOTIFICATION:
if (notification.getNotificationType().equals(NotificationType.CUSTOMERIO)
&& !notificationClient.notifyConnectionDisableWarning(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription,
logUrl)) {
LOGGER.warn("Failed to successfully notify auto-disable connection warning: {}", notification);
}
}
} catch (final Exception e) {
LOGGER.error("Failed to notify: {} due to an exception", notification, e);
Expand All @@ -154,6 +153,62 @@ public void notifyJobByEmail(final String reason, final String action, final Job
}
}

// This method allows for the alert to be sent without the customerio configuration set in the
// database
// This is only needed because there is no UI element to allow for users to create that
// configuration.
// Once that exists, this can be removed and we should be using `notifyJobByEmail`.
// The alert is sent to the email associated with the workspace.
public void autoDisableConnectionAlertWithoutCustomerioConfig(final String action, final Job job) {
try {
final UUID workspaceId = workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(job.getId());
final StandardWorkspace workspace = configRepository.getStandardWorkspace(workspaceId, true);

final Notification customerioNotification = new Notification()
.withNotificationType(NotificationType.CUSTOMERIO)
.withCustomerioConfiguration(new CustomerioNotificationConfiguration());
final NotificationClient notificationClient = getNotificationClient(customerioNotification);

final UUID connectionId = UUID.fromString(job.getScope());
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
final String sourceConnector = String.format("%s version %s", sourceDefinition.getName(), sourceDefinition.getDockerImageTag());
final String destinationConnector = String.format("%s version %s", destinationDefinition.getName(), destinationDefinition.getDockerImageTag());
final String logUrl = connectionPageUrl + connectionId;
final String jobDescription = getJobDescription(job, "");

switch (action) {
case CONNECTION_DISABLED_NOTIFICATION:
if (!notificationClient.notifyConnectionDisabled(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription, logUrl)) {
LOGGER.warn("Failed to successfully notify auto-disable connection: {}", customerioNotification);
}
break;
case CONNECTION_DISABLED_WARNING_NOTIFICATION:
if (!notificationClient.notifyConnectionDisableWarning(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription,
logUrl)) {
LOGGER.warn("Failed to successfully notify auto-disable connection warning: {}", customerioNotification);
}
break;
default:
LOGGER.error(
"Incorrect action supplied, this method only supports Connection Disabled Notification and Connection Disabled Warning Notification.");
}
} catch (final Exception e) {
LOGGER.error("Unable to send auto disable alert:", e);
}
}

private String getJobDescription(final Job job, final String reason) {
final Instant jobStartedDate = Instant.ofEpochSecond(job.getStartedAtInSecond().orElse(job.getCreatedAtInSecond()));
final DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.FULL).withZone(ZoneId.systemDefault());
final Instant jobUpdatedDate = Instant.ofEpochSecond(job.getUpdatedAtInSecond());
final Instant adjustedJobUpdatedDate = jobUpdatedDate.equals(jobStartedDate) ? Instant.now() : jobUpdatedDate;
final Duration duration = Duration.between(jobStartedDate, adjustedJobUpdatedDate);
final String durationString = DurationFormatUtils.formatDurationWords(duration.toMillis(), true, true);

return String.format("sync started on %s, running for %s%s.", formatter.format(jobStartedDate), durationString, reason);
}

public void failJob(final String reason, final Job job) {
notifyJob(reason, FAILURE_NOTIFICATION, job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.scheduler.models.AttemptWithJobInfo;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.models.JobWithStatusAndTimestamp;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
Expand Down Expand Up @@ -164,11 +165,14 @@ public interface JobPersistence {
* @param connectionId The ID of the connection
* @param configTypes The types of jobs
* @param jobCreatedAtTimestamp The timestamp after which you want the jobs
* @return List of job statuses from a specific connection that have attempts after the provided
* timestamp, sorted by jobs' createAt in descending order
* @return List of jobs that only include information regarding id, status, timestamps from a
* specific connection that have attempts after the provided timestamp, sorted by jobs'
* createAt in descending order
* @throws IOException
*/
List<JobStatus> listJobStatusWithConnection(UUID connectionId, Set<JobConfig.ConfigType> configTypes, Instant jobCreatedAtTimestamp)
List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID connectionId,
Set<JobConfig.ConfigType> configTypes,
Instant jobCreatedAtTimestamp)
throws IOException;

Optional<Job> getLastReplicationJob(UUID connectionId) throws IOException;
Expand Down
Loading