Skip to content
This repository has been archived by the owner on Nov 20, 2019. It is now read-only.

Commit

Permalink
[Postgresql Connector] Support union, intersect, except and fix SQLBu…
Browse files Browse the repository at this point in the history
…ilder unsupported operation case (#809)

* support union, intersect, except and fix SQLBuilder unsupported operation case

* simple test for union, intersect, except

* change DriverStreamsAPIIT timeout to 4 s
  • Loading branch information
pmadrigal committed Jan 25, 2017
1 parent b92dffc commit b45d568
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
Expand Up @@ -63,22 +63,19 @@ class PostgresqlQueryProcessor(postgresRelation: PostgresqlXDRelation,
val limit: Option[Int] = logicalPlan.collectFirst { case Limit(Literal(num: Int, _), _) => num }

try {
new SQLBuilder(logicalPlan).toSQL.map { sqlQuery =>
if (limit.exists(_ == 0)) Array.empty[InternalRow]
else {

Try(executeQuery(sqlQuery)).getOrElse{
val sqlWithLimit = s"$sqlText LIMIT ${limit.getOrElse(DefaultLimit)}"
executeQuery(sqlWithLimit)
}

if (limit.exists(_ == 0)) Some(Array.empty[InternalRow])
else {
lazy val sqlWithLimit = s"$sqlText LIMIT ${limit.getOrElse(DefaultLimit)}"
lazy val executeDirectQuery = Some(executeQuery(sqlWithLimit))
new SQLBuilder(logicalPlan).toSQL.fold(executeDirectQuery){ sqlQuery =>
Try(Some(executeQuery(sqlQuery))).getOrElse{executeDirectQuery}
}
}
} catch {
case exc: Exception => log.warn(s"Exception executing the native query $logicalPlan", exc); None
}
}
//spark code
//spark code
private def getValue(idx: Int, rs: ResultSet, schema: StructType) : Any = {
val metadata = schema.fields(idx).metadata
val rsIdx= idx+1
Expand Down
Expand Up @@ -97,7 +97,7 @@ class PostgresqlXDRelation( url: String,
case _ => false
}
case bn: BinaryNode => bn match {
case _: Join => true
case Join(_, _,_, _) | Union(_, _) | Intersect(_, _) | Except(_, _) => true
case _ => false
}
case unsupportedLogicalPlan =>logDebug(s"LogicalPlan $unsupportedLogicalPlan cannot be executed natively"); false
Expand Down
Expand Up @@ -77,4 +77,31 @@ class PostgresqlJoinIT extends PostgresqlWithSharedContext {
result should have length 20
}

it should s"support a UNION natively" in {
assumeEnvironmentIsUpAndRunning

val df = sql(s"SELECT id FROM $postgresqlSchema.$Table UNION ALL SELECT id FROM $postgresqlSchema.$aggregationTable")
val result = df.collect(ExecutionType.Native)

result should have length 30
}

it should s"support a INTERSECT natively" in {
assumeEnvironmentIsUpAndRunning

val df = sql(s"SELECT id FROM $postgresqlSchema.$Table INTERSECT SELECT id FROM $postgresqlSchema.$aggregationTable")
val result = df.collect(ExecutionType.Native)

result should have length 10
}

it should s"support a EXCEPT natively" in {
assumeEnvironmentIsUpAndRunning

val df = sql(s"SELECT id FROM $postgresqlSchema.$Table EXCEPT SELECT id FROM $postgresqlSchema.$aggregationTable")
val result = df.collect(ExecutionType.Native)

result should have length 0
}

}
Expand Up @@ -55,7 +55,7 @@ class DriverStreamsAPIIT extends EndToEndTest with ScalaFutures {
.requestNext(Row(2, "Fuse"))
.request(1).expectComplete()

}(PatienceConfig(timeout = 2 seconds))
}(PatienceConfig(timeout = 4 seconds))

}
}
Expand Down

0 comments on commit b45d568

Please sign in to comment.