@@ -26,9 +26,18 @@ public class MultipartStreamReader {
26
26
27
27
private final BufferedSource mSource ;
28
28
private final String mBoundary ;
29
-
30
- public interface ChunkCallback {
31
- void execute (Map <String , String > headers , Buffer body , boolean done ) throws IOException ;
29
+ private long mLastProgressEvent ;
30
+
31
+ public interface ChunkListener {
32
+ /**
33
+ * Invoked when a chunk of a multipart response is fully downloaded.
34
+ */
35
+ void onChunkComplete (Map <String , String > headers , Buffer body , boolean isLastChunk ) throws IOException ;
36
+
37
+ /**
38
+ * Invoked as bytes of the current chunk are read.
39
+ */
40
+ void onChunkProgress (Map <String , String > headers , long loaded , long total ) throws IOException ;
32
41
}
33
42
34
43
public MultipartStreamReader (BufferedSource source , String boundary ) {
@@ -55,34 +64,50 @@ private Map<String, String> parseHeaders(Buffer data) {
55
64
return headers ;
56
65
}
57
66
58
- private void emitChunk (Buffer chunk , boolean done , ChunkCallback callback ) throws IOException {
67
+ private void emitChunk (Buffer chunk , boolean done , ChunkListener listener ) throws IOException {
59
68
ByteString marker = ByteString .encodeUtf8 (CRLF + CRLF );
60
69
long indexOfMarker = chunk .indexOf (marker );
61
70
if (indexOfMarker == -1 ) {
62
- callback . execute (null , chunk , done );
71
+ listener . onChunkComplete (null , chunk , done );
63
72
} else {
64
73
Buffer headers = new Buffer ();
65
74
Buffer body = new Buffer ();
66
75
chunk .read (headers , indexOfMarker );
67
76
chunk .skip (marker .size ());
68
77
chunk .readAll (body );
69
- callback .execute (parseHeaders (headers ), body , done );
78
+ listener .onChunkComplete (parseHeaders (headers ), body , done );
79
+ }
80
+ }
81
+
82
+ private void emitProgress (Map <String , String > headers , long contentLength , boolean isFinal , ChunkListener listener ) throws IOException {
83
+ if (headers == null || listener == null ) {
84
+ return ;
85
+ }
86
+
87
+ long currentTime = System .currentTimeMillis ();
88
+ if (currentTime - mLastProgressEvent > 16 || isFinal ) {
89
+ mLastProgressEvent = currentTime ;
90
+ long headersContentLength = headers .get ("Content-Length" ) != null ? Long .parseLong (headers .get ("Content-Length" )) : 0 ;
91
+ listener .onChunkProgress (headers , contentLength , headersContentLength );
70
92
}
71
93
}
72
94
73
95
/**
74
- * Reads all parts of the multipart response and execute the callback for each chunk received.
75
- * @param callback Callback executed when a chunk is received
96
+ * Reads all parts of the multipart response and execute the listener for each chunk received.
97
+ * @param listener Listener invoked when chunks are received.
76
98
* @return If the read was successful
77
99
*/
78
- public boolean readAllParts (ChunkCallback callback ) throws IOException {
100
+ public boolean readAllParts (ChunkListener listener ) throws IOException {
79
101
ByteString delimiter = ByteString .encodeUtf8 (CRLF + "--" + mBoundary + CRLF );
80
102
ByteString closeDelimiter = ByteString .encodeUtf8 (CRLF + "--" + mBoundary + "--" + CRLF );
103
+ ByteString headersDelimiter = ByteString .encodeUtf8 (CRLF + CRLF );
81
104
82
105
int bufferLen = 4 * 1024 ;
83
106
long chunkStart = 0 ;
84
107
long bytesSeen = 0 ;
85
108
Buffer content = new Buffer ();
109
+ Map <String , String > currentHeaders = null ;
110
+ long currentHeadersLength = 0 ;
86
111
87
112
while (true ) {
88
113
boolean isCloseDelimiter = false ;
@@ -98,6 +123,20 @@ public boolean readAllParts(ChunkCallback callback) throws IOException {
98
123
99
124
if (indexOfDelimiter == -1 ) {
100
125
bytesSeen = content .size ();
126
+
127
+ if (currentHeaders == null ) {
128
+ long indexOfHeaders = content .indexOf (headersDelimiter , searchStart );
129
+ if (indexOfHeaders >= 0 ) {
130
+ mSource .read (content , indexOfHeaders );
131
+ Buffer headers = new Buffer ();
132
+ content .copyTo (headers , searchStart , indexOfHeaders - searchStart );
133
+ currentHeadersLength = headers .size () + headersDelimiter .size ();
134
+ currentHeaders = parseHeaders (headers );
135
+ }
136
+ } else {
137
+ emitProgress (currentHeaders , content .size () - currentHeadersLength , false , listener );
138
+ }
139
+
101
140
long bytesRead = mSource .read (content , bufferLen );
102
141
if (bytesRead <= 0 ) {
103
142
return false ;
@@ -113,7 +152,10 @@ public boolean readAllParts(ChunkCallback callback) throws IOException {
113
152
Buffer chunk = new Buffer ();
114
153
content .skip (chunkStart );
115
154
content .read (chunk , length );
116
- emitChunk (chunk , isCloseDelimiter , callback );
155
+ emitProgress (currentHeaders , chunk .size () - currentHeadersLength , true , listener );
156
+ emitChunk (chunk , isCloseDelimiter , listener );
157
+ currentHeaders = null ;
158
+ currentHeadersLength = 0 ;
117
159
} else {
118
160
content .skip (chunkEnd );
119
161
}
0 commit comments