From 37a88297dacf4e4213841a9ad9c588910e8637a9 Mon Sep 17 00:00:00 2001 From: David Dossot Date: Sat, 11 Sep 2010 16:24:44 -0700 Subject: [PATCH] added support for writing data streams to objects --- README.md | 8 +++++-- src/cferl_connection.erl | 8 +++---- src/cferl_object.erl | 34 ++++++++++++++++++++-------- test/cferl_integration_tests.erl | 39 ++++++++++++++++++++++++++++++-- 4 files changed, 72 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index ad26e66..7a762fc 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ The following, which is output when running the integration tests, demonstrates true=RefreshedContainer:is_public(). io:format("~s~n~n",[RefreshedContainer:cdn_url()]). - http://c0025308.cdn1.cloudfiles.rackspacecloud.com + http://c0025360.cdn1.cloudfiles.rackspacecloud.com 86400=RefreshedContainer:cdn_ttl(). true=RefreshedContainer:log_retention(). @@ -130,7 +130,7 @@ The following, which is output when running the integration tests, demonstrates {ok,[ObjectName]}=RefreshedContainer:get_objects_names(). {ok,[ObjectName]}=RefreshedContainer:get_objects_names(#cf_object_query_args{limit=1}). {ok,[ObjectDetails]}=RefreshedContainer:get_objects_details(). - ObjectDetails = #cf_object_details{name=<<"test.xml">>, bytes=8, last_modified={{2010,9,11},{2,56,48}}, content_type=application/xml, etag=4366c359d1a7b9b248fa262775613699} + ObjectDetails = #cf_object_details{name=<<"test.xml">>, bytes=8, last_modified={{2010,9,11},{23,22,58}}, content_type=application/xml, etag=4366c359d1a7b9b248fa262775613699} # Read the whole data {ok,<<"">>}=Object:read_data(). @@ -150,6 +150,10 @@ The following, which is output when running the integration tests, demonstrates # Get custom meta-data [{<<"Key123">>,<<"my123Value">>}]=RefreshedObject:metadata(). + # Data can be streamed from a generating function/0 returning {ok, Data} | eof + {ok,StreamedObject}=RefreshedContainer:create_object(<<"streamed.txt">>). + StreamedObject:write_data_stream(DataFun,<<"text/plain">>,[{<<"Content-Length">>,<<"1000">>}]). + # Delete the object ok=RefreshedObject:delete(). diff --git a/src/cferl_connection.erl b/src/cferl_connection.erl index 202f28e..91f9e64 100644 --- a/src/cferl_connection.erl +++ b/src/cferl_connection.erl @@ -208,7 +208,7 @@ send_storage_request(Method, PathAndQuery, Headers, Accept) send_request(StorageUrl, Method, PathAndQuery, Headers, <<>>, Accept). send_storage_request(Method, PathAndQuery, Headers, Body, Accept) - when is_atom(Method), is_list(Headers), is_binary(Body), is_atom(Accept) -> + when is_atom(Method), is_list(Headers), is_binary(Body); is_function(Body, 0), is_atom(Accept) -> send_request(StorageUrl, Method, PathAndQuery, Headers, Body, Accept). %% @hidden @@ -231,11 +231,11 @@ send_request(BaseUrl, Method, PathAndQuery, Headers, Body, Accept) send_request(BaseUrl, Method, binary_to_list(PathAndQuery), Headers, Body, Accept); send_request(BaseUrl, Method, PathAndQuery, Headers, Body, raw) - when is_atom(Method), is_list(PathAndQuery), is_list(Headers), is_binary(Body) -> + when is_atom(Method), is_list(PathAndQuery), is_list(Headers), is_binary(Body); is_function(Body, 0) -> do_send_request(BaseUrl, Method, PathAndQuery, Headers, Body); send_request(BaseUrl, Method, PathAndQuery, Headers, Body, json) - when is_atom(Method), is_list(PathAndQuery), is_list(Headers), is_binary(Body) -> + when is_atom(Method), is_list(PathAndQuery), is_list(Headers), is_binary(Body); is_function(Body, 0) -> do_send_request(BaseUrl, Method, build_json_query_string(PathAndQuery), @@ -243,7 +243,7 @@ send_request(BaseUrl, Method, PathAndQuery, Headers, Body, json) Body). do_send_request(BaseUrl, Method, PathAndQuery, Headers, Body) - when is_list(BaseUrl), is_list(PathAndQuery), is_list(Headers), is_atom(Method), is_binary(Body) -> + when is_list(BaseUrl), is_list(PathAndQuery), is_list(Headers), is_atom(Method), is_binary(Body); is_function(Body, 0) -> ibrowse:send_req(BaseUrl ++ PathAndQuery, [{"User-Agent", "cferl (CloudFiles Erlang API) v" ++ Version}, {"X-Auth-Token", AuthToken} | diff --git a/src/cferl_object.erl b/src/cferl_object.erl index e0a89d3..b524fea 100644 --- a/src/cferl_object.erl +++ b/src/cferl_object.erl @@ -14,7 +14,8 @@ -export([name/0, bytes/0, last_modified/0, content_type/0, etag/0, metadata/0, set_metadata/1, refresh/0, - read_data/0, read_data/2, write_data/2, write_data/3, + read_data/0, read_data/2, + write_data/2, write_data/3, write_data_stream/2, write_data_stream/3, delete/0]). %% @doc Name of the current object. @@ -92,6 +93,8 @@ read_data_result({ok, ResponseCode, _, ResponseBody}) read_data_result(Other) -> cferl_lib:error_result(Other). +% TODO add: read_data_stream read_data_stream(size/offset) + %% @doc Write data for the current object. %% @spec write_data(Data::binary(), ContentType::binary()) -> ok | Error %% Error = {error, invalid_content_length} | {error, mismatched_etag} | cferl_error() @@ -105,24 +108,37 @@ write_data(Data, ContentType) when is_binary(Data), is_binary(ContentType) -> write_data(Data, ContentType, RequestHeaders) when is_binary(Data), is_binary(ContentType), is_list(RequestHeaders) -> + do_write_data(Data, ContentType, RequestHeaders). + +% TODO comment +write_data_stream(DataFun, ContentType) + when is_function(DataFun, 0), is_binary(ContentType) -> + + write_data_stream(DataFun, ContentType, []). + +% TODO comment +write_data_stream(DataFun, ContentType, RequestHeaders) + when is_function(DataFun, 0), is_binary(ContentType), is_list(RequestHeaders) -> + + do_write_data(DataFun, ContentType, RequestHeaders). + +do_write_data(DataSource, ContentType, RequestHeaders) -> Result = Connection:send_storage_request(put, ObjectPath, [{"Content-Type", ContentType}|RequestHeaders], - Data, + DataSource, raw), - write_data_result(Result). + do_write_data_result(Result). -write_data_result({ok, "201", _, _}) -> +do_write_data_result({ok, "201", _, _}) -> ok; -write_data_result({ok, "412", _, _}) -> +do_write_data_result({ok, "412", _, _}) -> {error, invalid_content_length}; -write_data_result({ok, "422", _, _}) -> +do_write_data_result({ok, "422", _, _}) -> {error, mismatched_etag}; -write_data_result(Other) -> +do_write_data_result(Other) -> cferl_lib:error_result(Other). -% TODO add: read_data_stream read_data_stream(size/offset) write_data_stream - %% @doc Delete the current storage object. %% @spec delete() -> ok | Error %% Error = cferl_error() diff --git a/test/cferl_integration_tests.erl b/test/cferl_integration_tests.erl index 513e0cd..c8e8e1e 100644 --- a/test/cferl_integration_tests.erl +++ b/test/cferl_integration_tests.erl @@ -10,7 +10,7 @@ -author('David Dossot '). -include("cferl.hrl"). --export([start/0]). +-export([start/0, data_producer_loop/1]). -define(PRINT_CODE(Code), io:format(" ~s~n", [Code])). -define(PRTFM_CODE(Format, Data), ?PRINT_CODE(io_lib:format(Format, Data))). -define(PRINT_CALL(Call), @@ -205,8 +205,24 @@ container_tests(CloudFiles) -> ?PRINT_CALL([{<<"Key123">>, <<"my 123 Value">>}] = RefreshedObject:metadata()), ?PRINT_CODE(""), - % TODO test write_data_stream then read_data_stream + ?PRINT_CODE("# Data can be streamed from a generating function/0 returning {ok, Data} | eof"), + ?PRINT_CALL({ok, StreamedObject} = RefreshedContainer:create_object(<<"streamed.txt">>)), + + DataPid = spawn_data_producer(), + DataFun = + fun() -> + DataPid ! {self(), get_data}, + receive + Data -> Data + after 5000 -> eof + end + end, + + ?PRINT_CALL(StreamedObject:write_data_stream(DataFun, <<"text/plain">>, [{<<"Content-Length">>, <<"1000">>}])), + ?PRINT_CODE(""), + % TODO read_data_stream + ?PRINT_CODE("# Delete the object"), ?PRINT_CALL(ok = RefreshedObject:delete()), ?PRINT_CODE(""), @@ -216,6 +232,9 @@ container_tests(CloudFiles) -> ?PRINT_CALL(true = RefreshedContainer:object_exists(<<"photos">>)), ?PRINT_CALL(true = RefreshedContainer:object_exists(<<"photos/plants">>)), ?PRINT_CODE(""), + + % delete the streamed object + ok = RefreshedContainer:delete_object(<<"streamed.txt">>), % delete the path elements ok = RefreshedContainer:delete_object(<<"photos">>), @@ -250,3 +269,19 @@ make_new_container_name() -> ++ HostName, list_to_binary(ContainerName). +spawn_data_producer() -> + spawn(?MODULE, data_producer_loop, [0]). + +data_producer_loop(Index) -> + receive + {Pid, get_data} when Index < 10 -> + Pid ! {ok, string:copies(integer_to_list(Index), 100)}, + data_producer_loop(Index + 1); + + {Pid, get_data} -> + Pid ! eof; + + _ -> + ok + end. +