Skip to content

Commit

Permalink
修复分块可能出现的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
AriaLyy committed May 14, 2019
1 parent e6da523 commit 9212aab
Show file tree
Hide file tree
Showing 37 changed files with 597 additions and 539 deletions.
15 changes: 8 additions & 7 deletions Aria/src/main/java/com/arialyy/aria/core/AriaManager.java
Expand Up @@ -276,19 +276,20 @@ UploadReceiver upload(Object obj) {
/**
* 删除任务记录
*
* @param type 需要删除的任务类型,1、表示单任务下载。2、表示任务组下载。3、单任务上传
* @param key 下载为保存路径、任务组为任务组名、上传为上传文件路径
* @param type 需要删除的任务类型,1、普通下载任务;2、组合任务;3、普通上传任务。
* @param key type为1时,key为保存路径;type为2时,key为组合任务hash;type为3时,key为文件上传路径。
* @param removeFile {@code true} 不仅删除任务数据库记录,还会删除已经删除完成的文件;{@code false}如果任务已经完成,只删除任务数据库记录。
*/
public void delRecord(int type, String key) {
public void delRecord(int type, String key, boolean removeFile) {
switch (type) {
case 1:
DbEntity.deleteData(DownloadEntity.class, "url=? and isGroupChild='false'", key);
case 1: // 删除普通任务记录
CommonUtil.delTaskRecord(key, 1, removeFile, true);
break;
case 2:
DbEntity.deleteData(DownloadGroupEntity.class, "groupHash=?", key);
CommonUtil.delGroupTaskRecord(key, removeFile);
break;
case 3:
DbEntity.deleteData(UploadEntity.class, "filePath=?", key);
CommonUtil.delTaskRecord(key, 2);
break;
}
}
Expand Down
82 changes: 42 additions & 40 deletions Aria/src/main/java/com/arialyy/aria/core/common/AbsFileer.java
Expand Up @@ -212,6 +212,9 @@ private synchronized void startTimer() {
|| mConstance.isCancel()
|| mConstance.isFail()
|| !isRunning()) {
if (mConstance.isComplete() || mConstance.isFail()) {
ThreadTaskManager.getInstance().removeTaskThread(mTaskWrapper.getKey());
}
closeTimer();
} else if (mConstance.CURRENT_LOCATION >= 0) {
mListener.onProgress(mConstance.CURRENT_LOCATION);
Expand Down Expand Up @@ -251,9 +254,7 @@ public long getCurrentLocation() {
}

public synchronized boolean isRunning() {
boolean b = ThreadTaskManager.getInstance().taskIsRunning(mTaskWrapper.getKey());
//ALog.d(TAG, "isRunning = " + b);
return b;
return ThreadTaskManager.getInstance().taskIsRunning(mTaskWrapper.getKey());
}

public synchronized void cancel() {
Expand All @@ -271,7 +272,7 @@ public synchronized void cancel() {
task.cancel();
}
}
ThreadTaskManager.getInstance().stopTaskThread(mTaskWrapper.getKey());
ThreadTaskManager.getInstance().removeTaskThread(mTaskWrapper.getKey());
}
}).start();
}
Expand All @@ -292,7 +293,7 @@ public synchronized void stop() {
task.stop();
}
}
ThreadTaskManager.getInstance().stopTaskThread(mTaskWrapper.getKey());
ThreadTaskManager.getInstance().removeTaskThread(mTaskWrapper.getKey());
}
}).start();
}
Expand Down Expand Up @@ -382,13 +383,14 @@ private void handleNormalRecord() {
}

/**
* 处理分块任务的记录
* 处理分块任务的记录,分块文件(blockFileLen)长度必须需要小于等于线程区间(threadRectLen)的长度
*/
private void handleBlockRecord() {
final int threadNum = mRecord.threadRecords.size();
final long blockLen = mEntity.getFileSize() / threadNum;
int i = 0;
// 默认线程分块长度
long normalRectLen = mEntity.getFileSize() / mRecord.threadRecords.size();
for (ThreadRecord tr : mRecord.threadRecords) {
long threadRect = tr.blockLen;

File temp = new File(String.format(SUB_PATH, mRecord.filePath, tr.threadId));
if (!temp.exists()) {
ALog.i(TAG, String.format("分块文件【%s】不存在,该分块将重新开始", temp.getPath()));
Expand All @@ -398,33 +400,38 @@ private void handleBlockRecord() {
if (tr.isComplete) {
mCompleteThreadNum++;
} else {
long realLocation = i * blockLen + temp.length();
ALog.i(TAG, String.format(
"startLocation = %s; endLocation = %s; block = %s; tempLen = %s; i = %s",
tr.startLocation, tr.endLocation, blockLen, temp.length(), i));
if (tr.endLocation == realLocation) {
"startLocation = %s; endLocation = %s; block = %s; tempLen = %s; threadId = %s",
tr.startLocation, tr.endLocation, threadRect, temp.length(), tr.threadId));

long blockFileLen = temp.length(); // 磁盘中的分块文件长度
/*
* 检查磁盘中的分块文件
*/
if (blockFileLen > threadRect) {
ALog.i(TAG, String.format("分块【%s】错误,分块长度【%s】 > 线程区间长度【%s】,将重新开始该分块",
tr.threadId, blockFileLen, threadRect));
temp.delete();
tr.startLocation = tr.threadId * threadRect;
continue;
}

long realLocation =
tr.threadId * normalRectLen + blockFileLen; //正常情况下,该线程的startLocation的位置
/*
* 检查记录文件
*/
if (blockFileLen == threadRect) {
ALog.i(TAG, String.format("分块【%s】已完成,更新记录", temp.getPath()));
tr.startLocation = realLocation;
tr.startLocation = blockFileLen;
tr.isComplete = true;
mCompleteThreadNum++;
} else {
tr.isComplete = false;
if (realLocation == tr.startLocation) {
i++;
continue;
}
if (realLocation > tr.startLocation) {
ALog.i(TAG, String.format("修正分块【%s】的进度记录为:%s", temp.getPath(), realLocation));
tr.startLocation = realLocation;
} else {
ALog.i(TAG, String.format("分块【%s】错误,将重新开始该分块", temp.getPath()));
temp.delete();
tr.startLocation = i * blockLen;
}
} else if (tr.startLocation != realLocation) { // 处理记录小于分块文件长度的情况
ALog.i(TAG, String.format("修正分块【%s】的进度记录为:%s", temp.getPath(), realLocation));
tr.startLocation = realLocation;
}
}
}
i++;
}
mTotalThreadNum = mRecord.threadRecords.size();
mTaskWrapper.setNewTask(false);
Expand Down Expand Up @@ -517,13 +524,8 @@ private String getFilePath() {
private void saveRecord() {
mRecord.threadNum = mRecord.threadRecords.size();
mRecord.save();
for (ThreadRecord tr : mRecord.threadRecords) {
tr.save();
}
}

public TaskRecord getRecord() {
return mRecord;
ALog.d(TAG, String.format("保存记录,线程记录数:%s", mRecord.threadRecords.size()));
DbEntity.saveAll(mRecord.threadRecords);
}

/**
Expand Down Expand Up @@ -573,10 +575,10 @@ private void handleBreakpoint() {
long fileLength = mEntity.getFileSize();
long blockSize = fileLength / mTotalThreadNum;
Set<Integer> threads = new HashSet<>();
// 如果是新任务,检查下历史记录,如果有遗留的记录,删除并删除分块文件
if (mTaskWrapper.isNewTask()) {
CommonUtil.delTaskRecord(getFilePath(), 1, true);
}
//// 如果是新任务,检查下历史记录,如果有遗留的记录,删除并删除分块文件
//if (mTaskWrapper.isNewTask()) {
// CommonUtil.delTaskRecord(getFilePath(), 1, true);
//}

mRecord.fileLength = fileLength;
if (mTaskWrapper.isNewTask() && !handleNewTask()) {
Expand Down
70 changes: 48 additions & 22 deletions Aria/src/main/java/com/arialyy/aria/core/common/AbsThreadTask.java
Expand Up @@ -22,6 +22,7 @@
import com.arialyy.aria.core.config.DGroupConfig;
import com.arialyy.aria.core.config.DownloadConfig;
import com.arialyy.aria.core.config.UploadConfig;
import com.arialyy.aria.core.download.DownloadEntity;
import com.arialyy.aria.core.inf.AbsNormalEntity;
import com.arialyy.aria.core.inf.AbsTaskWrapper;
import com.arialyy.aria.core.inf.IEventListener;
Expand All @@ -31,6 +32,7 @@
import com.arialyy.aria.exception.FileException;
import com.arialyy.aria.exception.TaskException;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.CommonUtil;
import com.arialyy.aria.util.ErrorHelp;
import com.arialyy.aria.util.FileUtil;
import com.arialyy.aria.util.NetUtils;
Expand Down Expand Up @@ -272,8 +274,10 @@ protected boolean checkBlock() {
ThreadRecord tr = getThreadRecord();
File blockFile = getBockFile();
if (!blockFile.exists() || blockFile.length() != tr.blockLen) {
ALog.i(TAG, String.format("分块【%s】下载错误,即将重新下载该分块,开始位置:%s,结束位置:%s", blockFile.getName(),
tr.startLocation, tr.endLocation));
ALog.i(TAG,
String.format("分块【%s】错误,blockFileLen: %s, threadRect: %s; 即将重新下载该分块,开始位置:%s,结束位置:%s",
blockFile.getName(), blockFile.length(), tr.blockLen, tr.startLocation,
tr.endLocation));
if (blockFile.exists()) {
blockFile.delete();
ALog.i(TAG, String.format("删除分块【%s】成功", blockFile.getName()));
Expand Down Expand Up @@ -405,14 +409,11 @@ protected void fail(final long subCurrentLocation, BaseException ex, boolean nee
*/
private void retryThis(boolean needRetry) {
if (!NetUtils.isConnected(AriaManager.APP) && !isNotNetRetry) {
ALog.w(TAG, String.format("任务【%s】thread__%s__重试失败,网络未连接", getConfig().TEMP_FILE.getName(),
getConfig().THREAD_ID));
ALog.w(TAG, String.format("任务【%s】重试失败,网络未连接", getConfig().TEMP_FILE.getName()));
}
if (mFailTimes < RETRY_NUM && needRetry && (NetUtils.isConnected(AriaManager.APP)
|| isNotNetRetry) && !isBreak()) {
ALog.w(TAG,
String.format("任务【%s】thread__%s__正在重试", getConfig().TEMP_FILE.getName(),
getConfig().THREAD_ID));
ALog.w(TAG, String.format("任务【%s】正在重试", getConfig().TEMP_FILE.getName()));
mFailTimes++;
handleRetryRecord();
ThreadTaskManager.getInstance().retryThread(AbsThreadTask.this);
Expand All @@ -427,25 +428,40 @@ private void retryThis(boolean needRetry) {
private void handleRetryRecord() {
if (getTaskRecord().isBlock) {
ThreadRecord tr = getThreadRecord();
long block = getEntity().getFileSize() / getTaskRecord().threadRecords.size();
// 默认线程分块长度
long normalRectLen = getEntity().getFileSize() / getTaskRecord().threadRecords.size();
File temp = getBockFile();

File blockFile = getBockFile();
if (blockFile.length() > tr.blockLen) {
ALog.i(TAG, String.format("分块【%s】错误,将重新下载该分块", blockFile.getPath()));
blockFile.delete();
tr.startLocation = block * tr.threadId;
tr.isComplete = false;
getConfig().START_LOCATION = tr.startLocation;
} else if (blockFile.length() < tr.blockLen) {
tr.startLocation = block * tr.threadId + blockFile.length();
long blockFileLen = temp.length(); // 磁盘中的分块文件长度
long threadRect = tr.blockLen; // 当前线程的区间

if (!temp.exists()) {
ALog.i(TAG, String.format("分块文件【%s】不存在,该分块将重新开始", temp.getName()));
tr.isComplete = false;
getConfig().START_LOCATION = tr.startLocation;
getState().CURRENT_LOCATION = getBlockRealTotalSize();
ALog.i(TAG, String.format("修正分块【%s】,开始位置:%s,当前进度:%s", blockFile.getPath(), tr.startLocation,
getState().CURRENT_LOCATION));
} else {
getState().COMPLETE_THREAD_NUM++;
tr.isComplete = true;
/*
* 检查磁盘中的分块文件
*/
if (blockFileLen > threadRect) {
ALog.i(TAG, String.format("分块【%s】错误,将重新下载该分块", temp.getName()));
temp.delete();
tr.startLocation = normalRectLen * tr.threadId;
tr.isComplete = false;
getConfig().START_LOCATION = tr.startLocation;
} else if (blockFileLen < tr.blockLen) {
tr.startLocation = normalRectLen * tr.threadId + blockFileLen;
tr.isComplete = false;
getConfig().START_LOCATION = tr.startLocation;
getState().CURRENT_LOCATION = getBlockRealTotalSize();
ALog.i(TAG,
String.format("修正分块【%s】,开始位置:%s,当前进度:%s", temp.getName(), tr.startLocation,
getState().CURRENT_LOCATION));
} else {
ALog.i(TAG, String.format("分块【%s】已完成,更新记录", temp.getName()));
getState().COMPLETE_THREAD_NUM++;
tr.isComplete = true;
}
}
tr.update();
} else {
Expand All @@ -454,6 +470,16 @@ private void handleRetryRecord() {
}
}

/**
* 发送任务完成的消息,并删除任务记录
*/
protected synchronized void sendCompleteMsg() {
ALog.i(TAG, String.format("任务【%s】完成", mEntity.getFileName()));
CommonUtil.delTaskRecord(mEntity.getKey(), mEntity instanceof DownloadEntity ? 1 : 2, false,
false);
mListener.onComplete();
}

/**
* 获取分块文件
*
Expand Down
Expand Up @@ -24,6 +24,7 @@
* 任务的线程记录
*/
public class ThreadRecord extends DbEntity {

@Foreign(parent = TaskRecord.class, column = "filePath", onUpdate = ActionPolicy.CASCADE, onDelete = ActionPolicy.CASCADE)
public String key;

Expand All @@ -46,7 +47,7 @@ public class ThreadRecord extends DbEntity {
/**
* 线程id
*/
public int threadId = -1;
public int threadId = 0;

/**
* 分块长度
Expand Down
Expand Up @@ -17,20 +17,11 @@

import android.support.annotation.NonNull;
import android.text.TextUtils;
import com.arialyy.aria.core.AriaManager;
import com.arialyy.aria.core.download.DGTaskWrapper;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.inf.AbsHttpFileLenAdapter;
import com.arialyy.aria.core.inf.AbsTarget;
import com.arialyy.aria.core.inf.AbsTaskWrapper;
import com.arialyy.aria.core.inf.IHttpFileLenAdapter;
import com.arialyy.aria.core.inf.IHttpHeaderDelegate;
import com.arialyy.aria.core.upload.UTaskWrapper;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.CommonUtil;
import java.io.File;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.net.Proxy;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -90,27 +81,11 @@ public TARGET addHeaders(@NonNull Map<String, String> headers) {
return mTarget;
}

public TARGET setFileLenAdapter(AbsHttpFileLenAdapter adapter) {
public TARGET setFileLenAdapter(IHttpFileLenAdapter adapter) {
if (adapter == null) {
throw new IllegalArgumentException("adapter为空");
}
//try {
// adapter.clone();
mTarget.getTaskWrapper().asHttp().setFileLenAdapter((IHttpFileLenAdapter) adapter);
//} catch (CloneNotSupportedException e) {
// e.printStackTrace();
//}
//// 以下代码有问题,匿名内部类不能序列化
//String objPath = String.format("%s/obj_temp/%s", AriaManager.APP.getFilesDir().getPath(),
// adapter.hashCode());
//CommonUtil.writeObjToFile(objPath, adapter);
//IHttpFileLenAdapter newAdapter = (IHttpFileLenAdapter) CommonUtil.readObjFromFile(objPath);
//File temp = new File(objPath);
//if (temp.exists()) {
// temp.delete();
//}
//mTarget.getTaskWrapper().asHttp().setFileLenAdapter(newAdapter);

mTarget.getTaskWrapper().asHttp().setFileLenAdapter(adapter);
return mTarget;
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.arialyy.aria.core.common.RequestEnum;
import com.arialyy.aria.core.inf.IHttpFileLenAdapter;
import com.arialyy.aria.core.inf.ITargetHeadDelegate;
import java.lang.ref.WeakReference;
import java.net.CookieManager;
import java.net.Proxy;
import java.util.HashMap;
Expand Down
Expand Up @@ -76,6 +76,7 @@ TARGET setDirPath(String dirPath) {
* @param newDirPath 新的文件夹路径
*/
void reChangeDirPath(String newDirPath) {
ALog.d(TAG, String.format("修改新路径为:%s", newDirPath));
List<DTaskWrapper> subTasks = mWrapper.getSubTaskWrapper();
if (subTasks != null && !subTasks.isEmpty()) {
List<DownloadEntity> des = new ArrayList<>();
Expand All @@ -90,7 +91,6 @@ void reChangeDirPath(String newDirPath) {
de.setDownloadPath(newPath);
des.add(de);
}
AbsEntity.saveAll(des);
}
}

Expand Down
Expand Up @@ -55,7 +55,7 @@ protected void saveData(int state, long location) {
mEntity.setState(state);

if (state == IEntity.STATE_CANCEL) {
CommonUtil.delTaskRecord(mEntity.getDownloadPath(), 1, mTaskWrapper.isRemoveFile());
CommonUtil.delTaskRecord(mEntity.getDownloadPath(), 1, mTaskWrapper.isRemoveFile(), true);
return;
} else if (state == IEntity.STATE_STOP) {
mEntity.setStopTime(System.currentTimeMillis());
Expand Down

0 comments on commit 9212aab

Please sign in to comment.