Skip to content

Commit

Permalink
Processing pipeline support (dadoonet#1003).
Browse files Browse the repository at this point in the history
  • Loading branch information
janhoy committed Sep 4, 2020
1 parent 6e82ba0 commit 7c53731
Show file tree
Hide file tree
Showing 10 changed files with 540 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fr.pilato.elasticsearch.crawler.fs;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.isIndexable;

/**
* The processing pipeline that will be used if not overridden.
*
*/
public class DefaultProcessingPipeline implements ProcessingPipeline {
private static final Logger logger = LogManager.getLogger(DefaultProcessingPipeline.class);
private final TikaProcessor tika;
private final EsIndexProcessor es;

public DefaultProcessingPipeline() {
tika = new TikaProcessor();
es = new EsIndexProcessor();
}

@Override
public void processFile(FsCrawlerContext ctx) {
// Extracting content with Tika
tika.process(ctx);

// Index to es
if (isIndexable(ctx.getDoc().getContent(), ctx.getFsSettings().getFs().getFilters())) {
es.process(ctx);
} else {
logger.debug("We ignore file [{}] because it does not match all the patterns {}", ctx.getFile().getName(),
ctx.getFsSettings().getFs().getFilters());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fr.pilato.elasticsearch.crawler.fs;

import com.fasterxml.jackson.core.JsonProcessingException;
import fr.pilato.elasticsearch.crawler.fs.beans.DocParser;
import fr.pilato.elasticsearch.crawler.fs.framework.SignTool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.lang.invoke.MethodHandles;
import java.security.NoSuchAlgorithmException;

/**
* Pulls {@link fr.pilato.elasticsearch.crawler.fs.beans.Doc} from context and indexes it to Elasticsearch
*/
public class EsIndexProcessor implements Processor {
private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass());

@Override
public void process(FsCrawlerContext ctx) throws ProcessingException {
try {
long startTime = System.currentTimeMillis();
String index = ctx.getFsSettings().getElasticsearch().getIndex();
String id = generateId(ctx);
String pipeline = ctx.getFsSettings().getElasticsearch().getPipeline();
String json = DocParser.toJson(ctx.getDoc());
ctx.getEsClient().index(index, id, json, pipeline);
logger.debug("Indexed {}/{}?pipeline={} in {}ms", index, id, pipeline,
System.currentTimeMillis() - startTime);
logger.trace("JSon indexed : {}", json);
} catch (JsonProcessingException e) {
throw new ProcessingException(e);
}
}

String generateId(FsCrawlerContext ctx) {
try {
return ctx.getFsSettings().getFs().isFilenameAsId() ?
ctx.getFile().getName() :
SignTool.sign((new File(ctx.getFile().getName(), ctx.getFilepath())).toString());
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fr.pilato.elasticsearch.crawler.fs;

import fr.pilato.elasticsearch.crawler.fs.beans.Doc;
import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClient;
import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractModel;
import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings;

import java.io.InputStream;
import java.security.MessageDigest;

/**
* A class that keeps necessary context for the processing pipeline.
* Each processor will typically update the 'doc' as needed.
*/
public class FsCrawlerContext {
private final FileAbstractModel file;
private final FsSettings fsSettings;
private final String filepath;
private final ElasticsearchClient esClient;
private final InputStream inputStream;
private final String fullFilename;
private Doc doc;
private final MessageDigest messageDigest;

public FsCrawlerContext(Builder builder) {
this.file = builder.file;
this.fsSettings = builder.fsSettings;
this.filepath = builder.filepath;
this.messageDigest = builder.messageDigest;
this.esClient = builder.esClient;
this.inputStream = builder.inputStream;
this.fullFilename = builder.fullFilename;
this.doc = builder.doc;
}

public FileAbstractModel getFile() {
return file;
}

public FsSettings getFsSettings() {
return fsSettings;
}

public String getFilepath() {
return filepath;
}

public void setDoc() {
this.doc = new Doc();
}

public Doc getDoc() {
return doc;
}

public MessageDigest getMessageDigest() {
return messageDigest;
}

public ElasticsearchClient getEsClient() {
return esClient;
}

public InputStream getInputStream() {
return inputStream;
}

public String getFullFilename() {
return fullFilename;
}


public static class Builder {
private FileAbstractModel file;
private FsSettings fsSettings;
private String filepath;
private MessageDigest messageDigest;
private ElasticsearchClient esClient;
private InputStream inputStream;
private String fullFilename;
private Doc doc = new Doc();

public Builder withFileModel(FileAbstractModel file) {
this.file = file;
return this;
}

public Builder withFsSettings(FsSettings fsSettings) {
this.fsSettings = fsSettings;
return this;
}

public Builder withFilePath(String filepath) {
this.filepath = filepath;
return this;
}

public Builder withMessageDigest(MessageDigest messageDigest) {
this.messageDigest = messageDigest;
return this;
}

public Builder withEsClient(ElasticsearchClient esClient) {
this.esClient = esClient;
return this;
}

public Builder withInputStream(InputStream inputStream) {
this.inputStream = inputStream;
return this;
}

public Builder withFullFilename(String fullFilename) {
this.fullFilename = fullFilename;
return this;
}

public Builder withDoc(Doc doc) {
this.doc = doc;
return this;
}

public FsCrawlerContext build() {
return new FsCrawlerContext(this);
}

}
}
Loading

0 comments on commit 7c53731

Please sign in to comment.