Skip to content
This repository has been archived by the owner on Jul 9, 2021. It is now read-only.

Changes for Postgresql Copy Export: #68

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,46 @@ protected void configureMapper(Job job, String tableName,
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
}
/* list of chars that cannot be passed via jobconf */
final static String badXmlString = "\u0000\u0001\u0002\u0003\u0004\u0005" +
"\u0006\u0007\u0008\u000B\u000C\u000E\u000F\u0010\u0011\u0012" +
"\u0013\u0014\u0015\u0016\u0017\u0018\u0019\u001A\u001B\u001C" +
"\u001D\u001E\u001F\uFFFE\uFFFF";

/* true if the char is ok to pass via Configuration */
public static boolean validXml(char s){
return (badXmlString.indexOf(s)<0);
}


protected void propagateOptionsToJob(Job job) {
super.propagateOptionsToJob(job);
SqoopOptions opts = context.getOptions();
Configuration conf = job.getConfiguration();
if (opts.getNullStringValue() != null) {
conf.set("postgresql.null.string", opts.getNullStringValue());

/* empty string needs to be passed as a flag */
if ("".equals(opts.getNullStringValue())) {
conf.set("postgresql.null.emptystring","true");
}
setDelimiter("postgresql.input.field.delim",
opts.getInputFieldDelim(), conf);

/* valid delimiters may not be valid xml chars, so the hadoop conf will fail.
* but we still want to support them so we base64 encode it in that case
* */
char delim= opts.getInputFieldDelim();
String delimString=Character.toString(delim);
if(validXml(delim)){
setDelimiter("postgresql.input.field.delim",delim,conf);
}else{
conf.set("postgresql.input.field.delim.base64",
java.util.Base64.getEncoder().encodeToString(delimString.getBytes()));
}

/* use the --batch switch to enable line buffering */
if (opts.isBatchMode()){
conf.set("postgresql.export.batchmode","true");
}

/* todo: there may still be some case where user wants an invalid xml char for record delim */
setDelimiter("postgresql.input.record.delim",
opts.getInputRecordDelim(), conf);
setDelimiter("postgresql.input.enclosedby",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.postgresql.copy.CopyIn;
//import org.apache.sqoop.util.LineBuffer;


/**
Expand All @@ -52,6 +53,10 @@ public class PostgreSQLCopyExportMapper
public static final Log LOG =
LogFactory.getLog(PostgreSQLCopyExportMapper.class.getName());

private boolean bufferMode=false; /* whether or not to use the line buffer */
private LineBuffer lineBuffer; /* batch up the lines before sending to copy */
private boolean isRaw=false; /* if isRaw then we won't interprete escapes */

private Configuration conf;
private DBConfiguration dbConf;
private Connection conn = null;
Expand All @@ -61,8 +66,49 @@ public class PostgreSQLCopyExportMapper
new DelimiterSet(',', '\n',
DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR, false);

/* Buffer up lines of text until buffer is full.
* This helper class is just to support a test to
* see if the copyIn mapper for postgres is unbuffered
*
* */
static class LineBuffer{
public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName());

private StringBuilder sb=new StringBuilder();
private static int MAXLEN=100000000;
//private static int MAXLEN=50000000;

public void clear(){ sb.setLength(0); }
public int length(){return sb.length();}
public boolean append(String s){
// LOG.debug(s);
if (sb.length()+s.length()+1>MAXLEN){return false;}
sb.append(s);
sb.append("\n");
return true;
}
public String toString(){return sb.toString();}
public byte[] getBytes() {
try {
//LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8")));
return sb.toString().getBytes("UTF-8");
}catch(Exception e){e.printStackTrace();return null;}
}
}

public PostgreSQLCopyExportMapper() {
}
/* Text mode normally interprets escape sequences. Optionally
* turn that off by escaping the escapes
* */
public String fixEscapes(String s){
if (isRaw){
return s.replace("\\","\\\\");
} else {
return s;
}
}


@Override
protected void setup(Context context)
Expand All @@ -71,6 +117,7 @@ protected void setup(Context context)
super.setup(context);
conf = context.getConfiguration();
dbConf = new DBConfiguration(conf);
lineBuffer=new LineBuffer();
CopyManager cm = null;
try {
conn = dbConf.getConnection();
Expand All @@ -83,69 +130,146 @@ protected void setup(Context context)
throw new IOException(ex);
}
try {
StringBuilder sql = new StringBuilder();
sql.append("COPY ");
sql.append(dbConf.getOutputTableName());
sql.append(" FROM STDIN WITH (");
sql.append(" ENCODING 'UTF-8' ");
sql.append(", FORMAT csv ");
sql.append(", DELIMITER ");
sql.append("'");
sql.append(conf.get("postgresql.input.field.delim", ","));
sql.append("'");
sql.append(", QUOTE ");
sql.append("'");
sql.append(conf.get("postgresql.input.enclosedby", "\""));
sql.append("'");
sql.append(", ESCAPE ");
sql.append("'");
sql.append(conf.get("postgresql.input.escapedby", "\""));
sql.append("'");
if (conf.get("postgresql.null.string") != null) {
sql.append(", NULL ");
sql.append("'");
sql.append(conf.get("postgresql.null.string"));
sql.append("'");
}
sql.append(")");
/* Set if buffering mode is requested */
this.bufferMode=("true".equals(conf.get("postgresql.export.batchmode")));

/* isRaw means escapes are NOT to be interpreted */
this.isRaw=("true".equals(conf.get("postgresql.input.israw")));

/* add support for delims which are not valid xml. We have base64 encoded them */
String delimBase64=conf.get("postgresql.input.field.delim.base64");
String delim=null;
if (delimBase64!=null){
delim=new String(java.util.Base64.getDecoder().decode(delimBase64));
} else {
delim=conf.get("postgresql.input.field.delim",",");
}

/* Some postgres instances out there still using version 8.x */
StringBuilder sql = new StringBuilder();
String ver=conf.get("postgresql.targetdb.ver", "9");
if (ver.equals("8")){
sql.append("COPY ");
sql.append(dbConf.getOutputTableName());
sql.append(" FROM STDIN WITH ");
sql.append(" DELIMITER ");
sql.append("'");
sql.append(delim);
sql.append("'");
if (! "true".equals(conf.get("postgresql.format.text"))){
sql.append(" CSV ");
sql.append(" QUOTE ");
sql.append("'");
sql.append(conf.get("postgresql.input.enclosedby", "\""));
sql.append("'");
sql.append(" ESCAPE ");
sql.append("'");
sql.append(conf.get("postgresql.input.escapedby", "\""));
sql.append("'");
}
/* Hadoop config does not permit empty string so we use special switch to designate that */
if (conf.get("postgresql.null.emptystring")!=null){
sql.append(" NULL ''");
}else
if (conf.get("postgresql.null.string") != null) {
sql.append(" NULL ");
sql.append("'");
sql.append(conf.get("postgresql.null.string"));
sql.append("'");
}
} else { /* intended for version 9.x This has not been fixed for buffering */
sql.append("COPY ");
sql.append(dbConf.getOutputTableName());
sql.append(" FROM STDIN WITH (");
sql.append(" ENCODING 'UTF-8' ");
sql.append(", FORMAT csv ");
sql.append(", DELIMITER ");
sql.append("'");
sql.append(delim);
sql.append("'");
sql.append(", QUOTE ");
sql.append("'");
sql.append(conf.get("postgresql.input.enclosedby", "\""));
sql.append("'");
sql.append(", ESCAPE ");
sql.append("'");
sql.append(conf.get("postgresql.input.escapedby", "\""));
sql.append("'");
/* Hadoop config does not permit empty string so we use special switch to designate that */
if (conf.get("postgresql.null.emptystring")!=null){
sql.append(", NULL ''");
}else {
if (conf.get("postgresql.null.string") != null) {
sql.append(", NULL ");
sql.append("'");
sql.append(conf.get("postgresql.null.string"));
sql.append("'");
}
}
sql.append(")");
}
LOG.debug("Starting export with copy: " + sql);
copyin = cm.copyIn(sql.toString());
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex);
close();
throw new IOException(ex);
}
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex);
close();
throw new IOException(ex);
}
}

@Override
public void map(LongWritable key, Writable value, Context context)
throws IOException, InterruptedException {
line.setLength(0);
line.append(value.toString());
if (value instanceof Text) {
line.append(System.getProperty("line.separator"));
}
try {
byte[]data = line.toString().getBytes("UTF-8");
copyin.writeToCopy(data, 0, data.length);
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to execute copy", ex);
close();
throw new IOException(ex);
}
if (bufferMode){
if (lineBuffer.append(fixEscapes(value.toString()))){
return;
}
/* else buffer is full lets write out */
try {
byte[]data=lineBuffer.getBytes();
copyin.writeToCopy(data,0,data.length);
lineBuffer.clear();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to execute copy", ex);
close();
throw new IOException(ex);
}

/* now write the new line that could not be appended because the buffer was full */
lineBuffer.append(fixEscapes(value.toString()));
} else { /* original unbuffered method */
line.setLength(0);
line.append(value.toString());
if (value instanceof Text) {
line.append(System.getProperty("line.separator"));
}
try {
byte[]data = line.toString().getBytes("UTF-8");
copyin.writeToCopy(data, 0, data.length);
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to execute copy", ex);
close();
throw new IOException(ex);
}
}
}

@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
try {
copyin.endCopy();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to finalize copy", ex);
throw new IOException(ex);
try { /* write out the final fragment in the buffer */
if (bufferMode){
byte[]data=lineBuffer.getBytes();
copyin.writeToCopy(data,0,data.length);
lineBuffer.clear();
}
copyin.endCopy();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to finalize copy", ex);
throw new IOException(ex);
}
close();
}
close();
}

void close() throws IOException {
if (conn != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,27 @@ public void testExportDirect() throws IOException, SQLException {
"3,Fred,2009-01-23,15,marketing",
});

String[] extra = new String[] {"--direct","--batch"};

runExport(getArgv(true, extra));

assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
}
@Test
public void testExportDirectBatch() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing",
});

String[] extra = new String[] {"--direct"};

runExport(getArgv(true, extra));

assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
}


@Test
public void testExportCustomSchema() throws IOException, SQLException {
createTestFile("inputFile", new String[] {
Expand Down