Skip to content

Commit

Permalink
Return HTTP 400 if content-md5 header does not match
Browse files Browse the repository at this point in the history
Fixes #596

Respect content-md5 headers on object uploads and return HTTP 400 with
appropriate error body when it does not match the calculated digest of
the object body. Also refactor the handling of content-md5 headers for
multipart part uploads so that is matches the handling for normal
object uploads.
  • Loading branch information
kellymclaughlin committed Jul 24, 2013
1 parent e346a4f commit 81d14a5
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 107 deletions.
4 changes: 2 additions & 2 deletions int_test/riak_cs_put_fsm_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ small_put() ->
Md5 = make_md5(Data),
Parts = [<<"0">>, <<"123">>, <<"45">>, <<"678">>, <<"9">>],
[riak_cs_put_fsm:augment_data(Pid, D) || D <- Parts],
{ok, Manifest} = riak_cs_put_fsm:finalize(Pid),
{ok, Manifest} = riak_cs_put_fsm:finalize(Pid, undefined),
?assert(Manifest?MANIFEST.state == active andalso
Manifest?MANIFEST.content_md5==Md5).

Expand All @@ -64,7 +64,7 @@ zero_byte_put() ->
60000, self(), RiakPid}),
Data = <<>>,
Md5 = make_md5(Data),
{ok, Manifest} = riak_cs_put_fsm:finalize(Pid),
{ok, Manifest} = riak_cs_put_fsm:finalize(Pid, undefined),
?assert(Manifest?MANIFEST.state == active andalso
Manifest?MANIFEST.content_md5==Md5).

Expand Down
2 changes: 1 addition & 1 deletion src/riak_cs_mp_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ upload_part(Bucket, Key, UploadId, PartNumber, Size, Caller, RiakcPidUnW) ->

upload_part_1blob(PutPid, Blob) ->
ok = riak_cs_put_fsm:augment_data(PutPid, Blob),
{ok, M} = riak_cs_put_fsm:finalize(PutPid),
{ok, M} = riak_cs_put_fsm:finalize(PutPid, undefined),
{ok, M?MANIFEST.content_md5}.

%% Once upon a time, in a naive land far away, I thought that it would
Expand Down
24 changes: 23 additions & 1 deletion src/riak_cs_oos_response.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
-export([api_error/3,
error_code_to_atom/1,
error_response/5,
invalid_digest_response/3,
respond/3,
status_code/1]).

Expand Down Expand Up @@ -70,6 +71,22 @@ api_error({error, Reason}, RD, Ctx) ->
error_response(StatusCode, _Code, _Message, RD, Ctx) ->
{{halt, StatusCode}, RD, Ctx}.

%% @TODO Figure out how this should actually be formatted
invalid_digest_response(ContentMd5, RD, Ctx) ->
XmlDoc = {'Error',
[
{'Code', [error_code(invalid_digest)]},
{'Message', [error_message(invalid_digest)]},
{'Content-MD5', [ContentMd5]},
{'HostId', ["host-id"]}
]},
Body = riak_cs_xml:export_xml([XmlDoc]),
UpdRD = wrq:set_resp_body(Body,
wrq:set_resp_header("Content-Type",
?XML_TYPE,
RD)),
{{halt, status_code(invalid_digest)}, UpdRD, Ctx}.

%% @doc Convert an error code string into its corresponding atom
-spec error_code_to_atom(string()) -> atom().
error_code_to_atom(ErrorCode) ->
Expand Down Expand Up @@ -130,7 +147,10 @@ error_message(malformed_policy_action) -> "Policy has invalid action";
error_message(malformed_policy_condition) -> "Policy has invalid condition";
error_message(no_such_bucket_policy) -> "The bucket policy does not exist";
error_message(no_such_upload) ->
"The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.";
"The specified upload does not exist. The upload ID may be invalid, "
"or the upload may have been aborted or completed.";
error_message(invalid_digest) ->
"The Content-MD5 you specified was invalid.";
error_message(bad_request) -> "Bad Request";
error_message(invalid_argument) -> "Invalid Argument";
error_message(unresolved_grant_email) -> "The e-mail address you provided does not match any account on record.";
Expand Down Expand Up @@ -162,6 +182,7 @@ error_code(malformed_policy_action) -> "MalformedPolicy";
error_code(malformed_policy_condition) -> "MalformedPolicy";
error_code(no_such_bucket_policy) -> "NoSuchBucketPolicy";
error_code(no_such_upload) -> "NoSuchUpload";
error_code(invalid_digest) -> "InvalidDigest";
error_code(bad_request) -> "BadRequest";
error_code(invalid_argument) -> "InvalidArgument";
error_code(invalid_range) -> "InvalidRange";
Expand Down Expand Up @@ -201,6 +222,7 @@ status_code(malformed_policy_action) -> 400;
status_code(malformed_policy_condition) -> 400;
status_code(no_such_bucket_policy) -> 404;
status_code(no_such_upload) -> 404;
status_code(invalid_digest) -> 400;
status_code(bad_request) -> 400;
status_code(invalid_argument) -> 400;
status_code(unresolved_grant_email) -> 400;
Expand Down
107 changes: 64 additions & 43 deletions src/riak_cs_put_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
get_uuid/1,
augment_data/2,
block_written/2,
finalize/1,
finalize/2,
force_stop/1]).

%% gen_fsm callbacks
Expand Down Expand Up @@ -59,6 +59,7 @@
caller :: reference(),
uuid :: binary(),
md5 :: binary(),
reported_md5 :: undefined | string(),
reply_pid :: {pid(), reference()},
mani_pid :: undefined | pid(),
riakc_pid :: pid(),
Expand Down Expand Up @@ -115,8 +116,8 @@ get_uuid(Pid) ->
augment_data(Pid, Data) ->
gen_fsm:sync_send_event(Pid, {augment_data, Data}, infinity).

finalize(Pid) ->
gen_fsm:sync_send_event(Pid, finalize, infinity).
finalize(Pid, ContentMD5) ->
gen_fsm:sync_send_event(Pid, {finalize, ContentMD5}, infinity).

force_stop(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, force_stop, infinity).
Expand Down Expand Up @@ -185,13 +186,15 @@ prepare(timeout, State) ->
NewState = prepare(State),
{next_state, not_full, NewState}.

prepare(finalize, From, State=#state{content_length=0}) ->
prepare({finalize, ContentMD5}, From, State=#state{content_length=0}) ->
NewState = prepare(State),
Md5 = crypto:md5_final(NewState#state.md5),
NewManifest = NewState#state.manifest?MANIFEST{content_md5=Md5,
state=active,
last_block_written_time=os:timestamp()},
done(finalize, From, NewState#state{md5=Md5, manifest=NewManifest});
done(finalize, From, NewState#state{md5=Md5,
manifest=NewManifest,
reported_md5=ContentMD5});

prepare({get_uuid}, _From, State) ->
{reply, State#state.uuid, prepare, State};
Expand Down Expand Up @@ -225,27 +228,17 @@ full({block_written, BlockID, WriterPid}, State=#state{reply_pid=Waiter}) ->

all_received({augment_data, <<>>}, State) ->
{next_state, all_received, State};
all_received({block_written, BlockID, WriterPid}, State=#state{mani_pid=ManiPid,
timer_ref=TimerRef}) ->

all_received({block_written, BlockID, WriterPid}, State) ->
NewState = state_from_block_written(BlockID, WriterPid, State),
Manifest = NewState#state.manifest?MANIFEST{state=active},
NewState2 = NewState#state{manifest=Manifest},
case ordsets:size(NewState#state.unacked_writes) of
0 ->
case State#state.reply_pid of
undefined ->
{next_state, done, NewState};
ReplyPid ->
%% reply with the final manifest
_ = erlang:cancel_timer(TimerRef),
case maybe_update_manifest_with_confirmation(ManiPid, Manifest) of
ok ->
gen_fsm:reply(ReplyPid, {ok, Manifest}),
{stop, normal, NewState};
Error ->
gen_fsm:reply(ReplyPid, {error, Error}),
{stop, Error, NewState}
end
done(finalize, ReplyPid, NewState2)
end;
_ ->
{next_state, all_received, NewState}
Expand Down Expand Up @@ -282,25 +275,53 @@ not_full({augment_data, NewData}, From,

all_received({augment_data, <<>>}, _From, State) ->
{next_state, all_received, State};
all_received(finalize, From, State) ->
all_received({finalize, ContentMD5}, From, State) ->
%% 1. stash the From pid into our
%% state so that we know to reply
%% later with the finished manifest
{next_state, all_received, State#state{reply_pid=From}}.

done(finalize, _From, State=#state{manifest=Manifest,
mani_pid=ManiPid,
timer_ref=TimerRef}) ->
%% 1. reply immediately
%% with the finished manifest
{next_state, all_received, State#state{reply_pid=From,
reported_md5=ContentMD5}}.

done({finalize, ReportedMD5}, _From, State=#state{md5=MD5}) ->
done(finalize, is_digest_valid(MD5, ReportedMD5), _From, State);
done(finalize, _From, State=#state{md5=MD5,
reported_md5=ReportedMD5}) ->
done(finalize, is_digest_valid(MD5, ReportedMD5), _From, State).


done(finalize, false, From, State=#state{manifest=Manifest,
bucket=Bucket,
key=Key,
mani_pid=ManiPid,
riakc_pid=RiakPid,
timer_ref=TimerRef}) ->
_ = erlang:cancel_timer(TimerRef),
%% The digest is invalid. Write the manifest and immediately
%% schedule it for gc.
_ = maybe_update_manifest_with_confirmation(ManiPid, Manifest),
_ = riak_cs_gc:gc_active_manifests(Bucket, Key, RiakPid),
gen_fsm:reply(From, {error, invalid_digest}),
{stop, invalid_digest, State};
done(finalize, true, From, State=#state{manifest=Manifest,
mani_pid=ManiPid,
timer_ref=TimerRef}) ->
%% 1. reply immediately with the finished manifest
_ = erlang:cancel_timer(TimerRef),
case maybe_update_manifest_with_confirmation(ManiPid, Manifest) of
ok ->
{stop, normal, {ok, Manifest}, State};
gen_fsm:reply(From, {ok, Manifest}),
{stop, normal, State};
Error ->
{stop, Error, {error, Error}, State}
gen_fsm:reply(From, {error, Error}),
{stop, Error, State}
end.

-spec is_digest_valid(binary(), undefined | string()) -> boolean().
is_digest_valid(_, undefined) ->
true;
is_digest_valid(CalculatedMD5, ReportedMD5) ->
base64:encode(CalculatedMD5) =:= list_to_binary(ReportedMD5).

%%--------------------------------------------------------------------
%%
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -380,7 +401,7 @@ prepare(State=#state{bucket=Bucket,
[];
_ ->
riak_cs_block_server:start_block_servers(RiakPid,
riak_cs_lfs_utils:put_concurrency())
riak_cs_lfs_utils:put_concurrency())
end,
FreeWriters = ordsets:from_list(WriterPids),
MaxBufferSize = (riak_cs_lfs_utils:put_fsm_buffer_size_factor() * BlockSize),
Expand All @@ -389,17 +410,17 @@ prepare(State=#state{bucket=Bucket,
ClusterID = riak_cs_config:cluster_id(RiakPid),
Manifest =
riak_cs_lfs_utils:new_manifest(Bucket,
Key,
UUID,
ContentLength,
ContentType,
%% we don't know the md5 yet
undefined,
Metadata,
BlockSize,
Acl,
[],
ClusterID),
Key,
UUID,
ContentLength,
ContentType,
%% we don't know the md5 yet
undefined,
Metadata,
BlockSize,
Acl,
[],
ClusterID),
NewManifest = Manifest?MANIFEST{write_start_time=os:timestamp()},

%% TODO:
Expand Down Expand Up @@ -531,10 +552,10 @@ handle_accept_chunk(NewData, State=#state{buffer_queue=BufferQueue,
NewRemainderData = combine_new_and_remainder_data(NewData, RemainderData),
UpdatedBytesReceived = PreviousBytesReceived + size(NewData),
if UpdatedBytesReceived > ContentLength ->
exit({too_many_bytes_received, got, UpdatedBytesReceived,
content_length, ContentLength});
exit({too_many_bytes_received, got, UpdatedBytesReceived,
content_length, ContentLength});
true ->
ok
ok
end,
NewCurrentBufferSize = CurrentBufferSize + size(NewData),

Expand Down
2 changes: 1 addition & 1 deletion src/riak_cs_s3_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ calculate_signature(KeyData, RD) ->
Date,
AmazonHeaders,
Resource],
lager:debug("STS: ~p", [STS]),
_ = lager:debug("STS: ~p", [STS]),
base64:encode_to_string(
crypto:sha_mac(KeyData, STS)).

Expand Down
19 changes: 18 additions & 1 deletion src/riak_cs_s3_response.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
error_response/5,
copy_object_response/3,
no_such_upload_response/3,
invalid_digest_response/3,
error_code_to_atom/1]).

-include("riak_cs.hrl").
Expand Down Expand Up @@ -76,7 +77,10 @@ error_message(malformed_policy_condition) -> "Policy has invalid condition";
error_message(no_such_key) -> "The specified key does not exist.";
error_message(no_such_bucket_policy) -> "The specified bucket does not have a bucket policy.";
error_message(no_such_upload) ->
"The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.";
"The specified upload does not exist. The upload ID may be invalid, "
"or the upload may have been aborted or completed.";
error_message(invalid_digest) ->
"The Content-MD5 you specified was invalid.";
error_message(bad_request) -> "Bad Request";
error_message(invalid_argument) -> "Invalid Argument";
error_message(unresolved_grant_email) -> "The e-mail address you provided does not match any account on record.";
Expand Down Expand Up @@ -109,6 +113,7 @@ error_code(malformed_policy_action) -> "MalformedPolicy";
error_code(malformed_policy_condition) -> "MalformedPolicy";
error_code(no_such_bucket_policy) -> "NoSuchBucketPolicy";
error_code(no_such_upload) -> "NoSuchUpload";
error_code(invalid_digest) -> "InvalidDigest";
error_code(bad_request) -> "BadRequest";
error_code(invalid_argument) -> "InvalidArgument";
error_code(invalid_range) -> "InvalidRange";
Expand Down Expand Up @@ -150,6 +155,7 @@ status_code(malformed_policy_action) -> 400;
status_code(malformed_policy_condition) -> 400;
status_code(no_such_bucket_policy) -> 404;
status_code(no_such_upload) -> 404;
status_code(invalid_digest) -> 400;
status_code(bad_request) -> 400;
status_code(invalid_argument) -> 400;
status_code(unresolved_grant_email) -> 400;
Expand Down Expand Up @@ -220,6 +226,17 @@ no_such_upload_response(UploadId, RD, Ctx) ->
Body = riak_cs_xml:export_xml([XmlDoc]),
respond(status_code(no_such_upload), Body, RD, Ctx).

invalid_digest_response(ContentMd5, RD, Ctx) ->
XmlDoc = {'Error',
[
{'Code', [error_code(invalid_digest)]},
{'Message', [error_message(invalid_digest)]},
{'Content-MD5', [ContentMd5]},
{'HostId', ["host-id"]}
]},
Body = riak_cs_xml:export_xml([XmlDoc]),
respond(status_code(invalid_digest), Body, RD, Ctx).

%% @doc Convert an error code string into its corresponding atom
-spec error_code_to_atom(string()) -> atom().
error_code_to_atom(ErrorCode) ->
Expand Down
30 changes: 30 additions & 0 deletions src/riak_cs_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
put_with_no_meta/3,
riak_connection/0,
riak_connection/1,
safe_base64_decode/1,
safe_base64url_decode/1,
safe_list_to_integer/1,
save_user/3,
set_bucket_acl/5,
set_object_acl/5,
Expand Down Expand Up @@ -1436,3 +1439,30 @@ chunked_md5(Data, Context, ChunkSize) ->
UpdContext = crypto:md5_update(Context, Chunk),
chunked_md5(RestData, UpdContext, ChunkSize)
end.

-spec safe_base64_decode(binary() | string()) -> {ok, binary()} | bad.
safe_base64_decode(Str) ->
try
X = base64:decode(Str),
{ok, X}
catch _:_ ->
bad
end.

-spec safe_base64url_decode(binary() | string()) -> {ok, binary()} | bad.
safe_base64url_decode(Str) ->
try
X = base64url:decode(Str),
{ok, X}
catch _:_ ->
bad
end.

-spec safe_list_to_integer(string()) -> {ok, integer()} | bad.
safe_list_to_integer(Str) ->
try
X = list_to_integer(Str),
{ok, X}
catch _:_ ->
bad
end.
Loading

0 comments on commit 81d14a5

Please sign in to comment.