Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,14 @@ If no profile is specified then an H2 DataSource will be used by default and no

Assuming Docker is running on the system where the build is running, then the following commands can be run:

| Target Database | Build Command |
|-----------------|--------------------------------------------------------------------|
| All supported | `mvn verify -Ptest-all-dbs` |
| H2 (default) | `mvn verify` |
| MariaDB 10.3 | `mvn verify -Pcontrib-check -Dspring.profiles.active=mariadb-10-3` |
| MySQL 8 | `mvn verify -Pcontrib-check -Dspring.profiles.active=mysql-8` |
| PostgreSQL 10 | `mvn verify -Dspring.profiles.active=postgres-10` |
| Target Database | Build Command |
|-------------------|--------------------------------------------------------------------|
| All supported | `mvn verify -Ptest-all-dbs` |
| H2 (default) | `mvn verify` |
| MariaDB 10.3 | `mvn verify -Pcontrib-check -Dspring.profiles.active=mariadb-10-3` |
| MySQL 8 | `mvn verify -Pcontrib-check -Dspring.profiles.active=mysql-8` |
| PostGreSQL 10 | `mvn verify -Dspring.profiles.active=postgres-10` |
| MSSQL Server 2019 | `mvn verify -Dspring.profiles.active=mssql-15` |

For a full list of the available DataSource factories, consult the `nifi-registry-test` module.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@
<artifactId>flyway-mysql</artifactId>
<version>${flyway.version}</version>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-sqlserver</artifactId>
<version>${flyway.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.flywaydb.core.internal.jdbc.JdbcUtils;
import org.flywaydb.database.mysql.MySQLDatabaseType;
import org.flywaydb.database.mysql.mariadb.MariaDBDatabaseType;
import org.flywaydb.database.sqlserver.SQLServerDatabaseType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.flyway.FlywayConfigurationCustomizer;
Expand Down Expand Up @@ -52,6 +53,9 @@ public class CustomFlywayConfiguration implements FlywayConfigurationCustomizer
private static final String LOCATION_POSTGRES = "classpath:db/migration/postgres";
private static final String[] LOCATIONS_POSTGRES = {LOCATION_COMMON, LOCATION_POSTGRES};

private static final String LOCATION_MSSQL = "classpath:db/migration/mssql";
private static final String[] LOCATIONS_MSSQL = {LOCATION_COMMON, LOCATION_MSSQL};

private static final String LEGACY_FLYWAY_SCHEMA_TABLE = "schema_version";

@Override
Expand All @@ -65,6 +69,9 @@ public void customize(final FluentConfiguration configuration) {
} else if (databaseType instanceof PostgreSQLDatabaseType) {
LOGGER.info("Setting migration locations to {}", Arrays.asList(LOCATIONS_POSTGRES));
configuration.locations(LOCATIONS_POSTGRES);
} else if (databaseType instanceof SQLServerDatabaseType) {
LOGGER.info("Setting migration locations to {}", Arrays.asList(LOCATIONS_MSSQL));
configuration.locations(LOCATIONS_MSSQL);
} else {
LOGGER.info("Setting migration locations to {}", Arrays.asList(LOCATIONS_DEFAULT));
configuration.locations(LOCATIONS_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,23 @@ public void migrate(Flyway flyway) {

/**
* Determines if the database represented by this data source is being initialized for the first time based on
* whether or not the table named 'BUCKET' or 'bucket' already exists.
* whether the table named 'BUCKET' or 'bucket' already exists.
*
* @param dataSource the data source
* @return true if the database has never been initialized before, false otherwise
*/
private boolean isNewDatabase(final DataSource dataSource) {
try (final Connection connection = dataSource.getConnection();
final ResultSet rsUpper = connection.getMetaData().getTables(null, null, "BUCKET", null);
final ResultSet rsLower = connection.getMetaData().getTables(null, null, "bucket", null)) {
return !rsUpper.next() && !rsLower.next();
final ResultSet rs = connection.getMetaData().getTables(null, null, "%", null)) {
boolean isNew = true;
while (rs.next()) {
String tableName = rs.getString("TABLE_NAME");
if ("BUCKET".equalsIgnoreCase(tableName)) {
isNew = false;
break;
}
}
return isNew;
} catch (SQLException e) {
LOGGER.error(e.getMessage(), e);
throw new FlywayException("Unable to obtain connection from Flyway DataSource", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@
import org.apache.nifi.extension.manifest.ExtensionType;
import org.apache.nifi.extension.manifest.ProvidedServiceAPI;
import org.apache.nifi.registry.service.MetadataService;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.internal.database.DatabaseType;
import org.flywaydb.core.internal.database.DatabaseTypeRegister;
import org.flywaydb.database.sqlserver.SQLServerDatabaseType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -66,10 +73,20 @@
public class DatabaseMetadataService implements MetadataService {

private final JdbcTemplate jdbcTemplate;
private final DatabaseType databaseType;

@Autowired
public DatabaseMetadataService(final JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.databaseType = getDatabaseType(jdbcTemplate.getDataSource());
}

private DatabaseType getDatabaseType(final DataSource dataSource) {
try (final Connection connection = dataSource.getConnection()) {
return DatabaseTypeRegister.getDatabaseTypeForConnection(connection);
} catch (SQLException e) {
throw new FlywayException("Unable to obtain connection from Flyway DataSource", e);
}
}

//----------------- Buckets ---------------------------------
Expand Down Expand Up @@ -391,7 +408,9 @@ public FlowSnapshotEntity getFlowSnapshot(final String flowIdentifier, final Int

@Override
public FlowSnapshotEntity getLatestSnapshot(final String flowIdentifier) {
final String sql = "SELECT * FROM FLOW_SNAPSHOT WHERE flow_id = ? ORDER BY version DESC LIMIT 1";
final String sql = (databaseType instanceof SQLServerDatabaseType)
? "SELECT TOP 1 * FROM FLOW_SNAPSHOT WHERE flow_id = ? ORDER BY version DESC"
: "SELECT * FROM FLOW_SNAPSHOT WHERE flow_id = ? ORDER BY version DESC LIMIT 1";

try {
return jdbcTemplate.queryForObject(sql, new FlowSnapshotEntityRowMapper(), flowIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@
import org.apache.nifi.registry.security.exception.SecurityProviderCreationException;
import org.apache.nifi.registry.security.exception.SecurityProviderDestructionException;
import org.apache.nifi.registry.security.identity.IdentityMapper;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.internal.database.DatabaseType;
import org.flywaydb.core.internal.database.DatabaseTypeRegister;
import org.flywaydb.database.sqlserver.SQLServerDatabaseType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -59,6 +65,7 @@ public class DatabaseUserGroupProvider implements ConfigurableUserGroupProvider
private IdentityMapper identityMapper;

private JdbcTemplate jdbcTemplate;
private DatabaseType databaseType;

@AuthorizerContext
public void setDataSource(final DataSource dataSource) {
Expand All @@ -73,6 +80,7 @@ public void setIdentityMapper(final IdentityMapper identityMapper) {
@Override
public void initialize(final UserGroupProviderInitializationContext initializationContext) throws SecurityProviderCreationException {
this.jdbcTemplate = new JdbcTemplate(dataSource);
this.databaseType = getDatabaseType(dataSource);
}

@Override
Expand Down Expand Up @@ -121,7 +129,9 @@ public void checkInheritability(final String proposedFingerprint) throws Authori
@Override
public User addUser(final User user) throws AuthorizationAccessException {
Validate.notNull(user);
final String sql = "INSERT INTO UGP_USER(IDENTIFIER, IDENTITY) VALUES (?, ?)";
final String sql = (databaseType instanceof SQLServerDatabaseType)
? "INSERT INTO UGP_USER(IDENTIFIER, [IDENTITY]) VALUES (?, ?)"
: "INSERT INTO UGP_USER(IDENTIFIER, IDENTITY) VALUES (?, ?)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I think the application shouldn't really be checking the type of database, with the exception of the Flyway stuff that sets up the DB. It is not clear to me what the different SQL here is doing, but can it be done in a general way that conforms to standard SQL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because IDENTITY is a keyword in MSSQL but not on H2, mysql, mariaDB and postgresql so for MSSQL it must be enclosed by [].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
: "INSERT INTO UGP_USER(IDENTIFIER, IDENTITY) VALUES (?, ?)";
final String sql = "INSERT INTO UGP_USER(IDENTIFIER, \"IDENTITY\") VALUES (?, ?)";

Consider escaping with double quotes instead, which is the ANSI/ISO standard, so hopefully works across all supported database types

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ChrisSamo632,
I tried your suggestion on branch nathluu/registry-mssql-b but it does not work.
Caused by: org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [INSERT INTO UGP_USER(IDENTIFIER, "IDENTITY") VALUES (?, ?)]; nested exception is java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '"IDENTITY") VALUES ('76137000-a35a-3bb2-94d7-2f5bac309242', 'CN=user2, OU=nifi')' at line 1
at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:239)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70)
at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1541)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:667)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:960)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1015)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1025)
at org.apache.nifi.registry.security.authorization.database.DatabaseUserGroupProvider.addUser(DatabaseUserGroupProvider.java:133)
at org.apache.nifi.registry.security.authorization.database.DatabaseUserGroupProvider.onConfigured(DatabaseUserGroupProvider.java:97)
at org.apache.nifi.registry.security.authorization.AuthorizerFactory.getAuthorizer(AuthorizerFactory.java:191)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ANSI Mode would need to be enabled for MySQL - see https://stackoverflow.com/questions/2889871/how-do-i-escape-reserved-words-used-as-column-names-mysql-create-table/2889884#2889884, otherwise backticks would be necessary to escape column names of any reserved/keywords we're used in MySQL (I guess IDENTITY isn't such a word for existing supported DBs but is for MSSQL)

If we were to start requiring that mode to be set (I don't know whether other database types have similar settings) then document updates would be a good idea, but arguably this becomes a potentially breaking change for some users

Renaming the column to avoid keywords is another option, but again incompatible with existing deployments

I don't know which would be the preferred approach in that case. Maybe @bbende will have a preference

jdbcTemplate.update(sql, new Object[] {user.getIdentifier(), user.getIdentity()});
return user;
}
Expand All @@ -131,7 +141,9 @@ public User updateUser(final User user) throws AuthorizationAccessException {
Validate.notNull(user);

// update the user identity
final String sql = "UPDATE UGP_USER SET IDENTITY = ? WHERE IDENTIFIER = ?";
final String sql = (databaseType instanceof SQLServerDatabaseType)
? "UPDATE UGP_USER SET [IDENTITY] = ? WHERE IDENTIFIER = ?"
: "UPDATE UGP_USER SET IDENTITY = ? WHERE IDENTIFIER = ?";
final int updated = jdbcTemplate.update(sql, user.getIdentity(), user.getIdentifier());

// if no rows were updated then there is no user with the given identifier, so return null
Expand Down Expand Up @@ -170,7 +182,9 @@ public User getUser(final String identifier) throws AuthorizationAccessException
public User getUserByIdentity(final String identity) throws AuthorizationAccessException {
Validate.notBlank(identity);

final String sql = "SELECT * FROM UGP_USER WHERE IDENTITY = ?";
final String sql = (databaseType instanceof SQLServerDatabaseType)
? "SELECT * FROM UGP_USER WHERE [IDENTITY] = ?"
: "SELECT * FROM UGP_USER WHERE IDENTITY = ?";
final DatabaseUser databaseUser = queryForObject(sql, new Object[] {identity}, new DatabaseUserRowMapper());
if (databaseUser == null) {
return null;
Expand All @@ -191,8 +205,17 @@ public UserAndGroups getUserAndGroups(final String userIdentity) throws Authoriz
if (user == null) {
groups = null;
} else {
final String userGroupSql =
"SELECT " +
final String userGroupSql = (databaseType instanceof SQLServerDatabaseType)
? "SELECT " +
"G.IDENTIFIER AS IDENTIFIER, " +
"G.[IDENTITY] AS [IDENTITY] " +
"FROM " +
"UGP_GROUP AS G, " +
"UGP_USER_GROUP AS UG " +
"WHERE " +
"G.IDENTIFIER = UG.GROUP_IDENTIFIER AND " +
"UG.USER_IDENTIFIER = ?"
: "SELECT " +
"G.IDENTIFIER AS IDENTIFIER, " +
"G.IDENTITY AS IDENTITY " +
"FROM " +
Expand All @@ -202,6 +225,7 @@ public UserAndGroups getUserAndGroups(final String userIdentity) throws Authoriz
"G.IDENTIFIER = UG.GROUP_IDENTIFIER AND " +
"UG.USER_IDENTIFIER = ?";


final Object[] args = {user.getIdentifier()};
final List<DatabaseGroup> databaseGroups = jdbcTemplate.query(userGroupSql, args, new DatabaseGroupRowMapper());

Expand Down Expand Up @@ -260,7 +284,9 @@ public Group addGroup(final Group group) throws AuthorizationAccessException {
Validate.notNull(group);

// insert to the group table...
final String groupSql = "INSERT INTO UGP_GROUP(IDENTIFIER, IDENTITY) VALUES (?, ?)";
final String groupSql = (databaseType instanceof SQLServerDatabaseType)
? "INSERT INTO UGP_GROUP(IDENTIFIER, [IDENTITY]) VALUES (?, ?)"
: "INSERT INTO UGP_GROUP(IDENTIFIER, IDENTITY) VALUES (?, ?)";
jdbcTemplate.update(groupSql, group.getIdentifier(), group.getName());

// insert to the user-group table...
Expand All @@ -274,7 +300,9 @@ public Group updateGroup(final Group group) throws AuthorizationAccessException
Validate.notNull(group);

// update the group identity
final String updateGroupSql = "UPDATE UGP_GROUP SET IDENTITY = ? WHERE IDENTIFIER = ?";
final String updateGroupSql = (databaseType instanceof SQLServerDatabaseType)
? "UPDATE UGP_GROUP SET [IDENTITY] = ? WHERE IDENTIFIER = ?"
: "UPDATE UGP_GROUP SET IDENTITY = ? WHERE IDENTIFIER = ?";
final int updated = jdbcTemplate.update(updateGroupSql, group.getName(), group.getIdentifier());

// if no rows were updated then a group does not exist for the given identifier, so return null
Expand Down Expand Up @@ -384,4 +412,13 @@ private <T> T queryForObject(final String sql, final Object[] args, final RowMap
return null;
}
}

private DatabaseType getDatabaseType(final DataSource dataSource) {
try (final Connection connection = dataSource.getConnection()) {
return DatabaseTypeRegister.getDatabaseTypeForConnection(connection);
} catch (SQLException e) {
LOGGER.error(e.getMessage(), e);
throw new FlywayException("Unable to obtain connection from Flyway DataSource", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- The NAME column has a max size of 768 because this is the largest size that MySQL allows when using a unique constraint.
CREATE TABLE BUCKET (
ID VARCHAR(50) NOT NULL,
NAME VARCHAR(767) NOT NULL,
DESCRIPTION TEXT,
CREATED DATETIME2 NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT PK__BUCKET_ID PRIMARY KEY (ID),
CONSTRAINT UNIQUE__BUCKET_NAME UNIQUE (NAME)
);

CREATE TABLE BUCKET_ITEM (
ID VARCHAR(50) NOT NULL,
NAME VARCHAR(1000) NOT NULL,
DESCRIPTION TEXT,
CREATED DATETIME2 NOT NULL DEFAULT CURRENT_TIMESTAMP,
MODIFIED DATETIME2 NOT NULL DEFAULT CURRENT_TIMESTAMP,
ITEM_TYPE VARCHAR(50) NOT NULL,
BUCKET_ID VARCHAR(50) NOT NULL,
CONSTRAINT PK__BUCKET_ITEM_ID PRIMARY KEY (ID),
CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID) ON DELETE CASCADE
);

CREATE TABLE FLOW (
ID VARCHAR(50) NOT NULL,
CONSTRAINT PK__FLOW_ID PRIMARY KEY (ID),
CONSTRAINT FK__FLOW_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID) ON DELETE CASCADE
);

CREATE TABLE FLOW_SNAPSHOT (
FLOW_ID VARCHAR(50) NOT NULL,
VERSION INT NOT NULL,
CREATED DATETIME2 NOT NULL DEFAULT CURRENT_TIMESTAMP,
CREATED_BY VARCHAR(1000) NOT NULL,
COMMENTS TEXT,
CONSTRAINT PK__FLOW_SNAPSHOT_FLOW_ID_AND_VERSION PRIMARY KEY (FLOW_ID, VERSION),
CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID) ON DELETE CASCADE
);

CREATE TABLE SIGNING_KEY (
ID VARCHAR(50) NOT NULL,
TENANT_IDENTITY VARCHAR(767) NOT NULL,
KEY_VALUE VARCHAR(50) NOT NULL,
CONSTRAINT PK__SIGNING_KEY_ID PRIMARY KEY (ID),
CONSTRAINT UNIQUE__SIGNING_KEY_TENANT_IDENTITY UNIQUE (TENANT_IDENTITY)
);
Loading