Skip to content
Permalink
Browse files
Persisting notification and events in the database. Implementing the …
…grpc service
  • Loading branch information
DImuthuUpe committed Sep 5, 2021
1 parent a33bf64 commit 501b613f1366a7b60400f25fba5c79104c161c95
Showing 14 changed files with 611 additions and 60 deletions.
@@ -12,6 +12,8 @@ outboundEventProcessor:
drmsPort: {{ datalake_drms_grpc_port }}
mftHost: "{{ mft_api_service_host }}"
mftPort: {{ mft_api_service_grpc_port }}
notificationServiceHost: "localhost"
notificationServicePort: {{ datalake_data_orch_grpc_port }}
consumer:
brokerURL: "{{ datalake_data_orch_broker_url }}"
consumerGroup: "{{ datalake_data_orch_broker_consumer_group }}"
@@ -15,6 +15,13 @@
become: yes
become_user: "{{ user }}"

- name: open firewall ports for Workflow Engine
firewalld: port="{{ item }}/tcp"
zone=public permanent=true state=enabled immediate=yes
with_items:
- "{{ workflow_manager_grpc_port }}"
become: yes

- name: Run Datalake maven build
command: mvn clean install -Dmaven.test.skip=true chdir="{{ datalake_source_dir }}/"
environment:
@@ -22,6 +22,11 @@
<artifactId>data-orchestrator-messaging</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.airavata.data.lake</groupId>
<artifactId>data-orchestrator-api-stub</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.dataorchestrator.clients.core;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationServiceGrpc;

import java.io.Closeable;
import java.io.IOException;

public class NotificationClient implements Closeable {

private final ManagedChannel channel;

public NotificationClient(String hostName, int port) {
channel = ManagedChannelBuilder.forAddress(hostName, port).usePlaintext().build();
}

public NotificationServiceGrpc.NotificationServiceBlockingStub get() {
return NotificationServiceGrpc.newBlockingStub(channel);
}

@Override
public void close() throws IOException {
if (channel != null) {
channel.shutdown();
}
}
}
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "NOTIFICATION_ENTITY")
public class NotificationEntity {

@Id
@Column(name = "NOTIFICATION_ID")
private String notificationId;

@Column(name = "RESOURCE_PATH")
private String resourcePath;

@Column(name = "RESOURCE_TYPE")
private String resourceType;

@Column(name = "OCCURED_TIME")
private long occuredTime;

@Column(name = "AUTH_TOKEN")
private String authToken;

@Column(name = "TENANT_ID")
private String tenantId;

@Column(name = "HOSTNAME")
private String hostName;

@Column(name = "BASE_PATH")
private String basePath;

@Column(name = "EVENT_TYPE")
private String eventType;

public String getNotificationId() {
return notificationId;
}

public void setNotificationId(String notificationId) {
this.notificationId = notificationId;
}

public String getResourcePath() {
return resourcePath;
}

public void setResourcePath(String resourcePath) {
this.resourcePath = resourcePath;
}

public String getResourceType() {
return resourceType;
}

public void setResourceType(String resourceType) {
this.resourceType = resourceType;
}

public long getOccuredTime() {
return occuredTime;
}

public void setOccuredTime(long occuredTime) {
this.occuredTime = occuredTime;
}

public String getAuthToken() {
return authToken;
}

public void setAuthToken(String authToken) {
this.authToken = authToken;
}

public String getTenantId() {
return tenantId;
}

public void setTenantId(String tenantId) {
this.tenantId = tenantId;
}

public String getHostName() {
return hostName;
}

public void setHostName(String hostName) {
this.hostName = hostName;
}

public String getBasePath() {
return basePath;
}

public void setBasePath(String basePath) {
this.basePath = basePath;
}

public String getEventType() {
return eventType;
}

public void setEventType(String eventType) {
this.eventType = eventType;
}
}
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification;

import javax.persistence.*;

@Entity
@Table(name = "NOTIFICATION_STATUS_ENTITY")
public class NotificationStatusEntity {

@Id
@Column(name = "STATUS_ID")
private String statusId;

@Column(name = "STATUS")
private String status;

@Column(name = "PUBLISHED_TIME")
private long publishedTime;

@Column(name = "DESCRIPTION", columnDefinition = "TEXT")
private String description;

@Column(name = "NOTIFICATION_ID")
private String notificationId;

@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "NOTIFICATION_ID", insertable=false, updatable=false)
private NotificationEntity notificationEntity;

public String getStatusId() {
return statusId;
}

public void setStatusId(String statusId) {
this.statusId = statusId;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public long getPublishedTime() {
return publishedTime;
}

public void setPublishedTime(long publishedTime) {
this.publishedTime = publishedTime;
}

public NotificationEntity getNotificationEntity() {
return notificationEntity;
}

public void setNotificationEntity(NotificationEntity notificationEntity) {
this.notificationEntity = notificationEntity;
}

public String getNotificationId() {
return notificationId;
}

public void setNotificationId(String notificationId) {
this.notificationId = notificationId;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}
}
@@ -15,14 +15,10 @@
* limitations under the License.
*/

package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;

public enum EventStatus {
DATA_ORCH_RECEIVED,
DISPATCHED_TO_WORFLOW_ENGING,
DATA_ORCH_PROCESSED_AND_SKIPPED,
MFT_CALLBACK_RECEIVED,
COMPLETED,
ERRORED,
import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationEntity;
import org.springframework.data.jpa.repository.JpaRepository;

public interface NotificationEntityRepository extends JpaRepository<NotificationEntity, String> {
}
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.datalake.orchestrator.registry.persistance.repository;

import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationStatusEntity;
import org.springframework.data.jpa.repository.JpaRepository;

public interface NotificationStatusEntityRepository extends JpaRepository<NotificationStatusEntity, String> {
}
@@ -80,6 +80,11 @@
<artifactId>data-orchestrator-messaging</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.airavata.data.lake</groupId>
<artifactId>data-orchestrator-clients-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.airavata.data.lake</groupId>
<artifactId>data-orchestrator-registry</artifactId>
@@ -165,11 +165,10 @@ public static class OutboundEventProcessorConfig {
private int pollingInterval;
private String mftHost;
private int mftPort;
private String notificationServiceHost;
private int notificationServicePort;


public OutboundEventProcessorConfig() {
}

public OutboundEventProcessorConfig() {}

public String getWorkflowEngineHost() {
return workflowEngineHost;
@@ -226,6 +225,22 @@ public int getMftPort() {
public void setMftPort(int mftPort) {
this.mftPort = mftPort;
}

public String getNotificationServiceHost() {
return notificationServiceHost;
}

public void setNotificationServiceHost(String notificationServiceHost) {
this.notificationServiceHost = notificationServiceHost;
}

public int getNotificationServicePort() {
return notificationServicePort;
}

public void setNotificationServicePort(int notificationServicePort) {
this.notificationServicePort = notificationServicePort;
}
}


0 comments on commit 501b613

Please sign in to comment.