Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Cloud produced messages fail parsing #46

Closed
grigorigoldman opened this issue Apr 23, 2015 · 7 comments
Closed

Spring Cloud produced messages fail parsing #46

grigorigoldman opened this issue Apr 23, 2015 · 7 comments

Comments

@grigorigoldman
Copy link

I'm trying to use an embedded elasticmq in my Spring Boot/Spring Cloud application but getting exceptions when sending a message:

2015-04-23 22:41:53.312 ERROR 14025 --- [t-dispatcher-13] o.e.r.s.TheSQSRestServerBuilder$$anon$1  : Exception when running routes

java.lang.Exception: Currently only handles String typed attributes
    at org.elasticmq.rest.sqs.SendMessageDirectives$$anonfun$getMessageAttributes$1.apply(SendMessageDirectives.scala:62)
    at org.elasticmq.rest.sqs.SendMessageDirectives$$anonfun$getMessageAttributes$1.apply(SendMessageDirectives.scala:53)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.immutable.Range.foreach(Range.scala:166)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.elasticmq.rest.sqs.SendMessageDirectives$class.getMessageAttributes(SendMessageDirectives.scala:53)
    at org.elasticmq.rest.sqs.TheSQSRestServerBuilder$$anon$1.getMessageAttributes(SQSRestServerBuilder.scala:89)
    at org.elasticmq.rest.sqs.SendMessageDirectives$class.doSendMessage(SendMessageDirectives.scala:72)
    at org.elasticmq.rest.sqs.TheSQSRestServerBuilder$$anon$1.doSendMessage(SQSRestServerBuilder.scala:89)
    at org.elasticmq.rest.sqs.SendMessageBatchDirectives$$anonfun$1$$anonfun$apply$1$$anonfun$2.apply(SendMessageBatchDirectives.scala:16)
    at org.elasticmq.rest.sqs.SendMessageBatchDirectives$$anonfun$1$$anonfun$apply$1$$anonfun$2.apply(SendMessageBatchDirectives.scala:15)
    at org.elasticmq.rest.sqs.BatchRequestsModule$$anonfun$2.apply(BatchRequestsModule.scala:35)
    at org.elasticmq.rest.sqs.BatchRequestsModule$$anonfun$2.apply(BatchRequestsModule.scala:31)
    at scala.collection.immutable.List.map(List.scala:273)
    at org.elasticmq.rest.sqs.BatchRequestsModule$class.batchRequest(BatchRequestsModule.scala:31)
    at org.elasticmq.rest.sqs.TheSQSRestServerBuilder$$anon$1.batchRequest(SQSRestServerBuilder.scala:89)
    at org.elasticmq.rest.sqs.SendMessageBatchDirectives$$anonfun$1$$anonfun$apply$1.apply(SendMessageBatchDirectives.scala:15)
    at org.elasticmq.rest.sqs.SendMessageBatchDirectives$$anonfun$1$$anonfun$apply$1.apply(SendMessageBatchDirectives.scala:12)
    at org.elasticmq.rest.sqs.directives.AnyParamDirectives2$$anonfun$anyParamsMap$1$$anonfun$apply$1.apply(AnyParamDirectives2.scala:13)
    at org.elasticmq.rest.sqs.directives.AnyParamDirectives2$$anonfun$anyParamsMap$1$$anonfun$apply$1.apply(AnyParamDirectives2.scala:11)
    at spray.routing.ApplyConverterInstances$$anon$22$$anonfun$apply$1.apply(ApplyConverterInstances.scala:25)
    at spray.routing.ApplyConverterInstances$$anon$22$$anonfun$apply$1.apply(ApplyConverterInstances.scala:24)
    at spray.routing.ConjunctionMagnet$$anon$1$$anon$2$$anonfun$happly$1$$anonfun$apply$1.apply(Directive.scala:38)
    at spray.routing.ConjunctionMagnet$$anon$1$$anon$2$$anonfun$happly$1$$anonfun$apply$1.apply(Directive.scala:37)
    at spray.routing.directives.BasicDirectives$$anon$1.happly(BasicDirectives.scala:26)
    at spray.routing.ConjunctionMagnet$$anon$1$$anon$2$$anonfun$happly$1.apply(Directive.scala:37)
    at spray.routing.ConjunctionMagnet$$anon$1$$anon$2$$anonfun$happly$1.apply(Directive.scala:36)
    at spray.routing.directives.BasicDirectives$$anon$2.happly(BasicDirectives.scala:79)
    at spray.routing.Directive$$anon$7$$anonfun$happly$4.apply(Directive.scala:86)
    at spray.routing.Directive$$anon$7$$anonfun$happly$4.apply(Directive.scala:86)
    at spray.routing.directives.BasicDirectives$$anon$3$$anonfun$happly$1.apply(BasicDirectives.scala:92)
    at spray.routing.directives.BasicDirectives$$anon$3$$anonfun$happly$1.apply(BasicDirectives.scala:92)
    at spray.routing.directives.BasicDirectives$$anon$3$$anonfun$happly$1.apply(BasicDirectives.scala:92)
    at spray.routing.directives.BasicDirectives$$anon$3$$anonfun$happly$1.apply(BasicDirectives.scala:92)
    at spray.routing.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$4.apply(ExecutionDirectives.scala:35)
    at spray.routing.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$4.apply(ExecutionDirectives.scala:33)
    at org.elasticmq.rest.sqs.directives.FutureDirectives$$anonfun$futureRouteToRoute$1$$anonfun$apply$1.apply(FutureDirectives.scala:14)
    at org.elasticmq.rest.sqs.directives.FutureDirectives$$anonfun$futureRouteToRoute$1$$anonfun$apply$1.apply(FutureDirectives.scala:11)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:236)
    at scala.util.Try$.apply(Try.scala:191)
    at scala.util.Success.map(Try.scala:236)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[Fatal Error] :1:1: Content is not allowed in prolog.
[Fatal Error] :1:1: Content is not allowed in prolog.

I originally thought that it was Spring incorrectly creating the message, which elasticMq was then failing to process, however, when running the same code against a real SQS service, both sending and receiving messages works.

My project code looks like this:

@SpringBootApplication
@Import(MessagingConfig.class)
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}
@Configuration
public class MessagingConfig implements InitializingBean, DisposableBean {

    private SQSRestServer sqsRestServer;

    @Value("${elasticmq.enabled:false}")
    private boolean elasticMqEnabled;

    @Autowired
    private AmazonSQSAsync client;

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
        return new QueueMessagingTemplate(amazonSqs, resourceIdResolver);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if (elasticMqEnabled) {
            sqsRestServer = SQSRestServerBuilder.start();
            sqsRestServer.waitUntilStarted();

            client.setEndpoint("http://localhost:9324");
        }
        client.createQueue("TestQueue");
    }

    @Override
    public void destroy() throws Exception {
        String queueUrl = client.getQueueUrl("TestQueue").getQueueUrl();
        client.deleteQueue(queueUrl);

        if (elasticMqEnabled) {
            sqsRestServer.stopAndWait();
        }
    }
}
@Component
public class Sender {

    @Autowired
    private QueueMessagingTemplate queueMessagingTemplate;

    public void send(MyMessage message) {
        queueMessagingTemplate.convertAndSend("TestQueue", message);
    }
}
@Component
public class GreetingEventReceiver {
    @MessageMapping("TestQueue")
    public void receiveGreetingEvent(Greeting greeting) {
        System.out.println(greeting.getMessage());
    }
}
@adamw
Copy link
Member

adamw commented May 11, 2015

Are you setting any non-String attributes on the message? (I suspect that might happen in queueMessagingTemplate.convertAndSend) Currently only String-typed attributes are supported

@sanjaybhatol
Copy link

Hi Adam,
I am passing JSON message and facing the same issue. Is there any plan in near future to provide support for JSON type ?

@sf-git
Copy link

sf-git commented Sep 1, 2015

I had the same issue. Spring Cloud uses spring-messaging which automatically adds UUID (string) and timestamp(long) to a message's headers. And those headers will be transformed into SQS message attributes. Timestamp causes this issue. As a workaround you can override doSend method in QueueMessagingTemplate and remove timestamp header from the message.

@adamw
Copy link
Member

adamw commented Sep 2, 2015

I think adding support for all message attribute types shouldn't be too hard, if somebody would like to attempt a PR :)

@sf-git
Copy link

sf-git commented Sep 2, 2015

It wasn't ) I am currently working on it. Just need to write some more tests for SQSStrict mode to check that number attribute's value is inside -10^128 .. 10^126. So expect PR really soon )

@adamw
Copy link
Member

adamw commented Sep 2, 2015

Awesome! :)

adamw added a commit that referenced this issue Sep 3, 2015
Issue #46. Spring Cloud produced messages fail parsing. Support Number MessageAttributes
@adamw
Copy link
Member

adamw commented Sep 3, 2015

Merged & released 0.8.10 - thanks!

@adamw adamw closed this as completed Sep 3, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants