Skip to content

Commit b6fda22

Browse files
author
priteela
committed
Refactored to support multiple listeners
1 parent 6637f2e commit b6fda22

14 files changed

+262
-21
lines changed

src/main/java/mu/integration/consumer/rabbitmq/DemoApplication.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
import org.springframework.boot.SpringApplication;
55
import org.springframework.boot.autoconfigure.SpringBootApplication;
66
import org.springframework.cloud.stream.annotation.EnableBinding;
7-
import org.springframework.cloud.stream.messaging.Processor;
87

9-
@EnableBinding(Processor.class)
8+
import mu.integration.consumer.rabbitmq.binder.TransportProcessor;
9+
10+
@EnableBinding(TransportProcessor.class)
1011
@SpringBootApplication
1112
public class DemoApplication {
1213

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package mu.integration.consumer.rabbitmq.binder;
2+
3+
import org.springframework.cloud.stream.annotation.Input;
4+
import org.springframework.cloud.stream.annotation.Output;
5+
import org.springframework.messaging.MessageChannel;
6+
import org.springframework.messaging.SubscribableChannel;
7+
8+
/**
9+
* Bindable interface with separate input and output channels for cruise.
10+
*
11+
* @author Priteela
12+
* @see org.springframework.cloud.stream.annotation.EnableBinding
13+
*/
14+
public interface CruiseProcessor {
15+
16+
String CRUISE_OUTPUT = "cruiseOutput";
17+
String CRUISE_INPUT = "cruiseInput";
18+
19+
@Output(CRUISE_OUTPUT)
20+
MessageChannel cruiseOutput();
21+
22+
@Input(CRUISE_INPUT)
23+
SubscribableChannel cruiseInput();
24+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package mu.integration.consumer.rabbitmq.binder;
2+
3+
import org.springframework.cloud.stream.annotation.Input;
4+
import org.springframework.cloud.stream.annotation.Output;
5+
import org.springframework.messaging.MessageChannel;
6+
import org.springframework.messaging.SubscribableChannel;
7+
8+
/**
9+
* Bindable interface with separate input and output channels for flight
10+
*
11+
* @author Priteela
12+
* @see org.springframework.cloud.stream.annotation.EnableBinding
13+
*/
14+
public interface FlightProcessor {
15+
16+
String FLIGHT_OUTPUT = "flightOutput";
17+
String FLIGHT_INPUT = "flightInput";
18+
19+
@Output(FLIGHT_OUTPUT)
20+
MessageChannel flightOutput();
21+
22+
@Input(FLIGHT_INPUT)
23+
SubscribableChannel flightInput();
24+
25+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package mu.integration.consumer.rabbitmq.binder;
2+
3+
import org.springframework.cloud.stream.annotation.Input;
4+
import org.springframework.cloud.stream.annotation.Output;
5+
import org.springframework.messaging.MessageChannel;
6+
import org.springframework.messaging.SubscribableChannel;
7+
8+
/**
9+
* Bindable interface with separate input and output channels for flight and cruise respectively.
10+
*
11+
* @author Priteela
12+
* @see org.springframework.cloud.stream.annotation.EnableBinding
13+
*/
14+
public interface TransportProcessor extends CruiseProcessor, FlightProcessor {
15+
String INTEGRATION_RESPONSE_OUTPUT = "integrationResponseOutput";
16+
String INTEGRATION_RESPONSE_INPUT = "integrationResponseInput";
17+
18+
@Output(INTEGRATION_RESPONSE_OUTPUT)
19+
MessageChannel integrationResponseOutput();
20+
21+
@Input(INTEGRATION_RESPONSE_INPUT)
22+
SubscribableChannel integrationResponseInput();
23+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package mu.integration.consumer.rabbitmq.constant;
2+
3+
/**
4+
* Enumerator to indicate the status of the file import
5+
*
6+
* @author Priteela ,ganesh
7+
*/
8+
public enum FileStatus {
9+
10+
/**
11+
* SUCCESS fileStatus enum
12+
*/
13+
SUCCESS,
14+
15+
/**
16+
* FAILURE fileStatus enum
17+
*/
18+
FAILURE,
19+
20+
/**
21+
* PENDING VALIDATION fileStatus enum
22+
*/
23+
PENDING_VALIDATION;
24+
25+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package mu.integration.consumer.rabbitmq.constant;
2+
3+
/**
4+
* Enumerator to indicate the possible transport types
5+
*
6+
* @author Priteela
7+
*/
8+
public enum TransportType {
9+
10+
FLIGHT,
11+
12+
CRUISE;
13+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package mu.integration.consumer.rabbitmq.dto;
2+
3+
import java.io.Serializable;
4+
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
8+
9+
/**
10+
* View object for CSV Import Status
11+
*
12+
* @author Priteela
13+
*/
14+
@Data
15+
@NoArgsConstructor
16+
public class TransportIntegrationResultVO implements Serializable {
17+
18+
private String fileName;
19+
// private List<Set<String>> errorMessageList;
20+
private long numberRecordUpload;
21+
private long numberRecordSuccess;
22+
private long numberRecordFail;
23+
}
Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package mu.integration.consumer.rabbitmq.dto;
22

33
import java.io.Serializable;
4+
import java.util.Set;
45

56
import lombok.AllArgsConstructor;
67
import lombok.Data;
78
import lombok.NoArgsConstructor;
9+
import mu.integration.consumer.rabbitmq.constant.FileStatus;
10+
import mu.integration.consumer.rabbitmq.constant.TransportType;
811

912
/**
1013
*
@@ -14,12 +17,20 @@
1417
@AllArgsConstructor
1518
@NoArgsConstructor
1619
public class TransportIntegrationVO implements Serializable {
20+
1721
private String id;
1822
private String line;
1923
private String fileName;
20-
private String status;
24+
private FileStatus status;
25+
private TransportType transportType;
26+
private Set<String> errorMessage;
2127

28+
/**
29+
* Method to concat line and list of error message
30+
*
31+
* @return string
32+
*/
2233
public String toCsv() {
23-
return this.id + "," + this.line + "," + this.status + "," + this.fileName;
34+
return this.line + "," + this.errorMessage;
2435
}
2536
}

src/main/java/mu/integration/consumer/rabbitmq/listener/TransportConsumer.java renamed to src/main/java/mu/integration/consumer/rabbitmq/listener/TransportCruiseConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package mu.integration.consumer.rabbitmq.listener;
22

33
import org.springframework.cloud.stream.annotation.StreamListener;
4-
import org.springframework.cloud.stream.messaging.Sink;
54
import org.springframework.messaging.Message;
65
import org.springframework.messaging.handler.annotation.Header;
76
import org.springframework.messaging.handler.annotation.Payload;
@@ -12,6 +11,7 @@
1211

1312
import lombok.RequiredArgsConstructor;
1413
import lombok.extern.slf4j.Slf4j;
14+
import mu.integration.consumer.rabbitmq.binder.TransportProcessor;
1515
import mu.integration.consumer.rabbitmq.dto.TransportIntegrationVO;
1616
import mu.integration.consumer.rabbitmq.service.TransportIntegrationSender;
1717
import mu.integration.consumer.rabbitmq.service.TransportIntegrationService;
@@ -24,14 +24,14 @@
2424
@RequiredArgsConstructor
2525
@Slf4j
2626
@Component
27-
public class TransportConsumer {
27+
public class TransportCruiseConsumer {
2828

2929
private final TransportIntegrationService transportIntegrationService;
3030
private final TransportIntegrationSender transportIntegrationSender;
3131
private final ObjectMapper mapper;
3232

33-
@StreamListener(Sink.INPUT)
34-
public void receiveOrder(@Header Message<String> header, @Payload TransportIntegrationVO transportIntegrationVO)
33+
@StreamListener(TransportProcessor.CRUISE_INPUT)
34+
public void processCruiseIntegration(@Header Message<String> header, @Payload TransportIntegrationVO transportIntegrationVO)
3535
throws JsonProcessingException {
3636

3737
log.info("\n\n payload received: {}", mapper.writeValueAsString(transportIntegrationVO));
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package mu.integration.consumer.rabbitmq.listener;
2+
3+
import org.springframework.cloud.stream.annotation.StreamListener;
4+
import org.springframework.messaging.Message;
5+
import org.springframework.messaging.handler.annotation.Header;
6+
import org.springframework.messaging.handler.annotation.Payload;
7+
import org.springframework.stereotype.Component;
8+
9+
import com.fasterxml.jackson.core.JsonProcessingException;
10+
import com.fasterxml.jackson.databind.ObjectMapper;
11+
12+
import lombok.RequiredArgsConstructor;
13+
import lombok.extern.slf4j.Slf4j;
14+
import mu.integration.consumer.rabbitmq.binder.TransportProcessor;
15+
import mu.integration.consumer.rabbitmq.dto.TransportIntegrationVO;
16+
import mu.integration.consumer.rabbitmq.service.TransportIntegrationSender;
17+
import mu.integration.consumer.rabbitmq.service.TransportIntegrationService;
18+
19+
/**
20+
* Receives a message from topic exchange.
21+
*
22+
* @author Priteela
23+
*/
24+
@RequiredArgsConstructor
25+
@Slf4j
26+
@Component
27+
public class TransportFlightConsumer {
28+
29+
private final TransportIntegrationService transportIntegrationService;
30+
private final TransportIntegrationSender transportIntegrationSender;
31+
private final ObjectMapper mapper;
32+
33+
@StreamListener(TransportProcessor.FLIGHT_INPUT)
34+
public void processFlightIntegration(@Header Message<String> header, @Payload TransportIntegrationVO transportIntegrationVO)
35+
throws JsonProcessingException {
36+
37+
log.info("\n\n payload received: {}", mapper.writeValueAsString(transportIntegrationVO));
38+
39+
//update csv line information
40+
TransportIntegrationVO updatedTransportIntegrationVO = transportIntegrationService.validate(transportIntegrationVO);
41+
42+
transportIntegrationSender.send(header, updatedTransportIntegrationVO);
43+
44+
}
45+
}

0 commit comments

Comments
 (0)