Skip to content

Commit

Permalink
Refactoring cache streams
Browse files Browse the repository at this point in the history
  • Loading branch information
QuantumBadger committed Nov 21, 2020
1 parent c6eb35c commit b794f91
Show file tree
Hide file tree
Showing 18 changed files with 794 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ public void onProgress(
}

@Override
public void onSuccess(
public void onCacheFileWritten(
@NonNull final CacheManager.ReadableCacheFile cacheFile,
final long timestamp,
@NonNull final UUID session,
Expand Down Expand Up @@ -1229,7 +1229,7 @@ public void onFailure(
}

@Override
public void onSuccess(
public void onCacheFileWritten(
@NonNull final CacheManager.ReadableCacheFile cacheFile,
final long timestamp,
@NonNull final UUID session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void onFailure(
}

@Override
public void onSuccess(
public void onCacheFileWritten(
@NonNull final CacheManager.ReadableCacheFile cacheFile,
final long timestamp,
@NonNull final UUID session,
Expand Down
155 changes: 90 additions & 65 deletions src/main/java/org/quantumbadger/redreader/cache/CacheDownload.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.quantumbadger.redreader.common.Priority;
import org.quantumbadger.redreader.common.RRTime;
import org.quantumbadger.redreader.common.TorCommon;
import org.quantumbadger.redreader.common.datastream.MemoryDataStream;
import org.quantumbadger.redreader.common.datastream.MemoryDataStreamInputStream;
import org.quantumbadger.redreader.http.HTTPBackend;
import org.quantumbadger.redreader.reddit.api.RedditOAuth;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -176,16 +177,71 @@ public void onSuccess(
final Long bodyBytes,
final InputStream is) {

final ArrayList<CacheDataStreamChunkConsumer> consumers = new ArrayList<>();
final MemoryDataStream stream = new MemoryDataStream(64 * 1024);

mInitiator.notifyDataStreamAvailable(
stream::getInputStream,
RRTime.utcCurrentTimeMillis(),
session,
false,
mimetype);

// Download the file into memory

try {

final byte[] buf = new byte[64 * 1024];

int bytesRead;
long totalBytesRead = 0;

while((bytesRead = is.read(buf)) > 0) {

totalBytesRead += bytesRead;

stream.writeBytes(buf, 0, bytesRead);

if(bodyBytes != null) {
mInitiator.notifyProgress(
false,
totalBytesRead,
bodyBytes);
}
}

stream.setComplete();

mInitiator.notifyDataStreamComplete(
stream::getInputStream,
RRTime.utcCurrentTimeMillis(),
session,
false,
mimetype);

} catch(final Throwable t) {

stream.setFailed(t instanceof IOException
? (IOException)t
: new IOException("Got exception during download", t));

mInitiator.notifyFailure(
CacheRequest.REQUEST_FAILURE_CONNECTION,
t,
null,
"The connection was interrupted");

return;
}

// Save it to the cache

@Nullable CacheManager.WritableCacheFile writableCacheFile = null;

if(mInitiator.cache) {
try {
writableCacheFile
= manager.openNewCacheFile(mInitiator, session, mimetype);

consumers.add(writableCacheFile);

} catch(final IOException e) {

Log.e(TAG, "Exception opening cache file for write", e);
Expand All @@ -207,78 +263,47 @@ public void onSuccess(

return;
}
}

{
final CacheDataStreamChunkConsumer consumer
= mInitiator.notifyDataStreamAvailable();

if(consumer != null) {
consumers.add(consumer);
}
}

try {
final byte[] buf = new byte[128 * 1024];
final MemoryDataStreamInputStream inputStream = stream.getInputStream();

final byte[] buf = new byte[64 * 1024];
int bytesRead;
long totalBytesRead = 0;

while((bytesRead = is.read(buf)) > 0) {

totalBytesRead += bytesRead;

for(int i = 0; i < consumers.size(); i++) {
consumers.get(i).onDataStreamChunk(buf, 0, bytesRead);
}

if(bodyBytes != null) {
mInitiator.notifyProgress(
false,
totalBytesRead,
bodyBytes);
try {
while((bytesRead = inputStream.read(buf)) > 0) {
writableCacheFile.writeChunk(buf, 0, bytesRead);
}
}

for(int i = 0; i < consumers.size(); i++) {
consumers.get(i).onDataStreamSuccess();
}

mInitiator.notifySuccess(
writableCacheFile == null
? null
: writableCacheFile.getReadableCacheFile(),
RRTime.utcCurrentTimeMillis(),
session,
false,
mimetype);
writableCacheFile.onWriteFinished();

} catch(final IOException e) {
} catch(final IOException e) {

if(e.getMessage() != null && e.getMessage().contains("ENOSPC")) {
mInitiator.notifyFailure(
CacheRequest.REQUEST_FAILURE_STORAGE,
e,
null,
"Out of disk space");
writableCacheFile.onWriteCancelled();

} else {
e.printStackTrace();
mInitiator.notifyFailure(
CacheRequest.REQUEST_FAILURE_CONNECTION,
e,
null,
"The connection was interrupted");
if(e.getMessage() != null && e.getMessage().contains("ENOSPC")) {
mInitiator.notifyFailure(
CacheRequest.REQUEST_FAILURE_STORAGE,
e,
null,
"Out of disk space");
} else {
mInitiator.notifyFailure(
CacheRequest.REQUEST_FAILURE_STORAGE,
e,
null,
"Failed to write to cache");
}
}

} catch(final Throwable t) {
t.printStackTrace();
mInitiator.notifyFailure(
CacheRequest.REQUEST_FAILURE_CONNECTION,
t,
null,
"The connection was interrupted");
}

mInitiator.notifyCacheFileWritten(
writableCacheFile == null
? null
: writableCacheFile.getReadableCacheFile(),
RRTime.utcCurrentTimeMillis(),
session,
false,
mimetype);
}
});
}
Expand Down
84 changes: 32 additions & 52 deletions src/main/java/org/quantumbadger/redreader/cache/CacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
import androidx.annotation.Nullable;
import org.quantumbadger.redreader.account.RedditAccount;
import org.quantumbadger.redreader.activities.BugReportActivity;
import org.quantumbadger.redreader.common.Factory;
import org.quantumbadger.redreader.common.FileUtils;
import org.quantumbadger.redreader.common.General;
import org.quantumbadger.redreader.common.Optional;
import org.quantumbadger.redreader.common.PrefsUtility;
import org.quantumbadger.redreader.common.PrioritisedCachedThreadPool;
import org.quantumbadger.redreader.common.Priority;
import org.quantumbadger.redreader.common.datastream.SeekableFileInputStream;
import org.quantumbadger.redreader.common.datastream.SeekableInputStream;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -290,7 +290,7 @@ public ReadableCacheFile getExistingCacheFileById(final long cacheId) {
return new ReadableCacheFile(cacheId);
}

public class WritableCacheFile implements CacheDataStreamChunkConsumer {
public class WritableCacheFile {

private final OutputStream mOutStream;
private ReadableCacheFile readableCacheFile = null;
Expand All @@ -314,25 +314,23 @@ private WritableCacheFile(
location = getPreferredCacheLocation();
mTmpFile = new File(location, UUID.randomUUID().toString() + tempExt);

mOutStream = new BufferedOutputStream(new FileOutputStream(mTmpFile), 64 * 1024);
mOutStream = new FileOutputStream(mTmpFile);
}

@NonNull
public ReadableCacheFile getReadableCacheFile() {
return Objects.requireNonNull(readableCacheFile);
}

@Override
public void onDataStreamChunk(
public void writeChunk(
@NonNull final byte[] dataReused,
final int offset,
final int length) throws IOException {

mOutStream.write(dataReused, offset, length);
}

@Override
public void onDataStreamSuccess() throws IOException {
public void onWriteFinished() throws IOException {

final long cacheFileId = dbManager.newEntry(
mRequest.url,
Expand All @@ -352,8 +350,7 @@ public void onDataStreamSuccess() throws IOException {
readableCacheFile = new ReadableCacheFile(cacheFileId);
}

@Override
public void onDataStreamCancel() {
public void onWriteCancelled() {

try {
mOutStream.close();
Expand Down Expand Up @@ -447,15 +444,15 @@ private File getExistingCacheFile(final long id) {
}

@Nullable
private InputStream getCacheFileInputStream(final long id) throws IOException {
private SeekableFileInputStream getCacheFileInputStream(final long id) throws IOException {

final File cacheFile = getExistingCacheFile(id);

if(cacheFile == null) {
return null;
}

return new BufferedInputStream(new FileInputStream(cacheFile), 8 * 1024);
return new SeekableFileInputStream(cacheFile);
}

@Nullable
Expand Down Expand Up @@ -597,50 +594,33 @@ public Priority getPriority() {
@Override
public void run() {

final CacheDataStreamChunkConsumer consumer
= request.notifyDataStreamAvailable();

if(consumer != null) {

try {
try(InputStream cfis = getCacheFileInputStream(entry.id)) {

if(cfis == null) {
request.notifyFailure(
CacheRequest.REQUEST_FAILURE_CACHE_MISS,
null,
null,
"Couldn't retrieve cache file");
return;
}

final byte[] buf = new byte[128 * 1024];
int bytesRead;
while((bytesRead = cfis.read(buf)) > 0) {
consumer.onDataStreamChunk(buf, 0, bytesRead);
}
}

consumer.onDataStreamSuccess();

} catch(final Exception e) {
final Factory<SeekableInputStream, IOException> streamFactory = () -> {
final SeekableFileInputStream stream
= getCacheFileInputStream(entry.id);

if(stream == null) {
dbManager.delete(entry.id);
throw new IOException("Failed to open file");
}

final File existingCacheFile = getExistingCacheFile(entry.id);
if(existingCacheFile != null) {
existingCacheFile.delete();
}
return stream;
};

request.notifyFailure(
CacheRequest.REQUEST_FAILURE_PARSE,
e,
null,
"Error parsing the stream");
}
}
request.notifyDataStreamAvailable(
streamFactory,
entry.timestamp,
entry.session,
true,
entry.mimetype);

request.notifyDataStreamComplete(
streamFactory,
entry.timestamp,
entry.session,
true,
entry.mimetype);

request.notifySuccess(
request.notifyCacheFileWritten(
new ReadableCacheFile(entry.id),
entry.timestamp,
entry.session,
Expand Down

0 comments on commit b794f91

Please sign in to comment.