Skip to content

Commit

Permalink
Fixed #863 PostgreSQL COPY ... FROM STDIN statement support
Browse files Browse the repository at this point in the history
  • Loading branch information
Axel Fontaine committed Oct 14, 2014
1 parent 44217c6 commit 7b9b2d1
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 13 deletions.
10 changes: 5 additions & 5 deletions flyway-core/pom.xml
Expand Up @@ -65,6 +65,11 @@
<artifactId>android</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down Expand Up @@ -130,11 +135,6 @@
<artifactId>mariadb-java-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.sourceforge.jtds</groupId>
<artifactId>jtds</artifactId>
Expand Down
Expand Up @@ -17,6 +17,7 @@

import org.flywaydb.core.api.FlywayException;

import java.sql.Connection;
import java.sql.SQLException;

/**
Expand Down Expand Up @@ -166,4 +167,14 @@ public String quote(String... identifiers) {
* @return {@code true} if this database use a catalog to represent a schema. {@code false} if a schema is simply a schema.
*/
public abstract boolean catalogIsSchema();

/**
* Executes this COPY statement (PostgreSQL only).
*
* @param connection The connection to use.
* @param sql The statement to execute.
*/
public void executePgCopy(Connection connection, String sql) throws SQLException {
// Do nothing by default
}
}
Expand Up @@ -86,7 +86,11 @@ public void execute(final JdbcTemplate jdbcTemplate) {
LOG.debug("Executing SQL: " + sql);

try {
jdbcTemplate.executeStatement(sql);
if (sqlStatement.isPgCopy()) {
dbSupport.executePgCopy(jdbcTemplate.getConnection(), sql);
} else {
jdbcTemplate.executeStatement(sql);
}
} catch (SQLException e) {
throw new FlywaySqlScriptException(sqlStatement.getLineNumber(), sql, e);
}
Expand Down
Expand Up @@ -29,15 +29,24 @@ public class SqlStatement {
*/
private String sql;

/**
* Whether this is a PostgreSQL COPY FROM STDIN statement.
* <p/>
* Note: This may have to be generalized if additional special cases appear.
*/
private boolean pgCopy;

/**
* Creates a new sql statement.
*
* @param lineNumber The original line number where the statement was located in the script it came from.
* @param sql The sql to send to the database.
* @param pgCopy Whether this is a PostgreSQL COPY FROM STDIN statement.
*/
public SqlStatement(int lineNumber, String sql) {
public SqlStatement(int lineNumber, String sql, boolean pgCopy) {
this.lineNumber = lineNumber;
this.sql = sql;
this.pgCopy = pgCopy;
}

/**
Expand All @@ -53,4 +62,11 @@ public int getLineNumber() {
public String getSql() {
return sql;
}

/**
* @return Whether this is a PostgreSQL COPY FROM STDIN statement.
*/
public boolean isPgCopy() {
return pgCopy;
}
}
Expand Up @@ -112,7 +112,8 @@ public boolean isTerminated() {
* @return The assembled statement, with the delimiter stripped off.
*/
public SqlStatement getSqlStatement() {
return new SqlStatement(lineNumber, statement.toString());
String sql = statement.toString();
return new SqlStatement(lineNumber, sql, isPgCopy());
}

/**
Expand All @@ -126,6 +127,15 @@ public Delimiter extractNewDelimiterFromLine(String line) {
return null;
}

/**
* Checks whether this statement is a COPY statement for PostgreSQL.
*
* @return {@code true} if it is, {@code false} if not.
*/
public boolean isPgCopy() {
return false;
}

/**
* Checks whether this line is in fact a directive disguised as a comment.
*
Expand Down Expand Up @@ -190,7 +200,7 @@ protected String simplifyLine(String line) {
* Checks whether this line in the sql script indicates that the statement delimiter will be different from the
* current one. Useful for database-specific stored procedures and block constructs.
*
* @param line The line to analyse.
* @param line The simplified line to analyse.
* @param delimiter The current delimiter.
* @return The new delimiter to use (can be the same as the current one) or {@code null} for no delimiter.
*/
Expand Down Expand Up @@ -260,7 +270,7 @@ protected String computeAlternateCloseQuote(String openQuote) {
*
* @param line The line that was just added to the statement.
* @return {@code true} if the statement is unfinished and the end is currently in the middle of a multi-line string
* literal. {@code false} if not.
* literal. {@code false} if not.
*/
protected boolean endsWithOpenMultilineStringLiteral(String line) {
//Ignore all special characters that naturally occur in SQL, but are not opening or closing string literals
Expand Down Expand Up @@ -298,7 +308,7 @@ protected boolean endsWithOpenMultilineStringLiteral(String line) {
*
* @param tokens The tokens to analyse.
* @return The list of potentially delimiting string literals token types per token. Tokens that do not have any
* impact on string delimiting are discarded.
* impact on string delimiting are discarded.
*/
private List<TokenType> extractStringLiteralDelimitingTokens(String[] tokens) {
List<TokenType> delimitingTokens = new ArrayList<TokenType>();
Expand Down Expand Up @@ -363,6 +373,7 @@ private List<TokenType> extractStringLiteralDelimitingTokens(String[] tokens) {

/**
* Removes escaped quotes from this token.
*
* @param token The token to parse.
* @return The cleaned token.
*/
Expand All @@ -373,6 +384,7 @@ protected String removeEscapedQuotes(String token) {
/**
* Removes charset casting that prefixes string literals.
* Must be implemented in dialect specific sub classes.
*
* @param token The token to parse.
* @return The cleaned token.
*/
Expand Down
Expand Up @@ -20,7 +20,11 @@
import org.flywaydb.core.internal.dbsupport.Schema;
import org.flywaydb.core.internal.dbsupport.SqlStatementBuilder;
import org.flywaydb.core.internal.util.StringUtils;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;

import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Types;
Expand Down Expand Up @@ -96,4 +100,18 @@ public Schema getSchema(String name) {
public boolean catalogIsSchema() {
return false;
}

@Override
public void executePgCopy(Connection connection, String sql) throws SQLException {
int split = sql.indexOf(";");
String statement = sql.substring(0, split);
String data = sql.substring(split + 1).trim();

CopyManager copyManager = new CopyManager((BaseConnection) connection.unwrap(Connection.class));
try {
copyManager.copyIn(statement, new StringReader(data));
} catch (IOException e) {
throw new SQLException("Unable to execute COPY operation", e);
}
}
}
Expand Up @@ -15,6 +15,7 @@
*/
package org.flywaydb.core.internal.dbsupport.postgresql;

import org.flywaydb.core.internal.dbsupport.Delimiter;
import org.flywaydb.core.internal.dbsupport.SqlStatementBuilder;

import java.util.regex.Matcher;
Expand All @@ -24,12 +25,27 @@
* SqlStatementBuilder supporting PostgreSQL specific syntax.
*/
public class PostgreSQLSqlStatementBuilder extends SqlStatementBuilder {
/**
* Delimiter of COPY statements.
*/
private static final Delimiter COPY_DELIMITER = new Delimiter("\\.", true);

/**
* Matches $$, $BODY$, $xyz123$, ...
*/
/*private -> for testing*/
static final String DOLLAR_QUOTE_REGEX = "(\\$[A-Za-z0-9_]*\\$).*";

/**
* Are we at the beginning of the statement.
*/
private boolean firstLine = true;

/**
* Whether this statement is a COPY statement.
*/
private boolean pgCopy;

@Override
protected String extractAlternateOpenQuote(String token) {
Matcher matcher = Pattern.compile(DOLLAR_QUOTE_REGEX).matcher(token);
Expand All @@ -38,4 +54,22 @@ protected String extractAlternateOpenQuote(String token) {
}
return null;
}

@Override
protected Delimiter changeDelimiterIfNecessary(String line, Delimiter delimiter) {
if (firstLine) {
firstLine = false;
if (line.matches("COPY|COPY\\s.*")) {
pgCopy = true;
return COPY_DELIMITER;
}
}

return pgCopy ? COPY_DELIMITER : delimiter;
}

@Override
public boolean isPgCopy() {
return pgCopy;
}
}
Expand Up @@ -307,6 +307,16 @@ public void multiLine() throws Exception {
assertEquals(1, jdbcTemplate.queryForInt("select count(*) from address"));
}

/**
* Tests support for COPY FROM STDIN statements generated by pg_dump..
*/
@Test
public void copy() throws Exception {
flyway.setLocations("migration/dbsupport/postgresql/sql/copy");
flyway.migrate();
assertEquals(3, jdbcTemplate.queryForInt("select count(*) from copy_test"));
}

/**
* Tests that the lock on SCHEMA_VERSION is not blocking SQL commands in migrations. This test won't fail if there's
* a too restrictive lock - it would just hang endlessly.
Expand Down
@@ -0,0 +1,64 @@
--
-- Copyright 2010-2014 Axel Fontaine
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

--
-- PostgreSQL database dump
--

SET statement_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SET check_function_bodies = false;
SET client_min_messages = warning;

SET search_path = public, pg_catalog;

SET default_tablespace = '';

SET default_with_oids = false;

--
-- Name: copy_test; Type: TABLE; Schema: public; Owner: arnd; Tablespace:
--

CREATE TABLE copy_test (
c1 integer NOT NULL,
c2 character varying,
c3 double precision
);

--
-- Name: copy_test_c1_seq; Type: SEQUENCE; Schema: public; Owner: arnd
--

CREATE SEQUENCE copy_test_c1_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;

--
-- Name: c1; Type: DEFAULT; Schema: public; Owner: arnd
--

ALTER TABLE ONLY copy_test ALTER COLUMN c1 SET DEFAULT nextval('copy_test_c1_seq'::regclass);


--
-- PostgreSQL database dump complete
--

@@ -0,0 +1,50 @@
--
-- Copyright 2010-2014 Axel Fontaine
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

--
-- PostgreSQL database dump
--

SET statement_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SET check_function_bodies = false;
SET client_min_messages = warning;

SET search_path = public, pg_catalog;

--
-- Data for Name: copy_test; Type: TABLE DATA; Schema: public; Owner: arnd
--

COPY copy_test (c1, c2, c3) FROM stdin;
1 utf8: ümlaute: äüß NaN
2 \N 123
3 text 123.234444444444449
\.


--
-- Name: copy_test_c1_seq; Type: SEQUENCE SET; Schema: public; Owner: arnd
--

SELECT pg_catalog.setval('copy_test_c1_seq', 3, true);


--
-- PostgreSQL database dump complete
--

0 comments on commit 7b9b2d1

Please sign in to comment.