Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close underlying stream after process failure in close #182

Merged
merged 2 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,14 @@ end

function Base.close(stream::TranscodingStream)
mode = stream.state.mode
if mode != :panic
changemode!(stream, :close)
end
if !stream.state.stop_on_end
close(stream.stream)
try
if mode != :panic
changemode!(stream, :close)
end
finally
if !stream.state.stop_on_end
close(stream.stream)
end
end
return nothing
end
Expand Down
33 changes: 27 additions & 6 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ function TranscodingStreams.process(
codec :: DoubleFrameEncoder,
input :: TranscodingStreams.Memory,
output :: TranscodingStreams.Memory,
error :: TranscodingStreams.Error,
error_ref :: TranscodingStreams.Error,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this name change although I see it brings some clarity that this is actually a reference-like object. How is it related to the rest of the commit?

If you are going to keep this, please maintain the formatting alignment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it to avoid confusion with the Base.error function I had in the other process method. I'll fix up the formatting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could change the docstring for process as well to be consistent but maybe there is a better name for this argument?

)
if input.size == 0
codec.got_stop_msg[] = true
end

if output.size < 2
error[] = ErrorException("requires a minimum of 2 bytes of output space")
error_ref[] = ErrorException("requires a minimum of 2 bytes of output space")
return 0, 0, :error
elseif codec.stopped[]
error[] = ErrorException("cannot process after stopped")
error_ref[] = ErrorException("cannot process after stopped")
return 0, 0, :error
elseif codec.got_stop_msg[] && input.size != 0
error[] = ErrorException("cannot accept more input after getting stop message")
error_ref[] = ErrorException("cannot accept more input after getting stop message")
return 0, 0, :error
elseif !codec.opened[]
output[1] = UInt8('[')
Expand Down Expand Up @@ -95,7 +95,7 @@ function TranscodingStreams.process(
codec :: DoubleFrameDecoder,
input :: TranscodingStreams.Memory,
output :: TranscodingStreams.Memory,
error :: TranscodingStreams.Error,
error_ref :: TranscodingStreams.Error,
)
Δin::Int = 0
Δout::Int = 0
Expand Down Expand Up @@ -167,7 +167,7 @@ function TranscodingStreams.process(
catch e
codec.state[]=7
e isa ErrorException || rethrow()
error[] = e
error_ref[] = e
return Δin, Δout, :error
end
end
Expand Down Expand Up @@ -243,6 +243,27 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
@test String(take!(sink)) == "[ ][ aabbcc ][ ddee ]"
end

@testset "Issue #160 Safely close stream after failure" begin
sink = IOBuffer()
stream = TranscodingStream(DoubleFrameDecoder(), sink)
write(stream, "abc")
@test_throws ErrorException("expected [") close(stream)
@test !isopen(stream)
@test !isopen(sink)

@testset "nested decoders" begin
sink = IOBuffer()
stream = TranscodingStream(DoubleFrameDecoder(), sink)
stream2 = TranscodingStream(DoubleFrameDecoder(), stream)
write(stream2, "abc")
# "expected byte" error with caused by "expected ["
@test_throws ErrorException("expected byte") close(stream2)
@test !isopen(stream2)
@test !isopen(stream)
@test !isopen(sink)
end
end

test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream)
test_roundtrip_write(DoubleFrameEncoderStream, DoubleFrameDecoderStream)
test_roundtrip_lines(DoubleFrameEncoderStream, DoubleFrameDecoderStream)
Expand Down
Loading