Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use metadata to set column comments and encoding #178

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,14 @@ data should the <tt>COPY</tt> fail.</p>
Redshift cluster and/or don't have requirements to keep the table availability high.</p>
</td>
</tr>
<tr>
<td><tt>description</tt></td>
<td>No</td>
<td>No default</td>
<td>
<p>A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools.
See also the <tt>description</tt> metadata to set descriptions on individual columns.
</tr>
<tr>
<td><tt>preactions</tt></td>
<td>No</td>
Expand Down Expand Up @@ -414,6 +422,14 @@ df.write
.save()
```

### Configuring column encoding

When creating a table, this library can be configured to use a specific compression encoding on individual columns. You can use the `encoding` column metadata field to specify a compression encoding for each column (see [Amazon docs](http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html) for available encodings).

### Setting descriptions on columns

Redshift allows columns to have descriptions attached that should show up in most query tools (using the `COMMENT` command). You can set the `description` column metadata field to specify a description for individual columns.

## Transactional Guarantees

This section describes `spark-redshift`'s transactional guarantees.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,90 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase {
}
}

test("configuring compression on columns") {
val tableName = s"configuring_compression_on_columns_$randomSuffix"
try {
val metadata = new MetadataBuilder().putString("encoding", "LZO").build()
val schema = StructType(
StructField("x", StringType, metadata = metadata) :: Nil)
sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 128))), schema).write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.mode(SaveMode.ErrorIfExists)
.save()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.load()
checkAnswer(loadedDf, Seq(Row("a" * 128)))
val encodingDF = sqlContext.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable",
s"""(SELECT "column", lower(encoding) FROM pg_table_def WHERE tablename='$tableName')""")
.load()
checkAnswer(encodingDF, Seq(Row("x", "lzo")))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("configuring comments on columns") {
val tableName = s"configuring_comments_on_columns_$randomSuffix"
try {
val metadata = new MetadataBuilder().putString("description", "Hello Column").build()
val schema = StructType(
StructField("x", StringType, metadata = metadata) :: Nil)
sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 128))), schema).write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("description", "Hello Table")
.option("tempdir", tempDir)
.mode(SaveMode.ErrorIfExists)
.save()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.load()
checkAnswer(loadedDf, Seq(Row("a" * 128)))
val tableDF = sqlContext.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"(SELECT pg_catalog.obj_description('$tableName'::regclass))")
.load()
checkAnswer(tableDF, Seq(Row("Hello Table")))
val commentQuery =
s"""
|(SELECT c.column_name, pgd.description
|FROM pg_catalog.pg_statio_all_tables st
|INNER JOIN pg_catalog.pg_description pgd
| ON (pgd.objoid=st.relid)
|INNER JOIN information_schema.columns c
| ON (pgd.objsubid=c.ordinal_position AND c.table_name=st.relname)
|WHERE c.table_name='$tableName')
""".stripMargin
val columnDF = sqlContext.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", commentQuery)
.load()
checkAnswer(columnDF, Seq(Row("x", "Hello Column")))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("informative error message when saving a table with string that is longer than max length") {
val tableName = s"error_message_when_string_too_long_$randomSuffix"
try {
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/com/databricks/spark/redshift/Parameters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ private[redshift] object Parameters {
*/
def extraCopyOptions: String = parameters.get("extracopyoptions").getOrElse("")

/**
* Description of the table, set using the SQL COMMENT command.
*/
def description: Option[String] = parameters.get("description")

/**
* List of semi-colon separated SQL statements to run before write operations.
* This can be useful for running DELETE operations to clean up data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,12 @@ private[redshift] class JDBCWrapper {
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
}
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s""", "${name.replace("\"", "\\\"")}" $typ $nullable""".trim)
val encoding = if (field.metadata.contains("encoding")) {
s"ENCODE ${field.metadata.getString("encoding")}"
} else {
""
}
sb.append(s""", "${name.replace("\"", "\\\"")}" $typ $nullable $encoding""".trim)
}}
if (sb.length < 2) "" else sb.substring(2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ private[redshift] class RedshiftWriter(
}
}

/**
* Generate COMMENT SQL statements for the table and columns.
*/
private[redshift] def commentActions(tableComment: Option[String], schema: StructType):
List[String] = {
tableComment.toList.map(desc => s"COMMENT ON TABLE %s IS '${desc.replace("'", "''")}'") ++
schema.fields
.withFilter(f => f.metadata.contains("description"))
.map(f => s"""COMMENT ON COLUMN %s."${f.name.replace("\"", "\\\"")}""""
+ s" IS '${f.metadata.getString("description").replace("'", "''")}'")
}

/**
* Perform the Redshift load, including deletion of existing data in the case of an overwrite,
* and creating the table if it doesn't already exist.
Expand All @@ -161,8 +173,10 @@ private[redshift] class RedshiftWriter(
log.info(createStatement)
jdbcWrapper.executeInterruptibly(conn.prepareStatement(createStatement))

val preActions = commentActions(params.description, data.schema) ++ params.preActions

// Execute preActions
params.preActions.foreach { action =>
preActions.foreach { action =>
val actionSql = if (action.contains("%s")) action.format(params.table.get) else action
log.info("Executing preAction: " + actionSql)
jdbcWrapper.executeInterruptibly(conn.prepareStatement(actionSql))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,41 @@ class RedshiftSourceSuite
assert(createTableCommand === expectedCreateTableCommand)
}

test("configuring encoding on columns") {
val lzoMetadata = new MetadataBuilder().putString("encoding", "LZO").build()
val runlengthMetadata = new MetadataBuilder().putString("encoding", "RUNLENGTH").build()
val schema = StructType(
StructField("lzo_str", StringType, metadata = lzoMetadata) ::
StructField("runlength_str", StringType, metadata = runlengthMetadata) ::
StructField("default_str", StringType) ::
Nil)
val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema)
val createTableCommand =
DefaultRedshiftWriter.createTableSql(df, MergedParameters.apply(defaultParams)).trim
val expectedCreateTableCommand =
"""CREATE TABLE IF NOT EXISTS "PUBLIC"."test_table" ("lzo_str" TEXT ENCODE LZO,""" +
""" "runlength_str" TEXT ENCODE RUNLENGTH, "default_str" TEXT)"""
assert(createTableCommand === expectedCreateTableCommand)
}

test("configuring descriptions on columns") {
val descriptionMetadata1 = new MetadataBuilder().putString("description", "Test1").build()
val descriptionMetadata2 = new MetadataBuilder().putString("description", "Test'2").build()
val schema = StructType(
StructField("first_str", StringType, metadata = descriptionMetadata1) ::
StructField("second_str", StringType, metadata = descriptionMetadata2) ::
StructField("default_str", StringType) ::
Nil)
val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema)
val commentCommands =
DefaultRedshiftWriter.commentActions(Some("Test"), schema)
val expectedCommentCommands = List(
"COMMENT ON TABLE %s IS 'Test'",
"COMMENT ON COLUMN %s.\"first_str\" IS 'Test1'",
"COMMENT ON COLUMN %s.\"second_str\" IS 'Test''2'")
assert(commentCommands === expectedCommentCommands)
}

test("Respect SaveMode.ErrorIfExists when table exists") {
val mockRedshift = new MockRedshift(
defaultParams("url"),
Expand Down