Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Databricks database support #3518

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flywaydb.commandline.command.version;
package org.flywaydb.commandline.logging.version;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hope I'm within the bounds of reasonableness. Since you've changed the path here to be logging instead of version, you also need to update the line in src/main/resources/META-INF/services/org.flywaydb.core.extensibility.Plugin to org.flywaydb.commandline.logging.version.VersionCommandExtension as well. (or at least I had to in order to build this)

I think I'm missing some context here, but what's the motivation behind moving these files into logging?


import lombok.CustomLog;
import org.flywaydb.core.api.FlywayException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flywaydb.commandline.command.version;
package org.flywaydb.commandline.logging.version;

import org.flywaydb.core.api.output.OperationResult;

Expand Down
6 changes: 6 additions & 0 deletions flyway-community-db-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,14 @@
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>2.12.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>databricks-jdbc</artifactId>
<version>2.6.29</version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to include the latest version if possible, 2.6.33

</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.flywaydb.community.database.databricks;

import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.internal.database.base.Connection;
import org.flywaydb.core.internal.database.base.Schema;
import org.flywaydb.core.internal.util.StringUtils;

import java.sql.SQLException;

public class DatabricksConnection extends Connection<DatabricksDatabase> {
protected DatabricksConnection(DatabricksDatabase database, java.sql.Connection connection) {
super(database, connection);
}

@Override
protected String getCurrentSchemaNameOrSearchPath() throws SQLException {
String defaultCatalog = "hive_metastore";
String currentCatalog = jdbcTemplate.queryForString("SELECT current_catalog();");
String defaultSchema = "default";
String currentSchema = jdbcTemplate.queryForString("SELECT current_database();");
return (currentSchema != null) ? currentSchema : defaultSchema;
}

@Override
public void doChangeCurrentSchemaOrSearchPathTo(String schema) throws SQLException {
String sql = "USE DATABASE " + database.doQuote(schema) + ";";
jdbcTemplate.execute(sql);
}

@Override
public Schema doGetCurrentSchema() throws SQLException {
String currentSchema = getCurrentSchemaNameOrSearchPath();

if (!StringUtils.hasText(currentSchema)) {
throw new FlywayException("Unable to determine current schema as currentSchema is empty.");
}

return getSchema(currentSchema);
}

@Override
public Schema getSchema(String name) {
return new DatabricksSchema(jdbcTemplate, database, name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.flywaydb.community.database.databricks;

import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.internal.database.base.Database;
import org.flywaydb.core.internal.database.base.Table;
import org.flywaydb.core.internal.jdbc.JdbcConnectionFactory;
import org.flywaydb.core.internal.jdbc.StatementInterceptor;
import org.flywaydb.core.internal.util.StringUtils;

import java.sql.Connection;
import java.sql.SQLException;

public class DatabricksDatabase extends Database<DatabricksConnection> {
public DatabricksDatabase(Configuration configuration, JdbcConnectionFactory jdbcConnectionFactory, StatementInterceptor statementInterceptor) {
super(configuration, jdbcConnectionFactory, statementInterceptor);
}

@Override
protected DatabricksConnection doGetConnection(Connection connection) {
return new DatabricksConnection(this, connection);
}

@Override
protected String doGetCurrentUser() throws SQLException {
return getMainConnection().getJdbcTemplate().queryForString("SELECT current_user() as user;");
}

@Override
public void ensureSupported() {
// Always latest Databricks version.
}

@Override
public boolean supportsDdlTransactions() {
// Databricks i non-transactional
return false;
}

@Override
public boolean supportsChangingCurrentSchema() {
return false;
}

@Override
public String getBooleanTrue() {
return "TRUE";
}

@Override
public String getBooleanFalse() {
return "FALSE";
}

@Override
public boolean catalogIsSchema() {
return true;
}

@Override
public boolean supportsMultiStatementTransactions() {
return false;
}

@Override
public boolean useSingleConnection() {
return true;
}

@Override
public String doQuote(String identifier) {
return getOpenQuote() + StringUtils.replaceAll(identifier, getCloseQuote(), getEscapedQuote()) + getCloseQuote();
}

@Override
protected String getOpenQuote() {
return "`";
}

@Override
protected String getCloseQuote() {
return "`";
}

@Override
public String getEscapedQuote() {
return "\\`";
}

@Override
public String getRawCreateScript(Table table, boolean baseline) {
String sql = "CREATE TABLE " + table + " (\n" +
" `installed_rank` INT NOT NULL,\n" +
" `version` STRING,\n" +
" `description` STRING NOT NULL,\n" +
" `type` STRING NOT NULL,\n" +
" `script` STRING NOT NULL,\n" +
" `checksum` INT,\n" +
" `installed_by` STRING NOT NULL,\n" +
" `installed_on` TIMESTAMP NOT NULL,\n" +
" `execution_time` INT NOT NULL,\n" +
" `success` BOOLEAN NOT NULL\n" +
");\n" +
(baseline ? getBaselineStatement(table) + ";\n" : "");
return sql;
}

@Override
public String getInsertStatement(Table table) {
// Explicitly set installed_on to CURRENT_TIMESTAMP().
return "INSERT INTO " + table
+ " (" + quote("installed_rank")
+ ", " + quote("version")
+ ", " + quote("description")
+ ", " + quote("type")
+ ", " + quote("script")
+ ", " + quote("checksum")
+ ", " + quote("installed_by")
+ ", " + quote("installed_on")
+ ", " + quote("execution_time")
+ ", " + quote("success")
+ ")"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP(), ?, ?)";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.flywaydb.community.database.databricks;

import org.flywaydb.core.api.ResourceProvider;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.internal.database.base.BaseDatabaseType;
import org.flywaydb.core.internal.database.base.Database;
import org.flywaydb.core.internal.jdbc.JdbcConnectionFactory;
import org.flywaydb.core.internal.jdbc.StatementInterceptor;
import org.flywaydb.core.internal.parser.Parser;
import org.flywaydb.core.internal.parser.ParsingContext;
import org.flywaydb.core.internal.util.ClassUtils;

import java.sql.Connection;
import java.sql.Types;
import java.util.Properties;

public class DatabricksDatabaseType extends BaseDatabaseType {
private static final String DATABRICKS_JDBC_DRIVER = "com.databricks.client.jdbc.Driver";
private static final String DATABRICKS_JDBC41_DRIVER = "com.databricks.client.jdbc41.Driver";

@Override
public String getName() {
return "Databricks";
}

@Override
public int getNullType() {
return Types.VARCHAR;
}

@Override
public boolean handlesJDBCUrl(String url) {
return url.startsWith("jdbc:databricks:");
}

@Override
public String getDriverClass(String url, ClassLoader classLoader) {
return "com.databricks.client.jdbc42.Driver";
}

@Override
public String getBackupDriverClass(String url, ClassLoader classLoader) {
if (ClassUtils.isPresent(DATABRICKS_JDBC41_DRIVER, classLoader)) {
return DATABRICKS_JDBC41_DRIVER;
}
return DATABRICKS_JDBC_DRIVER;
}

@Override
public boolean handlesDatabaseProductNameAndVersion(String databaseProductName, String databaseProductVersion, Connection connection) {
if (databaseProductName.startsWith("SparkSQL")) {
return true;
}
return false;
}

@Override
public Database createDatabase(Configuration configuration, JdbcConnectionFactory jdbcConnectionFactory, StatementInterceptor statementInterceptor) {
return new DatabricksDatabase(configuration, jdbcConnectionFactory, statementInterceptor);
}

@Override
public Parser createParser(Configuration configuration, ResourceProvider resourceProvider, ParsingContext parsingContext) {
return new DatabricksParser(configuration, parsingContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.flywaydb.community.database.databricks;

import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.internal.parser.Parser;
import org.flywaydb.core.internal.parser.ParsingContext;

public class DatabricksParser extends Parser {
protected DatabricksParser(Configuration configuration, ParsingContext parsingContext) {
super(configuration, parsingContext, 3);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.flywaydb.community.database.databricks;

import org.flywaydb.core.internal.database.base.Schema;
import org.flywaydb.core.internal.database.base.Table;
import org.flywaydb.core.internal.jdbc.JdbcTemplate;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

public class DatabricksSchema extends Schema<DatabricksDatabase, DatabricksTable> {
/**
* @param jdbcTemplate The Jdbc Template for communicating with the DB.
* @param database The database-specific support.
* @param name The name of the schema.
*/
public DatabricksSchema(JdbcTemplate jdbcTemplate, DatabricksDatabase database, String name) {
super(jdbcTemplate, database, name);
}

@Override
protected boolean doExists() throws SQLException {
return jdbcTemplate
.queryForInt("select count(table_name) from information_schema.tables where table_schema = ?;", name) > 0;
}

@Override
protected boolean doEmpty() throws SQLException {
return jdbcTemplate
.queryForInt("select count(table_name) from information_schema.tables where table_schema = ?;", name) == 0;
}

@Override
protected void doCreate() throws SQLException {
jdbcTemplate.execute("create database if not exists " + database.quote(name) + ";");
}

@Override
protected void doDrop() throws SQLException {
jdbcTemplate.execute("drop database if exists " + database.quote(name) + " cascade;");
}

@Override
protected void doClean() throws SQLException {
for (String statement : generateDropStatements("MANAGED", "TABLE")) {
jdbcTemplate.execute(statement);
}
for (String statement : generateDropStatements("VIEW", "VIEW")) {
jdbcTemplate.execute(statement);
}
for (String statement : generateDropStatementsForRoutines("FUNCTION")) {
jdbcTemplate.execute(statement);
}
}

private List<String> generateDropStatements(String type, String objType) throws SQLException {
List<String> names =
jdbcTemplate.queryForStringList(
// Search for all views
"select table_name from information_schema.tables where table_schema = ? WHERE table_type = ?;",
name,
type
);
List<String> statements = new ArrayList<>();
for (String domainName : names) {
statements.add("drop " + objType + " if exists " + database.quote(name, domainName) + ";");
}
return statements;
}

private List<String> generateDropStatementsForRoutines(String objType) throws SQLException {
List<String> objNames =
jdbcTemplate.queryForStringList(
// Search for all functions
"select routine_name from information_schema.routines where routine_schema = ? WHERE routine_type = ?;",
objType
);
List<String> statements = new ArrayList<>();
for (String objName : objNames) {
statements.add("drop " + objType + " if exists " + database.quote(name, objName) + ";");
}
return statements;
}

@Override
protected DatabricksTable[] doAllTables() throws SQLException {
List<String> tableNames = jdbcTemplate.queryForStringList(
"select table_name from information_schema.tables where table_schema = ? and table_type = 'MANAGED';",
name
);
DatabricksTable[] tables = new DatabricksTable[tableNames.size()];
for (int i = 0; i < tableNames.size(); i++) {
tables[i] = new DatabricksTable(jdbcTemplate, database, this, tableNames.get(i));
}
return tables;
}

@Override
public Table getTable(String tableName) {
return new DatabricksTable(jdbcTemplate, database, this, tableName);
}
}
Loading