Skip to content

Commit

Permalink
added support for writing data streams to objects
Browse files Browse the repository at this point in the history
  • Loading branch information
ddossot committed Sep 11, 2010
1 parent 7d00230 commit 37a8829
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 17 deletions.
8 changes: 6 additions & 2 deletions README.md
Expand Up @@ -104,7 +104,7 @@ The following, which is output when running the integration tests, demonstrates
true=RefreshedContainer:is_public().

io:format("~s~n~n",[RefreshedContainer:cdn_url()]).
http://c0025308.cdn1.cloudfiles.rackspacecloud.com
http://c0025360.cdn1.cloudfiles.rackspacecloud.com

86400=RefreshedContainer:cdn_ttl().
true=RefreshedContainer:log_retention().
Expand All @@ -130,7 +130,7 @@ The following, which is output when running the integration tests, demonstrates
{ok,[ObjectName]}=RefreshedContainer:get_objects_names().
{ok,[ObjectName]}=RefreshedContainer:get_objects_names(#cf_object_query_args{limit=1}).
{ok,[ObjectDetails]}=RefreshedContainer:get_objects_details().
ObjectDetails = #cf_object_details{name=<<"test.xml">>, bytes=8, last_modified={{2010,9,11},{2,56,48}}, content_type=application/xml, etag=4366c359d1a7b9b248fa262775613699}
ObjectDetails = #cf_object_details{name=<<"test.xml">>, bytes=8, last_modified={{2010,9,11},{23,22,58}}, content_type=application/xml, etag=4366c359d1a7b9b248fa262775613699}

# Read the whole data
{ok,<<"<test/>">>}=Object:read_data().
Expand All @@ -150,6 +150,10 @@ The following, which is output when running the integration tests, demonstrates
# Get custom meta-data
[{<<"Key123">>,<<"my123Value">>}]=RefreshedObject:metadata().

# Data can be streamed from a generating function/0 returning {ok, Data} | eof
{ok,StreamedObject}=RefreshedContainer:create_object(<<"streamed.txt">>).
StreamedObject:write_data_stream(DataFun,<<"text/plain">>,[{<<"Content-Length">>,<<"1000">>}]).

# Delete the object
ok=RefreshedObject:delete().

Expand Down
8 changes: 4 additions & 4 deletions src/cferl_connection.erl
Expand Up @@ -208,7 +208,7 @@ send_storage_request(Method, PathAndQuery, Headers, Accept)
send_request(StorageUrl, Method, PathAndQuery, Headers, <<>>, Accept).

send_storage_request(Method, PathAndQuery, Headers, Body, Accept)
when is_atom(Method), is_list(Headers), is_binary(Body), is_atom(Accept) ->
when is_atom(Method), is_list(Headers), is_binary(Body); is_function(Body, 0), is_atom(Accept) ->
send_request(StorageUrl, Method, PathAndQuery, Headers, Body, Accept).

%% @hidden
Expand All @@ -231,19 +231,19 @@ send_request(BaseUrl, Method, PathAndQuery, Headers, Body, Accept)
send_request(BaseUrl, Method, binary_to_list(PathAndQuery), Headers, Body, Accept);

send_request(BaseUrl, Method, PathAndQuery, Headers, Body, raw)
when is_atom(Method), is_list(PathAndQuery), is_list(Headers), is_binary(Body) ->
when is_atom(Method), is_list(PathAndQuery), is_list(Headers), is_binary(Body); is_function(Body, 0) ->
do_send_request(BaseUrl, Method, PathAndQuery, Headers, Body);

send_request(BaseUrl, Method, PathAndQuery, Headers, Body, json)
when is_atom(Method), is_list(PathAndQuery), is_list(Headers), is_binary(Body) ->
when is_atom(Method), is_list(PathAndQuery), is_list(Headers), is_binary(Body); is_function(Body, 0) ->
do_send_request(BaseUrl,
Method,
build_json_query_string(PathAndQuery),
Headers,
Body).

do_send_request(BaseUrl, Method, PathAndQuery, Headers, Body)
when is_list(BaseUrl), is_list(PathAndQuery), is_list(Headers), is_atom(Method), is_binary(Body) ->
when is_list(BaseUrl), is_list(PathAndQuery), is_list(Headers), is_atom(Method), is_binary(Body); is_function(Body, 0) ->
ibrowse:send_req(BaseUrl ++ PathAndQuery,
[{"User-Agent", "cferl (CloudFiles Erlang API) v" ++ Version},
{"X-Auth-Token", AuthToken} |
Expand Down
34 changes: 25 additions & 9 deletions src/cferl_object.erl
Expand Up @@ -14,7 +14,8 @@

-export([name/0, bytes/0, last_modified/0, content_type/0, etag/0,
metadata/0, set_metadata/1, refresh/0,
read_data/0, read_data/2, write_data/2, write_data/3,
read_data/0, read_data/2,
write_data/2, write_data/3, write_data_stream/2, write_data_stream/3,
delete/0]).

%% @doc Name of the current object.
Expand Down Expand Up @@ -92,6 +93,8 @@ read_data_result({ok, ResponseCode, _, ResponseBody})
read_data_result(Other) ->
cferl_lib:error_result(Other).

% TODO add: read_data_stream read_data_stream(size/offset)

%% @doc Write data for the current object.
%% @spec write_data(Data::binary(), ContentType::binary()) -> ok | Error
%% Error = {error, invalid_content_length} | {error, mismatched_etag} | cferl_error()
Expand All @@ -105,24 +108,37 @@ write_data(Data, ContentType) when is_binary(Data), is_binary(ContentType) ->
write_data(Data, ContentType, RequestHeaders)
when is_binary(Data), is_binary(ContentType), is_list(RequestHeaders) ->

do_write_data(Data, ContentType, RequestHeaders).

% TODO comment
write_data_stream(DataFun, ContentType)
when is_function(DataFun, 0), is_binary(ContentType) ->

write_data_stream(DataFun, ContentType, []).

% TODO comment
write_data_stream(DataFun, ContentType, RequestHeaders)
when is_function(DataFun, 0), is_binary(ContentType), is_list(RequestHeaders) ->

do_write_data(DataFun, ContentType, RequestHeaders).

do_write_data(DataSource, ContentType, RequestHeaders) ->
Result = Connection:send_storage_request(put,
ObjectPath,
[{"Content-Type", ContentType}|RequestHeaders],
Data,
DataSource,
raw),
write_data_result(Result).
do_write_data_result(Result).

write_data_result({ok, "201", _, _}) ->
do_write_data_result({ok, "201", _, _}) ->
ok;
write_data_result({ok, "412", _, _}) ->
do_write_data_result({ok, "412", _, _}) ->
{error, invalid_content_length};
write_data_result({ok, "422", _, _}) ->
do_write_data_result({ok, "422", _, _}) ->
{error, mismatched_etag};
write_data_result(Other) ->
do_write_data_result(Other) ->
cferl_lib:error_result(Other).

% TODO add: read_data_stream read_data_stream(size/offset) write_data_stream

%% @doc Delete the current storage object.
%% @spec delete() -> ok | Error
%% Error = cferl_error()
Expand Down
39 changes: 37 additions & 2 deletions test/cferl_integration_tests.erl
Expand Up @@ -10,7 +10,7 @@
-author('David Dossot <david@dossot.net>').
-include("cferl.hrl").

-export([start/0]).
-export([start/0, data_producer_loop/1]).
-define(PRINT_CODE(Code), io:format(" ~s~n", [Code])).
-define(PRTFM_CODE(Format, Data), ?PRINT_CODE(io_lib:format(Format, Data))).
-define(PRINT_CALL(Call),
Expand Down Expand Up @@ -205,8 +205,24 @@ container_tests(CloudFiles) ->
?PRINT_CALL([{<<"Key123">>, <<"my 123 Value">>}] = RefreshedObject:metadata()),
?PRINT_CODE(""),

% TODO test write_data_stream then read_data_stream
?PRINT_CODE("# Data can be streamed from a generating function/0 returning {ok, Data} | eof"),
?PRINT_CALL({ok, StreamedObject} = RefreshedContainer:create_object(<<"streamed.txt">>)),

DataPid = spawn_data_producer(),
DataFun =
fun() ->
DataPid ! {self(), get_data},
receive
Data -> Data
after 5000 -> eof
end
end,

?PRINT_CALL(StreamedObject:write_data_stream(DataFun, <<"text/plain">>, [{<<"Content-Length">>, <<"1000">>}])),
?PRINT_CODE(""),

% TODO read_data_stream

?PRINT_CODE("# Delete the object"),
?PRINT_CALL(ok = RefreshedObject:delete()),
?PRINT_CODE(""),
Expand All @@ -216,6 +232,9 @@ container_tests(CloudFiles) ->
?PRINT_CALL(true = RefreshedContainer:object_exists(<<"photos">>)),
?PRINT_CALL(true = RefreshedContainer:object_exists(<<"photos/plants">>)),
?PRINT_CODE(""),

% delete the streamed object
ok = RefreshedContainer:delete_object(<<"streamed.txt">>),

% delete the path elements
ok = RefreshedContainer:delete_object(<<"photos">>),
Expand Down Expand Up @@ -250,3 +269,19 @@ make_new_container_name() ->
++ HostName,
list_to_binary(ContainerName).

spawn_data_producer() ->
spawn(?MODULE, data_producer_loop, [0]).

data_producer_loop(Index) ->
receive
{Pid, get_data} when Index < 10 ->
Pid ! {ok, string:copies(integer_to_list(Index), 100)},
data_producer_loop(Index + 1);

{Pid, get_data} ->
Pid ! eof;

_ ->
ok
end.

0 comments on commit 37a8829

Please sign in to comment.