Skip to content

Commit

Permalink
- 修复组任务,其中一个子任务在获取文件长度失败后,重新恢复组合任务,组合任务状态变为完成的问题 https://github.com/A…
Browse files Browse the repository at this point in the history
  • Loading branch information
AriaLyy committed Mar 3, 2020
1 parent 54cbdb7 commit 669ac6b
Show file tree
Hide file tree
Showing 15 changed files with 131 additions and 99 deletions.
1 change: 1 addition & 0 deletions DEV_LOG.md
Expand Up @@ -4,6 +4,7 @@
- 修复40x错误,会继续重试并且无法重试成功的问题 https://github.com/AriaLyy/Aria/issues/619 - 修复40x错误,会继续重试并且无法重试成功的问题 https://github.com/AriaLyy/Aria/issues/619
- 修复wait模式下,resume(true)无效问题 - 修复wait模式下,resume(true)无效问题
- 修复now模式下的一些问题 https://github.com/AriaLyy/Aria/issues/620 - 修复now模式下的一些问题 https://github.com/AriaLyy/Aria/issues/620
- 修复组任务,其中一个子任务在获取文件长度失败后,重新恢复组合任务,组合任务状态变为完成的问题 https://github.com/AriaLyy/Aria/issues/628
+ v_3.8.6 (2020/2/17) + v_3.8.6 (2020/2/17)
- fix bug https://github.com/AriaLyy/Aria/issues/608 - fix bug https://github.com/AriaLyy/Aria/issues/608
- fix bug https://github.com/AriaLyy/Aria/issues/579#issuecomment-586665035 - fix bug https://github.com/AriaLyy/Aria/issues/579#issuecomment-586665035
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.arialyy.aria.util.CommonUtil; import com.arialyy.aria.util.CommonUtil;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


/** /**
* 组合任务文件信息,用于获取长度未知时,组合任务的长度 * 组合任务文件信息,用于获取长度未知时,组合任务的长度
Expand All @@ -41,27 +42,27 @@ public final class HttpDGInfoTask implements IInfoTask {
private final Object LOCK = new Object(); private final Object LOCK = new Object();
private ExecutorService mPool = null; private ExecutorService mPool = null;
private boolean getLenComplete = false; private boolean getLenComplete = false;
private int count; private AtomicInteger count = new AtomicInteger();
private int failCount; private AtomicInteger failCount = new AtomicInteger();
private DownloadGroupListener listener; private DownloadGroupListener listener;


/** /**
* 子任务回调 * 子任务回调
*/ */
private Callback subCallback = new Callback() { private Callback subCallback = new Callback() {
@Override public void onSucceed(String url, CompleteInfo info) { @Override public void onSucceed(String url, CompleteInfo info) {
count++; count.getAndIncrement();
checkGetSizeComplete(count, failCount); checkGetSizeComplete(count.get(), failCount.get());
ALog.d(TAG, "获取子任务信息完成"); ALog.d(TAG, "获取子任务信息完成");
} }


@Override public void onFail(AbsEntity entity, AriaException e, boolean needRetry) { @Override public void onFail(AbsEntity entity, AriaException e, boolean needRetry) {
ALog.e(TAG, String.format("获取文件信息失败,url:%s", ((DownloadEntity) entity).getUrl())); ALog.e(TAG, String.format("获取文件信息失败,url:%s", ((DownloadEntity) entity).getUrl()));
count++; count.getAndIncrement();
failCount++; failCount.getAndIncrement();
listener.onSubFail((DownloadEntity) entity, new AriaHTTPException(TAG, listener.onSubFail((DownloadEntity) entity, new AriaHTTPException(TAG,
String.format("子任务获取文件长度失败,url:%s", ((DownloadEntity) entity).getUrl()))); String.format("子任务获取文件长度失败,url:%s", ((DownloadEntity) entity).getUrl())));
checkGetSizeComplete(count, failCount); checkGetSizeComplete(count.get(), failCount.get());
} }
}; };


Expand All @@ -79,7 +80,7 @@ public final class HttpDGInfoTask implements IInfoTask {
return; return;
} }
// 处理组合任务大小未知的情况 // 处理组合任务大小未知的情况
if (wrapper.isUnknownSize() && wrapper.getEntity().getFileSize() < 1) { if (wrapper.isUnknownSize()) {
mPool = Executors.newCachedThreadPool(); mPool = Executors.newCachedThreadPool();
getGroupSize(); getGroupSize();
try { try {
Expand All @@ -89,7 +90,7 @@ public final class HttpDGInfoTask implements IInfoTask {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
if (!mPool.isShutdown()){ if (!mPool.isShutdown()) {
mPool.shutdown(); mPool.shutdown();
} }
} else { } else {
Expand All @@ -107,6 +108,10 @@ private void getGroupSize() {
new Thread(new Runnable() { new Thread(new Runnable() {
@Override public void run() { @Override public void run() {
for (DTaskWrapper dTaskWrapper : wrapper.getSubTaskWrapper()) { for (DTaskWrapper dTaskWrapper : wrapper.getSubTaskWrapper()) {
if (dTaskWrapper.getEntity().getFileSize() > 0) {
count.getAndIncrement();
continue;
}
cloneHeader(dTaskWrapper); cloneHeader(dTaskWrapper);
HttpDFileInfoTask infoTask = new HttpDFileInfoTask(dTaskWrapper); HttpDFileInfoTask infoTask = new HttpDFileInfoTask(dTaskWrapper);
infoTask.setCallback(subCallback); infoTask.setCallback(subCallback);
Expand Down Expand Up @@ -158,7 +163,6 @@ private void cloneHeader(DTaskWrapper taskWrapper) {
subOption.setFileNameAdapter(groupOption.getFileNameAdapter()); subOption.setFileNameAdapter(groupOption.getFileNameAdapter());
subOption.setUseServerFileName(groupOption.isUseServerFileName()); subOption.setUseServerFileName(groupOption.isUseServerFileName());



subOption.setFileNameAdapter(groupOption.getFileNameAdapter()); subOption.setFileNameAdapter(groupOption.getFileNameAdapter());
subOption.setRequestEnum(groupOption.getRequestEnum()); subOption.setRequestEnum(groupOption.getRequestEnum());
subOption.setHeaders(groupOption.getHeaders()); subOption.setHeaders(groupOption.getHeaders());
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.arialyy.aria.core.loader.IInfoTask; import com.arialyy.aria.core.loader.IInfoTask;
import com.arialyy.aria.core.wrapper.AbsTaskWrapper; import com.arialyy.aria.core.wrapper.AbsTaskWrapper;
import com.arialyy.aria.exception.AriaException; import com.arialyy.aria.exception.AriaException;
import java.io.File;


/** /**
* http 组合任务加载器 * http 组合任务加载器
Expand Down Expand Up @@ -53,6 +54,12 @@ private void startSub() {
} }
onPostStart(); onPostStart();
for (DTaskWrapper wrapper : getWrapper().getSubTaskWrapper()) { for (DTaskWrapper wrapper : getWrapper().getSubTaskWrapper()) {
File subFile = new File(wrapper.getEntity().getFilePath());
if (wrapper.getEntity().getFileSize() > 0
&& subFile.exists()
&& subFile.length() == wrapper.getEntity().getFileSize()) {
continue;
}
DownloadEntity dEntity = wrapper.getEntity(); DownloadEntity dEntity = wrapper.getEntity();
startSubLoader(createSubLoader(wrapper, dEntity.getFileSize() < 0)); startSubLoader(createSubLoader(wrapper, dEntity.getFileSize() < 0));
} }
Expand Down
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;


Expand All @@ -75,10 +76,11 @@ final class M3U8VodLoader extends BaseM3U8Loader {
private SparseArray<ThreadRecord> mAfterPeer = new SparseArray<>(); private SparseArray<ThreadRecord> mAfterPeer = new SparseArray<>();
private PeerIndexEvent mCurrentEvent; private PeerIndexEvent mCurrentEvent;
private String mCacheDir; private String mCacheDir;
private int aIndex = 0, bIndex = 0; private AtomicInteger afterPeerIndex = new AtomicInteger();
private int mCurrentFlagSize; private AtomicInteger beforePeerIndex = new AtomicInteger();
private AtomicInteger mCompleteNum = new AtomicInteger();
private AtomicInteger mCurrentFlagSize = new AtomicInteger();
private boolean isJump = false, isDestroy = false; private boolean isJump = false, isDestroy = false;
private int mCompleteNum = 0;
private ExecutorService mJumpThreadPool; private ExecutorService mJumpThreadPool;
private Thread jumpThread = null; private Thread jumpThread = null;
private M3U8TaskOption mM3U8Option; private M3U8TaskOption mM3U8Option;
Expand All @@ -102,20 +104,20 @@ SparseArray<ThreadRecord> getBeforePeer() {
} }


int getCompleteNum() { int getCompleteNum() {
return mCompleteNum; return mCompleteNum.get();
} }


void setCompleteNum(int mCompleteNum) { void setCompleteNum(int completeNum) {
this.mCompleteNum = mCompleteNum; mCompleteNum.set(completeNum);
} }


int getCurrentFlagSize() { int getCurrentFlagSize() {
mCurrentFlagSize = mFlagQueue.size(); mCurrentFlagSize.set(mFlagQueue.size());
return mCurrentFlagSize; return mCurrentFlagSize.get();
} }


void setCurrentFlagSize(int currentFlagSize) { void setCurrentFlagSize(int currentFlagSize) {
mCurrentFlagSize = currentFlagSize; mCurrentFlagSize.set(currentFlagSize);
} }


boolean isJump() { boolean isJump() {
Expand Down Expand Up @@ -178,7 +180,7 @@ private void startThreadTask() {
try { try {
LOCK.lock(); LOCK.lock();
while (mFlagQueue.size() < EXEC_MAX_NUM && !isBreak()) { while (mFlagQueue.size() < EXEC_MAX_NUM && !isBreak()) {
if (mCompleteNum == mRecord.threadRecords.size()) { if (mCompleteNum.get() == mRecord.threadRecords.size()) {
break; break;
} }


Expand Down Expand Up @@ -214,16 +216,18 @@ private ThreadRecord getThreadRecord() {
ThreadRecord tr = null; ThreadRecord tr = null;
try { try {
// 优先下载peer指针之后的数据 // 优先下载peer指针之后的数据
if (bIndex == 0 && aIndex < mAfterPeer.size()) { if (beforePeerIndex.get() == 0 && afterPeerIndex.get() < mAfterPeer.size()) {
//ALog.d(TAG, String.format("afterArray size:%s, index:%s", mAfterPeer.size(), aIndex)); //ALog.d(TAG, String.format("afterArray size:%s, index:%s", mAfterPeer.size(), aIndex));
tr = mAfterPeer.valueAt(aIndex); tr = mAfterPeer.valueAt(afterPeerIndex.get());
aIndex++; afterPeerIndex.getAndIncrement();
} }


// 如果指针之后的数组没有切片了,则重新初始化指针位置,并获取指针之前的数组获取切片进行下载 // 如果指针之后的数组没有切片了,则重新初始化指针位置,并获取指针之前的数组获取切片进行下载
if (mBeforePeer.size() > 0 && (tr == null || bIndex != 0) && bIndex < mBeforePeer.size()) { if (mBeforePeer.size() > 0
tr = mBeforePeer.valueAt(bIndex); && (tr == null || beforePeerIndex.get() != 0)
bIndex++; && beforePeerIndex.get() < mBeforePeer.size()) {
tr = mBeforePeer.valueAt(beforePeerIndex.get());
beforePeerIndex.getAndIncrement();
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Expand Down Expand Up @@ -256,19 +260,19 @@ private void initData() {
return; return;
} }
// 设置需要下载的切片 // 设置需要下载的切片
mCompleteNum = 0; mCompleteNum.set(0);
for (ThreadRecord tr : mRecord.threadRecords) { for (ThreadRecord tr : mRecord.threadRecords) {
if (!tr.isComplete) { if (!tr.isComplete) {
mAfterPeer.put(tr.threadId, tr); mAfterPeer.put(tr.threadId, tr);
} else { } else {
mCompleteNum++; mCompleteNum.getAndIncrement();
} }
} }
getStateManager().updateStateCount(); getStateManager().updateStateCount();
if (mCompleteNum <= 0) { if (mCompleteNum.get() <= 0) {
getListener().onStart(0); getListener().onStart(0);
} else { } else {
int percent = mCompleteNum * 100 / mRecord.threadRecords.size(); int percent = mCompleteNum.get() * 100 / mRecord.threadRecords.size();
getListener().onResume(percent); getListener().onResume(percent);
} }
} }
Expand Down Expand Up @@ -329,7 +333,7 @@ private void handleJump(PeerIndexEvent event) {


isJump = true; isJump = true;
notifyWaitLock(false); notifyWaitLock(false);
mCurrentFlagSize = mFlagQueue.size(); mCurrentFlagSize.set(mFlagQueue.size());
// 停止所有正在执行的线程任务 // 停止所有正在执行的线程任务
try { try {
TempFlag flag; TempFlag flag;
Expand Down Expand Up @@ -403,12 +407,12 @@ synchronized void resumeTask() {
mBeforePeer.clear(); mBeforePeer.clear();
mAfterPeer.clear(); mAfterPeer.clear();
mFlagQueue.clear(); mFlagQueue.clear();
aIndex = 0; afterPeerIndex.set(0);
bIndex = 0; beforePeerIndex.set(0);
mCompleteNum = 0; mCompleteNum.set(0);
for (ThreadRecord tr : mRecord.threadRecords) { for (ThreadRecord tr : mRecord.threadRecords) {
if (tr.isComplete) { if (tr.isComplete) {
mCompleteNum++; mCompleteNum.getAndIncrement();
continue; continue;
} }
if (tr.threadId < mCurrentEvent.peerIndex) { if (tr.threadId < mCurrentEvent.peerIndex) {
Expand Down
Expand Up @@ -29,8 +29,8 @@
import com.arialyy.aria.core.manager.ThreadTaskManager; import com.arialyy.aria.core.manager.ThreadTaskManager;
import com.arialyy.aria.core.processor.ITsMergeHandler; import com.arialyy.aria.core.processor.ITsMergeHandler;
import com.arialyy.aria.core.task.ThreadTask; import com.arialyy.aria.core.task.ThreadTask;
import com.arialyy.aria.exception.AriaM3U8Exception;
import com.arialyy.aria.exception.AriaException; import com.arialyy.aria.exception.AriaException;
import com.arialyy.aria.exception.AriaM3U8Exception;
import com.arialyy.aria.m3u8.BaseM3U8Loader; import com.arialyy.aria.m3u8.BaseM3U8Loader;
import com.arialyy.aria.m3u8.M3U8Listener; import com.arialyy.aria.m3u8.M3U8Listener;
import com.arialyy.aria.m3u8.M3U8TaskOption; import com.arialyy.aria.m3u8.M3U8TaskOption;
Expand All @@ -40,6 +40,7 @@
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;


/** /**
* m3u8 点播下载状态管理器 * m3u8 点播下载状态管理器
Expand All @@ -49,9 +50,9 @@ public final class VodStateManager implements IThreadStateManager {


private M3U8Listener listener; private M3U8Listener listener;
private int startThreadNum; // 启动的线程总数 private int startThreadNum; // 启动的线程总数
private int cancelNum = 0; // 已经取消的线程的数 private AtomicInteger cancelNum = new AtomicInteger(0); // 已经取消的线程的数
private int stopNum = 0; // 已经停止的线程数 private AtomicInteger stopNum = new AtomicInteger(0); // 已经停止的线程数
private int failNum = 0; // 失败的线程数 private AtomicInteger failNum = new AtomicInteger(0); // 失败的线程数
private long progress; private long progress;
private TaskRecord taskRecord; // 任务记录 private TaskRecord taskRecord; // 任务记录
private Looper looper; private Looper looper;
Expand All @@ -74,11 +75,11 @@ public final class VodStateManager implements IThreadStateManager {
int peerIndex = msg.getData().getInt(ISchedulers.DATA_M3U8_PEER_INDEX); int peerIndex = msg.getData().getInt(ISchedulers.DATA_M3U8_PEER_INDEX);
switch (msg.what) { switch (msg.what) {
case STATE_STOP: case STATE_STOP:
stopNum++; stopNum.getAndIncrement();
removeSignThread((ThreadTask) msg.obj); removeSignThread((ThreadTask) msg.obj);
// 处理跳转位置后,恢复任务 // 处理跳转位置后,恢复任务
if (loader.isJump() if (loader.isJump()
&& (stopNum == loader.getCurrentFlagSize() || loader.getCurrentFlagSize() == 0) && (stopNum.get() == loader.getCurrentFlagSize() || loader.getCurrentFlagSize() == 0)
&& !loader.isBreak()) { && !loader.isBreak()) {
loader.resumeTask(); loader.resumeTask();
return true; return true;
Expand All @@ -90,7 +91,7 @@ public final class VodStateManager implements IThreadStateManager {
} }
break; break;
case STATE_CANCEL: case STATE_CANCEL:
cancelNum++; cancelNum.getAndIncrement();
removeSignThread((ThreadTask) msg.obj); removeSignThread((ThreadTask) msg.obj);


if (loader.isBreak()) { if (loader.isBreak()) {
Expand All @@ -99,7 +100,7 @@ public final class VodStateManager implements IThreadStateManager {
} }
break; break;
case STATE_FAIL: case STATE_FAIL:
failNum++; failNum.getAndIncrement();
for (ThreadRecord tr : taskRecord.threadRecords) { for (ThreadRecord tr : taskRecord.threadRecords) {
if (tr.threadId == peerIndex) { if (tr.threadId == peerIndex) {
loader.getBeforePeer().put(peerIndex, tr); loader.getBeforePeer().put(peerIndex, tr);
Expand Down Expand Up @@ -175,9 +176,9 @@ public final class VodStateManager implements IThreadStateManager {
}; };


void updateStateCount() { void updateStateCount() {
cancelNum = 0; cancelNum.set(0);
stopNum = 0; stopNum.set(0);
failNum = 0; failNum.set(0);
} }


@Override public void setLooper(TaskRecord taskRecord, Looper looper) { @Override public void setLooper(TaskRecord taskRecord, Looper looper) {
Expand Down Expand Up @@ -233,12 +234,12 @@ private void handlerPercent() {


@Override public boolean isFail() { @Override public boolean isFail() {
printInfo("isFail"); printInfo("isFail");
return failNum != 0 && failNum == loader.getCurrentFlagSize() && !loader.isJump(); return failNum.get() != 0 && failNum.get() == loader.getCurrentFlagSize() && !loader.isJump();
} }


@Override public boolean isComplete() { @Override public boolean isComplete() {
if (m3U8Option.isIgnoreFailureTs()) { if (m3U8Option.isIgnoreFailureTs()) {
return loader.getCompleteNum() + failNum >= taskRecord.threadRecords.size() return loader.getCompleteNum() + failNum.get() >= taskRecord.threadRecords.size()
&& !loader.isJump(); && !loader.isJump();
} else { } else {
return loader.getCompleteNum() == taskRecord.threadRecords.size() && !loader.isJump(); return loader.getCompleteNum() == taskRecord.threadRecords.size() && !loader.isJump();
Expand Down
Expand Up @@ -137,7 +137,7 @@ public void handleBlockRecord() {
} else if (tr.startLocation != realLocation) { // 处理记录小于分块文件长度的情况 } else if (tr.startLocation != realLocation) { // 处理记录小于分块文件长度的情况
ALog.i(TAG, String.format("修正分块【%s】的进度记录为:%s", temp.getPath(), realLocation)); ALog.i(TAG, String.format("修正分块【%s】的进度记录为:%s", temp.getPath(), realLocation));
tr.startLocation = realLocation; tr.startLocation = realLocation;
}else { } else {
ALog.i(TAG, String.format("修正分块【%s】的进度记录为:%s", temp.getPath(), realLocation)); ALog.i(TAG, String.format("修正分块【%s】的进度记录为:%s", temp.getPath(), realLocation));
tr.startLocation = realLocation; tr.startLocation = realLocation;
tr.isComplete = false; tr.isComplete = false;
Expand All @@ -163,8 +163,10 @@ public void handleSingleThreadRecord() {
// 目标文件 // 目标文件
File targetFile = new File(mTaskRecord.filePath); File targetFile = new File(mTaskRecord.filePath);
// 处理组合任务其中一个子任务完成的情况 // 处理组合任务其中一个子任务完成的情况
if (tr.isComplete && targetFile.exists() && targetFile.length() == mWrapper.getEntity() if (tr.isComplete
.getFileSize()) { && targetFile.exists()
&& targetFile.length() != 0
&& targetFile.length() == mWrapper.getEntity().getFileSize()) {
tr.isComplete = true; tr.isComplete = true;
} else { } else {
ALog.w(TAG, String.format("文件【%s】不存在,任务将重新开始", file.getPath())); ALog.w(TAG, String.format("文件【%s】不存在,任务将重新开始", file.getPath()));
Expand Down
Expand Up @@ -153,7 +153,7 @@ void startSubTask(String url) {
if (!checkSubTask(url, "开始")) { if (!checkSubTask(url, "开始")) {
return; return;
} }
if (!mState.isRunning) { if (!mState.isRunning.get()) {
startTimer(); startTimer();
} }
AbsSubDLoadUtil d = getDownloader(url, false); AbsSubDLoadUtil d = getDownloader(url, false);
Expand Down Expand Up @@ -212,7 +212,7 @@ private AbsSubDLoadUtil getDownloader(String url, boolean needGetFileInfo) {
} }


@Override public boolean isRunning() { @Override public boolean isRunning() {
return mState != null && mState.isRunning; return mState != null && mState.isRunning.get();
} }


@Override public void cancel() { @Override public void cancel() {
Expand Down Expand Up @@ -275,11 +275,11 @@ protected void onPostStart() {
} }


private synchronized void startTimer() { private synchronized void startTimer() {
mState.isRunning = true; mState.isRunning.set(true);
mTimer = new ScheduledThreadPoolExecutor(1); mTimer = new ScheduledThreadPoolExecutor(1);
mTimer.scheduleWithFixedDelay(new Runnable() { mTimer.scheduleWithFixedDelay(new Runnable() {
@Override public void run() { @Override public void run() {
if (!mState.isRunning) { if (!mState.isRunning.get()) {
closeTimer(); closeTimer();
} else if (mCurrentLocation >= 0) { } else if (mCurrentLocation >= 0) {
long t = 0; long t = 0;
Expand Down Expand Up @@ -321,7 +321,7 @@ private synchronized void closeTimer() {
} }
} }


protected void fail(AriaException e, boolean needRetry){ protected void fail(AriaException e, boolean needRetry) {
closeTimer(); closeTimer();
getListener().onFail(needRetry, e); getListener().onFail(needRetry, e);
} }
Expand Down

0 comments on commit 669ac6b

Please sign in to comment.