Skip to content

Commit

Permalink
Job Failure Notification (#2706)
Browse files Browse the repository at this point in the history
* Add webhook to workspace for notifications when sync fail
* API to test notifications webhook (#2809)
  • Loading branch information
ChristopheDuong committed Apr 9, 2021
1 parent 838d85d commit 03fdd65
Show file tree
Hide file tree
Showing 26 changed files with 769 additions and 19 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ TRACKING_STRATEGY=segment
# already exist on the host filesystem and MUST be parents of *_ROOT.
# Issue: https://github.com/airbytehq/airbyte/issues/577
HACK_LOCAL_ROOT_PARENT=/tmp
WEBAPP_URL=http://localhost:8000/
API_URL=http://localhost:8001/api/v1/
TEMPORAL_HOST=airbyte-temporal:7233
2 changes: 2 additions & 0 deletions .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ TRACKING_STRATEGY=logging
# already exist on the host filesystem and MUST be parents of *_ROOT.
# Issue: https://github.com/airbytehq/airbyte/issues/577
HACK_LOCAL_ROOT_PARENT=/tmp
WEBAPP_URL=http://localhost:8000/
API_URL=http://localhost:8001/api/v1/
35 changes: 35 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,29 @@ paths:
description: Workspace not found
"422":
$ref: "#/components/responses/InvalidInput"
/v1/notifications/try:
post:
tags:
- notifications
summary: Try sending a notifications
operationId: tryNotificationConfig
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/Notification"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/NotificationRead"
"404":
description: Notification Client not found
"422":
$ref: "#/components/responses/InvalidInput"
/v1/source_definitions/create:
post:
tags:
Expand Down Expand Up @@ -1265,6 +1288,18 @@ components:
- slack
# - email
# - webhook
NotificationRead:
type: object
required:
- status
properties:
status:
type: string
enum:
- succeeded
- failed
message:
type: string
WorkspaceIdRequestBody:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public interface Configs {

String getDatabaseUrl();

String getWebappUrl();

String getWorkspaceDockerMount();

String getLocalDockerMount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class EnvConfigs implements Configs {
public static final String DATABASE_USER = "DATABASE_USER";
public static final String DATABASE_PASSWORD = "DATABASE_PASSWORD";
public static final String DATABASE_URL = "DATABASE_URL";
public static final String WEBAPP_URL = "WEBAPP_URL";
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
Expand Down Expand Up @@ -114,6 +115,11 @@ public String getDatabaseUrl() {
return getEnsureEnv(DATABASE_URL);
}

@Override
public String getWebappUrl() {
return getEnsureEnv(WEBAPP_URL);
}

@Override
public String getWorkspaceDockerMount() {
return getEnvOrDefault(WORKSPACE_DOCKER_MOUNT, getWorkspaceRoot().toString());
Expand Down
8 changes: 8 additions & 0 deletions airbyte-notification/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
plugins {
id "java-library"
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-json-validation')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.notification;

import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import java.io.IOException;

public interface NotificationClient {

boolean notifyJobFailure(
String sourceConnector,
String destinationConnector,
String jobDescription,
String logUrl)
throws IOException, InterruptedException;

boolean notify(String message) throws IOException, InterruptedException;

static NotificationClient createNotificationClient(final Notification notification) {
if (notification.getNotificationType() == NotificationType.SLACK) {
return new SlackNotificationClient(notification.getSlackConfiguration());
} else {
throw new IllegalArgumentException("Unknown notification type:" + notification.getNotificationType());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.notification;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.SlackNotificationConfiguration;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Notification client that uses Slack API for Incoming Webhook to send messages.
*
* This class also reads a resource YAML file that defines the template message to send.
*
* It is stored as a YAML so that we can easily change the structure of the JSON data expected by
* the API that we are posting to (and we can write multi-line strings more easily).
*
* For example, slack API expects some text message in the { "text" : "Hello World" } field...
*/
public class SlackNotificationClient implements NotificationClient {

private static final Logger LOGGER = LoggerFactory.getLogger(SlackNotificationClient.class);

private final HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.build();
private final SlackNotificationConfiguration config;

public SlackNotificationClient(final SlackNotificationConfiguration config) {
this.config = config;
}

@Override
public boolean notifyJobFailure(String sourceConnector, String destinationConnector, String jobDescription, String logUrl)
throws IOException, InterruptedException {
return notify(renderJobData(
"failure_slack_notification_template.txt",
sourceConnector,
destinationConnector,
jobDescription,
logUrl));
}

private String renderJobData(String templateFile, String sourceConnector, String destinationConnector, String jobDescription, String logUrl)
throws IOException {
final String template = MoreResources.readResource(templateFile);
return String.format(template, sourceConnector, destinationConnector, jobDescription, logUrl);
}

@Override
public boolean notify(final String message) throws IOException, InterruptedException {
final String webhookUrl = config.getWebhook();
if (!Strings.isEmpty(webhookUrl)) {
final ImmutableMap<String, String> body = new Builder<String, String>()
.put("text", message)
.build();
final HttpRequest request = HttpRequest.newBuilder()
.POST(HttpRequest.BodyPublishers.ofString(Jsons.serialize(body)))
.uri(URI.create(webhookUrl))
.header("Content-Type", "application/json")
.build();
final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (isSuccessfulHttpResponse(response.statusCode())) {
LOGGER.info("Successful notification ({}): {}", response.statusCode(), response.body());
return true;
} else {
final String errorMessage = String.format("Failed to deliver notification (%s): %s", response.statusCode(), response.body());
throw new IOException(errorMessage);
}
}
return false;
}

/**
* Use an integer division to check successful HTTP status codes (i.e., those from 200-299), not
* just 200. https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
*/
private static boolean isSuccessfulHttpResponse(int httpStatusCode) {
return httpStatusCode / 100 == 2;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Your connection from %s to %s just failed...
This happened with %s

You can access its logs here: %s
Loading

0 comments on commit 03fdd65

Please sign in to comment.