Skip to content
Permalink
Browse files
Get a set of messages by id.
  • Loading branch information
Everett Toews committed Nov 25, 2013
1 parent 2e4c666 commit 3e54454a2a9d2437fa5836e8209c18c414f8554e
Showing 13 changed files with 226 additions and 41 deletions.
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.openstack.marconi.v1.binders;

import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import org.jclouds.http.HttpRequest;
import org.jclouds.rest.Binder;

import javax.inject.Singleton;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* @author Everett Toews
*/
@Singleton
public class BindIdsToQueryParam implements Binder {

@SuppressWarnings("unchecked")
@Override
public <R extends HttpRequest> R bindToRequest(R request, Object input) {
checkArgument(input instanceof Iterable<?>, "This binder is only valid for Iterable");
Iterable<String> ids = (Iterable<String>) checkNotNull(input, "Iterable of Strings");
checkArgument(Iterables.size(ids) > 0, "You must specify at least one id");

return (R) request.toBuilder().replaceQueryParam("ids", Joiner.on(',').join(ids)).build();
}
}
@@ -21,7 +21,7 @@
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import org.jclouds.openstack.marconi.v1.options.StreamOptions;
import org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions;
import org.jclouds.openstack.v2_0.domain.Link;
import org.jclouds.openstack.v2_0.domain.PaginatedCollection;

@@ -37,8 +37,8 @@ protected MessageStream(Iterable<Message> resources, Iterable<Link> links) {
*
* @return The options necessary to get the next page of messages.
*/
public StreamOptions nextStreamOptions() {
return StreamOptions.class.cast(nextMarker().get());
public StreamMessagesOptions nextStreamOptions() {
return StreamMessagesOptions.class.cast(nextMarker().get());
}

@Override
@@ -58,7 +58,7 @@ public boolean apply(Link link) {
@Override
public Object apply(Link link) {
Multimap<String, String> queryParams = queryParser().apply(link.getHref().getRawQuery());
StreamOptions paginationOptions = StreamOptions.Builder.queryParameters(queryParams);
StreamMessagesOptions paginationOptions = StreamMessagesOptions.Builder.queryParameters(queryParams);

return paginationOptions;
}
@@ -23,7 +23,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import org.jclouds.openstack.marconi.v1.options.ListQueuesOptions;
import org.jclouds.openstack.marconi.v1.options.StreamOptions;
import org.jclouds.openstack.v2_0.domain.Link;
import org.jclouds.openstack.v2_0.domain.PaginatedCollection;

@@ -17,14 +17,16 @@
package org.jclouds.openstack.marconi.v1.features;

import org.jclouds.Fallbacks;
import org.jclouds.openstack.keystone.v2_0.KeystoneFallbacks;
import org.jclouds.openstack.keystone.v2_0.filters.AuthenticateRequest;
import org.jclouds.openstack.marconi.v1.binders.BindIdsToQueryParam;
import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
import org.jclouds.openstack.marconi.v1.domain.Message;
import org.jclouds.openstack.marconi.v1.domain.MessageStream;
import org.jclouds.openstack.marconi.v1.functions.ParseMessages;
import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream;
import org.jclouds.openstack.marconi.v1.functions.ParseMessagesCreated;
import org.jclouds.openstack.marconi.v1.options.StreamOptions;
import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToList;
import org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions;
import org.jclouds.rest.annotations.BinderParam;
import org.jclouds.rest.annotations.Fallback;
import org.jclouds.rest.annotations.RequestFilters;
@@ -38,10 +40,14 @@
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import java.util.List;
import java.util.UUID;

import static org.jclouds.Fallbacks.EmptyListOnNotFoundOr404;
import static org.jclouds.openstack.keystone.v2_0.KeystoneFallbacks.EmptyPaginatedCollectionOnNotFoundOr404;

/**
* Provides access to Messages via their REST API.
*
@@ -69,12 +75,36 @@ public interface MessageApi {
MessagesCreated create(@HeaderParam("Client-ID") UUID clientId,
@BinderParam(BindToJsonPayload.class) List<CreateMessage> messages);

/**
* Streams the messages off of a queue. In a very active queue it's possible that you could continuously stream
* messages indefinitely.
*
* @param clientId A UUID for each client instance.
* @param options Options for streaming messages to your client.
*/
@Named("message:stream")
@GET
@ResponseParser(ParseMessages.class)
@ResponseParser(ParseMessagesToStream.class)
@Consumes(MediaType.APPLICATION_JSON)
@Fallback(KeystoneFallbacks.EmptyPaginatedCollectionOnNotFoundOr404.class)
@Fallback(EmptyPaginatedCollectionOnNotFoundOr404.class)
@Path("/messages")
MessageStream stream(@HeaderParam("Client-ID") UUID clientId,
StreamOptions... options);
StreamMessagesOptions... options);

/**
* List specific messages. Unlike the stream method, a client's own messages are always returned in this operation.
*
* @param clientId A UUID for each client instance.
* @param ids Specifies the IDs of the messages to get.
*/
@Named("message:list")
@GET
@ResponseParser(ParseMessagesToList.class)
@Consumes(MediaType.APPLICATION_JSON)
@Path("/messages")
@Fallback(EmptyListOnNotFoundOr404.class)
List<Message> list(@HeaderParam("Client-ID") UUID clientId,
@BinderParam(BindIdsToQueryParam.class) Iterable<String> ids);

// TODO: list by claim id when claim API done
}
@@ -110,7 +110,7 @@ public interface QueueApi {
/**
* Use this method to manually page through the list of queues.
*/
@Named("record:list")
@Named("queue:list")
@GET
@ResponseParser(ParseQueues.class)
@Consumes(MediaType.APPLICATION_JSON)
@@ -26,7 +26,7 @@
import java.util.List;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID;
import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;

/**
* This parses the messages created on a queue.
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.openstack.marconi.v1.functions;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.jclouds.http.HttpResponse;
import org.jclouds.http.functions.ParseJson;
import org.jclouds.openstack.marconi.v1.domain.Message;

import javax.inject.Inject;
import java.util.List;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Lists.newArrayList;
import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.MessageWithHref;
import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE;

/**
* @author Everett Toews
*/
public class ParseMessagesToList implements Function<HttpResponse, List<Message>> {

private final ParseJson<List<MessageWithHref>> json;

@Inject
ParseMessagesToList(ParseJson<List<MessageWithHref>> json) {
this.json = checkNotNull(json, "json");
}

@Override
public List<Message> apply(HttpResponse response) {
// An empty message stream has a 204 response code
if (response.getStatusCode() == 204) {
return ImmutableList.of();
}

List<MessageWithHref> messagesWithHref = json.apply(response);
return Lists.newArrayList(transform(messagesWithHref, TO_MESSAGE));
}
}
@@ -19,7 +19,6 @@
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.jclouds.http.HttpResponse;
import org.jclouds.http.functions.ParseJson;
import org.jclouds.openstack.marconi.v1.domain.Message;
@@ -29,19 +28,18 @@

import javax.inject.Inject;
import java.beans.ConstructorProperties;
import java.util.List;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* @author Everett Toews
*/
public class ParseMessages implements Function<HttpResponse, MessageStream> {
public class ParseMessagesToStream implements Function<HttpResponse, MessageStream> {

private final ParseJson<MessagesWithHref> json;

@Inject
ParseMessages(ParseJson<MessagesWithHref> json) {
ParseMessagesToStream(ParseJson<MessagesWithHref> json) {
this.json = checkNotNull(json, "json");
}

@@ -93,7 +91,7 @@ protected MessagesWithHref(Iterable<MessageWithHref> messagesWithHref, Iterable<
}
}

private static class MessageWithHref extends Message {
protected static class MessageWithHref extends Message {

@ConstructorProperties({ "href", "ttl", "body", "age" })
protected MessageWithHref(String href, int ttl, String body, int age) {
@@ -25,7 +25,7 @@
import org.jclouds.openstack.marconi.v1.domain.QueueStats;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID;
import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;

/**
* This parses the stats of a queue.
@@ -67,7 +67,7 @@ public ListQueuesOptions detailed(boolean detailed) {
}

/**
* @return The String representation of the marker for these StreamOptions.
* @return The String representation of the marker for these StreamMessagesOptions.
*/
public String getMarker() {
return Iterables.getOnlyElement(queryParameters.get("marker"));
@@ -25,15 +25,15 @@
/**
* Options used to control the messages returned in the response.
*/
public class StreamOptions extends PaginationOptions {
public class StreamMessagesOptions extends PaginationOptions {

public static final StreamOptions NONE = new StreamOptions();
public static final StreamMessagesOptions NONE = new StreamMessagesOptions();

/**
* {@inheritDoc}
*/
@Override
public StreamOptions queryParameters(Multimap<String, String> queryParams) {
public StreamMessagesOptions queryParameters(Multimap<String, String> queryParams) {
checkNotNull(queryParams, "queryParams");
queryParameters.putAll(queryParams);
return this;
@@ -43,7 +43,7 @@ public StreamOptions queryParameters(Multimap<String, String> queryParams) {
* @see Builder#marker(String)
*/
@Override
public StreamOptions marker(String marker) {
public StreamMessagesOptions marker(String marker) {
super.marker(marker);
return this;
}
@@ -52,7 +52,7 @@ public StreamOptions marker(String marker) {
* @see Builder#limit(int)
*/
@Override
public StreamOptions limit(int limit) {
public StreamMessagesOptions limit(int limit) {
super.limit(limit);
return this;

@@ -61,13 +61,13 @@ public StreamOptions limit(int limit) {
/**
* @see Builder#echo(boolean)
*/
public StreamOptions echo(boolean echo) {
public StreamMessagesOptions echo(boolean echo) {
queryParameters.put("echo", Boolean.toString(echo));
return this;
}

/**
* @return The String representation of the marker for these StreamOptions.
* @return The String representation of the marker for these StreamMessagesOptions.
*/
public String getMarker() {
return Iterables.getOnlyElement(queryParameters.get("marker"));
@@ -77,8 +77,8 @@ public static class Builder {
/**
* @see PaginationOptions#queryParameters(Multimap)
*/
public static StreamOptions queryParameters(Multimap<String, String> queryParams) {
StreamOptions options = new StreamOptions();
public static StreamMessagesOptions queryParameters(Multimap<String, String> queryParams) {
StreamMessagesOptions options = new StreamMessagesOptions();
return options.queryParameters(queryParams);
}

@@ -90,8 +90,8 @@ public static StreamOptions queryParameters(Multimap<String, String> queryParams
* Clients should make no assumptions about the format or length of the marker. Furthermore, clients should assume
* that there is no relationship between markers and message IDs.
*/
public static StreamOptions marker(String marker) {
StreamOptions options = new StreamOptions();
public static StreamMessagesOptions marker(String marker) {
StreamMessagesOptions options = new StreamMessagesOptions();
return options.marker(marker);
}

@@ -101,8 +101,8 @@ public static StreamOptions marker(String marker) {
* MessageStream#nextStreamOptions()}. Specifies up to 10 messages (the default value) to return. If you do not
* specify a value for the limit parameter, the default value of 10 is used.
*/
public static StreamOptions limit(int limit) {
StreamOptions options = new StreamOptions();
public static StreamMessagesOptions limit(int limit) {
StreamMessagesOptions options = new StreamMessagesOptions();
return options.limit(limit);
}

@@ -111,8 +111,8 @@ public static StreamOptions limit(int limit) {
* (UUID) portion of the client. If you do not specify a value, echo uses the default value of false. If you are
* experimenting with the API, you might want to set echo=true in order to see the messages that you posted.
*/
public static StreamOptions echo(boolean echo) {
StreamOptions options = new StreamOptions();
public static StreamMessagesOptions echo(boolean echo) {
StreamMessagesOptions options = new StreamMessagesOptions();
return options.echo(echo);
}
}

0 comments on commit 3e54454

Please sign in to comment.