Skip to content

Commit

Permalink
[ISSUE #4403] Fix the SPI extension admin-api loading of storage-plug…
Browse files Browse the repository at this point in the history
…ins (#4404)

* fix: remove knative circulate dependency

* feat: realize knative admin-api

* feat: realize mongodb admin-api

* feat: realize pulsar admin-api

* feat: realize pravega admin-api

* fix: deliver a config to client instead of null

* chore: revert startup plugin config
  • Loading branch information
Pil0tXia committed Aug 28, 2023
1 parent 3678c23 commit b55e6db
Show file tree
Hide file tree
Showing 14 changed files with 636 additions and 5 deletions.
Expand Up @@ -21,9 +21,6 @@ dependencies {
implementation project(":eventmesh-sdks:eventmesh-sdk-java")
testImplementation project(":eventmesh-sdks:eventmesh-sdk-java")

implementation project(":eventmesh-runtime")
testImplementation project(":eventmesh-runtime")

implementation project(":eventmesh-common")
testImplementation project(":eventmesh-common")

Expand Down
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.knative.admin;

import org.apache.eventmesh.api.admin.AbstractAdmin;
import org.apache.eventmesh.api.admin.TopicProperties;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import io.cloudevents.CloudEvent;

public class KnativeAdmin extends AbstractAdmin {

public KnativeAdmin() {
super(new AtomicBoolean(false));
}

@Override
public List<TopicProperties> getTopic() {
// TODO implement admin functions
return new ArrayList<>();
}

@Override
public void createTopic(String topicName) {
}

@Override
public void deleteTopic(String topicName) {
}

@Override
public void publish(CloudEvent cloudEvent) throws Exception {
}

@Override
public void shutdown() {
}
}
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.knative.admin;

import org.apache.eventmesh.api.admin.Admin;
import org.apache.eventmesh.api.admin.TopicProperties;

import java.util.List;
import java.util.Properties;

import io.cloudevents.CloudEvent;

public class KnativeAdminAdaptor implements Admin {

private final KnativeAdmin admin;

public KnativeAdminAdaptor() {
admin = new KnativeAdmin();
}

@Override
public boolean isStarted() {
return admin.isStarted();
}

@Override
public boolean isClosed() {
return admin.isClosed();
}

@Override
public void start() {
admin.start();
}

@Override
public void shutdown() {
admin.shutdown();
}

@Override
public void init(Properties properties) throws Exception {
admin.init(properties);
}

@Override
public List<TopicProperties> getTopic() throws Exception {
return admin.getTopic();
}

@Override
public void createTopic(String topicName) {
admin.createTopic(topicName);
}

@Override
public void deleteTopic(String topicName) {
admin.deleteTopic(topicName);
}

@Override
public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
return admin.getEvent(topicName, offset, length);
}

@Override
public void publish(CloudEvent cloudEvent) throws Exception {
admin.publish(cloudEvent);
}
}
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

knative=org.apache.eventmesh.storage.knative.admin.KnativeAdminAdaptor
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.mongodb.admin;

import org.apache.eventmesh.api.admin.AbstractAdmin;
import org.apache.eventmesh.api.admin.TopicProperties;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import io.cloudevents.CloudEvent;

public class MongodbAdmin extends AbstractAdmin {

public MongodbAdmin() {
super(new AtomicBoolean(false));
}

@Override
public List<TopicProperties> getTopic() {
// TODO implement admin functions
return new ArrayList<>();
}

@Override
public void createTopic(String topicName) {
}

@Override
public void deleteTopic(String topicName) {
}

@Override
public void publish(CloudEvent cloudEvent) throws Exception {
}

@Override
public void shutdown() {
}
}
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.mongodb.admin;

import org.apache.eventmesh.api.admin.Admin;
import org.apache.eventmesh.api.admin.TopicProperties;

import java.util.List;
import java.util.Properties;

import io.cloudevents.CloudEvent;

public class MongodbAdminAdaptor implements Admin {

private final MongodbAdmin admin;

public MongodbAdminAdaptor() {
admin = new MongodbAdmin();
}

@Override
public boolean isStarted() {
return admin.isStarted();
}

@Override
public boolean isClosed() {
return admin.isClosed();
}

@Override
public void start() {
admin.start();
}

@Override
public void shutdown() {
admin.shutdown();
}

@Override
public void init(Properties properties) throws Exception {
admin.init(properties);
}

@Override
public List<TopicProperties> getTopic() throws Exception {
return admin.getTopic();
}

@Override
public void createTopic(String topicName) {
admin.createTopic(topicName);
}

@Override
public void deleteTopic(String topicName) {
admin.deleteTopic(topicName);
}

@Override
public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
return admin.getEvent(topicName, offset, length);
}

@Override
public void publish(CloudEvent cloudEvent) throws Exception {
admin.publish(cloudEvent);
}
}
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

mongodb=org.apache.eventmesh.storage.mongodb.admin.MongodbAdminAdaptor
Expand Up @@ -19,19 +19,25 @@

import org.apache.eventmesh.api.storage.StorageResourceService;
import org.apache.eventmesh.storage.pravega.client.PravegaClient;
import org.apache.eventmesh.storage.pravega.config.PravegaStorageConfig;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PravegaStorageResourceServiceImpl implements StorageResourceService {

/**
* Unified configuration class corresponding to pravega-storage.properties
*/
private PravegaStorageConfig pravegaConnectorConfig = new PravegaStorageConfig();

@Override
public void init() throws Exception {
PravegaClient.getInstance().start();
PravegaClient.getInstance(pravegaConnectorConfig).start();
}

@Override
public void release() throws Exception {
public void release() {
PravegaClient.getInstance().shutdown();
}
}

0 comments on commit b55e6db

Please sign in to comment.