Skip to content

Commit

Permalink
[ARTEMIS-2626]: Postgresql Journal implementation requires direct acc…
Browse files Browse the repository at this point in the history
…ess to PostgeSQL driver internal classes.

Issue: The BLOB manipulation is done using PostgreSQL internal classes starting from PGConnection.
This leads to ClasCastExceptions if the connection is wrapped in a pool or if the driver is in a different classloader (WildFly).

Fix: unwrap the connection and if the PostgreSQL classes are not directly available uses reflection to manipulate the BLOBs.

Jira: https://issues.apache.org/jira/browse/ARTEMIS-2626
  • Loading branch information
ehsavoie committed Feb 20, 2020
1 parent d3d9ceb commit ac98cda
Show file tree
Hide file tree
Showing 4 changed files with 592 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright 2019 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;

/**
* Helper class for when the postresql driver is not directly availalbe.
*/
public class PostgresLargeObjectManager {

/**
* This mode indicates we want to write to an object
*/
public static final int WRITE = 0x00020000;

/**
* This mode indicates we want to read an object
*/
public static final int READ = 0x00040000;

/**
* This mode is the default. It indicates we want read and write access to
* a large object
*/
public static final int READWRITE = READ | WRITE;

private final Connection realConnection;
private boolean shouldUseReflection;

public PostgresLargeObjectManager(Connection connection) throws SQLException {
this.realConnection = unwrap(connection);
try {
this.getClass().getClassLoader().loadClass("org.postgresql.PGConnection");
shouldUseReflection = false;
} catch (ClassNotFoundException ex) {
shouldUseReflection = true;
}
}

public final Long createLO() throws SQLException {
if (shouldUseReflection) {
Object largeObjectManager = getLargeObjectManager();
try {
Method method = largeObjectManager.getClass().getMethod("createLO");
return (Long) method.invoke(largeObjectManager);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
}
} else {
return ((PGConnection) realConnection).getLargeObjectAPI().createLO();
}
}

public Object open(long oid, int mode) throws SQLException {
if (shouldUseReflection) {
Object largeObjectManager = getLargeObjectManager();
try {
Method method = largeObjectManager.getClass().getMethod("open", long.class, int.class);
return method.invoke(largeObjectManager, oid, mode);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
}
} else {
return ((PGConnection) realConnection).getLargeObjectAPI().open(oid, mode);
}
}

public int size(Object largeObject) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("size");
return (int) method.invoke(largeObject);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
return ((LargeObject) largeObject).size();
}
}

public void close(Object largeObject) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("close");
method.invoke(largeObject);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
((LargeObject) largeObject).close();
}
}

public byte[] read(Object largeObject, int length) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("read", int.class);
return (byte[]) method.invoke(largeObject, length);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
return ((LargeObject) largeObject).read(length);
}
}

public void write(Object largeObject, byte[] data) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("write", byte[].class);
method.invoke(largeObject, data);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
((LargeObject) largeObject).write(data);
}
}

public void seek(Object largeObject, int position) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("seek", Integer.class);
method.invoke(largeObject, position);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
((LargeObject) largeObject).seek(position);
}
}

public void truncate(Object largeObject, int position) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("truncate", Integer.class);
method.invoke(largeObject, position);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
((LargeObject) largeObject).truncate(position);
}
}

private Object getLargeObjectManager() throws SQLException {
if (shouldUseReflection) {
try {
Method method = realConnection.getClass().getMethod("getLargeObjectAPI");
return method.invoke(realConnection);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
}
} else {
return ((PGConnection) realConnection).getLargeObjectAPI();
}
}

public final Connection unwrap(Connection connection) throws SQLException {
Connection conn = connection.unwrap(Connection.class);
return unwrapIronJacamar(unwrapDbcp(unwrapSpring(conn)));
}

private Connection unwrapIronJacamar(Connection conn) {
try {
Method method = conn.getClass().getMethod("getUnderlyingConnection");
return (Connection) method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
return conn;
}
}

private Connection unwrapDbcp(Connection conn) {
try {
Method method = conn.getClass().getMethod("getDelegate");
return (Connection) method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
return conn;
}
}

private Connection unwrapSpring(Connection conn) {
try {
Method method = conn.getClass().getMethod("getTargetConnection");
return (Connection) method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
return conn;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@
import java.sql.Statement;

import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;

import javax.sql.DataSource;

@SuppressWarnings("SynchronizeOnNonFinalField")
public final class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {

private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
private PostgresLargeObjectManager largeObjectManager;

public PostgresSequentialSequentialFileDriver() throws SQLException {
super();
Expand All @@ -52,6 +50,7 @@ public PostgresSequentialSequentialFileDriver(Connection connection, SQLProvider

@Override
protected void prepareStatements() throws SQLException {
this.largeObjectManager = new PostgresLargeObjectManager(connection);
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
Expand All @@ -67,9 +66,7 @@ public void createFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) {
try {
connection.setAutoCommit(false);

LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
long oid = lobjManager.createLO();
Long oid = largeObjectManager.createLO();

createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension());
Expand Down Expand Up @@ -109,20 +106,19 @@ public void loadFile(JDBCSequentialFile file) throws SQLException {
@Override
public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
synchronized (connection) {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
Object largeObject = null;

Long oid = getOID(file);
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.WRITE);
if (append) {
largeObject.seek(largeObject.size());
largeObjectManager.seek(largeObject, largeObjectManager.size(largeObject));
} else {
largeObject.truncate(0);
largeObjectManager.truncate(largeObject, 0);
}
largeObject.write(data);
largeObject.close();
largeObjectManager.write(largeObject, data);
largeObjectManager.close(largeObject);
connection.commit();
} catch (Exception e) {
connection.rollback();
Expand All @@ -134,23 +130,23 @@ public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) thr

@Override
public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
Object largeObject = null;
long oid = getOID(file);
synchronized (connection) {
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.READ);
int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ);
int readLength = (int) calculateReadLength(largeObjectManager.size(largeObject), bytes.remaining(), file.position());

if (readLength > 0) {
if (file.position() > 0)
largeObject.seek((int) file.position());
byte[] data = largeObject.read(readLength);
if (file.position() > 0) {
largeObjectManager.seek(largeObject, (int) file.position());
}
byte[] data = largeObjectManager.read(largeObject, readLength);
bytes.put(data);
}

largeObject.close();
largeObjectManager.close(largeObject);
connection.commit();

return readLength;
Expand Down Expand Up @@ -185,17 +181,15 @@ private Long getOID(JDBCSequentialFile file) throws SQLException {
}

private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();

int size = 0;
Long oid = getOID(file);
if (oid != null) {
synchronized (connection) {
try {
connection.setAutoCommit(false);
LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
size = largeObject.size();
largeObject.close();
Object largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ);
size = largeObjectManager.size(largeObject);
largeObjectManager.close(largeObject);
connection.commit();
} catch (SQLException e) {
connection.rollback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.file;
package org.apache.activemq.artemis.jdbc.store.file;

import java.nio.ByteBuffer;
import java.sql.DriverManager;
Expand All @@ -37,8 +37,6 @@
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
Expand Down
Loading

0 comments on commit ac98cda

Please sign in to comment.