From 71a95b98f37d268eeafd2fb653ee1b2158416eaf Mon Sep 17 00:00:00 2001 From: Kevin Ratnasekera Date: Wed, 12 Jul 2017 03:08:24 +0530 Subject: [PATCH 1/3] GORA-513 Add initial OrientDB datastore impl --- gora-orientdb/pom.xml | 184 ++++ .../apache/gora/orientdb/package-info.java | 23 + .../gora/orientdb/query/OrientDBQuery.java | 113 +++ .../gora/orientdb/query/OrientDBResult.java | 103 ++ .../gora/orientdb/query/package-info.java | 23 + .../gora/orientdb/store/OrientDBMapping.java | 177 ++++ .../store/OrientDBMappingBuilder.java | 123 +++ .../gora/orientdb/store/OrientDBStore.java | 922 ++++++++++++++++++ .../store/OrientDBStoreParameters.java | 157 +++ .../gora/orientdb/store/package-info.java | 23 + .../gora/orientdb/GoraOrientDBTestDriver.java | 75 ++ .../mapreduce/OrientDBStoreMapReduceTest.java | 66 ++ .../gora/orientdb/mapreduce/package-info.java | 22 + .../apache/gora/orientdb/package-info.java | 23 + .../store/OrientDBGoraDataStoreTest.java | 65 ++ .../gora/orientdb/store/package-info.java | 22 + .../test/resources/gora-orientdb-mapping.xml | 45 + .../src/test/resources/gora.properties | 23 + .../test/resources/orientdb-server-config.xml | 35 + pom.xml | 36 + 20 files changed, 2260 insertions(+) create mode 100644 gora-orientdb/pom.xml create mode 100644 gora-orientdb/src/main/java/org/apache/gora/orientdb/package-info.java create mode 100644 gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java create mode 100644 gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java create mode 100644 gora-orientdb/src/main/java/org/apache/gora/orientdb/query/package-info.java create mode 100644 gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java create mode 100644 gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java create mode 100644 gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java create mode 100644 gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java create mode 100644 gora-orientdb/src/main/java/org/apache/gora/orientdb/store/package-info.java create mode 100644 gora-orientdb/src/test/java/org/apache/gora/orientdb/GoraOrientDBTestDriver.java create mode 100644 gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/OrientDBStoreMapReduceTest.java create mode 100644 gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/package-info.java create mode 100644 gora-orientdb/src/test/java/org/apache/gora/orientdb/package-info.java create mode 100644 gora-orientdb/src/test/java/org/apache/gora/orientdb/store/OrientDBGoraDataStoreTest.java create mode 100644 gora-orientdb/src/test/java/org/apache/gora/orientdb/store/package-info.java create mode 100644 gora-orientdb/src/test/resources/gora-orientdb-mapping.xml create mode 100644 gora-orientdb/src/test/resources/gora.properties create mode 100644 gora-orientdb/src/test/resources/orientdb-server-config.xml diff --git a/gora-orientdb/pom.xml b/gora-orientdb/pom.xml new file mode 100644 index 000000000..50a84dd17 --- /dev/null +++ b/gora-orientdb/pom.xml @@ -0,0 +1,184 @@ + + + + 4.0.0 + + + org.apache.gora + gora + 0.8-SNAPSHOT + ../ + + gora-orientdb + bundle + + Apache Gora :: OrientDB + http://gora.apache.org + The Apache Gora open source framework provides an in-memory data model and + persistence for big data. Gora supports persisting to column stores, key value stores, + document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce + support. + 2010 + + The Apache Software Foundation + http://www.apache.org/ + + + JIRA + https://issues.apache.org/jira/browse/GORA + + + Jenkins + https://builds.apache.org/job/Gora-trunk/ + + + + * + org.apache.gora.orientdb*;version="${project.version}";-noimport:=true + + + + target + target/classes + ${project.artifactId}-${project.version} + target/test-classes + src/test/java + src/main/java + + + ${project.basedir}/src/test/resources + + **/* + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + ${build-helper-maven-plugin.version} + + + generate-sources + + add-source + + + + src/examples/java + + + + + + + + + + + + org.apache.gora + gora-core + + + + org.apache.gora + gora-core + test-jar + test + + + + org.apache.avro + avro + + + + + org.slf4j + slf4j-log4j12 + + + + log4j + log4j + + + javax.jms + jms + + + + + + + junit + junit + + + + org.apache.hadoop + hadoop-minicluster + test + + + + + com.orientechnologies + orientdb-server + + + + com.orientechnologies + orientdb-core + + + + com.orientechnologies + orientdb-graphdb + + + + com.orientechnologies + orientdb-client + + + + com.github.raymanrt + orientqb + + + + org.jdom + jdom + compile + + + + + + org.apache.hadoop + hadoop-client + + + + + diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/package-info.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/package-info.java new file mode 100644 index 000000000..1eecd09f2 --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package encapsulates all OrientDB dataStore related class implementations. + * + */ +package org.apache.gora.orientdb; \ No newline at end of file diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java new file mode 100644 index 000000000..209549a27 --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.query; + +import com.github.raymanrt.orientqb.query.Parameter; +import com.github.raymanrt.orientqb.query.Projection; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; +import org.apache.gora.orientdb.store.OrientDBMapping; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; +import com.github.raymanrt.orientqb.query.Query; + +import java.util.HashMap; +import java.util.Map; + +import static com.github.raymanrt.orientqb.query.Projection.projection; + + +/** + * OrientDB specific implementation of the {@link org.apache.gora.query.Query} interface. + */ +public class OrientDBQuery extends QueryBase { + + private OSQLSynchQuery dbQuery; + private Map params; + + public OrientDBQuery() { + super(null); + } + + public OrientDBQuery(DataStore dataStore) { + super(dataStore); + } + + /** + * Return populated {@link OSQLSynchQuery} Orient DB query. + * + * @return a {@link OSQLSynchQuery} query executable over Orient DB. + */ + public OSQLSynchQuery getOrientDBQuery() { + return dbQuery; + } + + /** + * Dynamic parameters for {@link OSQLSynchQuery} Orient DB query. + * + * @return a param map related to {@link OSQLSynchQuery} Orient DB query. + */ + public Map getParams() { + return params; + } + + /** + * Convert Gora query to Orient DB specific query which underline API understands. + * And maintain it s state encapsulated to Gora implementation of the {@link org.apache.gora.query.Query}. + * + * @return a {@link OSQLSynchQuery} query executable over Orient DB. + */ + public OSQLSynchQuery populateOrientDBQuery(final OrientDBMapping orientDBMapping, + final String[] fields, + final String[] schemaFields) { + params = new HashMap(); + Query selectQuery = new Query(); + selectQuery.from(orientDBMapping.getDocumentClass()); + if ((this.getStartKey() != null) && (this.getEndKey() != null) + && this.getStartKey().equals(this.getEndKey())) { + selectQuery.where(projection("_id").eq(Parameter.parameter("key"))); + params.put("key", this.getStartKey()); + } else if (this.getStartKey() != null || this.getEndKey() != null) { + if (this.getStartKey() != null) { + selectQuery.where(projection("_id").ge(Parameter.parameter("key_lower"))); + params.put("key_lower", this.getStartKey()); + } + if (this.getEndKey() != null) { + selectQuery.where(projection("_id").le(Parameter.parameter("key_upper"))); + params.put("key_upper", this.getEndKey()); + } + } + + if (fields.length == schemaFields.length) { + selectQuery.select(Projection.ALL); + } else { + for (String k : fields) { + String dbFieldName = orientDBMapping.getDocumentField(k); + if (dbFieldName != null && dbFieldName.length() > 0) { + selectQuery.select(dbFieldName); + } + } + selectQuery.select("_id"); + } + dbQuery = new OSQLSynchQuery(selectQuery.toString()); + return dbQuery; + } + +} diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java new file mode 100644 index 000000000..23df5fa9b --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.query; + +import java.io.IOException; +import java.util.Iterator; + +import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.query.OConcurrentResultSet; +import org.apache.gora.orientdb.store.OrientDBStore; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OrientDB specific implementation of the {@link org.apache.gora.query.Result} interface. + * + */ +public class OrientDBResult extends ResultBase { + + /** + * Reference to the OrientDB Results set. + */ + private OConcurrentResultSet resultSet; + private int size; + private static final Logger log = LoggerFactory.getLogger(OrientDBResult.class); + private Iterator resultSetIterator; + + public OrientDBResult(DataStore dataStore, Query query) { + super(dataStore, query); + } + + public OrientDBResult(DataStore dataStore, + Query query, + OConcurrentResultSet resultSet) { + super(dataStore, query); + this.resultSet = resultSet; + this.size = resultSet.size(); + this.resultSetIterator = resultSet.iterator(); + } + + public OrientDBStore getDataStore() { + return (OrientDBStore) super.getDataStore(); + } + + @Override + public float getProgress() throws IOException { + if (resultSet == null) { + return 0; + } else if (size == 0) { + return 1; + } else { + return offset / (float) size; + } + } + + @Override + public void close() throws IOException { + resultSet.clear(); + } + + @Override + protected boolean nextInner() throws IOException { + ODatabaseDocumentTx loadTx = ((OrientDBStore) getDataStore()) + .getConnectionPool().acquire(); + loadTx.activateOnCurrentThread(); + try { + + if (!resultSetIterator.hasNext()) { + return false; + } + + ODocument obj = resultSetIterator.next(); + key = (K) obj.field("_id"); + persistent = ((OrientDBStore) getDataStore()) + .convertOrientDocToAvroBean(obj, getQuery().getFields()); + return persistent != null; + } finally { + loadTx.close(); + } + } + +} diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/package-info.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/package-info.java new file mode 100644 index 000000000..0812a12d8 --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains all the OrientDB dataStore query representation class as well as Result set representing class + * when query is executed over the OrientDB dataStore. + */ +package org.apache.gora.orientdb.query; \ No newline at end of file diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java new file mode 100644 index 000000000..a37c3d84d --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import com.orientechnologies.orient.core.metadata.schema.OType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.orientechnologies.orient.core.metadata.schema.OSchemaShared; +import com.orientechnologies.orient.core.record.impl.ODocument; +import org.apache.avro.specific.SpecificRecord; + +import java.util.HashMap; +import java.util.Locale; + +/** + * Maintains mapping between AVRO data bean and OrientDB document. + */ +public class OrientDBMapping { + + public static final Logger log = LoggerFactory.getLogger(OrientDBMapping.class); + + private String documentClass; + private HashMap classToDocument = new HashMap<>(); + private HashMap documentToClass = new HashMap<>(); + private HashMap documentFields = new HashMap<>(); + + /** + * Returns main OrientDB document class which matches to persistent bean. + * + * @return a {@link ODocument} class name. + */ + public String getDocumentClass() { + return documentClass; + } + + /** + * Setter for main OrientDB document class which matches for persistent bean. + * + * @param documentClass {@link ODocument} class name. + */ + public void setDocumentClass(String documentClass) { + this.documentClass = documentClass; + } + + /** + * Register mapping {@link com.orientechnologies.orient.core.record.impl.ODocument} + * field name to it s data type {@link OType} + * + * @param name {@link ODocument} class name. + * @param type {@link DocumentFieldType} field data type. + */ + private void registerDocumentField(String name, DocumentFieldType type) { + if (OSchemaShared.checkClassNameIfValid(name) != null) { + throw new IllegalArgumentException("'" + name + + "' is an invalid field name for a OrientDB document. '" + + OSchemaShared.checkClassNameIfValid(name) + "' is not a valid character."); + } + if (documentFields.containsKey(name) && (documentFields.get(name) != type)) + throw new IllegalStateException("The field '" + name + "' is already " + + "registered with a different type."); + documentFields.put(name, type); + } + + /** + * Register mapping between AVRO {@link SpecificRecord} + * record field, ODocument field and it's type. + * + * @param classFieldName {@link SpecificRecord} field name. + * @param docFieldName {@link ODocument} field name. + * @param fieldType {@link DocumentFieldType} field data type as string. + */ + public void registerClassField(String classFieldName, + String docFieldName, String fieldType) { + try { + registerDocumentField(docFieldName, + DocumentFieldType.valueOf(fieldType.toUpperCase(Locale.getDefault()))); + } catch (final IllegalArgumentException e) { + throw new IllegalStateException("Declared '" + fieldType + + "' for class field '" + classFieldName + + "' is not supported by OrientDBMapping"); + } + + if (classToDocument.containsKey(classFieldName)) { + if (!classToDocument.get(classFieldName).equals(docFieldName)) { + throw new IllegalStateException("The class field '" + classFieldName + + "' is already registered in the mapping" + + " with the document field '" + + classToDocument.get(classFieldName) + + " which differs from the new one '" + docFieldName + "'."); + } + } else { + classToDocument.put(classFieldName, docFieldName); + documentToClass.put(docFieldName, classFieldName); + } + } + + /** + * Returns all fields in AVRO {@link org.apache.hadoop.io.serializer.avro.Record} record. + * + * @return array of fields in string. + */ + public String[] getDocumentFields() { + return documentToClass.keySet().toArray(new String[documentToClass.keySet().size()]); + } + + /** + * Return ODocument name given it's mapped AVRO {@link SpecificRecord} + * record field name. + * + * @param field AVRO record field name in string + * @return matching ODocument {@link ODocument} field name in string. + */ + public String getDocumentField(String field) { + return classToDocument.get(field); + } + + /** + * Return ODocument name given it's mapped AVRO {@link SpecificRecord} + * record field name. + * + * @param field AVRO record field name in string + * @return matching ODocument {@link ODocument} field name in string. + */ + protected DocumentFieldType getDocumentFieldType(String field) { + return documentFields.get(field); + } + + /** + * Currently supporting data types from OrientDB data types {@link OType} + */ + public static enum DocumentFieldType { + + INTEGER(OType.INTEGER.name()), + LONG(OType.LONG.name()), + FLOAT(OType.FLOAT.name()), + SHORT(OType.SHORT.name()), + DOUBLE(OType.DOUBLE.name()), + STRING(OType.STRING.name()), + ANY(OType.ANY.name()), + TRANSIENT(OType.TRANSIENT.name()), + BINARY(OType.BINARY.name()), + DATE(OType.DATE.name()), + DATETIME(OType.DATETIME.name()), + EMBEDDED(OType.EMBEDDED.name()), + EMBEDDEDLIST(OType.EMBEDDEDLIST.name()), + EMBEDDEDSET(OType.EMBEDDEDSET.name()), + EMBEDDEDMAP(OType.EMBEDDEDMAP.name()); + + private final String stringValue; + + DocumentFieldType(final String s) { + stringValue = s; + } + + public String toString() { + return stringValue; + } + } + +} diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java new file mode 100644 index 000000000..4c2d68c4a --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import org.apache.gora.persistency.impl.PersistentBase; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility builder for create OrientDB mapping from gora-orientdb-mapping.xml. + */ +public class OrientDBMappingBuilder { + + public static final String ATT_NAME = "name"; + public static final String ATT_TYPE = "type"; + public static final String TAG_CLASS = "class"; + public static final String ATT_KEYCLASS = "keyClass"; + public static final String ATT_DOCUMENT = "document"; + public static final String TAG_FIELD = "field"; + public static final String ATT_FIELD = "docfield"; + public static final Logger log = LoggerFactory.getLogger(OrientDBMapping.class); + + private final OrientDBStore dataStore; + + private OrientDBMapping mapping; + + public OrientDBMappingBuilder(final OrientDBStore store) { + this.dataStore = store; + this.mapping = new OrientDBMapping(); + } + + /** + * Build OrientDB dataStore mapping from gora-orientdb-mapping.xml given from class path + * or file system location. + */ + public OrientDBMapping build() { + if (mapping.getDocumentClass() == null) + throw new IllegalStateException("Document Class is not specified."); + return mapping; + } + + protected OrientDBMappingBuilder fromFile(String uri) throws IOException { + try { + SAXBuilder saxBuilder = new SAXBuilder(); + InputStream is = getClass().getResourceAsStream(uri); + if (is == null) { + String msg = "Unable to load the mapping from classpath resource '" + uri + + "' Re-trying local from local file system location."; + log.warn(msg); + is = new FileInputStream(uri); + } + Document doc = saxBuilder.build(is); + Element root = doc.getRootElement(); + List classElements = root.getChildren(TAG_CLASS); + for (Element classElement : classElements) { + final Class persistentClass = dataStore.getPersistentClass(); + final Class keyClass = dataStore.getKeyClass(); + if (matchesKeyClassWithMapping(keyClass, classElement) + && matchesPersistentClassWithMapping(persistentClass, classElement)) { + loadPersistentClass(classElement, persistentClass); + break; + } + } + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + throw new IOException(ex); + } + return this; + } + + private boolean matchesPersistentClassWithMapping(final Class persistentClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_NAME).equals(persistentClass.getName()); + } + + private boolean matchesKeyClassWithMapping(final Class keyClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_KEYCLASS).equals(keyClass.getName()); + } + + private void loadPersistentClass(Element classElement, + Class pPersistentClass) { + + String docClassFromMapping = classElement.getAttributeValue(ATT_DOCUMENT); + String resolvedDocClass = dataStore.getSchemaName(docClassFromMapping, + pPersistentClass); + mapping.setDocumentClass(resolvedDocClass); + + List fields = classElement.getChildren(TAG_FIELD); + for (Element field : fields) { + mapping.registerClassField(field.getAttributeValue(ATT_NAME), + field.getAttributeValue(ATT_FIELD), + field.getAttributeValue(ATT_TYPE)); + } + } + +} \ No newline at end of file diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java new file mode 100644 index 000000000..d0b901fbc --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java @@ -0,0 +1,922 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Properties; +import java.util.List; +import java.util.HashMap; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.Date; +import java.util.Calendar; +import java.util.Collection; +import java.util.TimeZone; +import java.util.Locale; + +import com.github.raymanrt.orientqb.query.Parameter; +import com.gitub.raymanrt.orientqb.delete.Delete; +import com.orientechnologies.orient.client.remote.OServerAdmin; +import com.orientechnologies.orient.core.db.OPartitionedDatabasePool; +import com.orientechnologies.orient.core.db.OPartitionedDatabasePoolFactory; +import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; +import com.orientechnologies.orient.core.db.record.OTrackedList; +import com.orientechnologies.orient.core.db.record.OTrackedMap; +import com.orientechnologies.orient.core.db.record.OTrackedSet; +import com.orientechnologies.orient.core.metadata.schema.OClass; +import com.orientechnologies.orient.core.metadata.schema.OType; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.OCommandSQL; +import com.orientechnologies.orient.core.sql.query.OConcurrentResultSet; +import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; +import org.apache.gora.orientdb.query.OrientDBQuery; +import org.apache.gora.orientdb.query.OrientDBResult; +import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.ClassLoadingUtils; + +import javax.xml.bind.DatatypeConverter; + +import static com.github.raymanrt.orientqb.query.Projection.projection; + +/** + * {@link org.apache.gora.orientdb.store.OrientDBStore} is the primary class + * responsible for facilitating GORA CRUD operations on OrientDB documents. + */ +public class OrientDBStore extends DataStoreBase { + + public static final String DEFAULT_MAPPING_FILE = "/gora-orientdb-mapping.xml"; + private String ROOT_URL; + private String ROOT_DATABASE_URL; + private OrientDBStoreParameters orientDbStoreParams; + private OrientDBMapping orientDBMapping; + private OServerAdmin remoteServerAdmin; + private OPartitionedDatabasePool connectionPool; + private List docBatch = new ArrayList<>(); + + /** + * Initialize the OrientDB dataStore by {@link Properties} parameters. + * + * @param keyClass key class type for dataStore. + * @param persistentClass persistent class type for dataStore. + * @param properties OrientDB dataStore properties EG:- OrientDB client credentials. + */ + @Override + public void initialize(Class keyClass, Class persistentClass, Properties properties) { + super.initialize(keyClass, persistentClass, properties); + try { + orientDbStoreParams = OrientDBStoreParameters.load(properties); + ROOT_URL = "remote:".concat(orientDbStoreParams.getServerHost()).concat(":") + .concat(orientDbStoreParams.getServerPort()); + ROOT_DATABASE_URL = ROOT_URL.concat("/").concat(orientDbStoreParams.getDatabaseName()); + remoteServerAdmin = new OServerAdmin(ROOT_URL).connect(orientDbStoreParams.getUserName(), + orientDbStoreParams.getUserPassword()); + if (!remoteServerAdmin.existsDatabase(orientDbStoreParams.getDatabaseName(), "memory")) { + remoteServerAdmin.createDatabase(orientDbStoreParams.getDatabaseName(), "document", "memory"); + } + + if (orientDbStoreParams.getConnectionPoolSize() != null) { + int connPoolSize = Integer.valueOf(orientDbStoreParams.getConnectionPoolSize()); + connectionPool = new OPartitionedDatabasePoolFactory(connPoolSize) + .get(ROOT_DATABASE_URL, orientDbStoreParams.getUserName(), + orientDbStoreParams.getUserPassword()); + } else { + connectionPool = new OPartitionedDatabasePoolFactory().get(ROOT_DATABASE_URL, + orientDbStoreParams.getUserName(), orientDbStoreParams.getUserPassword()); + } + + OrientDBMappingBuilder builder = new OrientDBMappingBuilder<>(this); + orientDBMapping = builder.fromFile(orientDbStoreParams.getMappingFile()).build(); + + if (!schemaExists()) { + createSchema(); + } + } catch (Exception e) { + LOG.error("Error while initializing OrientDB dataStore: {}", + new Object[]{e.getMessage()}); + throw new RuntimeException(e); + } + } + + @Override + public String getSchemaName(final String mappingSchemaName, + final Class persistentClass) { + return super.getSchemaName(mappingSchemaName, persistentClass); + } + + @Override + public String getSchemaName() { + return orientDBMapping.getDocumentClass(); + } + + /** + * Create a new class of OrientDB documents if necessary. Enforce specified schema over the document class. + * + */ + @Override + public void createSchema() { + if (schemaExists()) { + return; + } + + ODatabaseDocumentTx schemaTx = connectionPool.acquire(); + schemaTx.activateOnCurrentThread(); + try { + + OClass documentClass = schemaTx.getMetadata().getSchema().createClass(orientDBMapping.getDocumentClass()); + documentClass.createProperty("_id", + OType.getTypeByClass(super.getKeyClass())).createIndex(OClass.INDEX_TYPE.UNIQUE); + for (String docField : orientDBMapping.getDocumentFields()) { + documentClass.createProperty(docField, + OType.valueOf(orientDBMapping.getDocumentFieldType(docField).name())); + } + schemaTx.getMetadata().getSchema().reload(); + } finally { + schemaTx.close(); + } + } + + /** + * Deletes enforced schema over OrientDB Document class. + * + */ + @Override + public void deleteSchema() { + ODatabaseDocumentTx schemaTx = connectionPool.acquire(); + schemaTx.activateOnCurrentThread(); + try { + schemaTx.getMetadata().getSchema().dropClass(orientDBMapping.getDocumentClass()); + } finally { + schemaTx.close(); + } + } + + /** + * Check whether there exist a schema enforced over OrientDB document class. + * + */ + @Override + public boolean schemaExists() { + ODatabaseDocumentTx schemaTx = connectionPool.acquire(); + schemaTx.activateOnCurrentThread(); + try { + return schemaTx.getMetadata().getSchema() + .existsClass(orientDBMapping.getDocumentClass()); + } finally { + schemaTx.close(); + } + } + + @Override + public T get(K key, String[] fields) { + String[] dbFields = getFieldsToQuery(fields); + com.github.raymanrt.orientqb.query.Query selectQuery = new com.github.raymanrt.orientqb.query.Query(); + for (String k : dbFields) { + String dbFieldName = orientDBMapping.getDocumentField(k); + if (dbFieldName != null && dbFieldName.length() > 0) { + selectQuery.select(dbFieldName); + } + } + selectQuery.from(orientDBMapping.getDocumentClass()) + .where(projection("_id").eq(Parameter.parameter("key"))); + Map params = new HashMap(); + params.put("key", key); + OSQLSynchQuery query = new OSQLSynchQuery(selectQuery.toString()); + ODatabaseDocumentTx selectTx = connectionPool.acquire(); + selectTx.activateOnCurrentThread(); + try { + List result = selectTx.command(query).execute(params); + if (result.size() == 1) { + return convertOrientDocToAvroBean(result.get(0), dbFields); + } else { + return null; + } + } finally { + selectTx.close(); + } + } + + @Override + public void put(K key, T val) { + if (val.isDirty()) { + OrientDBQuery dataStoreQuery = new OrientDBQuery<>(this); + dataStoreQuery.setStartKey(key); + dataStoreQuery.setEndKey(key); + dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields()); + + ODatabaseDocumentTx selectTx = connectionPool.acquire(); + selectTx.activateOnCurrentThread(); + try { + List result = selectTx.command(dataStoreQuery.getOrientDBQuery()) + .execute(dataStoreQuery.getParams()); + if (result.size() == 1) { + ODocument document = updateOrientDocFromAvroBean(key, val, result.get(0)); + docBatch.add(document); + } else { + ODocument document = convertAvroBeanToOrientDoc(key, val); + docBatch.add(document); + } + } finally { + selectTx.close(); + } + } else { + LOG.info("Ignored putting persistent bean {} in the store as it is neither " + + "new, neither dirty.", new Object[]{val}); + } + } + + @Override + public boolean delete(K key) { + Delete delete = new Delete(); + delete.from(orientDBMapping.getDocumentClass()) + .where(projection("_id").eq(Parameter.parameter("key"))); + Map params = new HashMap(); + params.put("key", key); + OCommandSQL query = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM")); + ODatabaseDocumentTx deleteTx = connectionPool.acquire(); + deleteTx.activateOnCurrentThread(); + try { + int deleteCount = deleteTx.command(query).execute(params); + if (deleteCount == 1) { + return true; + } else { + return false; + } + } finally { + deleteTx.close(); + } + } + + @Override + public long deleteByQuery(Query query) { + Delete delete = new Delete(); + delete.from(orientDBMapping.getDocumentClass()); + Map params = new HashMap(); + if (query.getFields() == null || (query.getFields().length == getFields().length)) { + if (query.getStartKey() != null) { + delete.where(projection("_id").ge(Parameter.parameter("start"))); + params.put("start", query.getStartKey()); + } + if (query.getEndKey() != null) { + delete.where(projection("_id").le(Parameter.parameter("end"))); + params.put("end", query.getEndKey()); + } + + OCommandSQL dbQuery = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM")); + ODatabaseDocumentTx deleteTx = connectionPool.acquire(); + deleteTx.activateOnCurrentThread(); + try { + int deleteCount; + if (params.isEmpty()) { + deleteCount = deleteTx.command(dbQuery).execute(); + } else { + deleteCount = deleteTx.command(dbQuery).execute(params); + } + if (deleteCount > 0) { + return deleteCount; + } else { + return 0; + } + } finally { + deleteTx.close(); + } + } else { + + OrientDBQuery dataStoreQuery = new OrientDBQuery<>(this); + dataStoreQuery.setStartKey(query.getStartKey()); + dataStoreQuery.setEndKey(query.getEndKey()); + dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields()); + + ODatabaseDocumentTx selectTx = connectionPool.acquire(); + selectTx.activateOnCurrentThread(); + try { + List result = selectTx.command(dataStoreQuery.getOrientDBQuery()) + .execute(dataStoreQuery.getParams()); + if (result != null && result.isEmpty()) { + return 0; + } else { + for (ODocument doc : result) { + for (String docField : query.getFields()) { + if (doc.containsField(orientDBMapping.getDocumentField(docField))) { + doc.removeField(orientDBMapping.getDocumentField(docField)); + } + } + doc.save(); + } + return result.size(); + } + } finally { + selectTx.close(); + } + } + } + + @Override + public Result execute(Query query) { + String[] fields = getFieldsToQuery(query.getFields()); + OrientDBQuery dataStoreQuery; + if (query instanceof OrientDBQuery) { + dataStoreQuery = ((OrientDBQuery) query); + } else { + dataStoreQuery = (OrientDBQuery) ((PartitionQueryImpl) query).getBaseQuery(); + } + dataStoreQuery.populateOrientDBQuery(orientDBMapping, fields, getFields()); + ODatabaseDocumentTx selectTx = connectionPool.acquire(); + selectTx.activateOnCurrentThread(); + try { + OConcurrentResultSet result = selectTx.command(dataStoreQuery.getOrientDBQuery()) + .execute(dataStoreQuery.getParams()); + result.setLimit((int) query.getLimit()); + return new OrientDBResult(this, query, result); + } finally { + selectTx.close(); + } + } + + @Override + public Query newQuery() { + OrientDBQuery query = new OrientDBQuery(this); + query.setFields(getFieldsToQuery(null)); + return new OrientDBQuery(this); + } + + @Override + public List> getPartitions(Query query) throws IOException { + // TODO : Improve code on OrientDB clusters + List> partitions = new ArrayList<>(); + PartitionQueryImpl partitionQuery = new PartitionQueryImpl<>( + query); + partitionQuery.setConf(this.getConf()); + partitions.add(partitionQuery); + return partitions; + } + + /** + * Flushes locally cached to content in memory to remote OrientDB server. + * + */ + @Override + public void flush() { + ODatabaseDocumentTx updateTx = connectionPool.acquire(); + updateTx.activateOnCurrentThread(); + try { + for (ODocument document : docBatch) { + updateTx.save(document); + } + } finally { + updateTx.close(); + docBatch.clear(); + } + } + + /** + * Releases resources which have been used dataStore. Eg:- OrientDB Client connection pool. + * + */ + @Override + public void close() { + docBatch.clear(); + remoteServerAdmin.close(); + connectionPool.close(); + } + + /** + * Returns OrientDB client connection pool maintained at Gora dataStore. + * + * @return {@link OPartitionedDatabasePool} OrientDB client connection pool. + */ + public OPartitionedDatabasePool getConnectionPool() { + return connectionPool; + } + + public T convertOrientDocToAvroBean(final ODocument obj, final String[] fields) { + T persistent = newPersistent(); + String[] dbFields = getFieldsToQuery(fields); + for (String f : dbFields) { + String docf = orientDBMapping.getDocumentField(f); + if (docf == null || !obj.containsField(docf)) + continue; + + OrientDBMapping.DocumentFieldType storeType = orientDBMapping.getDocumentFieldType(docf); + Schema.Field field = fieldMap.get(f); + Schema fieldSchema = field.schema(); + + LOG.debug("Load from ODocument, field:{}, schemaType:{}, docField:{}, storeType:{}", + new Object[]{field.name(), fieldSchema.getType(), docf, storeType}); + Object result = convertDocFieldToAvroField(fieldSchema, storeType, field, docf, obj); + persistent.put(field.pos(), result); + } + persistent.clearDirty(); + return persistent; + } + + private Object convertDocFieldToAvroField(final Schema fieldSchema, + final OrientDBMapping.DocumentFieldType storeType, + final Schema.Field field, + final String docf, + final ODocument obj) { + Object result = null; + switch (fieldSchema.getType()) { + case MAP: + result = convertDocFieldToAvroMap(docf, fieldSchema, obj, field, storeType); + break; + case ARRAY: + result = convertDocFieldToAvroList(docf, fieldSchema, obj, field, storeType); + break; + case RECORD: + ODocument record = obj.field(docf); + if (record == null) { + result = null; + break; + } + result = convertAvroBeanToOrientDoc(fieldSchema, record); + break; + case BOOLEAN: + result = OType.convert(obj.field(docf), Boolean.class); + break; + case DOUBLE: + result = OType.convert(obj.field(docf), Double.class); + break; + case FLOAT: + result = OType.convert(obj.field(docf), Float.class); + break; + case INT: + result = OType.convert(obj.field(docf), Integer.class); + break; + case LONG: + result = OType.convert(obj.field(docf), Long.class); + break; + case STRING: + result = convertDocFieldToAvroString(storeType, docf, obj); + break; + case ENUM: + result = AvroUtils.getEnumValue(fieldSchema, obj.field(docf)); + break; + case BYTES: + case FIXED: + if (obj.field(docf) == null) { + result = null; + break; + } + result = ByteBuffer.wrap((byte[]) obj.field(docf)); + break; + case NULL: + result = null; + break; + case UNION: + result = convertDocFieldToAvroUnion(fieldSchema, storeType, field, docf, obj); + break; + default: + LOG.warn("Unable to read {}", docf); + break; + } + return result; + } + + private Object convertDocFieldToAvroList(final String docf, + final Schema fieldSchema, + final ODocument doc, + final Schema.Field f, + final OrientDBMapping.DocumentFieldType storeType) { + if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET) { + OTrackedSet set = doc.field(docf); + List rlist = new ArrayList<>(); + if (set == null) { + return new DirtyListWrapper(rlist); + } + + for (Object item : set) { + Object o = convertDocFieldToAvroField(fieldSchema.getElementType(), storeType, f, + "item", new ODocument("item", item)); + rlist.add(o); + } + return new DirtyListWrapper<>(rlist); + + } else { + OTrackedList list = doc.field(docf); + List rlist = new ArrayList<>(); + if (list == null) { + return new DirtyListWrapper(rlist); + } + + for (Object item : list) { + Object o = convertDocFieldToAvroField(fieldSchema.getElementType(), storeType, f, + "item", new ODocument("item", item)); + rlist.add(o); + } + return new DirtyListWrapper<>(rlist); + } + } + + private Object convertAvroListToDocField(final String docf, final Collection array, + final Schema fieldSchema, final Schema.Type fieldType, + final OrientDBMapping.DocumentFieldType storeType) { + if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDLIST) { + ArrayList list; + list = new ArrayList(); + if (array == null) + return list; + for (Object item : array) { + OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, item); + list.add(result); + } + return list; + } else if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET) { + HashSet set; + set = new HashSet(); + if (array == null) + return set; + for (Object item : array) { + OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, item); + set.add(result); + } + return set; + } + return null; + } + + private Object convertDocFieldToAvroMap(final String docf, final Schema fieldSchema, + final ODocument doc, final Schema.Field f, + final OrientDBMapping.DocumentFieldType storeType) { + if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP) { + OTrackedMap map = doc.field(docf); + Map rmap = new HashMap<>(); + if (map == null) { + return new DirtyMapWrapper(rmap); + } + + for (Map.Entry entry : map.entrySet()) { + String mapKey = decodeFieldKey((String) entry.getKey()); + Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey, + decorateOTrackedMapToODoc(map)); + rmap.put(new Utf8(mapKey), o); + } + return new DirtyMapWrapper<>(rmap); + } else { + ODocument innerDoc = doc.field(docf); + Map rmap = new HashMap<>(); + if (innerDoc == null) { + return new DirtyMapWrapper(rmap); + } + + for (String fieldName : innerDoc.fieldNames()) { + String mapKey = decodeFieldKey(fieldName); + Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey, + innerDoc); + rmap.put(new Utf8(mapKey), o); + } + return new DirtyMapWrapper<>(rmap); + } + } + + private ODocument decorateOTrackedMapToODoc(OTrackedMap map) { + ODocument doc = new ODocument(); + for (Map.Entry entry : map.entrySet()) { + doc.field((String) entry.getKey(), entry.getValue()); + } + return doc; + } + + private Object convertAvroMapToDocField(final String docf, + final Map value, final Schema fieldSchema, + final Schema.Type fieldType, + final OrientDBMapping.DocumentFieldType storeType) { + if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP) { + HashMap map = new HashMap(); + if (value == null) + return map; + + for (Map.Entry e : value.entrySet()) { + String mapKey = encodeFieldKey(e.getKey().toString()); + Object mapValue = e.getValue(); + + OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, + mapValue); + map.put(mapKey, result); + } + return map; + } else { + ODocument doc = new ODocument(); + if (value == null) + return doc; + for (Map.Entry e : value.entrySet()) { + String mapKey = encodeFieldKey(e.getKey().toString()); + Object mapValue = e.getValue(); + + OrientDBMapping.DocumentFieldType fieldStoreType = orientDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToOrientField(docf, fieldSchema, fieldType, fieldStoreType, + mapValue); + doc.field(mapKey, result); + } + return doc; + } + } + + private Object convertAvroBeanToOrientDoc(final Schema fieldSchema, + final ODocument doc) { + Object result; + Class clazz = null; + try { + clazz = ClassLoadingUtils.loadClass(fieldSchema.getFullName()); + } catch (ClassNotFoundException e) { + //Ignore + } + PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent(); + for (Schema.Field recField : fieldSchema.getFields()) { + Schema innerSchema = recField.schema(); + OrientDBMapping.DocumentFieldType innerStoreType = orientDBMapping + .getDocumentFieldType(recField.name()); + String innerDocField = orientDBMapping.getDocumentField(recField.name()) != null ? orientDBMapping + .getDocumentField(recField.name()) : recField.name(); + LOG.debug("Load from ODocument (RECORD), field:{}, schemaType:{}, docField:{}, storeType:{}", + new Object[]{recField.name(), innerSchema.getType(), innerDocField, + innerStoreType}); + record.put(recField.pos(), + convertDocFieldToAvroField(innerSchema, innerStoreType, recField, innerDocField, + doc)); + } + result = record; + return result; + } + + private Object convertDocFieldToAvroString(final OrientDBMapping.DocumentFieldType storeType, + final String docf, final ODocument doc) { + Object result; + if (storeType == OrientDBMapping.DocumentFieldType.DATE || + storeType == OrientDBMapping.DocumentFieldType.DATETIME) { + Date dateTime = doc.field(docf); + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.getDefault()); + calendar.setTime(dateTime); + result = new Utf8(DatatypeConverter.printDateTime(calendar)); + } else { + result = new Utf8((String) doc.field(encodeFieldKey(docf))); + } + return result; + } + + private Object convertDocFieldToAvroUnion(final Schema fieldSchema, + final OrientDBMapping.DocumentFieldType storeType, + final Schema.Field field, + final String docf, + final ODocument doc) { + Object result; + Schema.Type type0 = fieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = fieldSchema.getTypes().get(1).getType(); + + if (!type0.equals(type1) + && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) { + Schema innerSchema = null; + if (type0.equals(Schema.Type.NULL)) { + innerSchema = fieldSchema.getTypes().get(1); + } else { + innerSchema = fieldSchema.getTypes().get(0); + } + + LOG.debug("Load from ODocument (UNION), schemaType:{}, docField:{}, storeType:{}", + new Object[]{innerSchema.getType(), docf, storeType}); + + result = convertDocFieldToAvroField(innerSchema, storeType, field, docf, doc); + } else { + throw new IllegalStateException("OrientDBStore only supports Union of two types field."); + } + return result; + } + + private Object convertAvroUnionToOrientDBField(final String docf, final Schema fieldSchema, + final OrientDBMapping.DocumentFieldType storeType, + final Object value) { + Object result; + Schema.Type type0 = fieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = fieldSchema.getTypes().get(1).getType(); + + if (!type0.equals(type1) + && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) { + Schema innerSchema = null; + if (type0.equals(Schema.Type.NULL)) { + innerSchema = fieldSchema.getTypes().get(1); + } else { + innerSchema = fieldSchema.getTypes().get(0); + } + + LOG.debug("Transform value to ODocument (UNION), type:{}, storeType:{}", + new Object[]{innerSchema.getType(), type1, storeType}); + + result = convertAvroFieldToOrientField(docf, innerSchema, innerSchema.getType(), storeType, value); + } else { + throw new IllegalStateException("OrientDBStore only supports Union of two types field."); + } + return result; + } + + private ODocument convertAvroBeanToOrientDoc(final K key, final T persistent) { + ODocument result = new ODocument(orientDBMapping.getDocumentClass()); + for (Schema.Field f : persistent.getSchema().getFields()) { + if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) != null)) { + String docf = orientDBMapping.getDocumentField(f.name()); + Object value = persistent.get(f.pos()); + OrientDBMapping.DocumentFieldType storeType = orientDBMapping.getDocumentFieldType(docf); + LOG.debug("Transform value to ODocument, docField:{}, schemaType:{}, storeType:{}", + new Object[]{docf, f.schema().getType(), storeType}); + Object o = convertAvroFieldToOrientField(docf, f.schema(), f.schema().getType(), + storeType, value); + result.field(docf, o); + } + } + result.field("_id", key); + return result; + } + + private ODocument updateOrientDocFromAvroBean(final K key, final T persistent, final ODocument result) { + for (Schema.Field f : persistent.getSchema().getFields()) { + if (persistent.isDirty(f.pos()) /*&& (persistent.get(f.pos()) != null)*/) { + String docf = orientDBMapping.getDocumentField(f.name()); + if (persistent.get(f.pos()) == null) { + result.removeField(docf); + continue; + } + Object value = persistent.get(f.pos()); + OrientDBMapping.DocumentFieldType storeType = orientDBMapping.getDocumentFieldType(docf); + LOG.debug("Transform value to ODocument, docField:{}, schemaType:{}, storeType:{}", + new Object[]{docf, f.schema().getType(), storeType}); + Object o = convertAvroFieldToOrientField(docf, f.schema(), f.schema().getType(), + storeType, value); + result.field(docf, o); + } + } + return result; + } + + private Object convertAvroFieldToOrientField(final String docf, final Schema fieldSchema, + final Schema.Type fieldType, + final OrientDBMapping.DocumentFieldType storeType, + final Object value) { + Object result = null; + switch (fieldType) { + case MAP: + if (storeType != null && !(storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP || + storeType == OrientDBMapping.DocumentFieldType.EMBEDDED)) { + throw new IllegalStateException( + "Field " + fieldSchema.getName() + + ": to store a AVRO 'map', target OrientDB mapping have to be of type 'EmbeddedMap'" + + "| 'Embedded'"); + } + Schema valueSchema = fieldSchema.getValueType(); + result = convertAvroMapToDocField(docf, (Map) value, valueSchema, + valueSchema.getType(), storeType); + break; + case ARRAY: + if (storeType != null && !(storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDLIST || + storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET)) { + throw new IllegalStateException("Field " + fieldSchema.getName() + + ": To store a AVRO 'array', target Mongo mapping have to be of type 'EmbeddedMap'" + + "|'EmbeddedList'"); + } + Schema elementSchema = fieldSchema.getElementType(); + result = convertAvroListToDocField(docf, (List) value, elementSchema, + elementSchema.getType(), storeType); + break; + case BYTES: + if (value != null) { + result = ((ByteBuffer) value).array(); + } + break; + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + result = value; + break; + case STRING: + result = convertAvroStringToDocField(fieldSchema, storeType, value); + break; + case ENUM: + if (value != null) + result = value.toString(); + break; + case RECORD: + if (value == null) + break; + result = convertAvroBeanToOrientDoc(docf, fieldSchema, value); + break; + case UNION: + result = convertAvroUnionToOrientDBField(docf, fieldSchema, storeType, value); + break; + case FIXED: + result = value; + break; + default: + LOG.error("Unknown field type: {}", fieldSchema.getType()); + break; + } + return result; + } + + private Object convertAvroStringToDocField(final Schema fieldSchema, + final OrientDBMapping.DocumentFieldType storeType, + final Object value) { + Object result = null; + if (storeType == OrientDBMapping.DocumentFieldType.DATETIME) { + if (value != null) { + Calendar dateTime = null; + try { + dateTime = DatatypeConverter.parseDateTime(value.toString()); + } catch (IllegalArgumentException e) { + throw new IllegalStateException("Field " + fieldSchema.getType() + + ": Invalid date and time format '" + value + "'", e); + + } + result = dateTime.getTime(); + } + } else if (storeType == OrientDBMapping.DocumentFieldType.DATE) { + Calendar date = null; + try { + date = DatatypeConverter.parseDate(value.toString()); + } catch (IllegalArgumentException e) { + throw new IllegalStateException("Field " + fieldSchema.getType() + + ": Invalid date format '" + value + "'", e); + } + result = date.getTime(); + } else { + if (value != null) { + result = value.toString(); + } + } + return result; + } + + private ODocument convertAvroBeanToOrientDoc(final String docf, + final Schema fieldSchema, + final Object value) { + ODocument record = new ODocument(); + for (Schema.Field member : fieldSchema.getFields()) { + Object innerValue = ((PersistentBase) value).get(member.pos()); + String innerDoc = orientDBMapping.getDocumentField(member.name()); + Schema.Type innerType = member.schema().getType(); + OrientDBMapping.DocumentFieldType innerStoreType = orientDBMapping.getDocumentFieldType(innerDoc); + LOG.debug("Transform value to ODocument , docField:{}, schemaType:{}, storeType:{}", + new Object[]{member.name(), member.schema().getType(), + innerStoreType}); + Object fieldValue = convertAvroFieldToOrientField(docf, member.schema() + , innerType, innerStoreType, innerValue); + record.field(member.name(), fieldValue); + } + return record; + } + + private String encodeFieldKey(final String key) { + if (key == null) { + return null; + } + return key.replace(".", "\u00B7") + .replace(":", "\u00FF") + .replace(";", "\u00FE") + .replace(" ", "\u00FD") + .replace("%", "\u00FC") + .replace("=", "\u00FB"); + } + + private String decodeFieldKey(final String key) { + if (key == null) { + return null; + } + return key.replace("\u00B7", ".") + .replace("\u00FF", ":") + .replace("\u00FE", ";") + .replace("\u00FD", " ") + .replace("\u00FC", "%") + .replace("\u00FB", "="); + } + +} \ No newline at end of file diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java new file mode 100644 index 000000000..f6e18bc33 --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import java.util.Properties; + +/** + * Maintains OrientDB client related properties parsed from gora.properties. + */ +public class OrientDBStoreParameters { + + public static final String ORIENT_DB_MAPPING_FILE = "gora.orientdb.mapping.file"; + public static final String ORIENT_DB_SERVER_HOST = "gora.orientdb.server.host"; + public static final String ORIENT_DB_SERVER_PORT = "gora.orientdb.server.port"; + public static final String ORIENT_DB_USER_USERNAME = "gora.orientdb.user.username"; + public static final String ORIENT_DB_USER_PASSWORD = "gora.orientdb.user.password"; + public static final String ORIENT_DB_DB_NAME = "gora.orientdb.database.name"; + public static final String ORIENT_DB_CONNECTION_POOL_SIZE = "gora.orientdb.con.pool.size"; + public static final String ORIENT_DB_STORAGE_TYPE = "gora.orientdb.storage.type"; + + + private String mappingFile; + private String serverHost; + private String serverPort; + private String userName; + private String userPassword; + private String databaseName; + private String connPoolSize; + private String storageType; + + + /** + * Return classpath or file system location for OrientDB mapping file. Eg:- /gora-orientdb-mapping.xml + * + * @return OrientDB Mapping file Location as string. + */ + public String getMappingFile() { + return this.mappingFile; + } + + /** + * Return remote OrientDB server host name. Eg:- localhost + * + * @return OrientDB remote server host as string. + */ + public String getServerHost() { + return this.serverHost; + } + + /** + * Return remote OrientDB server port number. Eg:- 2424 + * + * @return OrientDB remote server port number as string. + */ + public String getServerPort() { + return this.serverPort; + } + + /** + * Return remote OrientDB server client connecting user username. Eg:- admin + * + * @return OrientDB remote server client connecting user username as string. + */ + public String getUserName() { + return this.userName; + } + + /** + * Return remote OrientDB server client connecting user password. Eg:- admin + * + * @return OrientDB remote server client connecting user pass as string. + */ + public String getUserPassword() { + return this.userPassword; + } + + /** + * Return remote OrientDB server pointing database name. Eg:- gora + * + * @return OrientDB remote server pointing database name as string. + */ + public String getDatabaseName() { + return this.databaseName; + } + + /** + * Return remote OrientDB client connections pool size. Eg:- 80 + * + * @return OrientDB remote server client connections pool size as string. + */ + public String getConnectionPoolSize() { + return this.connPoolSize; + } + + /** + * Return remote OrientDB server storage type of pointing database. Eg:- plocal, memory + * + * @return OrientDB remote server storage type of pointing database. + */ + public String getStorageType() { + return this.storageType; + } + + public OrientDBStoreParameters(String mappingFile, + String serverHost, + String serverPort, + String userName, + String userPassword, + String databaseName, + String connPoolSize, + String storageType) { + this.mappingFile = mappingFile; + this.serverHost = serverHost; + this.serverPort = serverPort; + this.userName = userName; + this.userPassword = userPassword; + this.databaseName = databaseName; + this.connPoolSize = connPoolSize; + this.storageType = storageType; + } + + /** + * Extraction OrientDB dataStore properties from {@link Properties} gora.properties file. + * + * @return OrientDB client properties encapsulated inside instance of {@link OrientDBStoreParameters} + */ + public static OrientDBStoreParameters load(Properties properties) { + String propMappingFile = properties.getProperty(ORIENT_DB_MAPPING_FILE, + OrientDBStore.DEFAULT_MAPPING_FILE); + String propServerHost = properties.getProperty(ORIENT_DB_SERVER_HOST); + String propServerPort = properties.getProperty(ORIENT_DB_SERVER_PORT); + String propUserName = properties.getProperty(ORIENT_DB_USER_USERNAME); + String propUserPassword = properties.getProperty(ORIENT_DB_USER_PASSWORD); + String propDatabaseName = properties.getProperty(ORIENT_DB_DB_NAME); + String propConnPoolSize = properties.getProperty(ORIENT_DB_CONNECTION_POOL_SIZE); + String propStorageType = properties.getProperty(ORIENT_DB_STORAGE_TYPE); + return new OrientDBStoreParameters(propMappingFile, + propServerHost, propServerPort, propUserName, + propUserPassword, propDatabaseName, propConnPoolSize, propStorageType); + } +} diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/package-info.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/package-info.java new file mode 100644 index 000000000..60871b0c3 --- /dev/null +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains main OrientDB dataStore class and dataStore mapping representing class, and + * utility classes to build dataStore specific mappings. + */ +package org.apache.gora.orientdb.store; \ No newline at end of file diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/GoraOrientDBTestDriver.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/GoraOrientDBTestDriver.java new file mode 100644 index 000000000..a3f024ca0 --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/GoraOrientDBTestDriver.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb; + +import com.orientechnologies.orient.server.OServer; +import com.orientechnologies.orient.server.OServerMain; +import org.apache.gora.GoraTestDriver; +import org.apache.gora.orientdb.store.OrientDBStore; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Driver to set up an embedded OrientDB database instance for Gora + * dataStore specific integration tests. + */ +public class GoraOrientDBTestDriver extends GoraTestDriver { + + private static Logger log = LoggerFactory.getLogger(GoraOrientDBTestDriver.class); + private static final String SERVER_DIRECTORY = "./target/db"; + private static final String SERVER_CONFIGURATION = "/orientdb-server-config.xml"; + private OServer server; + + public GoraOrientDBTestDriver() { + super(OrientDBStore.class); + } + + /** + * Initialize embedded OrientDB server instance as per the gora-orientdb-mapping.xml + * server configuration file. + */ + @Override + public void setUpClass() throws Exception { + server = OServerMain.create(); + server.setServerRootDirectory(SERVER_DIRECTORY); + server.startup(getClass().getResourceAsStream(SERVER_CONFIGURATION)); + server.activate(); + log.info("OrientDB Embedded Server started successfully."); + } + + /** + * Terminate embedded OrientDB server. + */ + @Override + public void tearDownClass() throws Exception { + server.shutdown(); + log.info("OrientDB Embedded Server terminated successfully."); + } + + @Override + public DataStore + createDataStore(Class keyClass, Class persistentClass) throws GoraException { + OrientDBStore store = (OrientDBStore) super.createDataStore(keyClass, persistentClass); + return store; + } + +} diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/OrientDBStoreMapReduceTest.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/OrientDBStoreMapReduceTest.java new file mode 100644 index 000000000..7819d387a --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/OrientDBStoreMapReduceTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.mapreduce; + +import org.apache.gora.mapreduce.DataStoreMapReduceTestBase; +import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.orientdb.GoraOrientDBTestDriver; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +/** + * Executes tests for MR jobs over OrientDB dataStore. + */ +public class OrientDBStoreMapReduceTest extends DataStoreMapReduceTestBase { + + private GoraOrientDBTestDriver driver; + + public OrientDBStoreMapReduceTest() throws IOException { + super(); + driver = new GoraOrientDBTestDriver(); + } + + @Override + @Before + public void setUp() throws Exception { + driver.setUpClass(); + super.setUp(); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + driver.tearDownClass(); + } + + @Override + protected DataStore createWebPageDataStore() throws IOException { + try { + return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/package-info.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/package-info.java new file mode 100644 index 000000000..71c5962b8 --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/mapreduce/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains tests for MR jobs over OrientDB dataStore. + */ +package org.apache.gora.orientdb.mapreduce; \ No newline at end of file diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/package-info.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/package-info.java new file mode 100644 index 000000000..f7ac49dc1 --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains all the dataStore specific tests for OrientDB dataStore. + * + */ +package org.apache.gora.orientdb; \ No newline at end of file diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/OrientDBGoraDataStoreTest.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/OrientDBGoraDataStoreTest.java new file mode 100644 index 000000000..fd3d15bc8 --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/OrientDBGoraDataStoreTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.orientdb.store; + +import org.apache.gora.orientdb.GoraOrientDBTestDriver; +import org.apache.gora.store.DataStoreTestBase; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Executes all the dataStore specific integration tests for OrientDB dataStore. + */ +public class OrientDBGoraDataStoreTest extends DataStoreTestBase { + + private static final Logger log = LoggerFactory.getLogger(OrientDBGoraDataStoreTest.class); + + @BeforeClass + public static void setUpClass() throws Exception { + setTestDriver(new GoraOrientDBTestDriver()); + DataStoreTestBase.setUpClass(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + DataStoreTestBase.tearDownClass(); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + @Ignore("3 types union field is not supported by OrientDBStore.") + @Override + public void testGet3UnionField() throws Exception { + //3 types union field is not supported by OrientDBStore. + } + +} diff --git a/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/package-info.java b/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/package-info.java new file mode 100644 index 000000000..368af458f --- /dev/null +++ b/gora-orientdb/src/test/java/org/apache/gora/orientdb/store/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains tests for OrientDB dataStore CRUD operations. + */ +package org.apache.gora.orientdb.store; \ No newline at end of file diff --git a/gora-orientdb/src/test/resources/gora-orientdb-mapping.xml b/gora-orientdb/src/test/resources/gora-orientdb-mapping.xml new file mode 100644 index 000000000..515ed98c5 --- /dev/null +++ b/gora-orientdb/src/test/resources/gora-orientdb-mapping.xml @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/gora-orientdb/src/test/resources/gora.properties b/gora-orientdb/src/test/resources/gora.properties new file mode 100644 index 000000000..5a24e76d3 --- /dev/null +++ b/gora-orientdb/src/test/resources/gora.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +gora.datastore.default=org.apache.gora.orientdb.store.OrientDBStore +gora.orientdb.server.host=localhost +gora.orientdb.server.port=2424 +gora.orientdb.user.username=root +gora.orientdb.user.password=root +gora.orientdb.database.name=gora +gora.orientdb.con.pool.size=80 +gora.orientdb.storage.type=memory \ No newline at end of file diff --git a/gora-orientdb/src/test/resources/orientdb-server-config.xml b/gora-orientdb/src/test/resources/orientdb-server-config.xml new file mode 100644 index 000000000..3cc00d67e --- /dev/null +++ b/gora-orientdb/src/test/resources/orientdb-server-config.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 2c2e20a7e..57a7f5a64 100644 --- a/pom.xml +++ b/pom.xml @@ -723,6 +723,7 @@ gora-hbase gora-infinispan gora-jcache + gora-orientdb gora-dynamodb gora-couchdb @@ -776,6 +777,10 @@ 1.0.0 3.6.4 + + 2.2.22 + 0.2.0 + 4.10 @@ -1506,6 +1511,37 @@ ${jsr107.api.version} + + + com.orientechnologies + orientdb-client + ${orientdb.version} + + + + com.orientechnologies + orientdb-server + ${orientdb.version} + + + + com.orientechnologies + orientdb-core + ${orientdb.version} + + + + com.orientechnologies + orientdb-graphdb + ${orientdb.version} + + + + com.github.raymanrt + orientqb + ${orientqb.version} + + org.apache.hadoop From 2303e85c5ba9e2aaa72b0e3ea88d589e312df104 Mon Sep 17 00:00:00 2001 From: Kevin Ratnasekera Date: Mon, 11 Sep 2017 01:04:40 +0530 Subject: [PATCH 2/3] Make batch arraylist thread safe --- .../apache/gora/orientdb/store/OrientDBStore.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java index d0b901fbc..95617011e 100644 --- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java @@ -29,6 +29,8 @@ import java.util.Date; import java.util.Calendar; import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.locks.ReentrantLock; import java.util.TimeZone; import java.util.Locale; @@ -80,7 +82,8 @@ public class OrientDBStore extends DataStoreBase docBatch = new ArrayList<>(); + private List docBatch = Collections.synchronizedList(new ArrayList<>()); + private ReentrantLock flushLock = new ReentrantLock(); /** * Initialize the OrientDB dataStore by {@link Properties} parameters. @@ -248,8 +251,10 @@ public void put(K key, T val) { selectTx.close(); } } else { - LOG.info("Ignored putting persistent bean {} in the store as it is neither " - + "new, neither dirty.", new Object[]{val}); + if (LOG.isDebugEnabled()) { + LOG.info("Ignored putting persistent bean {} in the store as it is neither " + + "new, neither dirty.", new Object[]{val}); + } } } @@ -388,12 +393,14 @@ public void flush() { ODatabaseDocumentTx updateTx = connectionPool.acquire(); updateTx.activateOnCurrentThread(); try { + flushLock.lock(); for (ODocument document : docBatch) { updateTx.save(document); } } finally { updateTx.close(); docBatch.clear(); + flushLock.unlock(); } } From 96b9e96df165757f766e414633ae7ad05dd9d05e Mon Sep 17 00:00:00 2001 From: Kevin Ratnasekera Date: Mon, 11 Sep 2017 01:40:01 +0530 Subject: [PATCH 3/3] Fix java doc issues --- .../gora/orientdb/query/OrientDBQuery.java | 3 ++ .../gora/orientdb/store/OrientDBMapping.java | 2 +- .../store/OrientDBMappingBuilder.java | 2 + .../gora/orientdb/store/OrientDBStore.java | 43 ++++++++++++++++--- .../store/OrientDBStoreParameters.java | 1 + 5 files changed, 44 insertions(+), 7 deletions(-) diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java index 209549a27..596390bef 100644 --- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBQuery.java @@ -72,6 +72,9 @@ public Map getParams() { * Convert Gora query to Orient DB specific query which underline API understands. * And maintain it s state encapsulated to Gora implementation of the {@link org.apache.gora.query.Query}. * + * @param orientDBMapping OrientDB mapping file. + * @param fields OrientDB query fields. + * @param schemaFields AVRO schema fields for persistent bean. * @return a {@link OSQLSynchQuery} query executable over Orient DB. */ public OSQLSynchQuery populateOrientDBQuery(final OrientDBMapping orientDBMapping, diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java index a37c3d84d..ac06f0844 100644 --- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMapping.java @@ -112,7 +112,7 @@ public void registerClassField(String classFieldName, } /** - * Returns all fields in AVRO {@link org.apache.hadoop.io.serializer.avro.Record} record. + * Returns all fields in AVRO {@link SpecificRecord} record. * * @return array of fields in string. */ diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java index 4c2d68c4a..18965f970 100644 --- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBMappingBuilder.java @@ -57,6 +57,8 @@ public OrientDBMappingBuilder(final OrientDBStore store) { /** * Build OrientDB dataStore mapping from gora-orientdb-mapping.xml given from class path * or file system location. + * + * @return mapping file {@link OrientDBMapping} */ public OrientDBMapping build() { if (mapping.getDocumentClass() == null) diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java index 95617011e..000f1fae7 100644 --- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java @@ -70,6 +70,7 @@ import static com.github.raymanrt.orientqb.query.Projection.projection; /** + * {@inheritDoc} * {@link org.apache.gora.orientdb.store.OrientDBStore} is the primary class * responsible for facilitating GORA CRUD operations on OrientDB documents. */ @@ -86,6 +87,7 @@ public class OrientDBStore extends DataStoreBase keyClass, Class persistentClass, Properties p } } + /** + * {@inheritDoc} + */ @Override public String getSchemaName(final String mappingSchemaName, final Class persistentClass) { return super.getSchemaName(mappingSchemaName, persistentClass); } + /** + * {@inheritDoc} + */ @Override public String getSchemaName() { return orientDBMapping.getDocumentClass(); } /** - * Create a new class of OrientDB documents if necessary. Enforce specified schema over the document class. - * + * {@inheritDoc} + * Create a new class of OrientDB documents if necessary. Enforce specified schema over the document class. * */ @Override public void createSchema() { @@ -168,8 +176,8 @@ public void createSchema() { } /** + * {@inheritDoc} * Deletes enforced schema over OrientDB Document class. - * */ @Override public void deleteSchema() { @@ -183,8 +191,8 @@ public void deleteSchema() { } /** + * {@inheritDoc} * Check whether there exist a schema enforced over OrientDB document class. - * */ @Override public boolean schemaExists() { @@ -198,6 +206,9 @@ public boolean schemaExists() { } } + /** + * {@inheritDoc} + */ @Override public T get(K key, String[] fields) { String[] dbFields = getFieldsToQuery(fields); @@ -227,6 +238,9 @@ public T get(K key, String[] fields) { } } + /** + * {@inheritDoc} + */ @Override public void put(K key, T val) { if (val.isDirty()) { @@ -238,6 +252,8 @@ public void put(K key, T val) { ODatabaseDocumentTx selectTx = connectionPool.acquire(); selectTx.activateOnCurrentThread(); try { + // TODO : further optimize for queries to separate cases update / insert == get rid of select all query + // TODO : for update List result = selectTx.command(dataStoreQuery.getOrientDBQuery()) .execute(dataStoreQuery.getParams()); if (result.size() == 1) { @@ -258,6 +274,9 @@ public void put(K key, T val) { } } + /** + * {@inheritDoc} + */ @Override public boolean delete(K key) { Delete delete = new Delete(); @@ -280,6 +299,9 @@ public boolean delete(K key) { } } + /** + * {@inheritDoc} + */ @Override public long deleteByQuery(Query query) { Delete delete = new Delete(); @@ -344,6 +366,9 @@ public long deleteByQuery(Query query) { } } + /** + * {@inheritDoc} + */ @Override public Result execute(Query query) { String[] fields = getFieldsToQuery(query.getFields()); @@ -366,6 +391,9 @@ public Result execute(Query query) { } } + /** + * {@inheritDoc} + */ @Override public Query newQuery() { OrientDBQuery query = new OrientDBQuery(this); @@ -373,6 +401,9 @@ public Query newQuery() { return new OrientDBQuery(this); } + /** + * {@inheritDoc} + */ @Override public List> getPartitions(Query query) throws IOException { // TODO : Improve code on OrientDB clusters @@ -385,8 +416,8 @@ public List> getPartitions(Query query) throws IOExce } /** + * {@inheritDoc} * Flushes locally cached to content in memory to remote OrientDB server. - * */ @Override public void flush() { @@ -405,8 +436,8 @@ public void flush() { } /** + * {@inheritDoc} * Releases resources which have been used dataStore. Eg:- OrientDB Client connection pool. - * */ @Override public void close() { diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java index f6e18bc33..8eebcc5d4 100644 --- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java @@ -138,6 +138,7 @@ public OrientDBStoreParameters(String mappingFile, /** * Extraction OrientDB dataStore properties from {@link Properties} gora.properties file. * + * @param properties gora.properties properties related to datastore client. * @return OrientDB client properties encapsulated inside instance of {@link OrientDBStoreParameters} */ public static OrientDBStoreParameters load(Properties properties) {