-
Notifications
You must be signed in to change notification settings - Fork 0
/
Main.java
137 lines (124 loc) · 5.2 KB
/
Main.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package test;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
String source = "source";
String target = "target";
while (true) {
List<Path> filePaths = filePathsList(source); // Step 1: get all files from a directory
List<Path> filteredFilePaths = filter(filePaths); // Step 2: filter by ".txt"
SortedMap<Path, List<String>> contentOfFiles = getContentOfFiles(filteredFilePaths); // Step 3: get content of files
move(filteredFilePaths, target); // Step 4: move files to destination
printToConsole(contentOfFiles);
sendMessages(contentOfFiles);
Thread.sleep(5000L);
}
}
public static List<Path> filePathsList(String directory) throws IOException {
List<Path> filePaths = new ArrayList<>();
DirectoryStream<Path> directoryStream = Files.newDirectoryStream(FileSystems.getDefault().getPath(directory));
for (Path path : directoryStream) {
filePaths.add(path);
}
return filePaths;
}
private static List<Path> filter(List<Path> filePaths) {
List<Path> filteredFilePaths = new ArrayList<>();
for (Path filePath : filePaths) {
if (filePath.getFileName().toString().endsWith(".txt")) {
filteredFilePaths.add(filePath);
}
}
return filteredFilePaths;
}
private static SortedMap<Path, List<String>> getContentOfFiles(List<Path> filePaths) throws IOException {
SortedMap<Path, List<String>> contentOfFiles = new TreeMap<>((path1, path2) -> Long.valueOf(path1.toFile().lastModified()).compareTo(Long.valueOf(path2.toFile().lastModified())));
for (Path filePath : filePaths) {
contentOfFiles.put(filePath, new ArrayList<>());
Files.readAllLines(filePath).forEach(contentOfFiles.get(filePath)::add);
}
return contentOfFiles;
}
private static void move(List<Path> filePaths, String target) throws IOException {
Path targetDir = FileSystems.getDefault().getPath(target);
if (!Files.isDirectory(targetDir)) {
targetDir = Files.createDirectories(Paths.get(target));
}
for (Path filePath : filePaths) {
System.out.println("Moving " + filePath.getFileName() + " to " + targetDir.toAbsolutePath());
Files.move(filePath, Paths.get(target, filePath.getFileName().toString()), StandardCopyOption.ATOMIC_MOVE);
}
}
private static void printToConsole(SortedMap<Path, List<String>> contentOfFiles) {
System.out.println("Content of files:");
contentOfFiles.forEach((k,v) -> v.forEach(System.out::println));
}
private static void sendMessages(SortedMap<Path, List<String>> contentOfFiles) {
ActiveMQConnectionFactory factory = null;
Connection connection = null;
Session session = null;
try {
factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("TestQueue");
final MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
final Session sessionForLoop = session;
contentOfFiles.forEach((k,v) -> v.forEach((text) -> Main.sendMessage(sessionForLoop, producer, text)));
session.commit();
} catch (JMSException e) {
try {
if (session != null) {
session.rollback();
}
} catch (JMSException ex) {
// ignore
}
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException ex) {
// ignore
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
// ignore
}
}
}
}
public static void sendMessage(Session session, MessageProducer producer, String text) {
try {
TextMessage message = session.createTextMessage(text);
producer.send(message);
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
}