Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Core] Support event listener for job #6419

Merged
merged 12 commits into from
Mar 28, 2024
116 changes: 116 additions & 0 deletions docs/en/concept/event-listener.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Event Listener

## Introduction

The SeaTunnel provides a rich event listening feature that allows you to manage the status at which data is synchronized.
This functionality is crucial when you need to listen job running status(`org.apache.seatunnel.api.event`).
This document will guide you through the usage of these parameters and how to leverage them effectively.

## Support Those Engines

> SeaTunnel Zeta<br/>
> Flink<br/>
> Spark<br/>

## API

The event API is defined in the `org.apache.seatunnel.api.event` package.

### Event Data API

- `org.apache.seatunnel.api.event.Event` - The interface for event data.
- `org.apache.seatunnel.api.event.EventType` - The enum for event type.

### Event Listener API

You can customize event handler, such as sending events to external systems

- `org.apache.seatunnel.api.event.EventHandler` - The interface for event handler, SPI will automatically load subclass from the classpath.

### Event Collect API

- `org.apache.seatunnel.api.source.SourceSplitEnumerator` - Attached event listener API to report events from `SourceSplitEnumerator`.

```java
package org.apache.seatunnel.api.source;

public interface SourceSplitEnumerator {

interface Context {

/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this enumerator.
*
* @return
*/
EventListener getEventListener();
}
}
```

- `org.apache.seatunnel.api.source.SourceReader` - Attached event listener API to report events from `SourceReader`.

```java
package org.apache.seatunnel.api.source;

public interface SourceReader {

interface Context {

/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this reader.
*
* @return
*/
EventListener getEventListener();
}
}
```

- `org.apache.seatunnel.api.sink.SinkWriter` - Attached event listener API to report events from `SinkWriter`.

```java
package org.apache.seatunnel.api.sink;

public interface SinkWriter {

interface Context {

/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this writer.
*
* @return
*/
EventListener getEventListener();
}
}
```

## Configuration Listener

To use the event listening feature, you need to configure engine config.

### Zeta Engine

Example config in your config file(seatunnel.yaml):

```
seatunnel:
engine:
event-report-http:
url: "http://example.com:1024/event/report"
headers:
Content-Type: application/json
```

### Flink Engine

You can define the implementation class of `org.apache.seatunnel.api.event.EventHandler` interface and add to the classpath to automatically load it through SPI.

Support flink version: 1.14.0+

Example: `org.apache.seatunnel.api.event.LoggingEventHandler`

### Spark Engine

You can define the implementation class of `org.apache.seatunnel.api.event.EventHandler` interface and add to the classpath to automatically load it through SPI.
3 changes: 2 additions & 1 deletion docs/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ const sidebars = {
"concept/connector-v2-features",
'concept/schema-feature',
'concept/JobEnvConfig',
'concept/speed-limit'
'concept/speed-limit',
'concept/event-listener'
]
},
"Connector-v2-release-state",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.event;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

@Slf4j
@AllArgsConstructor
public class DefaultEventProcessor implements EventListener, EventProcessor {
private final String jobId;
private final List<EventHandler> handlers;

public DefaultEventProcessor() {
this(DefaultEventProcessor.class.getClassLoader());
}

public DefaultEventProcessor(String jobId) {
this(jobId, EventProcessor.loadEventHandlers(DefaultEventProcessor.class.getClassLoader()));
}

public DefaultEventProcessor(ClassLoader classLoader) {
this(null, EventProcessor.loadEventHandlers(classLoader));
}

@Override
public void process(Event event) {
handlers.forEach(listener -> listener.handle(event));
}

@Override
public void onEvent(Event event) {
if (jobId != null) {
event.setJobId(jobId);
}
process(event);
}

@Override
public void close() throws Exception {
log.info("Closing event handlers.");
EventProcessor.close(handlers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.event;

import java.io.Serializable;

public interface Event extends Serializable {

long getCreatedTime();

void setJobId(String jobId);

String getJobId();

EventType getEventType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.event;

import java.io.Serializable;

public interface EventHandler extends Serializable, AutoCloseable {

/**
* Receive and handle the event data.
*
* @param event
*/
void handle(Event event);

@Override
default void close() throws Exception {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.event;

import java.io.Serializable;

public interface EventListener extends Serializable {
void onEvent(Event event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.event;

import java.util.LinkedList;
import java.util.List;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;

public interface EventProcessor extends AutoCloseable {
void process(Event event);

static List<EventHandler> loadEventHandlers(ClassLoader classLoader) {
try {
List<EventHandler> result = new LinkedList<>();
ServiceLoader.load(EventHandler.class, classLoader)
.iterator()
.forEachRemaining(result::add);
return result;
} catch (ServiceConfigurationError e) {
throw new RuntimeException("Could not load service provider for event handlers.", e);
}
}

static void close(List<EventHandler> handlers) throws Exception {
if (handlers != null) {
for (EventHandler handler : handlers) {
handler.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.event;

public enum EventType {
SCHEMA_CHANGE_ADD_COLUMN,
SCHEMA_CHANGE_DROP_COLUMN,
SCHEMA_CHANGE_MODIFY_COLUMN,
SCHEMA_CHANGE_CHANGE_COLUMN,
SCHEMA_CHANGE_UPDATE_COLUMNS,
SCHEMA_CHANGE_RENAME_TABLE,
LIFECYCLE_ENUMERATOR_OPEN,
LIFECYCLE_ENUMERATOR_CLOSE,
LIFECYCLE_READER_OPEN,
LIFECYCLE_READER_CLOSE,
LIFECYCLE_WRITER_CLOSE,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.event;

public interface LifecycleEvent extends Event {}
Loading
Loading