Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARTEMIS-2626]: Postgresql Journal implementation requires the jdbc driver to be in the same classloader #2987

Merged
merged 1 commit into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright 2019 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;

/**
* Helper class for when the postresql driver is not directly availalbe.
*/
public class PostgresLargeObjectManager {

/**
* This mode indicates we want to write to an object
*/
public static final int WRITE = 0x00020000;

/**
* This mode indicates we want to read an object
*/
public static final int READ = 0x00040000;

/**
* This mode is the default. It indicates we want read and write access to
* a large object
*/
public static final int READWRITE = READ | WRITE;

private final Connection realConnection;
private boolean shouldUseReflection;

public PostgresLargeObjectManager(Connection connection) throws SQLException {
this.realConnection = unwrap(connection);
try {
this.getClass().getClassLoader().loadClass("org.postgresql.PGConnection");
shouldUseReflection = false;
} catch (ClassNotFoundException ex) {
shouldUseReflection = true;
}
}

public final Long createLO() throws SQLException {
if (shouldUseReflection) {
Object largeObjectManager = getLargeObjectManager();
try {
Method method = largeObjectManager.getClass().getMethod("createLO");
return (Long) method.invoke(largeObjectManager);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
}
} else {
return ((PGConnection) realConnection).getLargeObjectAPI().createLO();
}
}

public Object open(long oid, int mode) throws SQLException {
if (shouldUseReflection) {
Object largeObjectManager = getLargeObjectManager();
try {
Method method = largeObjectManager.getClass().getMethod("open", long.class, int.class);
return method.invoke(largeObjectManager, oid, mode);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
}
} else {
return ((PGConnection) realConnection).getLargeObjectAPI().open(oid, mode);
}
}

public int size(Object largeObject) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("size");
return (int) method.invoke(largeObject);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
return ((LargeObject) largeObject).size();
}
}

public void close(Object largeObject) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("close");
method.invoke(largeObject);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
((LargeObject) largeObject).close();
}
}

public byte[] read(Object largeObject, int length) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("read", int.class);
return (byte[]) method.invoke(largeObject, length);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
return ((LargeObject) largeObject).read(length);
}
}

public void write(Object largeObject, byte[] data) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("write", byte[].class);
method.invoke(largeObject, data);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
((LargeObject) largeObject).write(data);
}
}

public void seek(Object largeObject, int position) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("seek", Integer.class);
method.invoke(largeObject, position);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
((LargeObject) largeObject).seek(position);
}
}

public void truncate(Object largeObject, int position) throws SQLException {
if (shouldUseReflection) {
try {
Method method = largeObject.getClass().getMethod("truncate", Integer.class);
method.invoke(largeObject, position);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObject", ex);
}
} else {
((LargeObject) largeObject).truncate(position);
}
}

private Object getLargeObjectManager() throws SQLException {
if (shouldUseReflection) {
try {
Method method = realConnection.getClass().getMethod("getLargeObjectAPI");
return method.invoke(realConnection);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
}
} else {
return ((PGConnection) realConnection).getLargeObjectAPI();
}
}

public final Connection unwrap(Connection connection) throws SQLException {
Connection conn = connection.unwrap(Connection.class);
return unwrapIronJacamar(unwrapDbcp(unwrapSpring(conn)));
}

private Connection unwrapIronJacamar(Connection conn) {
try {
Method method = conn.getClass().getMethod("getUnderlyingConnection");
return (Connection) method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
return conn;
}
}

private Connection unwrapDbcp(Connection conn) {
try {
Method method = conn.getClass().getMethod("getDelegate");
return (Connection) method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
return conn;
}
}

private Connection unwrapSpring(Connection conn) {
try {
Method method = conn.getClass().getMethod("getTargetConnection");
return (Connection) method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
return conn;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@
import java.sql.Statement;

import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;

import javax.sql.DataSource;

@SuppressWarnings("SynchronizeOnNonFinalField")
public final class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {

private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
private PostgresLargeObjectManager largeObjectManager;

public PostgresSequentialSequentialFileDriver() throws SQLException {
super();
Expand All @@ -52,6 +50,7 @@ public PostgresSequentialSequentialFileDriver(Connection connection, SQLProvider

@Override
protected void prepareStatements() throws SQLException {
this.largeObjectManager = new PostgresLargeObjectManager(connection);
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
Expand All @@ -67,9 +66,7 @@ public void createFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) {
try {
connection.setAutoCommit(false);

LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
long oid = lobjManager.createLO();
Long oid = largeObjectManager.createLO();

createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension());
Expand Down Expand Up @@ -109,20 +106,19 @@ public void loadFile(JDBCSequentialFile file) throws SQLException {
@Override
public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
synchronized (connection) {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
Object largeObject = null;

Long oid = getOID(file);
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.WRITE);
if (append) {
largeObject.seek(largeObject.size());
largeObjectManager.seek(largeObject, largeObjectManager.size(largeObject));
} else {
largeObject.truncate(0);
largeObjectManager.truncate(largeObject, 0);
}
largeObject.write(data);
largeObject.close();
largeObjectManager.write(largeObject, data);
largeObjectManager.close(largeObject);
connection.commit();
} catch (Exception e) {
connection.rollback();
Expand All @@ -134,23 +130,23 @@ public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) thr

@Override
public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
Object largeObject = null;
long oid = getOID(file);
synchronized (connection) {
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.READ);
int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ);
int readLength = (int) calculateReadLength(largeObjectManager.size(largeObject), bytes.remaining(), file.position());

if (readLength > 0) {
if (file.position() > 0)
largeObject.seek((int) file.position());
byte[] data = largeObject.read(readLength);
if (file.position() > 0) {
largeObjectManager.seek(largeObject, (int) file.position());
}
byte[] data = largeObjectManager.read(largeObject, readLength);
bytes.put(data);
}

largeObject.close();
largeObjectManager.close(largeObject);
connection.commit();

return readLength;
Expand Down Expand Up @@ -185,17 +181,15 @@ private Long getOID(JDBCSequentialFile file) throws SQLException {
}

private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();

int size = 0;
Long oid = getOID(file);
if (oid != null) {
synchronized (connection) {
try {
connection.setAutoCommit(false);
LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
size = largeObject.size();
largeObject.close();
Object largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ);
size = largeObjectManager.size(largeObject);
largeObjectManager.close(largeObject);
connection.commit();
} catch (SQLException e) {
connection.rollback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.file;
package org.apache.activemq.artemis.jdbc.store.file;

import java.nio.ByteBuffer;
import java.sql.DriverManager;
Expand All @@ -37,8 +37,6 @@
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
Expand Down