Skip to content

Commit

Permalink
Upgrade fs2, fix subset column ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
doolse committed Feb 8, 2019
1 parent 13c04b5 commit f31ba44
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 15 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ lazy val config = ConfigFactory.parseFile(prjDir / "application.conf")

val commonSettings = Seq(
organization := "io.github.doolse",
version := "0.1.7-SNAPSHOT",
version := "0.1.8-SNAPSHOT",
scalaVersion := "2.12.6",
resolvers += Resolver.sonatypeRepo("snapshots"),

Expand Down
2 changes: 1 addition & 1 deletion core/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
libraryDependencies ++= Seq(
"com.chuusai" %% "shapeless" % "2.3.3",
"co.fs2" %% "fs2-core" % "0.10.3",
"co.fs2" %% "fs2-core" % "1.0.3",
"com.typesafe" % "config" % "1.3.0")

libraryDependencies ++= Seq(
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/io/doolse/simpledba/Columns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ case class Columns[C[_], T, R <: HList](columns: Seq[(String, C[_])], iso: Iso[T
def compose[T2](ciso: Iso[T2, T]): Columns[C, T2, R] = copy(iso = ciso >>> iso)

def subset[Keys](implicit ss: ColumnSubsetBuilder[R, Keys]): (ColumnSubset[C, R, ss.Out, ss.Out], R => ss.Out) = {
val (_subCols, convert) = ss.apply()
val subCols = _subCols.toSet
(ColumnSubset(columns.filter(c => subCols(c._1)), Iso.id), convert)
val (subCols, convert) = ss.apply()
(ColumnSubset(subCols.map(colName => columns.find(_._1 == colName).get), Iso.id), convert)
}
}

Expand Down
16 changes: 8 additions & 8 deletions jdbc/src/main/scala/io/doolse/simpledba/jdbc/JDBCQueries.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package io.doolse.simpledba.jdbc
import java.sql.{Connection, PreparedStatement, ResultSet, Statement}

import cats.data.{Kleisli, StateT}
import cats.effect.IO
import cats.effect.{IO, LiftIO}
import cats.effect.implicits._
import fs2.{Pure, Sink, Stream}
import fs2.{Pipe, Pure, Stream}
import io.doolse.simpledba.{jdbc, _}
import io.doolse.simpledba.jdbc.BinOp.BinOp
import io.doolse.simpledba.jdbc.JDBCTable.TableRecord
Expand All @@ -19,10 +19,10 @@ import scala.collection.mutable

object JDBCQueries {

def flush: Sink[JDBCIO, WriteOp] = writes => {
def flush: Pipe[JDBCIO, WriteOp, Unit] = writes => {
Stream.eval(
writes.evalMap {
case JDBCWriteOp(q, config, binder) => StateT.inspectF { con =>
case JDBCWriteOp(q, config, binder) => StateT.inspectF { con : Connection =>
def toSQL() = config.queryToSQL(q)

for {
Expand Down Expand Up @@ -224,11 +224,11 @@ object JDBCQueries {

def rowsStream[A](open: JDBCIO[ResultSet]): Stream[JDBCIO, ResultSet] = {
def nextLoop(rs: ResultSet): Stream[JDBCIO, ResultSet] =
Stream.eval(IO(rs.next()).liftIO[JDBCIO]).flatMap {
Stream.eval(liftJDBC.liftIO(IO(rs.next()))).flatMap {
n => if (n) Stream(rs) ++ nextLoop(rs) else Stream.empty
}

Stream.bracket(open)(nextLoop, rs => IO(rs.close()).liftIO[JDBCIO])
Stream.bracket(open)(rs => liftJDBC.liftIO(IO(rs.close()))).flatMap(nextLoop)
}

def bindValues[C[_] <: JDBCColumn, R <: HList, A](cols: ColumnRecord[C, A, R], record: R): ColumnRecord[C, (ParameterBinder, A), R] = {
Expand Down Expand Up @@ -325,7 +325,7 @@ object JDBCQueries {
query: JDBCPreparedQuery, bind: BindFunc[Seq[BindLog]], resultCols: ColumnRecord[C, _, Out])
: Stream[JDBCIO, Out] = {
prepareAndQuery(config, query, bind).evalMap {
rs => getColRecord(resultCols, 1, rs).liftIO[JDBCIO]
rs => liftJDBC.liftIO(getColRecord(resultCols, 1, rs))
}
}

Expand All @@ -342,7 +342,7 @@ object JDBCQueries {
outRec: ColumnRecord[C, _, OutRec])(implicit c: JDBCConfig): Params => Stream[JDBCIO, OutRec] = params => {
val bindFunc = bindParameters(bindValues(cr, params).columns.map(_._1._1)).map(l => Seq(ValueLog(l): BindLog))
prepareAndQuery(c, JDBCRawSQL(sql), bindFunc).evalMap { rs =>
getColRecord(outRec, 1, rs).liftIO[JDBCIO]
liftJDBC.liftIO(getColRecord(outRec, 1, rs))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ package object oracle {
(con,sql) => con.prepareStatement(sql, keyCols.map(_._1).toArray),
{ ps => ps.executeUpdate(); ps.getGeneratedKeys() }
).evalMap { rs =>
JDBCQueries.getColRecord(Columns(keyCols, Iso.id[A :: HNil]), 1, rs).liftIO[JDBCIO]
liftJDBC.liftIO(JDBCQueries.getColRecord(Columns(keyCols, Iso.id[A :: HNil]), 1, rs))
}.map { a => f(a.head) }
}
}
Expand Down
4 changes: 3 additions & 1 deletion jdbc/src/main/scala/io/doolse/simpledba/jdbc/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package io.doolse.simpledba
import java.sql.{Connection, DriverManager, PreparedStatement}

import cats.data.{Kleisli, StateT}
import cats.effect.IO
import cats.effect.{IO, LiftIO}
import com.typesafe.config.{Config, ConfigFactory}
import fs2.{Sink, Stream}

package object jdbc {

val liftJDBC = implicitly[LiftIO[JDBCIO]]

type JDBCIO[A] = StateT[IO, Connection, A]

type BindFunc[A] = Kleisli[StateT[IO, Int, ?], (Connection, PreparedStatement), A]
Expand Down

0 comments on commit f31ba44

Please sign in to comment.