Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Feature Notification per Connection #10999

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 100 additions & 5 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/Notification"
$ref: "#/components/schemas/NotificationLegacy"
required: true
responses:
"200":
Expand All @@ -248,6 +248,50 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/notifications/create:
post:
tags:
- notifications
summary: >
Create a new Notification
operationId: createNotification
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/NotificationCreate"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/NotificationRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
/v1/notifications/add_connection:
post:
tags:
- notifications
summary: >
Create a link between a connection and a notification
operationId: createNotificationConnection
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/NotificationConnectionCreate"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/NotificationRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
/v1/source_definitions/create:
post:
tags:
Expand Down Expand Up @@ -1940,10 +1984,61 @@ components:
notifications:
type: array
items:
$ref: "#/components/schemas/Notification"
$ref: "#/components/schemas/NotificationLegacy"
displaySetupWizard:
type: boolean
Notification:
NotificationCreate:
type: object
required:
- workspaceId
- name
- webhookUrl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This webhookUrl requirement seems to be only a requirement if the notificationType is slack, or anything else that uses a webhook. For example, it shouldn't be required if we add a notification that utilizes customer.io for emails.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@terencecho what do you think remove webhookUrl and add a config which stores the config for Slack or other notification system?

- notificationType
properties:
workspaceId:
type: string
format: uuid
name:
type: string
webhookUrl:
type: string
defaultNotification:
type: boolean
default: false
notificationType:
$ref: "#/components/schemas/NotificationType"
tombstone:
type: boolean
default: false
NotificationConnectionCreate:
type: object
required:
- workspaceId
- notificationId
- connectionId
- sendOnSuccess
- sendOnFailure
- tombstone
properties:
workspaceId:
type: string
format: uuid
notificationId:
type: string
format: uuid
connectionId:
type: string
format: uuid
sendOnSuccess:
type: boolean
default: false
sendOnFailure:
type: boolean
default: true
tombstone:
type: boolean
default: false
NotificationLegacy:
type: object
required:
- notificationType
Expand Down Expand Up @@ -2036,7 +2131,7 @@ components:
notifications:
type: array
items:
$ref: "#/components/schemas/Notification"
$ref: "#/components/schemas/NotificationLegacy"
firstCompletedSync:
type: boolean
feedbackDone:
Expand Down Expand Up @@ -2078,7 +2173,7 @@ components:
notifications:
type: array
items:
$ref: "#/components/schemas/Notification"
$ref: "#/components/schemas/NotificationLegacy"
WorkspaceGiveFeedback:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ void testBootloaderAppBlankDb() throws Exception {
container.getPassword(),
container.getJdbcUrl()).getInitialized();
val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, this.getClass().getName());
assertEquals("0.35.40.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.5.001", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configDatabase = new ConfigsDatabaseInstance(
mockedConfigs.getConfigDatabaseUser(),
mockedConfigs.getConfigDatabasePassword(),
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.56.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.50.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ public enum ConfigSchema implements AirbyteConfig {
standardWorkspace -> standardWorkspace.getWorkspaceId().toString(),
"workspaceId"),

// notification
NOTIFICATION("Notification.yaml",
Notification.class,
Notification -> Notification.getNotificationId().toString(),
"notificationId"),

// notification connection
NOTIFICATION_CONNECTION("NotificationConnection.yaml",
NotificationConnection.class,
NotificationConnection -> NotificationConnection.getNotificationConnectionId().toString(),
"notificationConnectionId"),

// source
STANDARD_SOURCE_DEFINITION("StandardSourceDefinition.yaml",
StandardSourceDefinition.class,
Expand Down
31 changes: 21 additions & 10 deletions airbyte-config/models/src/main/resources/types/Notification.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,32 @@
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/Notification.yaml
title: Notification
description: Notification Settings
description: notification configuration
type: object
required:
- workspaceId
- name
- webhookUrl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above, regarding if we should be requiring webhooks for configs

- notificationType
additionalProperties: true
additionalProperties: false
properties:
# Instead of this type field, we would prefer a json schema "oneOf" but unfortunately,
# the jsonschema2pojo does not seem to support it yet: https://github.com/joelittlejohn/jsonschema2pojo/issues/392
workspaceId:
type: string
format: uuid
notificationId:
type: string
format: uuid
name:
type: string
webhookUrl:
type: string
notificationType:
"$ref": NotificationType.yaml
sendOnSuccess:
defaultNotification:
type: boolean
default: false
sendOnFailure:
description: if set true the webhook will be triggered to all connections
tombstone:
description:
if not set or false, the configuration is active. if true, then this
configuration is permanently off.
type: boolean
default: true
slackConfiguration:
"$ref": SlackNotificationConfiguration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/NotificationConnection.yaml
title: NotificationConnection
description: notification connection link
type: object
required:
- workspaceId
- notificationId
- sendOnSuccess
- sendOnFailure
- tombstone
additionalProperties: false
properties:
notificationConnectionId:
type: string
format: uuid
workspaceId:
type: string
format: uuid
notificationId:
type: string
format: uuid
connectionId:
type: string
format: uuid
sendOnSuccess:
type: boolean
sendOnFailure:
type: boolean
tombstone:
description:
if not set or false, the configuration is active. if true, then this
configuration is permanently off.
type: boolean
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/NotificationLegacy.yaml
title: NotificationLegacy
description: Notification Settings
type: object
required:
- notificationType
additionalProperties: true
properties:
# Instead of this type field, we would prefer a json schema "oneOf" but unfortunately,
# the jsonschema2pojo does not seem to support it yet: https://github.com/joelittlejohn/jsonschema2pojo/issues/392
notificationType:
"$ref": NotificationType.yaml
sendOnSuccess:
type: boolean
default: false
sendOnFailure:
type: boolean
default: true
slackConfiguration:
"$ref": SlackNotificationConfiguration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ properties:
notifications:
type: array
items:
"$ref": Notification.yaml
"$ref": NotificationLegacy.yaml
firstCompletedSync:
type: boolean
feedbackDone:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.Notification;
import io.airbyte.config.NotificationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
Expand Down Expand Up @@ -137,6 +139,28 @@ public void setFeedback(final UUID workflowId) throws JsonValidationException, C
persistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspace.getWorkspaceId().toString(), workspace);
}

public List<Notification> listNotifications(final boolean includeTombstone) throws JsonValidationException, IOException {

final List<Notification> notifications = new ArrayList<>();

for (final Notification notification : persistence.listConfigs(ConfigSchema.NOTIFICATION, Notification.class)) {
if (!MoreBooleans.isTruthy(notification.getTombstone()) || includeTombstone) {
notifications.add(notification);
}
}

return notifications;
}

public void writeNotification(final Notification notification) throws JsonValidationException, IOException {
persistence.writeConfig(ConfigSchema.NOTIFICATION, notification.getNotificationId().toString(), notification);
}

public void writeNotificationConnection(final NotificationConnection notificationConnection) throws JsonValidationException, IOException {
persistence.writeConfig(ConfigSchema.NOTIFICATION_CONNECTION, notificationConnection.getNotificationConnectionId().toString(),
notificationConnection);
}

public StandardSourceDefinition getStandardSourceDefinition(final UUID sourceDefinitionId)
throws JsonValidationException, IOException, ConfigNotFoundException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_OAUTH_PARAMETER;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION_OPERATION;
import static io.airbyte.db.instance.configs.jooq.Tables.NOTIFICATION_CONFIG;
import static io.airbyte.db.instance.configs.jooq.Tables.NOTIFICATION_CONNECTION;
import static io.airbyte.db.instance.configs.jooq.Tables.OPERATION;
import static io.airbyte.db.instance.configs.jooq.Tables.STATE;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE;
Expand All @@ -33,6 +35,8 @@
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.Notification;
import io.airbyte.config.NotificationConnection;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.SourceConnection;
Expand Down Expand Up @@ -762,6 +766,48 @@ private void writeStandardWorkspace(final List<StandardWorkspace> configs) throw
});
}

private void writeNotification(final List<Notification> configs) throws IOException {
database.transaction(ctx -> {
writeNotification(configs, ctx);
return null;
});
}

private void writeNotificationConnection(final List<NotificationConnection> configs) throws IOException {
database.transaction(ctx -> {
writeNotificationConnection(configs, ctx);
return null;
});
}

private void writeNotification(final List<Notification> configs, final DSLContext ctx) {
configs.forEach((Notification) -> {
ctx.insertInto(NOTIFICATION_CONFIG)
.set(NOTIFICATION_CONFIG.ID, Notification.getNotificationId())
.set(NOTIFICATION_CONFIG.WORKSPACE_ID, Notification.getWorkspaceId())
.set(NOTIFICATION_CONFIG.NAME, Notification.getName())
.set(NOTIFICATION_CONFIG.WEBHOOK_URL, Notification.getWebhookUrl())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't require webhooks, we'd need to add in something more specific similar to the current SlackNotificationConfig. Not the nicest solution since it embeds another config into this config, but would add flexibility in adding required information for new Notification_types

.set(NOTIFICATION_CONFIG.DEFAULT_NOTIFICATION, Notification.getDefaultNotification())
.set(NOTIFICATION_CONFIG.NOTIFICATION_TYPE, Notification.getNotificationType().value())
.set(NOTIFICATION_CONFIG.TOMBSTONE, Notification.getTombstone())
.execute();
});
}

private void writeNotificationConnection(final List<NotificationConnection> configs, final DSLContext ctx) {
configs.forEach((NotificationConnection) -> {
ctx.insertInto(NOTIFICATION_CONNECTION)
.set(NOTIFICATION_CONNECTION.NOTIFICATION_CONNECTION_ID, NotificationConnection.getNotificationConnectionId())
.set(NOTIFICATION_CONNECTION.WORKSPACE_ID, NotificationConnection.getWorkspaceId())
.set(NOTIFICATION_CONNECTION.NOTIFICATION_ID, NotificationConnection.getNotificationId())
.set(NOTIFICATION_CONNECTION.CONNECTION_ID, NotificationConnection.getConnectionId())
.set(NOTIFICATION_CONNECTION.SEND_ON_SUCCESS, NotificationConnection.getSendOnSuccess())
.set(NOTIFICATION_CONNECTION.SEND_ON_FAILURE, NotificationConnection.getSendOnFailure())
.set(NOTIFICATION_CONNECTION.TOMBSTONE, NotificationConnection.getTombstone())
.execute();
});
}

private void writeStandardWorkspace(final List<StandardWorkspace> configs, final DSLContext ctx) {
final OffsetDateTime timestamp = OffsetDateTime.now();
configs.forEach((standardWorkspace) -> {
Expand Down
Loading