Skip to content

Commit

Permalink
First version of JdbcImporter
Browse files Browse the repository at this point in the history
  • Loading branch information
dmontag committed Jun 29, 2011
1 parent 5f4dd61 commit 347ae23
Show file tree
Hide file tree
Showing 5 changed files with 439 additions and 15 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Expand Up @@ -30,5 +30,13 @@
<artifactId>commons-io</artifactId>
<version>2.0.1</version>
</dependency>

<dependency>
<groupId>hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>1.8.0.10</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
4 changes: 2 additions & 2 deletions src/main/java/org/neo4j/dataimport/CsvImporter.java
Expand Up @@ -20,8 +20,6 @@ public class CsvImporter implements BatchInserterImporter
{
private File nodes;
private File rels;
private List<Pair<String, String>> nodePropertyKeys;
private List<Pair<String, String>> relPropertyKeys;

public CsvImporter( File nodes, File rels )
{
Expand Down Expand Up @@ -74,6 +72,7 @@ private static Map<String, String> getConfig( String storeDir )

private void importNodes( BatchInserter target ) throws FileNotFoundException
{
List<Pair<String, String>> nodePropertyKeys = null;
long counter = 0;
Scanner nodeScanner = new Scanner( nodes );
while ( nodeScanner.hasNextLine() )
Expand Down Expand Up @@ -104,6 +103,7 @@ private void importNodes( BatchInserter target ) throws FileNotFoundException

private void importRels( BatchInserter target ) throws FileNotFoundException
{
List<Pair<String, String>> relPropertyKeys = null;
long counter = 0;
Scanner nodeScanner = new Scanner( rels );
while ( nodeScanner.hasNextLine() )
Expand Down
155 changes: 155 additions & 0 deletions src/main/java/org/neo4j/dataimport/JdbcImporter.java
@@ -0,0 +1,155 @@
package org.neo4j.dataimport;

import org.neo4j.graphdb.DynamicRelationshipType;
import org.neo4j.kernel.impl.batchinsert.BatchInserter;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class JdbcImporter implements BatchInserterImporter
{
private Connection connection;
private String nodesTable;
private String relsTable;
private Set<String> reservedRelColumns = new HashSet<String>( Arrays.asList( "src", "dest", "type" ) );
private Set<String> reservedNodeColumns = new HashSet<String>( Arrays.asList( "id" ) );


public JdbcImporter( Connection connection, String nodes, String rels )
{
this.connection = connection;
nodesTable = nodes;
relsTable = rels;
}

@Override
public void importTo( BatchInserter target )
{
try
{
doImport( target );
}
catch ( SQLException e )
{
throw new DataImportException( e );
}
}

private void doImport( BatchInserter target ) throws SQLException
{
importNodes( target );
importRels( target );
}

private void importNodes( BatchInserter target ) throws SQLException
{
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery( "SELECT * FROM " + nodesTable );
Map<String, String> columnTypes = getPropertyColumns( resultSet, reservedNodeColumns );
while ( resultSet.next() )
{
target.createNode( resultSet.getLong( "id" ), getProperties( columnTypes, resultSet ) );
}
statement.close();
}

private void importRels( BatchInserter target ) throws SQLException
{
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery( "SELECT * FROM " + relsTable );
Map<String, String> columnTypes = getPropertyColumns( resultSet, reservedRelColumns );
while ( resultSet.next() )
{
target.createRelationship( resultSet.getLong( "src" ), resultSet.getLong( "dest" ), DynamicRelationshipType.withName( resultSet.getString( "type" ) ), getProperties( columnTypes, resultSet ) );
}
statement.close();
}

private Map<String, String> getPropertyColumns( ResultSet resultSet, Set<String> reserved ) throws SQLException
{
Map<String, String> columnTypes = new HashMap<String, String>();
ResultSetMetaData metaData = resultSet.getMetaData();
long columnCount = metaData.getColumnCount();
System.out.println( String.format( "Found %d columns", columnCount ) );
for ( int i = 1; i <= columnCount; i++ )
{
String columnName = metaData.getColumnName( i );
String columnType = metaData.getColumnTypeName( i );
System.out.println( String.format( "Found column %s (%s)", columnName, columnType ) );
if ( !reserved.contains( columnName ) )
{
columnTypes.put( columnName, columnType );
}
}
return columnTypes;
}

private Map<String, Object> getProperties( Map<String, String> columnTypes, ResultSet resultSet ) throws SQLException
{
Map<String, Object> properties = new HashMap<String, Object>();
for ( Map.Entry<String, String> columnEntry : columnTypes.entrySet() )
{
String key = columnEntry.getKey();
Object value = getProperty( key, columnEntry.getValue(), resultSet );
if ( value != null )
{
properties.put( key.toLowerCase(), value );
}
}
System.out.println( "props: " + properties );
return properties;
}

private Object getProperty( String column, String type, ResultSet resultSet ) throws SQLException
{
if ( type.equals( "VARCHAR" ) )
{
return resultSet.getString( column );
}
else if ( type.equals( "BIGINT" ) )
{
return resultSet.getLong( column );
}
else if ( type.equals( "INTEGER" ) )
{
return resultSet.getInt( column );
}
else if ( type.equals( "TINYINT" ) )
{
return resultSet.getByte( column );
}
else if ( type.equals( "SMALLINT" ) )
{
return resultSet.getShort( column );
}
// else if ( type.equals( "char" ) )
// {
// return s.charAt( 0 );
// }
else if ( type.equals( "BOOLEAN" ) )
{
return resultSet.getBoolean( column );
}
else if ( type.equals( "FLOAT" ) )
{
return resultSet.getFloat( column );
}
else if ( type.equals( "DOUBLE" ) )
{
return resultSet.getDouble( column );
}
else
{
throw new IllegalStateException( "Unknown type: " + type );
}
}

}
26 changes: 13 additions & 13 deletions src/test/java/org/neo4j/dataimport/CsvImporterTest.java
Expand Up @@ -28,7 +28,7 @@ public class CsvImporterTest
{
private File nodes;
private File rels;
private BatchInserter batchImporter;
private BatchInserter batchInserter;
private String storePath;
private GraphDatabaseService graphDb;

Expand All @@ -38,7 +38,7 @@ public void setUp() throws IOException
nodes = File.createTempFile( "nodes-import-", ".csv" );
rels = File.createTempFile( "rels-import-", ".csv" );
storePath = createTempDir().getAbsolutePath();
batchImporter = new BatchInserterImpl( storePath );
batchInserter = new BatchInserterImpl( storePath );
}

private File createTempDir() throws IOException
Expand All @@ -52,9 +52,9 @@ private File createTempDir() throws IOException
public void tearDown()
{
assertTrue( "Unable to delete tempfile.", nodes.delete() );
if (batchImporter != null)
if ( batchInserter != null)
{
batchImporter.shutdown();
batchInserter.shutdown();
}
if (graphDb != null)
{
Expand All @@ -66,7 +66,7 @@ public void tearDown()
public void testEmptyImport() throws IOException
{
CsvImporter csvImporter = new CsvImporter( nodes, rels );
csvImporter.importTo( batchImporter );
csvImporter.importTo( batchInserter );

importComplete();

Expand All @@ -81,7 +81,7 @@ public void testSingleLineImport() throws IOException
writeFiles();

CsvImporter csvImporter = new CsvImporter( nodes, rels );
csvImporter.importTo( batchImporter );
csvImporter.importTo( batchInserter );

importComplete();

Expand All @@ -106,7 +106,7 @@ public void testRelationshipImport() throws IOException
writeFiles();

CsvImporter csvImporter = new CsvImporter( nodes, rels );
csvImporter.importTo( batchImporter );
csvImporter.importTo( batchInserter );

importComplete();

Expand All @@ -126,7 +126,7 @@ public void testNodePropertyImport() throws IOException
writeFiles();

CsvImporter csvImporter = new CsvImporter( nodes, rels );
csvImporter.importTo( batchImporter );
csvImporter.importTo( batchInserter );

importComplete();

Expand All @@ -145,7 +145,7 @@ public void testRelationshipPropertyImport() throws IOException
writeFiles();

CsvImporter csvImporter = new CsvImporter( nodes, rels );
csvImporter.importTo( batchImporter );
csvImporter.importTo( batchInserter );

importComplete();

Expand All @@ -167,7 +167,7 @@ public void testPropertyTypes() throws IOException
writeFiles();

CsvImporter csvImporter = new CsvImporter( nodes, rels );
csvImporter.importTo( batchImporter );
csvImporter.importTo( batchInserter );

importComplete();

Expand Down Expand Up @@ -207,7 +207,7 @@ public void testSparseProperties() throws IOException
writeFiles();

CsvImporter csvImporter = new CsvImporter( nodes, rels );
csvImporter.importTo( batchImporter );
csvImporter.importTo( batchInserter );

importComplete();

Expand All @@ -224,8 +224,8 @@ public void testSparseProperties() throws IOException

private void importComplete()
{
batchImporter.shutdown();
batchImporter = null;
batchInserter.shutdown();
batchInserter = null;
graphDb = new EmbeddedGraphDatabase( storePath );
}

Expand Down

0 comments on commit 347ae23

Please sign in to comment.