Browse files

Commiting project

  • Loading branch information...
0 parents commit 20236b10c1cad9bae41676328e1d79fbfaaeab06 Pedro Gomes committed May 20, 2010
191 LICENSE.txt
@@ -0,0 +1,191 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ Copyright 2010-2010 Universidade do Minho
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
11 META-INF/MANIFEST.MF
@@ -0,0 +1,11 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: Datanucleus_cassandra
+Bundle-SymbolicName: org.datanucleus.cassandra;singleton:=true
+Bundle-Version: 1.0.0.qualifier
+Require-Bundle: org.datanucleus;bundle-version="2.1.0"
+Bundle-RequiredExecutionEnvironment: JavaSE-1.6
+Bundle-ClassPath: lib/apache-cassandra-0.6.1.jar,
+ lib/libthrift-r917130.jar,
+ lib/datanucleus-core-2.1.0-m2.jar,
+ .
53 README.mkd
@@ -0,0 +1,53 @@
+# Propose:
+
+A simple plugin for cassandra to the java persistence platform Datanucleus.
+http://www.datanucleus.org/project/download.html
+
+# About:
+
+This plugin allows under the specification that can be found in the Datanucleus site to simplify the persistence of java objects into cassandra.
+
+-It allows in the moment the persistence of simple attributes, one-one relations e one-many and many-many relations through collections.
+
+-It does not support JDOQL, and I didn't test it for JPA
+
+-It creates a datanucleus.schema every time some element is missing in Casssandra schema, until Cassandra allows dynamic changes to it.
+
+-It is made to function with Cassandra-0.6, under Thrifht interface.
+DataStore code isolation will be improved however, to allow other version to be easily plugged.
+
+#Required Libs
+
+-apache-cassandra-0.6.1.jar
+-libthrift-r917130.jar
+-datanucleus-core-2.1.0-m2.jar (*)
+
+(*) Due to API issues, I chose to use the new core version of datanucleus, this however demands that you compile and generate a jar for the new JDO API, when using the plugin. Sources under:
+http://www.datanucleus.org/downloads/maven2/javax/jdo/jdo2-api/2.3-ec/
+
+# Use example:
+
+Following the JDO sample presented in Datanucleus:
+http://sourceforge.net/projects/datanucleus/files/
+
+-Remove the JDOQL tests from Main.java.
+-In the datanucleus.properties add/edit:
+ javax.jdo.option.ConnectionURL=cassandra:ORMTest://localhost:9160 (*)
+
+ datanucleus.cassandra.replicaPlacementStrategy=org.apache.cassandra.locator.RackUnawareStrategy (**)
+ datanucleus.cassandra.replicationFactor=1 (**)
+ datanucleus.cassandra.endPointSnitch = org.apache.cassandra.locator.EndPointSnitch (**)
+ datanucleus.metadata.validate=false
+ datanucleus.autoCreateSchema=true
+
+Then enhance and run the tutorial, and it will function as expected, I thinkÉ
+
+(*)The connection url follows the pattern: cassandra:keySpace://hostaname:port,hostname:portÉ
+ cassandra:keySpace://hostname:port,ring:default_port can also be used, where the plugin is responsible for the discover of other nodes using the default port.
+
+(**)Used to generate the schema, that for now is only the generation of a file as said above.
+
+
+
+
+
83 build.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<project name="datanucleus-cassandra" default="dist" basedir=".">
+ <description>
+ An cassandra plugin to the java persistence tool, datanucleus.
+ </description>
+ <!-- global properties for this build -->
+ <property name="src.dir" location="src/"/>
+ <property name="dist" location="target"/>
+ <property name="lib.dir" value="lib"/>
+ <!-- compile properties -->
+ <property name="classes.dir" value="target/classes"/>
+
+<!--================================================================
+ Classpath properties
+=================================================================-->
+
+ <!-- the classpath for running -->
+ <path id="lib.classpath">
+ <fileset dir="${lib.dir}">
+ <include name="**/*.jar"/>
+ </fileset>
+ <pathelement location="${classes.dir}"/>
+ <pathelement location="${basedir}"/>
+ </path>
+
+ <!-- the classpath for the compile -->
+ <path id="compile.classpath">
+ <pathelement location="${classes.dir}"/>
+ <path refid="lib.classpath"/>
+ </path>
+
+ <path id="build.files">
+ <pathelement location="${classes.dir}"/>
+
+ </path>
+
+
+ <target name="prepare">
+ <!-- Create the time stamp -->
+ <tstamp/>
+ <mkdir dir="${classes.dir}"/>
+ </target>
+
+ <target name="compile" depends="clean,prepare">
+ <echo message="==================================================================="/>
+ <echo message="Compile configuration:"/>
+ <echo message="java.dir = ${src.dir}"/>
+ <echo message="classes.dir = ${classes.dir}"/>
+ <echo message="==================================================================="/>
+ <javac srcdir="${src.dir}" destdir="${classes.dir}" classpathref="compile.classpath">
+ <include name="**/*.java"/>
+ </javac>
+ <copy todir="${classes.dir}">
+ <fileset dir="${src.dir}" includes="**/*.properties"/>
+ </copy>
+ <copy todir="${classes.dir}/META-INF">
+ <fileset dir="${basedir}" includes="LICENSE.txt"/>
+ </copy>
+ </target>
+
+ <target name="dist" depends="compile"
+ description="generate the distribution" >
+ <!-- Create the distribution directory -->
+ <mkdir dir="${dist}"/>
+
+ <jar jarfile="${dist}/datanucleus-cassandra-0.1.jar" manifest="${basedir}/META-INF/MANIFEST.MF">
+
+ <fileset dir="${classes.dir}"/>
+ <fileset dir= "${basedir}">
+ <include name="plugin.xml"/>
+ </fileset>
+
+
+ </jar>
+
+ </target>
+
+
+ <target name="clean">
+ <delete includeEmptyDirs="true" dir="${dist}"/>
+ </target>
+
+</project>
31 plugin.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?eclipse version="3.4"?>
+<plugin id="org.datanucleus.cassandra" >
+ <extension
+ point="org.datanucleus.store_manager">
+ <store-manager
+ class-name="org.datanucleus.store.cassandra.CassandraStoreManager"
+ key="cassandra"
+ url-key="cassandra">
+ </store-manager>
+ </extension>
+ <extension
+ point="org.datanucleus.store_connectionfactory">
+ <connectionfactory
+ class-name="org.datanucleus.store.cassandra.CassandraConnectionFactory"
+ datastore="cassandra"
+ name="cassandra"
+ transactional="true">
+ </connectionfactory>
+ </extension>
+ <extension
+ point="org.datanucleus.store_connectionfactory">
+ <connectionfactory
+ class-name="org.datanucleus.store.cassandra.CassandraConnectionFactory"
+ datastore="cassandra"
+ name="cassandra"
+ transactional="false">
+ </connectionfactory>
+ </extension>
+</plugin>
+
60 src/org/datanucleus/store/cassandra/CassandraConnectionFactory.java
@@ -0,0 +1,60 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+
+package org.datanucleus.store.cassandra;
+
+import java.util.Map;
+
+import org.datanucleus.OMFContext;
+import org.datanucleus.store.connection.AbstractConnectionFactory;
+import org.datanucleus.store.connection.ManagedConnection;
+
+
+
+public class CassandraConnectionFactory extends AbstractConnectionFactory{
+
+ CassandraConnectionPool connectionPool;
+ CassandraConnectionInfo connectionInfo;
+
+ public CassandraConnectionFactory(OMFContext omfContext, String resourceType) {
+ super(omfContext, resourceType);
+ CassandraStoreManager storeManager = (CassandraStoreManager) omfContext.getStoreManager();
+ //TODO get eviction time
+ connectionPool = new CassandraConnectionPool();
+ connectionInfo = storeManager.getConnectionInfo();
+ connectionPool.setTimeBetweenEvictionRunsMillis(storeManager.getPoolTimeBetweenEvictionRunsMillis());
+
+ }
+
+ @Override
+ public ManagedConnection createManagedConnection(Object poolKey, Map arg1) {
+ CassandraStoreManager storeManager = (CassandraStoreManager) omfContext.getStoreManager();
+
+ CassandraManagedConnection managedConnection = connectionPool.getPooledConnection();
+ if (managedConnection == null)
+ {
+ managedConnection = new CassandraManagedConnection(connectionInfo);
+ managedConnection.setIdleTimeoutMills(storeManager.getPoolMinEvictableIdleTimeMillis());
+ connectionPool.registerConnection(managedConnection);
+ }
+ managedConnection.incrementReferenceCount();
+ return managedConnection;
+ }
+
+
+
+}
192 src/org/datanucleus/store/cassandra/CassandraConnectionInfo.java
@@ -0,0 +1,192 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+package org.datanucleus.store.cassandra;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.cassandra.thrift.Cassandra.Client;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.datanucleus.PersistenceConfiguration;
+import org.datanucleus.exceptions.NucleusDataStoreException;
+
+public class CassandraConnectionInfo {
+
+ private Map<String, Integer> ring_connections;
+ private String keyspace;
+
+ public CassandraConnectionInfo(PersistenceConfiguration conf) {
+
+ ring_connections = new TreeMap<String, Integer>();
+
+ String url = conf.getStringProperty("datanucleus.ConnectionURL");
+ String[] url_info = url.split("://");
+
+ if (url_info.length < 2) {
+ throw new NucleusDataStoreException(
+ "Malformed URL : Example -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+
+ String[] dataStore_info = url_info[0].split(":"); // datastore:Keyspace
+ String[] connection_info = url_info[1].split(","); // host:port,host:port...
+
+ if (dataStore_info.length < 2) {
+ throw new NucleusDataStoreException(
+ "Malformed URL : No defined Keyspace -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+
+
+ String datastore = dataStore_info[0]; // datastore name, ignore
+
+ // get Keyspace
+ if (dataStore_info[1] != null && !dataStore_info[1].isEmpty()) {
+ keyspace = dataStore_info[1];
+ } else {
+ throw new NucleusDataStoreException(
+ "Malformed URL : No defined Keyspace -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+
+ // get first connection
+ String main_connection;
+
+ if (connection_info[0] != null && !connection_info[0].isEmpty()) {
+ main_connection = connection_info[0];
+ } else {
+ throw new NucleusDataStoreException(
+ "Malformed URL : No main connection -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+ String[] main_connection_info = main_connection.split(":");
+ String main_host;
+ String main_port;
+ if (main_connection_info[0] != null
+ && !main_connection_info[0].isEmpty()) {
+ main_host = main_connection_info[0];
+ } else {
+ throw new NucleusDataStoreException(
+ "Malformed URL : Malformed main connection -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+ if (main_connection_info[1] != null
+ && !main_connection_info[1].isEmpty()) {
+ main_port = main_connection_info[1];
+ } else {
+ throw new NucleusDataStoreException(
+ "Malformed URL : Malformed main connection -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+ int port;
+ try {
+ port = Integer.parseInt(main_port);
+ } catch (Exception e) {
+ throw new NucleusDataStoreException(
+ "Malformed URL : Error when extracting connection port -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+ ring_connections.put(main_host, port);
+
+ // get other connections, the term ring activates ring lookup.
+ int num_endpoints = connection_info.length - 1;
+
+ boolean other_endpoints = num_endpoints > 0;
+ boolean get_ring = false;
+ int endpoint_it = 1;
+
+ String endpoint_host;
+ int endpoint_port = 0;
+
+ while (other_endpoints && endpoint_it <= num_endpoints) {
+
+ if (connection_info[endpoint_it] != null
+ && !connection_info[endpoint_it].isEmpty()) {
+ String enpoint_info[] = connection_info[endpoint_it].split(":");
+ if (enpoint_info[0] != null && !enpoint_info[0].isEmpty()) {
+ endpoint_host = enpoint_info[0];
+ } else {
+ throw new NucleusDataStoreException(
+ "Malformed URL : Error when extracting connection port -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+
+ String end_port;
+ if (enpoint_info[1] != null && !enpoint_info[1].isEmpty()) {
+ end_port = enpoint_info[1];
+ try {
+ endpoint_port = Integer.parseInt(end_port);
+ } catch (Exception e) {
+ throw new NucleusDataStoreException(
+ "Malformed URL : Error when extracting connection port -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+
+ } else {
+ throw new NucleusDataStoreException(
+ "Malformed URL : Error when extracting connection port -> "
+ + "cassandra:KeySpace://connectionURL:port,connectionURL:port...");
+ }
+
+ if (endpoint_host.equalsIgnoreCase("ring")) {
+ get_ring = true;
+ other_endpoints = false;
+ } else {
+ ring_connections.put(endpoint_host, endpoint_port);
+ }
+ endpoint_it++;
+ }
+
+ }
+
+ if (get_ring) {
+ try {
+ TSocket socket = new TSocket(main_host, port);
+ TProtocol protocol = new TBinaryProtocol(socket);
+ Client cassandraClient = new Client(protocol);
+ socket.open();
+
+ List<TokenRange> ring = cassandraClient
+ .describe_ring(keyspace);
+ for (TokenRange tr : ring) {
+ List<String> endpoints = tr.endpoints;
+ for (String enpoint : endpoints) {
+ ring_connections.put(enpoint, endpoint_port);
+ }
+ }
+ } catch (TException e) {
+ throw new NucleusDataStoreException(e.getMessage(), e);
+ }
+
+ }
+
+ }
+
+ public Map<String, Integer> getConnections() {
+ return ring_connections;
+ }
+
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+}
114 src/org/datanucleus/store/cassandra/CassandraConnectionPool.java
@@ -0,0 +1,114 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+package org.datanucleus.store.cassandra;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class CassandraConnectionPool
+{
+
+ private final List<CassandraManagedConnection> connections;
+
+ private final ThreadLocal<WeakReference<CassandraManagedConnection>> connectionForCurrentThread;
+
+ private final Timer evictorThread;
+
+ private int timeBetweenEvictionRunsMillis = 15 * 1000; // default, 15 secs
+
+ public CassandraConnectionPool()
+ {
+ connectionForCurrentThread = new ThreadLocal<WeakReference<CassandraManagedConnection>>();
+ connections = new CopyOnWriteArrayList<CassandraManagedConnection>();
+
+ evictorThread = new Timer("Cassandra Connection Evictor", true);
+ startConnectionEvictorThread(evictorThread);
+ }
+
+ public void registerConnection(CassandraManagedConnection managedConnection)
+ {
+ connections.add(managedConnection);
+ connectionForCurrentThread.set(new WeakReference<CassandraManagedConnection>(managedConnection));
+ }
+
+ public CassandraManagedConnection getPooledConnection()
+ {
+ WeakReference<CassandraManagedConnection> ref = connectionForCurrentThread.get();
+
+ if (ref == null)
+ {
+ return null;
+ }
+ else
+ {
+ CassandraManagedConnection managedConnection = ref.get();
+
+ if (managedConnection != null && !managedConnection.isDisposed())
+ {
+ return managedConnection;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis)
+ {
+ this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
+ }
+
+ private void disposeTimedOutConnections()
+ {
+ List<CassandraManagedConnection> timedOutConnections = new ArrayList<CassandraManagedConnection>();
+
+ for (CassandraManagedConnection managedConnection : connections)
+ {
+ if (managedConnection.isExpired())
+ {
+ timedOutConnections.add(managedConnection);
+ }
+ }
+
+ for (CassandraManagedConnection managedConnection : timedOutConnections)
+ {
+ managedConnection.dispose();
+ connections.remove(managedConnection);
+ }
+
+ }
+
+ private void startConnectionEvictorThread(Timer connectionTimeoutThread)
+ {
+ TimerTask timeoutTask = new TimerTask()
+ {
+
+ public void run()
+ {
+ disposeTimedOutConnections();
+ }
+ };
+
+ evictorThread.schedule(timeoutTask, timeBetweenEvictionRunsMillis, timeBetweenEvictionRunsMillis);
+ }
+
+}
278 src/org/datanucleus/store/cassandra/CassandraFetchFieldManager.java
@@ -0,0 +1,278 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+package org.datanucleus.store.cassandra;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.exceptions.NucleusException;
+import org.datanucleus.metadata.AbstractClassMetaData;
+import org.datanucleus.metadata.AbstractMemberMetaData;
+import org.datanucleus.metadata.MetaDataManager;
+import org.datanucleus.metadata.Relation;
+import org.datanucleus.store.ExecutionContext;
+import org.datanucleus.store.ObjectProvider;
+import org.datanucleus.store.fieldmanager.AbstractFieldManager;
+
+public class CassandraFetchFieldManager extends AbstractFieldManager {
+
+ private AbstractClassMetaData acmd;
+ private ObjectProvider objectProvider;
+
+ private Map<String, byte[]> result_map;
+
+ public CassandraFetchFieldManager(AbstractClassMetaData acmd,
+ ObjectProvider objcp, List<ColumnOrSuperColumn> result) {
+ this.acmd = acmd;
+ this.objectProvider = objcp;
+
+ result_map = new TreeMap<String, byte[]>();
+
+ for (int index = 0; index < result.size(); index++) {
+ ColumnOrSuperColumn columnOrSuperColumn = result.get(index);
+ String name = new String(columnOrSuperColumn.getColumn().name);
+ result_map.put(name, columnOrSuperColumn.getColumn().value);
+ }
+
+ }
+
+ public boolean fetchBooleanField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+ boolean value;
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = ois.readBoolean();
+ ois.close();
+ bis.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ return value;
+ }
+
+ public byte fetchByteField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+ byte value;
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = ois.readByte();
+ ois.close();
+ bis.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ return value;
+ }
+
+ public char fetchCharField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+ char value;
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = ois.readChar();
+ ois.close();
+ bis.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ return value;
+ }
+
+ public double fetchDoubleField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+ double value;
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = ois.readDouble();
+ ois.close();
+ bis.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ return value;
+ }
+
+ public float fetchFloatField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ float value;
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = ois.readFloat();
+ ois.close();
+ bis.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ return value;
+ }
+
+ public int fetchIntField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ int value;
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = ois.readInt();
+ ois.close();
+ bis.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ return value;
+ }
+
+ public long fetchLongField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ long value;
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = ois.readLong();
+ ois.close();
+ bis.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ return value;
+ }
+
+ public short fetchShortField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ short value;
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = ois.readShort();
+ ois.close();
+ bis.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ return value;
+ }
+
+ public String fetchStringField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ String value;
+ try {
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = (String) ois.readObject();
+ ois.close();
+ bis.close();
+ } catch (NullPointerException ex) {
+ return null;
+ }
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ } catch (ClassNotFoundException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ return value;
+ }
+
+ public Object fetchObjectField(int fieldNumber) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+ ExecutionContext context = objectProvider.getExecutionContext();
+ ClassLoaderResolver clr = context.getClassLoaderResolver();
+ AbstractMemberMetaData fieldMetaData = acmd
+ .getMetaDataForManagedMemberAtAbsolutePosition(fieldNumber);
+
+ // get object
+ Object value;
+ try {
+ try {
+ byte[] bytes = result_map.get(columnName);
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ value = ois.readObject();
+ ois.close();
+ bis.close();
+ } catch (NullPointerException ex) {
+ return null;
+ }
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ } catch (ClassNotFoundException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+
+ // handle relations
+ int relationType = fieldMetaData.getRelationType(clr);
+
+ if (relationType == Relation.ONE_TO_ONE_BI
+ || relationType == Relation.ONE_TO_ONE_UNI
+ || relationType == Relation.MANY_TO_ONE_BI) {
+
+ Object id = value;
+ String class_name = fieldMetaData.getClassName();
+ value = context.findObject(id, true, false, class_name);
+
+ } else if (relationType == Relation.MANY_TO_MANY_BI
+ || relationType == Relation.ONE_TO_MANY_BI
+ || relationType == Relation.ONE_TO_MANY_UNI) {
+
+ MetaDataManager mmgr = context.getMetaDataManager();
+ String elementClassName = fieldMetaData.getCollection()
+ .getElementClassMetaData(clr, mmgr).getFullClassName();
+
+ if (fieldMetaData.hasCollection()) {
+
+ List<Object> mapping = (List<Object>) value;
+ Collection<Object> collection = new ArrayList<Object>();
+ for (Object id : mapping) {
+
+ Object element = context.findObject(id, true, false,
+ elementClassName);
+ collection.add(element);
+ }
+ value = collection;
+ }
+ }
+
+ return value;
+ }
+}
414 src/org/datanucleus/store/cassandra/CassandraInsertFieldManager.java
@@ -0,0 +1,414 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+package org.datanucleus.store.cassandra;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.StateManager;
+import org.datanucleus.exceptions.NucleusException;
+import org.datanucleus.metadata.AbstractClassMetaData;
+import org.datanucleus.metadata.AbstractMemberMetaData;
+import org.datanucleus.metadata.Relation;
+import org.datanucleus.store.ExecutionContext;
+import org.datanucleus.store.ObjectProvider;
+import org.datanucleus.store.fieldmanager.AbstractFieldManager;
+
+//TODO isolate cassandra operations...
+public class CassandraInsertFieldManager extends AbstractFieldManager {
+
+ private AbstractClassMetaData acmd;
+ private ObjectProvider objectProvider;
+
+ private List<Mutation> mutations;
+ private Deletion deletion;
+ private String column_family;
+ private String row_key;
+
+ public CassandraInsertFieldManager(AbstractClassMetaData acmd,
+ ObjectProvider objp, String key, String ColumnFamily) {
+ this.acmd = acmd;
+ this.objectProvider = objp;
+
+ this.mutations = new ArrayList<Mutation>();
+ this.column_family = ColumnFamily;
+ this.row_key = key;
+ }
+
+ public void storeBooleanField(int fieldNumber, boolean value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeBoolean(value);
+ oos.flush();
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(),
+ bos.toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+
+ public void storeByteField(int fieldNumber, byte value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(), new byte[] { value },
+ System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ }
+
+ public void storeCharField(int fieldNumber, char value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeChar(value);
+ oos.flush();
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(),
+ bos.toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+
+ public void storeDoubleField(int fieldNumber, double value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeDouble(value);
+ oos.flush();
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(),
+ bos.toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+
+ public void storeFloatField(int fieldNumber, float value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeFloat(value);
+ oos.flush();
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(),
+ bos.toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+
+ public void storeIntField(int fieldNumber, int value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeInt(value);
+ oos.flush();
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(),
+ bos.toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+
+ public void storeLongField(int fieldNumber, long value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeLong(value);
+ oos.flush();
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(),
+ bos.toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+
+ public void storeShortField(int fieldNumber, short value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeShort(value);
+ oos.flush();
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(),
+ bos.toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+
+ public void storeStringField(int fieldNumber, String value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ if (value == null) {
+
+ if (deletion == null) { // No deletes yet, create a new Deletion.
+ deletion = new Deletion();
+ SlicePredicate predicate = new SlicePredicate();
+ List<byte[]> column_names = new ArrayList<byte[]>();
+ column_names.add(columnName.getBytes());
+ predicate.setColumn_names(column_names);
+ deletion.setPredicate(predicate);
+ } else {// add the column to the ones to be deleted
+ deletion.getPredicate().getColumn_names().add(
+ columnName.getBytes());
+ }
+ } else {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(value);
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(), bos
+ .toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+ }
+
+ public void storeObjectField(int fieldNumber, Object value) {
+ String columnName = CassandraUtils.getQualifierName(acmd, fieldNumber);
+
+ if (value == null) {
+
+ if (deletion == null) { // No deletes yet, create a new Deletion.
+ deletion = new Deletion();
+ SlicePredicate predicate = new SlicePredicate();
+ List<byte[]> column_names = new ArrayList<byte[]>();
+ column_names.add(columnName.getBytes());
+ predicate.setColumn_names(column_names);
+ deletion.setPredicate(predicate);
+ } else {// add the column to the ones to be deleted
+ deletion.getPredicate().getColumn_names().add(
+ columnName.getBytes());
+ }
+ } else {
+
+ ExecutionContext context = objectProvider.getExecutionContext();
+ ClassLoaderResolver clr = context.getClassLoaderResolver();
+ AbstractMemberMetaData fieldMetaData = acmd
+ .getMetaDataForManagedMemberAtAbsolutePosition(fieldNumber);
+ int relationType = fieldMetaData.getRelationType(clr);
+
+ if (relationType == Relation.ONE_TO_ONE_BI
+ || relationType == Relation.ONE_TO_ONE_UNI
+ || relationType == Relation.MANY_TO_ONE_BI) {
+
+ Object persisted = context.persistObjectInternal(value,
+ objectProvider, -1, StateManager.PC);
+
+ Object valueId = context.getApiAdapter().getIdForObject(
+ persisted);
+
+ try {
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(valueId);
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(), bos
+ .toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+
+ return;
+
+ } else if (relationType == Relation.MANY_TO_MANY_BI
+ || relationType == Relation.ONE_TO_MANY_BI
+ || relationType == Relation.ONE_TO_MANY_UNI) {
+
+ if (value instanceof Collection) {
+
+ List<Object> mapping = new ArrayList<Object>();
+
+ for (Object c : (Collection) value) {
+ Object persisted = context.persistObjectInternal(c,
+ objectProvider, -1, StateManager.PC);
+ Object valueId = context.getApiAdapter()
+ .getIdForObject(persisted);
+ mapping.add(valueId);
+ }
+
+ try {
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(mapping);
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(), bos
+ .toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+ return;
+ }
+
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(value);
+
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ Column column = new Column(columnName.getBytes(), bos
+ .toByteArray(), System.currentTimeMillis());
+ columnOrSuperColumn.setColumn(column);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ mutations.add(mutation);
+
+ oos.close();
+ bos.close();
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+
+ }
+
+ public Map<String, Map<String, List<Mutation>>> getMutation() {
+
+ if (deletion != null) {
+ Mutation mutation = new Mutation();
+ mutation.setDeletion(deletion);
+ mutations.add(mutation);
+ }
+
+ Map<String, List<Mutation>> columnFamily_mutations = new TreeMap<String, List<Mutation>>();
+ columnFamily_mutations.put(column_family, mutations);
+ Map<String, Map<String, List<Mutation>>> mutation_map = new TreeMap<String, Map<String, List<Mutation>>>();
+ mutation_map.put(row_key, columnFamily_mutations);
+
+ return mutation_map;
+
+ }
+
+}
193 src/org/datanucleus/store/cassandra/CassandraManagedConnection.java
@@ -0,0 +1,193 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+package org.datanucleus.store.cassandra;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.XAResource;
+
+import org.apache.cassandra.thrift.Cassandra.Client;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.datanucleus.exceptions.NucleusDataStoreException;
+import org.datanucleus.store.connection.AbstractManagedConnection;
+import org.datanucleus.store.connection.ManagedConnectionResourceListener;
+
+public class CassandraManagedConnection extends AbstractManagedConnection {
+
+ private CassandraConnectionInfo connectionInfo;
+
+ private List<Client> cassandraClients;
+
+ private int referenceCount = 0;
+
+ private int idleTimeoutMills = 30 * 1000; // 30 secs
+
+ private long expirationTime;
+
+ private int last_client = 0;
+
+ private boolean isDisposed = false;
+
+ public CassandraManagedConnection(CassandraConnectionInfo info) {
+ connectionInfo = info;
+ disableExpirationTime();
+ cassandraClients = new ArrayList<Client>();
+ }
+
+ @Override
+ public void close() {
+
+ for (int i = 0; i < listeners.size(); i++) {
+ ((ManagedConnectionResourceListener) listeners.get(i))
+ .managedConnectionPreClose();
+ }
+ try {
+// for (Client client : cassandraClients) {
+// client.getOutputProtocol().getTransport().close();
+// }
+ } finally {
+ for (int i = 0; i < listeners.size(); i++) {
+ ((ManagedConnectionResourceListener) listeners.get(i))
+ .managedConnectionPostClose();
+ }
+ }
+
+ }
+
+ @Override
+ public Object getConnection() {
+
+ if (cassandraClients.isEmpty()) {
+ establishNodeConnection();
+ }
+ Client client = getCassandraClient();
+ if (client == null) {
+ throw new NucleusDataStoreException("Connection error, no available nodes");
+ }
+
+ return client;
+ }
+
+ public Client getCassandraClient() {
+
+ boolean openClient = false;
+ Client cl = null;
+
+ while (!openClient) { // until there is no one open
+
+ if (!cassandraClients.isEmpty()) { // if none, then null...
+ cl = cassandraClients.get(last_client);
+ if (!cl.getInputProtocol().getTransport().isOpen()) {
+ cassandraClients.remove(last_client);
+ } else {
+ openClient = true;
+ }
+ last_client++;
+ last_client = last_client >= cassandraClients.size() ? 0
+ : last_client;
+
+ } else {
+ openClient = true;
+ }
+ }
+ return cl;
+
+ }
+
+ /**
+ * Establish connections to all nodes.
+ * TODO maybe it would be best if connections were made only when necessary, i.e., established in the round robin section.
+ * Trade-off : N connections vs connection establishment time.
+ * */
+ public void establishNodeConnection() {
+
+ Map<String,Integer> connections = connectionInfo.getConnections();
+ for (String host : connections.keySet()) {
+ int port = -1;
+ try {
+ port = connections.get(host);
+ TSocket socket = new TSocket(host, port);
+ TProtocol prot = new TBinaryProtocol(socket);
+ Client c = new Client(prot);
+ socket.open();
+ cassandraClients.add(c);
+ } catch (TTransportException ex) {
+ throw new NucleusDataStoreException("Error when connecting to client: "+
+ host+":"+port);
+ }
+ }
+
+ }
+
+ @Override
+ public XAResource getXAResource() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ void incrementReferenceCount() {
+ ++referenceCount;
+ disableExpirationTime();
+ }
+
+ public void release() {
+ --referenceCount;
+
+ if (referenceCount == 0) {
+ close();
+ enableExpirationTime();
+ } else if (referenceCount < 0) {
+ throw new NucleusDataStoreException("Too many calls on release(): "
+ + this);
+ }
+ }
+
+ private void enableExpirationTime() {
+ this.expirationTime = System.currentTimeMillis() + idleTimeoutMills;
+ }
+
+ private void disableExpirationTime() {
+ this.expirationTime = -1;
+ }
+
+ public void setIdleTimeoutMills(int mills) {
+ this.idleTimeoutMills = mills;
+ }
+
+ public boolean isExpired() {
+ return expirationTime > 0
+ && expirationTime > System.currentTimeMillis();
+ }
+
+ public void dispose() {
+ isDisposed = true;
+ for (Client client : cassandraClients) {
+ client.getOutputProtocol().getTransport().close();
+ }
+ }
+
+ public boolean isDisposed() {
+ return isDisposed;
+ }
+
+
+}
62 src/org/datanucleus/store/cassandra/CassandraMetaDataListener.java
@@ -0,0 +1,62 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+package org.datanucleus.store.cassandra;
+
+import java.io.IOException;
+
+import org.datanucleus.exceptions.NucleusException;
+import org.datanucleus.metadata.AbstractClassMetaData;
+import org.datanucleus.metadata.IdentityType;
+import org.datanucleus.metadata.InvalidMetaDataException;
+import org.datanucleus.metadata.MetaDataListener;
+
+import org.datanucleus.util.Localiser;
+
+public class CassandraMetaDataListener implements MetaDataListener{
+
+ /** Localiser for messages. */
+ protected static final Localiser LOCALISER = Localiser.getInstance(
+ "org.datanucleus.store.hbase.Localisation", CassandraStoreManager.class.getClassLoader());
+
+ private CassandraStoreManager storeManager;
+
+ CassandraMetaDataListener(CassandraStoreManager storeManager)
+ {
+ this.storeManager = storeManager;
+ }
+
+
+ @Override
+ public void loaded(AbstractClassMetaData cmd) {
+
+ if (cmd.getIdentityType() == IdentityType.DATASTORE && !cmd.isEmbeddedOnly())
+ {
+ // Datastore id not supported
+ throw new InvalidMetaDataException(LOCALISER, "Cassandra.DatastoreID", cmd.getFullClassName());
+ }
+ if (storeManager.isAutoCreateSchema()){
+ try {
+ CassandraUtils.createSchema(cmd,storeManager);
+ } catch (IOException e) {
+ throw new NucleusException(e.getMessage(), e);
+ }
+ }
+ }
+
+
+
+}
296 src/org/datanucleus/store/cassandra/CassandraPersistenceHandler.java
@@ -0,0 +1,296 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+package org.datanucleus.store.cassandra;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.thrift.Cassandra.Client;
+import org.apache.thrift.TException;
+import org.datanucleus.exceptions.NucleusDataStoreException;
+import org.datanucleus.exceptions.NucleusObjectNotFoundException;
+import org.datanucleus.exceptions.NucleusUserException;
+import org.datanucleus.metadata.AbstractClassMetaData;
+import org.datanucleus.store.AbstractPersistenceHandler;
+import org.datanucleus.store.ExecutionContext;
+import org.datanucleus.store.ObjectProvider;
+import org.datanucleus.store.StoreManager;
+import org.datanucleus.util.Localiser;
+
+
+public class CassandraPersistenceHandler extends AbstractPersistenceHandler{
+
+ /** Localiser for messages. */
+ Localiser LOCALISER;
+
+ protected final CassandraStoreManager storeManager;
+ private String keyspace;
+
+ public CassandraPersistenceHandler(StoreManager stm) {
+
+ LOCALISER = Localiser.getInstance(
+ "org.datanucleus.store.cassandra.Localisation", CassandraPersistenceHandler.class.getClassLoader());
+ storeManager = (CassandraStoreManager) stm;
+ keyspace = storeManager.getConnectionInfo().getKeyspace();
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void deleteObject(ObjectProvider objp) {
+
+ // Check if read-only so update not permitted
+ storeManager.assertReadOnlyForUpdateOfObject(objp);
+
+ CassandraManagedConnection managedConnection = (CassandraManagedConnection) storeManager.getConnection(objp.getExecutionContext());
+ try
+ {
+ AbstractClassMetaData acmd = objp.getClassMetaData();
+
+ Object pkValue = objp.provideField(acmd.getPKMemberPositions()[0]);
+ String key = CassandraUtils.ObjectToString(pkValue);
+
+ ColumnPath path = new ColumnPath(CassandraUtils.getFamilyName(acmd));
+
+ try {
+ ((Client)managedConnection.getConnection()).remove(keyspace, key, path, System.currentTimeMillis(), ConsistencyLevel.QUORUM);
+ } catch (InvalidRequestException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (UnavailableException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TimedOutException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ }
+ finally
+ {
+ managedConnection.release();
+ }
+
+ }
+
+ @Override
+ public void fetchObject(ObjectProvider objp, int[] fieldNumbers) {
+
+ CassandraManagedConnection managedConnection = (CassandraManagedConnection) storeManager.getConnection(objp.getExecutionContext());
+
+ try
+ {
+ AbstractClassMetaData acmd = objp.getClassMetaData();
+
+ Object pkValue = objp.provideField(acmd.getPKMemberPositions()[0]);
+ String key = CassandraUtils.ObjectToString(pkValue);
+
+ ColumnParent parent = new ColumnParent(CassandraUtils.getFamilyName(acmd));
+
+ Client dbClient = ((Client)managedConnection.getConnection());
+
+ SliceRange range = new SliceRange();
+ range.setStart(new byte[]{});
+ range.setFinish(new byte[]{});
+ range.setCount( dbClient.get_count(keyspace, key,parent , ConsistencyLevel.QUORUM));
+ SlicePredicate predicate = new SlicePredicate();
+
+ predicate.setSlice_range(range);
+
+ List<ColumnOrSuperColumn> result = dbClient.get_slice(keyspace,key, parent, predicate, ConsistencyLevel.QUORUM);
+
+
+ if(result==null)
+ {
+ throw new NucleusObjectNotFoundException();
+ }
+ CassandraFetchFieldManager fm = new CassandraFetchFieldManager(acmd,objp, result);
+ objp.replaceFields(acmd.getAllMemberPositions(), fm);
+
+ }
+ catch (IOException e)
+ {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (InvalidRequestException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (UnavailableException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TimedOutException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ }
+ finally
+ {
+ managedConnection.release();
+ }
+
+ }
+
+ @Override
+ public Object findObject(ExecutionContext arg0, Object arg1) {
+
+ return null;
+ }
+
+ @Override
+ public void insertObject(ObjectProvider objp) {
+ // Check if read-only so update not permitted
+ storeManager.assertReadOnlyForUpdateOfObject(objp);
+
+ if (!storeManager.managesClass(objp.getClassMetaData().getFullClassName()))
+ {
+ storeManager.addClass(objp.getClassMetaData().getFullClassName(),objp.getExecutionContext().getClassLoaderResolver());
+ }
+
+ try
+ {
+ locateObject(objp);
+
+ throw new NucleusUserException(LOCALISER.msg("Cassandra.Insert.ObjectWithIdAlreadyExists"));
+
+ }
+ catch (NucleusObjectNotFoundException onfe)
+ {
+ // Do nothing since object with this id doesn't exist
+ }
+
+ CassandraManagedConnection managedconnection = (CassandraManagedConnection) storeManager.getConnection(objp.getExecutionContext());
+ try
+ {
+ AbstractClassMetaData acmd = objp.getClassMetaData();
+
+ Object pkValue = objp.provideField(acmd.getPKMemberPositions()[0]);
+ String key = CassandraUtils.ObjectToString(pkValue);
+ String column_family = CassandraUtils.getFamilyName(acmd);
+
+ CassandraInsertFieldManager fm = new CassandraInsertFieldManager(acmd,objp,key, column_family);
+ objp.provideFields(acmd.getAllMemberPositions(), fm);
+ Client client = (Client)managedconnection.getConnection();
+ client.batch_mutate(keyspace,fm.getMutation(),ConsistencyLevel.QUORUM);
+
+ }
+ catch (IOException e)
+ {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (InvalidRequestException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (UnavailableException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TimedOutException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ }
+ finally
+ {
+ managedconnection.release();
+ }
+ }
+
+ @Override
+ public void locateObject(ObjectProvider objp) {
+ CassandraManagedConnection managedconnection = (CassandraManagedConnection) storeManager.getConnection(objp.getExecutionContext());
+ try
+ {
+ AbstractClassMetaData acmd = objp.getClassMetaData();
+ Object pkValue = objp.provideField(acmd.getPKMemberPositions()[0]);
+ String key = CassandraUtils.ObjectToString(pkValue);
+
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange("".getBytes(),"".getBytes(),false,1);
+ predicate.setSlice_range(range);
+ ColumnParent parent = new ColumnParent(CassandraUtils.getFamilyName(acmd));
+
+ List<ColumnOrSuperColumn> columns = ((Client)managedconnection.getConnection()).get_slice(keyspace,key,parent, predicate, ConsistencyLevel.QUORUM);
+
+ if(columns.isEmpty())
+ {
+ throw new NucleusObjectNotFoundException();
+ }
+ }
+ catch (IOException e)
+ {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (InvalidRequestException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (UnavailableException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TimedOutException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ }
+ finally
+ {
+ managedconnection.release();
+ }
+ }
+
+ @Override
+ public void updateObject(ObjectProvider objp, int[] arg1) {
+ // Check if read-only so update not permitted
+ storeManager.assertReadOnlyForUpdateOfObject(objp);
+ CassandraManagedConnection managedconnection = (CassandraManagedConnection) storeManager.getConnection(objp.getExecutionContext());
+ try
+ {
+ AbstractClassMetaData acmd = objp.getClassMetaData();
+
+ Object pkValue = objp.provideField(acmd.getPKMemberPositions()[0]);
+ String key = CassandraUtils.ObjectToString(pkValue);
+ String column_family = CassandraUtils.getFamilyName(acmd);
+
+ CassandraInsertFieldManager fm = new CassandraInsertFieldManager(acmd,objp ,key, column_family);
+ objp.provideFields(acmd.getAllMemberPositions(), fm);
+ ((Client)managedconnection.getConnection()).batch_mutate(keyspace,fm.getMutation(),ConsistencyLevel.QUORUM);
+
+ }
+ catch (IOException e)
+ {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (InvalidRequestException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (UnavailableException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TimedOutException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ } catch (TException e) {
+ throw new NucleusDataStoreException(e.getMessage(),e);
+ }
+ finally
+ {
+ managedconnection.release();
+ }
+ }
+
+
+
+}
126 src/org/datanucleus/store/cassandra/CassandraStoreManager.java
@@ -0,0 +1,126 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+package org.datanucleus.store.cassandra;
+
+import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.OMFContext;
+import org.datanucleus.PersistenceConfiguration;
+import org.datanucleus.metadata.MetaDataListener;
+import org.datanucleus.store.AbstractStoreManager;
+
+
+public class CassandraStoreManager extends AbstractStoreManager{
+
+ private boolean autoCreateSchema;
+
+ private MetaDataListener metadataListener;
+ private CassandraConnectionInfo connectionInfo;
+
+ private int poolTimeBetweenEvictionRunsMillis;
+ private int poolMinEvictableIdleTimeMillis;
+
+ private String replicaPlacementStrategy;
+ private int replicationFactor;
+ private String endPointSnitch;
+
+ public CassandraStoreManager(ClassLoaderResolver clr,
+ OMFContext omfContext) {
+ super("cassandra",clr, omfContext);
+
+ PersistenceConfiguration conf = omfContext.getPersistenceConfiguration();
+
+ if(connectionInfo==null){
+ connectionInfo = new CassandraConnectionInfo(conf);
+ }
+
+ this.persistenceHandler2 = new CassandraPersistenceHandler(this);
+
+ metadataListener = new CassandraMetaDataListener(this);
+ omfContext.getMetaDataManager().registerListener(metadataListener);
+
+ //Maybe in Cassandra 0.7 this will be dynamic
+ autoCreateSchema = conf.getBooleanProperty("datanucleus.autoCreateSchema");
+ replicaPlacementStrategy = conf.getStringProperty("datanucleus.cassandra.replicaPlacementStrategy");
+ replicationFactor = conf.getIntProperty("datanucleus.cassandra.replicationFactor");
+ endPointSnitch = conf.getStringProperty("datanucleus.cassandra.endPointSnitch");
+
+ poolTimeBetweenEvictionRunsMillis = conf.getIntProperty("datanucleus.connectionPool.timeBetweenEvictionRunsMillis");
+ if (poolTimeBetweenEvictionRunsMillis == 0)
+ {
+ poolTimeBetweenEvictionRunsMillis = 15 * 1000; // default, 15 secs
+ }
+
+ // how long may a connection sit idle in the pool before it may be evicted
+ poolMinEvictableIdleTimeMillis = conf.getIntProperty("datanucleus.connectionPool.minEvictableIdleTimeMillis");
+ if (poolMinEvictableIdleTimeMillis == 0)
+ {
+ poolMinEvictableIdleTimeMillis = 30 * 1000; // default, 30 secs
+ }
+
+ logConfiguration();
+ }
+
+ @Override
+ protected void registerConnectionFactory() {
+ super.registerConnectionFactory();
+ this.connectionMgr.disableConnectionPool();
+ }
+
+ public boolean isAutoCreateSchema() {
+ return autoCreateSchema;
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+ omfContext.getMetaDataManager().deregisterListener(metadataListener);
+ super.close();
+ }
+
+ public CassandraConnectionInfo getConnectionInfo() {
+ if(connectionInfo==null){
+ PersistenceConfiguration conf = omfContext.getPersistenceConfiguration();
+ connectionInfo = new CassandraConnectionInfo(conf);
+ }
+ return connectionInfo;
+ }
+
+ public int getPoolMinEvictableIdleTimeMillis()
+ {
+ return poolMinEvictableIdleTimeMillis;
+ }
+
+ public int getPoolTimeBetweenEvictionRunsMillis()
+ {
+ return poolTimeBetweenEvictionRunsMillis;
+ }
+
+ public String getReplicaPlacementStrategy() {
+ return replicaPlacementStrategy;
+ }
+
+ public int getReplicationFactor() {
+ return replicationFactor;
+ }
+
+ public String getEndPointSnitch() {
+ return endPointSnitch;
+ }
+
+
+
+}
223 src/org/datanucleus/store/cassandra/CassandraUtils.java
@@ -0,0 +1,223 @@
+/**********************************************************************
+Copyright (c) 2010 Pedro Gomes and Minho University . All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+**********************************************************************/
+
+package org.datanucleus.store.cassandra;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.Cassandra.Client;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.datanucleus.exceptions.NucleusDataStoreException;
+import org.datanucleus.exceptions.NucleusException;
+import org.datanucleus.metadata.AbstractClassMetaData;
+import org.datanucleus.metadata.AbstractMemberMetaData;
+import org.datanucleus.metadata.ColumnMetaData;
+import org.datanucleus.store.StoreManager;
+
+public class CassandraUtils {
+
+ private static boolean alterSchema = false;
+ private static boolean containsKeySpace = false;
+
+ private static StringBuilder schemaInfo = null;
+ private static String keyspaceInfo = "";
+
+ private static File cassandra_Schema_File = null;
+
+ public static String getQualifierName(AbstractClassMetaData acmd,
+ int absoluteFieldNumber) {
+ AbstractMemberMetaData ammd = acmd
+ .getMetaDataForManagedMemberAtAbsolutePosition(absoluteFieldNumber);
+ String columnName = null;
+
+ // Try the first column if specified
+ ColumnMetaData[] colmds = ammd.getColumnMetaData();
+ if (colmds != null && colmds.length > 0) {
+ columnName = colmds[0].getName();
+ }
+ if (columnName == null) {
+ // Fallback to the field/property name
+ columnName = ammd.getName();
+ }
+ if (columnName.indexOf(":") > -1) {
+ columnName = columnName.substring(columnName.indexOf(":") + 1);
+ }
+ return columnName;
+ }
+
+ public static String getFamilyName(AbstractClassMetaData acmd) {
+ if (acmd.getTable() != null) {
+ return acmd.getTable();
+ }
+ return acmd.getName();
+ }
+
+
+ public static String getSuperFamilyName(AbstractMemberMetaData fieldData,String class_CF){
+
+ if(fieldData.getTable()!=null){
+ return fieldData.getTable();
+ }
+ return class_CF+"_"+fieldData.getName();
+ }
+
+ public static String ObjectToString(Object object) throws IOException {
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(object);
+ String name = new String(bos.toByteArray());
+ return name;
+
+ }
+
+ /**
+ * Generates the elements in fault within the schema, when running the application.
+ * In version 0.7 of Cassandra will be responsible for the runtime edition of the schema.
+ *
+ * */
+ public static void createSchema(AbstractClassMetaData classMetaData,
+ CassandraStoreManager storeManager) throws IOException {
+
+ Client cassandraClient = null;
+ CassandraConnectionInfo connectionInfo = storeManager
+ .getConnectionInfo();
+ cassandraClient = getConnection(connectionInfo);
+ String keyspace = connectionInfo.getKeyspace();
+
+ if (cassandraClient == null) {
+ throw new NucleusDataStoreException(
+ "No connection to the data store");
+ }
+
+ if (schemaInfo == null) { // when starting the schema, check if the
+ // keyspace is already defined.
+ schemaInfo = new StringBuilder();
+
+ try {
+ containsKeySpace = cassandraClient.describe_keyspaces()
+ .contains(keyspace);
+ } catch (TException e) {
+ throw new NucleusDataStoreException(e.getMessage(), e);
+ }
+
+ if (!containsKeySpace) {
+ alterSchema = true;
+ schemaInfo.append("<Keyspace Name=\"" + keyspace + "\">\n");
+ keyspaceInfo =
+ "<ReplicaPlacementStrategy>"
+ + storeManager.getReplicaPlacementStrategy()
+ + "</ReplicaPlacementStrategy>\n"
+ + "<ReplicationFactor>"
+ + storeManager.getReplicationFactor()
+ + "</ReplicationFactor>\n"
+ + "<EndPointSnitch>"
+ + storeManager.getEndPointSnitch()
+ + "</EndPointSnitch>\n" + "</Keyspace>\n";
+ }
+ }
+
+ boolean containsColumn = false;
+ String columnName = getFamilyName(classMetaData);
+ if (containsKeySpace) {
+ try {
+ containsColumn = cassandraClient.describe_keyspace(keyspace)
+ .containsKey(columnName);
+ } catch (NotFoundException e) {
+ throw new NucleusDataStoreException(e.getMessage(), e);
+ } catch (TException e) {
+ throw new NucleusDataStoreException(e.getMessage(), e);
+ }
+ }
+ if (!containsColumn) {
+ alterSchema = true;
+ schemaInfo.append("<ColumnFamily Name=\"" + columnName
+ + "\" CompareWith=\"BytesType\"/>\n");
+ }
+
+ if (alterSchema) {
+
+ if (cassandra_Schema_File == null) {
+ cassandra_Schema_File = new File("datanucleus.schema");
+
+ if (cassandra_Schema_File.exists()) {// delete
+ cassandra_Schema_File.delete();
+ }
+ cassandra_Schema_File.createNewFile();// create new
+ }
+
+ if (cassandra_Schema_File != null) {
+ System.out.println("FILE:"
+ + cassandra_Schema_File.getAbsolutePath());
+ DataOutputStream out = null;
+ try {
+ FileOutputStream file = new FileOutputStream(
+ cassandra_Schema_File, false);
+ out = new DataOutputStream(file);
+
+ out.write((schemaInfo.toString() + keyspaceInfo)
+ .getBytes());
+
+ out.flush();
+ out.close();
+ } catch (IOException e) {
+ out.close();
+ throw new NucleusException(e.getMessage(), e);
+ }
+ } else {
+ throw new NucleusException("Schema file was not created.");
+ }
+ }
+ }
+
+ public static Client getConnection(CassandraConnectionInfo connectionInfo) {
+ Map<String, Integer> connections = connectionInfo.getConnections();
+ Iterator<String> connections_iterator = connections.keySet().iterator();
+
+ boolean connection = false;
+ Client cassandraClient = null;
+
+ while (connections_iterator.hasNext() && !connection) {
+ String host = (String) connections_iterator.next();
+ connection = true;
+ TSocket socket = new TSocket(host, connections.get(host));
+ TProtocol protocol = new TBinaryProtocol(socket);
+ cassandraClient = new Client(protocol);
+ try {
+ socket.open();
+ } catch (TTransportException e) {
+ System.out.println("Dead client: " + host + ":"
+ + connections.get(host));
+ connection = false;
+ }
+ }
+
+ return cassandraClient;
+ }
+
+}
6 src/org/datanucleus/store/cassandra/Localisation.properties
@@ -0,0 +1,6 @@
+
+
+Cassandra.DatastoreID=Datastore ID not supported for this datastore, but class "{0}" uses it.
+
+Cassandra.Insert.ObjectWithIdAlreadyExists=Object "{0}" being inserted has id "{1}" yet an object with this id already exists in the datastore!
+
133 src/tests/ApplicationIdPersistenceTest.java
@@ -0,0 +1,133 @@
+package tests;
+//
+//import javax.jdo.PersistenceManager;
+//import javax.jdo.Transaction;
+//
+//import org.jpox.samples.models.company.Person;
+//
+///**
+// * Application identity persistence tests for XML datastores.
+// */
+//public class ApplicationIdPersistenceTest extends JDOPersistenceTestCase
+//{
+// Object id;
+//
+// public ApplicationIdPersistenceTest(String name)
+// {
+// super(name);
+// }
+//
+// public void testInsert() throws Exception
+// {
+// try
+// {
+// PersistenceManager pm = pmf.getPersistenceManager();
+// Transaction tx = pm.currentTransaction();
+// try
+// {
+// tx.begin();
+// Person p = new Person();
+// p.setPersonNum(1);
+// p.setGlobalNum("1");
+// p.setFirstName("Bugs");
+// p.setLastName("Bunny");
+//
+// Person p2 = new Person();
+// p2.setPersonNum(2);
+// p2.setGlobalNum("2");
+// p2.setFirstName("My");
+// p2.setLastName("Friend");
+//
+// p.setBestFriend(p2);
+//
+// pm.makePersistent(p);
+// tx.commit();
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// fail("Exception thrown when running test " + e.getMessage());
+// }
+// finally
+// {
+// if (tx.isActive())
+// {
+// tx.rollback();
+// }
+// pm.close();
+// }
+// }
+// finally
+// {
+// clean(Person.class);
+// }
+// }
+//
+// /**
+// * Test of persistence of more than 1 app id objects with the same "id".
+// */
+// public void testPersistDuplicates()
+// {
+// try