Skip to content

Commit

Permalink
Add in clauses, with streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
doolse committed Jun 15, 2019
1 parent b7920ef commit d873ba5
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 93 deletions.
3 changes: 2 additions & 1 deletion .gitignore
@@ -1,2 +1,3 @@
.idea/
target/
target/
application.conf
5 changes: 4 additions & 1 deletion core/src/main/scala/io/doolse/simpledba/Columns.scala
Expand Up @@ -5,7 +5,7 @@ import cats.instances.function._
import cats.syntax.compose._
import shapeless.labelled.FieldType
import shapeless.ops.hlist.{Length, LiftAll, Prepend, Split, ToList, ZipWithKeys}
import shapeless.ops.record.{Keys, SelectAll}
import shapeless.ops.record.{Keys, SelectAll, Selector}
import shapeless.{::, HList, HNil, Nat, Witness}

import scala.annotation.tailrec
Expand Down Expand Up @@ -160,6 +160,9 @@ case class Columns[C[_], T, R <: HList](columns: Seq[(String, C[_])], iso: Iso[T
}

def toSubset: ColumnSubset[C, T, R] = ColumnSubset(columns, iso.to)

def singleColumn[A](col: Witness)(implicit selector: Selector.Aux[R, col.T, A], ev: col.T <:< Symbol): (String, C[A]) =
columns.find(_._1 == col.value.name).get.asInstanceOf[(String, C[A])]
}

object Columns {
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/io/doolse/simpledba/Effects.scala
Expand Up @@ -30,6 +30,7 @@ trait Streamable[S[_], F[_]] {
}
SM.flatMap(s)(loop)
}
def maxMapped[A, B](n: Int, s: S[A])(f: Seq[A] => B): S[B]

def toVector[A](s: S[A]): F[Vector[A]]
def last[A](s: S[A]): S[Option[A]]
Expand Down
2 changes: 2 additions & 0 deletions fs2/src/main/scala/io/doolse/simpledba/fs2/package.scala
Expand Up @@ -48,5 +48,7 @@ package object fs2 {

override def bracket[A](acquire: F[A])(release: A => F[Unit]): Stream[F, A] =
Stream.bracket(acquire)(release)

override def maxMapped[A, B](n: Int, s: Stream[F, A])(f: Seq[A] => B): Stream[F, B] = s.chunkN(n).map(c => f(c.toVector))
}
}
204 changes: 135 additions & 69 deletions jdbc/src/main/scala/io/doolse/simpledba/jdbc/JDBCQueries.scala
Expand Up @@ -2,19 +2,19 @@ package io.doolse.simpledba.jdbc

import java.sql.{Connection, PreparedStatement}

import cats.data.{Kleisli, State}
import cats.data.{Kleisli, NonEmptyList, State}
import io.doolse.simpledba._
import io.doolse.simpledba.jdbc.BinOp.BinOp
import io.doolse.simpledba.jdbc.JDBCQueries._
import io.doolse.simpledba.jdbc.JDBCTable.TableRecord
import shapeless.labelled._
import shapeless.ops.hlist.{Length, Prepend, Split, Take}
import shapeless.ops.record.{Keys, ToMap}
import shapeless.ops.record.{Keys, Selector, ToMap}
import shapeless.{::, DepFn2, HList, HNil, LabelledGeneric, Nat, Witness, _0}

import scala.annotation.tailrec

case class JDBCMapper[C[_] <: JDBCColumn[_]](dialect: SQLDialect) {
case class JDBCMapper[C[A] <: JDBCColumn[A]](dialect: SQLDialect) {

def record[R <: HList](implicit cr: ColumnRecord[C, Unit, R]): ColumnRecord[C, Unit, R] = cr

Expand All @@ -23,7 +23,7 @@ case class JDBCMapper[C[_] <: JDBCColumn[_]](dialect: SQLDialect) {

def mapped[T] = new RelationBuilder[T, C]

class RelationBuilder[T, C[_] <: JDBCColumn[_]] {
class RelationBuilder[T, C[A] <: JDBCColumn[A]] {
def embedded[GR <: HList, R <: HList](
implicit
gen: LabelledGeneric.Aux[T, GR],
Expand Down Expand Up @@ -65,7 +65,7 @@ object BindUpdate extends ColumnCompare[JDBCColumn, String, BoundColumn] {
}
}

case class JDBCQueries[C[_] <: JDBCColumn[_], S[_], F[_]](E: JDBCEffect[S, F],
case class JDBCQueries[C[A] <: JDBCColumn[A], S[_], F[_]](E: JDBCEffect[S, F],
dialect: SQLDialect) {
val S = E.S
val SM = S.SM
Expand Down Expand Up @@ -146,15 +146,15 @@ case class JDBCQueries[C[_] <: JDBCColumn[_], S[_], F[_]](E: JDBCEffect[S, F],
E,
ColumnRecord.empty,
identity,
_ => (Seq.empty, Seq.empty),
_ => S.emit((Seq.empty, Seq.empty)),
Seq.empty
)

def deleteFrom(table: JDBCTable[C]) =
new DeleteBuilder[S, C, F, table.DataRec, HNil](E.S,
new DeleteBuilder[S, F, C, table.DataRec, HNil](E.S,
table,
dialect,
_ => (Seq.empty, Seq.empty))
_ => S.emit((Seq.empty, Seq.empty)))

def query(table: JDBCTable[C]) =
new QueryBuilder[S, F, C, table.DataRec, HNil, table.DataRec, table.Data](
Expand All @@ -163,7 +163,7 @@ case class JDBCQueries[C[_] <: JDBCColumn[_], S[_], F[_]](E: JDBCEffect[S, F],
E,
toProjection(table.allColumns),
table.allColumns.iso.from,
_ => (Seq.empty, Seq.empty),
_ => S.emit((Seq.empty, Seq.empty)),
Seq.empty
)

Expand Down Expand Up @@ -213,49 +213,135 @@ object JDBCQueries {
}
}

case class DeleteBuilder[S[_], C[_] <: JDBCColumn[_], F[_], DataRec <: HList, InRec <: HList](
S: Streamable[S, F],
table: TableRecord[C, DataRec],
dialect: SQLDialect,
toWhere: InRec => (Seq[JDBCWhereClause], Seq[BoundValue])
) {
def where[W2 <: HList, ColNames <: HList](cols: Cols[ColNames], op: BinOp)(
implicit css: ColumnSubsetBuilder.Aux[DataRec, ColNames, W2],
): DeleteBuilder[S, C, F, DataRec, W2 :: InRec] = ???
case class DeleteBuilder[S[_], F[_], C[A] <: JDBCColumn[A], DataRec <: HList, InRec <: HList](
private[jdbc] val S: Streamable[S, F],
private[jdbc] val table: TableRecord[C, DataRec],
private[jdbc] val dialect: SQLDialect,
private[jdbc] val toWhere: InRec => S[(Seq[JDBCWhereClause], Seq[BoundValue])]
) extends WhereBuilder[S, F, C, DataRec, InRec] {
type WhereOut[NewIn <: HList] = DeleteBuilder[S, F, C, DataRec, NewIn]
def build[W2](
implicit
c: AutoConvert[W2, InRec]): W2 => S[WriteOp] = w => {
S.SM.map(toWhere(c(w))) {
case (where, values) =>
val deleteSQL = dialect.querySQL(JDBCDelete(table.name, where))
JDBCWriteOp(deleteSQL, bindParameters(values))
}
}

def where[W2 <: HList](col: Witness, op: BinOp)(
implicit cols: ColumnSubsetBuilder.Aux[DataRec, col.T :: HNil, W2]
): DeleteBuilder[S, C, F, DataRec, W2 :: InRec] = ???
override def withToWhere[NewIn <: HList](f: NewIn => S[(Seq[JDBCWhereClause], Seq[BoundValue])])
: DeleteBuilder[S, F, C, DataRec, NewIn] = copy(toWhere = f)
}

def build[W2](
c: AutoConvert[W2, InRec]
): W2 => S[WriteOp] = w => {
val (where, values) = toWhere(c(w))
S.emit {
val deleteSQL = dialect.querySQL(JDBCDelete(table.name, where))
JDBCWriteOp(deleteSQL, bindParameters(values))
trait WhereBuilder[S[_], F[_], C[A] <: JDBCColumn[A], DataRec <: HList, InRec <: HList] {
private[jdbc] def S: Streamable[S, F]
type WhereOut[NewIn <: HList]

private[jdbc] def dialect: SQLDialect
private[jdbc] def table: TableRecord[C, DataRec]
private[jdbc] def toWhere: InRec => S[(Seq[JDBCWhereClause], Seq[BoundValue])]

protected def withToWhere[NewIn <: HList](
f: NewIn => S[(Seq[JDBCWhereClause], Seq[BoundValue])]): WhereOut[NewIn]

protected def addWhere[NewIn <: HList](
f: NewIn => (InRec, Seq[JDBCWhereClause], Seq[BoundValue])): WhereOut[NewIn] = withToWhere {
newIn =>
val (oldIn, newClause, newBind) = f(newIn)
S.SM.map(this.toWhere(oldIn)) {
case (oldClause, oldBind) => (oldClause ++ newClause, oldBind ++ newBind)
}
}

def whereInNotEmpty[A, NewIn <: HList, WLen <: Nat, K <: Symbol](whereCol: Witness.Aux[K])(
implicit
select: Selector.Aux[DataRec, K, A],
prepend: Prepend.Aux[InRec, NonEmptyList[A] :: HNil, NewIn],
length: Length.Aux[InRec, WLen],
split: Split.Aux[NewIn, WLen, InRec, NonEmptyList[A] :: HNil]
): WhereOut[NewIn] = {
val (colName, inCol) = table.allColumns.singleColumn(whereCol)
addWhere { newIn =>
val (oldWhere, newWhere) = split(newIn)
val (newClause, bindVals) = mkInClause(colName, inCol, newWhere.head.toList)
(oldWhere, newClause, bindVals)
}
}

private def mkInClause[A](colName: String, inCol: C[A], vals: Seq[A]) = {
val bindVals = vals.map(inCol.bindValue)
val clause = BinClause(ColumnReference(NamedColumn(colName, inCol.columnType)),
BinOp.IN,
Expressions(bindVals.map(_ => Parameter(inCol.columnType))))
(Seq(clause), bindVals)
}

def whereIn[A, NewIn <: HList, WLen <: Nat](whereCol: Witness)(
implicit
select: Selector.Aux[DataRec, whereCol.T, A],
ev: whereCol.T <:< Symbol,
prepend: Prepend.Aux[InRec, S[A] :: HNil, NewIn],
length: Length.Aux[InRec, WLen],
split: Split.Aux[NewIn, WLen, InRec, S[A] :: HNil]
): WhereOut[NewIn] = {
val (colName, inCol) = table.allColumns.singleColumn(whereCol)
withToWhere { newIn =>
val (oldWhere, newWhere) = split(newIn)
S.SM.flatMap(toWhere(oldWhere)) {
case (oldClause, oldBind) =>
S.maxMapped(dialect.maxInParamaters, newWhere.head) { vals =>
val (newClause, newBind) = mkInClause(colName, inCol, vals)
(oldClause ++ newClause, oldBind ++ newBind)
}
}
}
}

def where[WhereVals <: HList, NewIn <: HList, WLen <: Nat](whereCol: Witness, binOp: BinOp)(
implicit
csb: ColumnSubsetBuilder.Aux[DataRec, whereCol.T :: HNil, WhereVals],
prepend: Prepend.Aux[InRec, WhereVals, NewIn],
length: Length.Aux[InRec, WLen],
split: Split.Aux[NewIn, WLen, InRec, WhereVals]
): WhereOut[NewIn] = where(Cols(whereCol), binOp)

def where[ColNames <: HList, WhereVals <: HList, NewIn <: HList, WLen <: Nat](
whereCols: Cols[ColNames],
binOp: BinOp)(
implicit
css: ColumnSubsetBuilder.Aux[DataRec, ColNames, WhereVals],
len: Length.Aux[InRec, WLen],
prepend: Prepend.Aux[InRec, WhereVals, NewIn],
split: Split.Aux[NewIn, WLen, InRec, WhereVals]
): WhereOut[NewIn] = {
val whereCols = table.allColumns.subset(css)
addWhere { newIn =>
val (oldWhere, newWhere) = split(newIn)
val opWheres = colsOp(binOp, whereCols).apply(newWhere)
(oldWhere, opWheres.map(_._1), opWheres.map(_._2))
}
}
}

case class QueryBuilder[S[_],
F[_],
C[_] <: JDBCColumn[_],
C[A] <: JDBCColumn[A],
DataRec <: HList,
InRec <: HList,
OutRec <: HList,
Out](
table: TableRecord[C, DataRec],
dialect: SQLDialect,
E: JDBCEffect[S, F],
projections: ColumnRecord[C, SQLProjection, OutRec],
mapOut: OutRec => Out,
toWhere: InRec => (Seq[JDBCWhereClause], Seq[BoundValue]),
orderCols: Seq[(NamedColumn, Boolean)]
) {

val S = E.S
val SM = S.SM
private[jdbc] val table: TableRecord[C, DataRec],
private[jdbc] val dialect: SQLDialect,
private[jdbc] val E: JDBCEffect[S, F],
private[jdbc] val projections: ColumnRecord[C, SQLProjection, OutRec],
private[jdbc] val mapOut: OutRec => Out,
private[jdbc] val toWhere: InRec => S[(Seq[JDBCWhereClause], Seq[BoundValue])],
private[jdbc] val orderCols: Seq[(NamedColumn, Boolean)]
) extends WhereBuilder[S, F, C, DataRec, InRec] {
type WhereOut[NewIn <: HList] = QueryBuilder[S, F, C, DataRec, NewIn, OutRec, Out]
private[jdbc] val S = E.S
private[jdbc] val SM = S.SM

def count(
implicit intCol: C[Int]
Expand Down Expand Up @@ -300,32 +386,6 @@ object JDBCQueries {
copy(orderCols = cols)
}

def where[WhereVals <: HList, NewIn <: HList, WLen <: Nat](whereCol: Witness, binOp: BinOp)(
implicit
csb: ColumnSubsetBuilder.Aux[DataRec, whereCol.T :: HNil, WhereVals],
prepend: Prepend.Aux[WhereVals, InRec, NewIn],
length: Length.Aux[WhereVals, WLen],
split: Split.Aux[NewIn, WLen, WhereVals, InRec]
): QueryBuilder[S, F, C, DataRec, NewIn, OutRec, Out] = where(Cols(whereCol), binOp)

def where[ColNames <: HList, WhereVals <: HList, NewIn <: HList, WLen <: Nat](
whereCols: Cols[ColNames],
binOp: BinOp)(
implicit
css: ColumnSubsetBuilder.Aux[DataRec, ColNames, WhereVals],
len: Length.Aux[WhereVals, WLen],
prepend: Prepend.Aux[WhereVals, InRec, NewIn],
split: Split.Aux[NewIn, WLen, WhereVals, InRec]
): QueryBuilder[S, F, C, DataRec, NewIn, OutRec, Out] = {

copy(toWhere = newin => {
val (newWhere, oldWhere) = split(newin)
val res = colsOp(binOp, table.allColumns.subset(css)).apply(newWhere)
(res.map(_._1), res.map(_._2))
})

}

def buildAs[In, Out2](
implicit c: AutoConvert[In, InRec],
cout: AutoConvert[Out, Out2]
Expand All @@ -337,10 +397,16 @@ object JDBCQueries {
val baseSel =
JDBCSelect(table.name, projections.columns.map(_._1), Seq.empty, orderCols, false)
w2: In =>
val (whereClauses, binds) = toWhere(c(w2))
val selSQL = dialect.querySQL(baseSel.copy(where = whereClauses))
SM.map(E.streamForQuery(selSQL, bindParameters(binds), projections))(mapOut)
SM.flatMap(toWhere(c(w2))) {
case (whereClauses, binds) =>
val selSQL = dialect.querySQL(baseSel.copy(where = whereClauses))
SM.map(E.streamForQuery(selSQL, bindParameters(binds), projections))(mapOut)
}
}

override def withToWhere[NewIn <: HList](f: NewIn => S[(Seq[JDBCWhereClause], Seq[BoundValue])])
: QueryBuilder[S, F, C, DataRec, NewIn, OutRec, Out] =
copy(toWhere = f)
}

def bindParameters(params: Seq[BoundValue]): (Connection, PreparedStatement) => Seq[Any] =
Expand Down
3 changes: 2 additions & 1 deletion jdbc/src/main/scala/io/doolse/simpledba/jdbc/JDBCSQL.scala
Expand Up @@ -27,7 +27,7 @@ object AggregateOp extends Enumeration {

object BinOp extends Enumeration {
type BinOp = Value
val EQ, GT, GTE, LT, LTE, LIKE = Value
val EQ, GT, GTE, LT, LTE, LIKE, IN = Value
}

sealed trait SQLExpression
Expand All @@ -37,6 +37,7 @@ case class Aggregate(name: AggregateOp, column: Option[NamedColumn]) extends SQL
case class FunctionCall(name: String, params: Seq[SQLExpression]) extends SQLExpression
case class SQLString(s: String) extends SQLExpression
case class Parameter(columnType: ColumnType) extends SQLExpression
case class Expressions(expressions: Seq[SQLExpression]) extends SQLExpression

case class ColumnExpression(column: NamedColumn, expression: SQLExpression)
case class SQLProjection(columnType: ColumnType, sql: SQLExpression)
Expand Down
10 changes: 5 additions & 5 deletions jdbc/src/main/scala/io/doolse/simpledba/jdbc/JDBCTable.scala
Expand Up @@ -3,7 +3,7 @@ package io.doolse.simpledba.jdbc
import io.doolse.simpledba._
import shapeless.{::, HList, HNil, Witness}

case class JDBCRelation[C[_] <: JDBCColumn[_], T, R <: HList](
case class JDBCRelation[C[A] <: JDBCColumn[A], T, R <: HList](
name: String,
all: Columns[C, T, R]
) {
Expand Down Expand Up @@ -33,7 +33,7 @@ case class JDBCRelation[C[_] <: JDBCColumn[_], T, R <: HList](
}
}

trait JDBCTable[C[_] <: JDBCColumn[_]] {
trait JDBCTable[C[A] <: JDBCColumn[A]] {
type Data
type DataRec <: HList
type KeyList <: HList
Expand Down Expand Up @@ -66,18 +66,18 @@ trait JDBCTable[C[_] <: JDBCColumn[_]] {
}

object JDBCTable {
type Aux[C[_] <: JDBCColumn[_], T, R, K, KeyN] = JDBCTable[C] {
type Aux[C[A] <: JDBCColumn[A], T, R, K, KeyN] = JDBCTable[C] {
type Data = T
type DataRec = R
type KeyNames = KeyN
type KeyList = K
}

type TableRecord[C[_] <: JDBCColumn[_], R] = JDBCTable[C] {
type TableRecord[C[A] <: JDBCColumn[A], R] = JDBCTable[C] {
type DataRec = R
}

def apply[C[_] <: JDBCColumn[_], T, R <: HList, K <: HList, KeyN <: HList](
def apply[C[A] <: JDBCColumn[A], T, R <: HList, K <: HList, KeyN <: HList](
tableName: String,
all: Columns[C, T, R],
keys: ColumnSubsetBuilder.Aux[R, KeyN, K],
Expand Down
Expand Up @@ -10,4 +10,5 @@ trait SQLDialect {
def addColumns(t: TableColumns): Seq[String]
def truncateTable(t: TableDefinition): String
def createIndex(t: TableColumns, named: String): String
def maxInParamaters: Int
}

0 comments on commit d873ba5

Please sign in to comment.