/
AuditSparqlProcessor.java
141 lines (121 loc) · 6.12 KB
/
AuditSparqlProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/**
* Copyright 2015 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.camel.audit.triplestore;
import static org.fcrepo.audit.AuditNamespaces.AUDIT;
import static org.fcrepo.audit.AuditNamespaces.PREMIS;
import static org.fcrepo.audit.AuditNamespaces.PROV;
import static org.fcrepo.audit.AuditNamespaces.XSD;
import static org.fcrepo.camel.RdfNamespaces.RDF;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import org.fcrepo.audit.AuditUtils;
import org.fcrepo.camel.JmsHeaders;
import org.fcrepo.camel.processor.ProcessorUtils;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.clerezza.rdf.core.Triple;
import org.apache.clerezza.rdf.core.UriRef;
import org.apache.clerezza.rdf.core.impl.SimpleMGraph;
import org.apache.clerezza.rdf.core.impl.TripleImpl;
import org.apache.clerezza.rdf.core.impl.TypedLiteralImpl;
import org.apache.clerezza.rdf.core.serializedform.SerializingProvider;
import org.apache.clerezza.rdf.jena.serializer.JenaSerializerProvider;
/**
* A processor that converts an audit message into a sparql-update
* statement for an external triplestore.
*
* @author Aaron Coburn
* @author escowles
* @since 2015-04-09
*/
public class AuditSparqlProcessor implements Processor {
/**
* Define how a message should be processed.
*
* @param exchange the current camel message exchange
*/
public void process(final Exchange exchange) throws Exception {
final Message in = exchange.getIn();
final String eventURIBase = in.getHeader(AuditHeaders.EVENT_BASE_URI, String.class);
final String UUIDString = UUID.randomUUID().toString();
final UriRef eventURI = new UriRef(eventURIBase + "/" + UUIDString);
final Set<Triple> triples = triplesForMessage(in, eventURI);
// serialize triples
final SerializingProvider serializer = new JenaSerializerProvider();
final ByteArrayOutputStream serializedGraph = new ByteArrayOutputStream();
serializer.serialize(serializedGraph, new SimpleMGraph(triples), "text/rdf+nt");
// generate SPARQL Update
final StringBuilder query = new StringBuilder("update=");
query.append(ProcessorUtils.insertData(serializedGraph.toString("UTF-8"), null));
// update exchange
in.setBody(query.toString());
in.setHeader(AuditHeaders.EVENT_URI, eventURI.toString());
in.setHeader(Exchange.CONTENT_TYPE, "application/x-www-form-urlencoded");
in.setHeader(Exchange.HTTP_METHOD, "POST");
}
// namespaces and properties
private static final UriRef INTERNAL_EVENT = new UriRef(AUDIT + "InternalEvent");
private static final UriRef PREMIS_EVENT = new UriRef(PREMIS + "Event");
private static final UriRef PROV_EVENT = new UriRef(PROV + "InstantaneousEvent");
private static final UriRef PREMIS_TIME = new UriRef(PREMIS + "hasEventDateTime");
private static final UriRef PREMIS_OBJ = new UriRef(PREMIS + "hasEventRelatedObject");
private static final UriRef PREMIS_AGENT = new UriRef(PREMIS + "hasEventRelatedAgent");
private static final UriRef PREMIS_TYPE = new UriRef(PREMIS + "hasEventType");
private static final UriRef RDF_TYPE = new UriRef(RDF + "type");
private static final UriRef XSD_DATE = new UriRef(XSD + "dateTime");
private static final UriRef XSD_STRING = new UriRef(XSD + "string");
private static final String EMPTY_STRING = "";
/**
* Convert a Camel message to audit event description.
* @param message Camel message produced by an audit event
* @param subject RDF subject of the audit description
*/
private static Set<Triple> triplesForMessage(final Message message, final UriRef subject) throws IOException {
// get info from jms message headers
final String eventType = (String) message.getHeader(JmsHeaders.EVENT_TYPE, EMPTY_STRING);
final Long timestamp = (Long) message.getHeader(JmsHeaders.TIMESTAMP, 0);
final DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
df.setTimeZone(TimeZone.getTimeZone("UTC"));
final String date = df.format(new Date(timestamp));
final String user = (String) message.getHeader(JmsHeaders.USER, EMPTY_STRING);
final String agent = (String) message.getHeader(JmsHeaders.USER_AGENT, EMPTY_STRING);
final String properties = (String) message.getHeader(JmsHeaders.PROPERTIES, EMPTY_STRING);
final String identifier = ProcessorUtils.getSubjectUri(message);
final String premisType = AuditUtils.getAuditEventType(eventType, properties);
// types
final Set<Triple> triples = new HashSet<>();
triples.add( new TripleImpl(subject, RDF_TYPE, INTERNAL_EVENT) );
triples.add( new TripleImpl(subject, RDF_TYPE, PREMIS_EVENT) );
triples.add( new TripleImpl(subject, RDF_TYPE, PROV_EVENT) );
// basic event info
triples.add( new TripleImpl(subject, PREMIS_TIME, new TypedLiteralImpl(date, XSD_DATE)) );
triples.add( new TripleImpl(subject, PREMIS_OBJ, new UriRef(identifier)) );
triples.add( new TripleImpl(subject, PREMIS_AGENT, new TypedLiteralImpl(user, XSD_STRING)) );
triples.add( new TripleImpl(subject, PREMIS_AGENT, new TypedLiteralImpl(agent, XSD_STRING)) );
if (premisType != null) {
triples.add(new TripleImpl(subject, PREMIS_TYPE, new UriRef(premisType)));
}
return triples;
}
}