Skip to content

Commit

Permalink
Close underlying stream after process failure in close (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhz2 committed Mar 25, 2024
1 parent 710dff1 commit 3be4361
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
13 changes: 8 additions & 5 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,14 @@ end

function Base.close(stream::TranscodingStream)
mode = stream.state.mode
if mode != :panic
changemode!(stream, :close)
end
if !stream.state.stop_on_end
close(stream.stream)
try
if mode != :panic
changemode!(stream, :close)
end
finally
if !stream.state.stop_on_end
close(stream.stream)
end
end
return nothing
end
Expand Down
33 changes: 27 additions & 6 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ function TranscodingStreams.process(
codec :: DoubleFrameEncoder,
input :: TranscodingStreams.Memory,
output :: TranscodingStreams.Memory,
error :: TranscodingStreams.Error,
error_ref :: TranscodingStreams.Error,
)
if input.size == 0
codec.got_stop_msg[] = true
end

if output.size < 2
error[] = ErrorException("requires a minimum of 2 bytes of output space")
error_ref[] = ErrorException("requires a minimum of 2 bytes of output space")
return 0, 0, :error
elseif codec.stopped[]
error[] = ErrorException("cannot process after stopped")
error_ref[] = ErrorException("cannot process after stopped")
return 0, 0, :error
elseif codec.got_stop_msg[] && input.size != 0
error[] = ErrorException("cannot accept more input after getting stop message")
error_ref[] = ErrorException("cannot accept more input after getting stop message")
return 0, 0, :error
elseif !codec.opened[]
output[1] = UInt8('[')
Expand Down Expand Up @@ -95,7 +95,7 @@ function TranscodingStreams.process(
codec :: DoubleFrameDecoder,
input :: TranscodingStreams.Memory,
output :: TranscodingStreams.Memory,
error :: TranscodingStreams.Error,
error_ref :: TranscodingStreams.Error,
)
Δin::Int = 0
Δout::Int = 0
Expand Down Expand Up @@ -167,7 +167,7 @@ function TranscodingStreams.process(
catch e
codec.state[]=7
e isa ErrorException || rethrow()
error[] = e
error_ref[] = e
return Δin, Δout, :error
end
end
Expand Down Expand Up @@ -243,6 +243,27 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
@test String(take!(sink)) == "[ ][ aabbcc ][ ddee ]"
end

@testset "Issue #160 Safely close stream after failure" begin
sink = IOBuffer()
stream = TranscodingStream(DoubleFrameDecoder(), sink)
write(stream, "abc")
@test_throws ErrorException("expected [") close(stream)
@test !isopen(stream)
@test !isopen(sink)

@testset "nested decoders" begin
sink = IOBuffer()
stream = TranscodingStream(DoubleFrameDecoder(), sink)
stream2 = TranscodingStream(DoubleFrameDecoder(), stream)
write(stream2, "abc")
# "expected byte" error with caused by "expected ["
@test_throws ErrorException("expected byte") close(stream2)
@test !isopen(stream2)
@test !isopen(stream)
@test !isopen(sink)
end
end

@testset "stop_on_end=true in nested streams" begin
s1 = DoubleFrameDecoderStream(DoubleFrameEncoderStream(
DoubleFrameDecoderStream(
Expand Down

0 comments on commit 3be4361

Please sign in to comment.