/
AbstractBigFileProcessorTest.java
executable file
·221 lines (197 loc) · 7.67 KB
/
AbstractBigFileProcessorTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package com.charlie.bigfile;
import com.charlie.utils.FileUtils;
import lombok.Data;
import lombok.extern.log4j.Log4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
@Data
@Log4j
public class AbstractBigFileProcessorTest
{
private Logger logger = LoggerFactory.getLogger(AbstractBigFileProcessorTest.class);
private MemoryChunkHandler memoryChunkHandler = new MemoryChunkHandler();
@Test
public void t()
{
logger.debug("1");
logger.info("1233");
logger.error("gg");
}
@Test
public void process() throws IOException, InterruptedException
{
// 1. 先拆分文件 1.46G
String originlName = "阳光电影www.ygdy8.com.扎克·施奈德版正义联盟.2021.BD.1080P.中英双字.mkv";
// 碎片存储路径
String chunkOutPutDirectroy = "/Users/joker/Desktop/split";
// 合并的文件路径
String mergeFile = "/Users/joker/Desktop/split";
// 每个文件分100m
String originFile = "/Users/joker/Downloads/zylm/阳光电影www.ygdy8.com.扎克·施奈德版正义联盟.2021.BD.1080P.中英双字.mkv";
logger.debug("1. 拆分文件");
FileUtils.splitFiles(originFile, 400*1024, chunkOutPutDirectroy, false);
// 创建多线程,用于进度条展示
logger.info("根据文件数量创建多线程,开始分片上传碎片");
File directoryFile = new File(chunkOutPutDirectroy);
File[] dirFiles = directoryFile.listFiles();
List<File> files = new ArrayList<>();
for (File file : dirFiles)
{
if (StringUtils.contains(file.getName(), originlName))
{
files.add(file);
}
}
String processId = createProcessId();
ArrayBlockingQueue<String> chunkMd5List = new ArrayBlockingQueue<>(files.size());
CountDownLatch countDownLatch = new CountDownLatch(files.size());
for (int i = 0; i < files.size(); i++)
{
final int index = i;
new Thread(() ->
{
DefaultBigFileProcessor processor = new DefaultBigFileProcessor(memoryChunkHandler);
processor.setStorePath(mergeFile);
BigFileUploadProcessInfo info = new BigFileUploadProcessInfo();
info.setChunk(index);
String chunkMd5 = getChunkMd5();
chunkMd5List.add(chunkMd5);
countDownLatch.countDown();
info.setChunkMd5(chunkMd5);
info.setChunkSize((int) files.get(index).length());
info.setFileOriginName(originlName);
info.setProcessId(processId);
info.setTotalChunk(files.size());
FileInputStream fileInputStream = null;
try
{
fileInputStream = new FileInputStream(files.get(index));
info.setInputStream(fileInputStream);
countDownLatch.await();
processor.process(info);
} catch (FileNotFoundException e)
{
e.printStackTrace();
} catch (IOException e)
{
e.printStackTrace();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}).start();
}
countDownLatch.await();
log.info("开始进度条查询");
CountDownLatch stop = new CountDownLatch(chunkMd5List.size());
List<String> idList = new ArrayList<>(chunkMd5List.size());
List<String> collect = new ArrayList<>(chunkMd5List);
// this.queryPercentage(stop, memoryChunkHandler, processId, collect);
this.querySingle(memoryChunkHandler, processId, collect);
// stop.await();
System.out.println("程序结束");
}
private void querySingle(MemoryChunkHandler memoryChunkHandler, String processId, List<String> collect)
{
Random random = new Random();
String md5 = collect.get(0);
float percentage = 0.0f;
while (percentage < 100)
{
ChunkInfo info = memoryChunkHandler.getPercentage(processId, md5);
if (info != null)
{
int uploadedSize = info.getUploadedSize();
int chunkSize = info.getChunkSize();
float per = (float) uploadedSize / chunkSize;
// System.out.println(String.format("chunk={%s} 进度为{%s}, {%s}", md5, per * 100 + "%", StringUtils.repeat(">", (int) (per * 100))));
percentage = per * 100;
try
{
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e)
{
e.printStackTrace();
}
} else
{
try
{
Thread.sleep(10);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
System.out.println("end");
}
private String getChunkMd5()
{return UUID.randomUUID().toString();}
private String createProcessId()
{
return "123456";
}
private void queryPercentage(CountDownLatch stop, MemoryChunkHandler memoryChunkHandler, String processId, List<String> chunkMd5List)
{
CountDownLatch countDownLatch = new CountDownLatch(chunkMd5List.size());
for (int i = 0; i < chunkMd5List.size(); i++)
{
final int index = i;
new Thread(() ->
{
String md5 = null;
md5 = chunkMd5List.get(index);
logger.debug("获取到md5:{},剩余:{}", md5, chunkMd5List.size());
countDownLatch.countDown();
try
{
countDownLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
}
logger.debug("开始查询");
float percentage = 0.0f;
while (percentage <= 100)
{
ChunkInfo info = memoryChunkHandler.getPercentage(processId, md5);
if (info != null)
{
int uploadedSize = info.getUploadedSize();
int chunkSize = info.getChunkSize();
float per = (float) uploadedSize / chunkSize;
System.out.println(String.format("chunk={%s} 进度为{%s}, {%s}", md5, per * 100 + "%", StringUtils.repeat(">", (int) (per * 100))));
percentage = per * 100;
try
{
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e)
{
e.printStackTrace();
}
} else
{
try
{
Thread.sleep(10);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
stop.countDown();
}).start();
}
}
}