Skip to content

Commit

Permalink
feat: Adds a way to find a cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
bric3 committed Oct 23, 2022
1 parent 276aa66 commit 402d088
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
* </code></pre>
* </p>
*
* <p>
* Note this implementation is not thread safe.
* </p>
*
* @author brice.dutheil@gmail.com
* @modifiedBy david.ohana@ibm.com, moshikh@il.ibm.com
* @originalAuthor LogPAI team
Expand Down Expand Up @@ -134,6 +138,22 @@ public void parseLogMessage(@Nonnull String message) {
}
}

/**
* Search a matching log cluster given a log message.
*
* @param message The log message content
* @return The matching log cluster or null if no match
*/
public LogCluster searchLogMessage(@Nonnull String message) {
// sprint message by delimiter / whitespaces
List<String> contentTokens = Tokenizer.tokenize(message, delimiters);

// Search the prefix tree
LogCluster matchCluster = treeSearch(contentTokens);
return matchCluster;
}


private @Nullable
InternalLogCluster treeSearch(@Nonnull List<String> logTokens) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -35,16 +36,16 @@ void smokeTest() throws IOException {
AtomicInteger lineCounter = new AtomicInteger();

Stopwatch stopwatch = Stopwatch.createStarted();
Files.lines(TestPaths.get("SSH.log"),
StandardCharsets.UTF_8)
.peek(__ -> lineCounter.incrementAndGet())
.map(l -> l.substring(l.indexOf("]: ") + 3)) // removes this part: "Dec 10 06:55:46 LabSZ sshd[24200]: "
.forEach(content -> {
drain.parseLogMessage(content);
if (lineCounter.get() % 10000 == 0) {
System.out.printf("%4d clusters so far%n", drain.clusters().size());
}
});
try (Stream<String> lines = Files.lines(TestPaths.get("SSH.log"), StandardCharsets.UTF_8)) {
lines.peek(__ -> lineCounter.incrementAndGet())
.map(l -> l.substring(l.indexOf("]: ") + 3)) // removes this part: "Dec 10 06:55:46 LabSZ sshd[24200]: "
.forEach(content -> {
drain.parseLogMessage(content);
if (lineCounter.get() % 10000 == 0) {
System.out.printf("%4d clusters so far%n", drain.clusters().size());
}
});
}


System.out.printf("---- Done processing file. Total of %d lines, done in %s, %d clusters%n",
Expand Down Expand Up @@ -115,9 +116,29 @@ void smokeTest() throws IOException {
assertCluster(sortedClusters, 50, 1, "syslogin perform logout: logout() returned an error");
}

private void assertCluster(List<LogCluster> sortedClusters, int i, int sigthings, String tokens) {
assertThat(sortedClusters.get(i).sightings()).isEqualTo(sigthings);
assertThat(String.join(" ", sortedClusters.get(i).tokens())).isEqualTo(tokens);
@Test
void can_find_a_log() throws IOException {
Drain drain = Drain.drainBuilder()
.additionalDelimiters("_")
.depth(4)
.build();

try (Stream<String> lines = Files.lines(TestPaths.get("SSH.log"), StandardCharsets.UTF_8)) {
lines.map(l -> l.substring(l.indexOf("]: ") + 3)) // removes this part: "Dec 10 06:55:46 LabSZ sshd[24200]: "
.forEach(drain::parseLogMessage);
}

LogCluster logCluster = drain.searchLogMessage("Received disconnect from 202.100.179.208: 11: Bye Bye [preauth]");
assertCluster(logCluster, 46642, "Received disconnect from <*> 11: <*> <*> <*>");
}

private void assertCluster(List<LogCluster> sortedClusters, int i, int sightings, String tokens) {
assertCluster(sortedClusters.get(i), sightings, tokens);
}

private static void assertCluster(LogCluster logCluster, int sightings, String tokens) {
assertThat(logCluster.sightings()).isEqualTo(sightings);
assertThat(String.join(" ", logCluster.tokens())).isEqualTo(tokens);
}
}
/*
Expand Down

0 comments on commit 402d088

Please sign in to comment.