Skip to content

Commit

Permalink
Initial code commit
Browse files Browse the repository at this point in the history
  • Loading branch information
fredrik.skogberg committed May 31, 2019
1 parent 4a6bcfd commit 5cb083a
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 0 deletions.
12 changes: 12 additions & 0 deletions build.sbt
@@ -0,0 +1,12 @@
name := "stm-partitioning"

version := "0.0.1"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
"org.scalaz" %% "scalaz-zio" % "1.0-RC4",
"org.scalaz" %% "scalaz-zio-streams" % "1.0-RC4",
"org.scalaz" %% "scalaz-zio-testkit" % "1.0-RC4" % "test",
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
)
1 change: 1 addition & 0 deletions project/build.properties
@@ -0,0 +1 @@
sbt.version = 1.2.8
61 changes: 61 additions & 0 deletions src/main/scala/freskog/concurrency/app/PartitioningDemo.scala
@@ -0,0 +1,61 @@
package freskog.concurrency.app


import java.util.concurrent.TimeUnit

import scalaz.zio.blocking.Blocking
import scalaz.zio.duration.Duration
import scalaz.zio.random.Random
import scalaz.zio.scheduler.Scheduler
import scalaz.zio.system.System
import scalaz.zio._
import scalaz.zio.clock._
import scalaz.zio.console._

import scala.concurrent.duration._
import freskog.concurrency.partition._

object PartitioningDemo extends App {

val config:Config =
Config(
userTTL = Duration(3, SECONDS),
idleTTL = Duration(2, SECONDS),
maxPending = 3
)

def brokenUserFunction(startTs:Long, counts:Ref[Map[Int,Int]])(n:Int): ZIO[Console with Clock, Nothing, Unit] =
ZIO.descriptorWith( desc =>
for {
now <- sleep(Duration(100 * n, MILLISECONDS)) *> currentTime (MILLISECONDS)
m <- counts.update(m => m.updated(n, m.getOrElse(n, 0) + 1))
msg = s"Offset: ${now - startTs}ms Fiber: ${desc.id}, n = $n (call #${m(n)})"
_ <- if (n == 0) throw new IllegalArgumentException(msg) else putStrLn(msg)
} yield ()
)

val workItems: List[Int] = List.range(0,11) ::: List.range(0,11) ::: List.range(0, 12)

val program: ZIO[Environment with Partition, Nothing, Int] =
for {
now <- clock.currentTime(TimeUnit.MILLISECONDS)
counter <- Ref.make(Map.empty[Int,Int])
env <- ZIO.environment[Console with Clock]
process <- partition[Int](config, _.toString, brokenUserFunction(now,counter)(_).provide(env))
results <- ZIO.foreach(workItems)(process)
_ <- console.putStrLn(s"Published ${results.count(identity)} out of ${results.length}")
_ <- ZIO.sleep(Duration.fromScala(10.seconds))
} yield 0

override def run(args: List[String]): ZIO[Environment, Nothing, Int] =
program.provideSome[Environment]( env =>
new Clock with Console with System with Random with Blocking with Partition.Live {
override val blocking: Blocking.Service[Any] = env.blocking
override val clock: Clock.Service[Any] = env.clock
override val console: Console.Service[Any] = env.console
override val random: Random.Service[Any] = env.random
override val system: System.Service[Any] = env.system
override val scheduler: Scheduler.Service[Any] = env.scheduler
})

}
110 changes: 110 additions & 0 deletions src/main/scala/freskog/concurrency/partition/Partition.scala
@@ -0,0 +1,110 @@
package freskog.concurrency.partition

import scalaz.zio._
import scalaz.zio.clock._
import scalaz.zio.console._
import scalaz.zio.duration.Duration
import scalaz.zio.scheduler.Scheduler
import scalaz.zio.stm._

trait Partition extends Serializable {
val partition:Partition.Service[Any]
}

case class Config(userTTL:Duration, idleTTL:Duration, maxPending:Int)

object Partition extends Serializable {

trait Service[R] extends Serializable {
def partition[A](config:Config, partIdOf:A => PartId, action: A => UIO[Unit]):ZIO[R, Nothing, A => UIO[Boolean]]
}

trait Live extends Clock.Live with Console.Live with Partition {
env =>

override val partition: Service[Any] =
new Service[Any] {
override def partition[A](config: Config, partIdOf: A => PartId, action: A => UIO[Unit]): ZIO[Any, Nothing, A => UIO[Boolean]] =
for {
queues <- TRef.make(Map.empty[PartId, TQueue[A]]).commit
} yield producer(queues, partIdOf, action)(_).provide(buildEnv(config, env))
}
}

trait Conf {
def userTTL: Duration
def idleTTL: Duration
def maxPending: Int
}

type PartEnv = Clock with Console with Conf
type Queues[A] = TRef[Map[PartId,TQueue[A]]]

def buildEnv(conf:Config, env:Clock with Console):PartEnv =
new Conf with Clock with Console {
override def userTTL: Duration = conf.userTTL
override def idleTTL: Duration = conf.idleTTL
override def maxPending: Int = conf.maxPending

override val clock:Clock.Service[Any] = env.clock
override val scheduler:Scheduler.Service[Any] = env.scheduler
override val console:Console.Service[Any] = env.console
}

val userTTL:ZIO[Conf, Nothing, Duration] =
ZIO.access[Conf](_.userTTL)

val idleTTL:ZIO[Conf, Nothing, Duration] =
ZIO.access[Conf](_.idleTTL)

val maxPending:ZIO[Conf, Nothing, Int] =
ZIO.access[Conf](_.maxPending)

def publish[A](queue:TQueue[A], a:A):STM[Nothing, Boolean] =
queue.size.flatMap(size => if(size == queue.capacity) STM.succeed(false) else queue.offer(a) *> STM.succeed(true))

def debug(cause: Exit.Cause[String]): ZIO[Console, Nothing, Unit] =
putStrLn(cause.failures.mkString("\n\t") + cause.defects.mkString("\n\t"))

def takeNextMessageOrTimeout[A](id: PartId, queue: TQueue[A]): ZIO[Clock with Conf, String, A] =
idleTTL >>= queue.take.commit.timeoutFail(s"$id consumer expired")

def safelyPerformAction[A](id: PartId, action: A => UIO[Unit])(a:A): ZIO[PartEnv, Nothing, Unit] =
(userTTL >>= (action(a).timeoutFail(s"$id action timed out")(_))).sandbox.catchAll(debug)

def startConsumer[A](id:PartId, queue: TQueue[A], cleanup:UIO[Unit], action: A => UIO[Unit]): ZIO[PartEnv, Nothing, Unit] =
(takeNextMessageOrTimeout(id, queue) >>= safelyPerformAction(id, action)).forever.ensuring(cleanup).fork.unit

def hasConsumer[A](queues:Queues[A], id:PartId): STM[Nothing, Boolean] =
queues.get.map(_.contains(id))

def removeConsumerFor[A](queues:Queues[A], id: PartId): UIO[Unit] =
queues.update(_ - id).unit.commit

def getWorkQueueFor[A](queues:Queues[A], id: PartId): STM[Nothing, TQueue[A]] =
queues.get.map(_(id))

def setWorkQueueFor[A](queues:Queues[A], id:PartId, queue:TQueue[A]): STM[Nothing, Unit] =
queues.update(_.updated(id, queue)).unit

def createConsumer[A](queues:Queues[A], id:PartId, maxPending:Int, action: A => UIO[Unit]): STM[Nothing, ZIO[PartEnv, Nothing, Unit]] =
for {
queue <- TQueue.make[A](maxPending)
_ <- setWorkQueueFor(queues, id, queue)
} yield startConsumer(id, queue, removeConsumerFor(queues, id), action)

def producer[A](queues:Queues[A], partIdOf:A => PartId, action: A => UIO[Unit])(a:A): ZIO[PartEnv, Nothing, Boolean] =
maxPending >>= { maxPending:Int =>
STM.atomically {
for {
exists <- hasConsumer(queues, partIdOf(a))
id = partIdOf(a)
consumer <- if (exists) STM.succeed(ZIO.unit) else createConsumer(queues, id, maxPending, action)
queue <- getWorkQueueFor(queues, partIdOf(a))
published <- publish(queue, a)
} yield ZIO.succeed(published) <* consumer
}.flatten
}

object Live extends Live
}
15 changes: 15 additions & 0 deletions src/main/scala/freskog/concurrency/partition/package.scala
@@ -0,0 +1,15 @@
package freskog.concurrency

import scalaz.zio.{UIO, ZIO}

package object partition extends Partition.Service[Partition] {

type PartId = String

final val partitionService:ZIO[Partition, Nothing, Partition.Service[Any]] =
ZIO.access(_.partition)

final def partition[A](config:Config, partIdOf:A => PartId, action: A => UIO[Unit]):ZIO[Partition, Nothing, A => UIO[Boolean]] =
ZIO.accessM[Partition](_.partition.partition(config,partIdOf, action))

}
48 changes: 48 additions & 0 deletions src/test/scala/freskog/concurrency/partition/BaseTests.scala
@@ -0,0 +1,48 @@
package freskog.concurrency.partition

import freskog.concurrency.partition.Partition.PartEnv
import org.scalatest.{Assertion, DiagrammedAssertions, FlatSpec}
import scalaz.zio.clock.Clock
import scalaz.zio.console.Console
import scalaz.zio.stm.STM
import scalaz.zio.testkit.{TestClock, TestConsole}
import scalaz.zio._

abstract class BaseTests extends FlatSpec with DiagrammedAssertions {

val realRts: DefaultRuntime =
new DefaultRuntime {}

val clockData:UIO[Ref[TestClock.Data]] =
Ref.make(TestClock.Zero)

val consoleData:UIO[Ref[TestConsole.Data]] =
Ref.make(TestConsole.Data())

val schedulerData:UIO[Ref[TestClock.Data]] =
Ref.make(TestClock.Zero)

val testRts: UIO[TestRuntime] =
(clockData <*> consoleData <*> schedulerData).map {
case clockR <*> consoleR <*> schedR => TestRuntime(clockR, consoleR, schedR)
}

def partEnv(config:Config):ZIO[Clock with Console, Nothing, PartEnv] =
ZIO.environment[Clock with Console].map(Partition.buildEnv(config, _))

def unwrap[R,E,A](zio:ZIO[Clock with Console,E,A]): A =
realRts.unsafeRun(zio)

def run(z:ZIO[Clock with Console, Throwable, Assertion]): Assertion = {
val rts = unwrap(testRts)
rts.unsafeRunSync(z).getOrElse(c => throw c.squash)
}

def runSTM(z:STM[Throwable, Assertion]) =
run(z.commit)

def runReal(z:ZIO[Clock with Console, Throwable, Assertion]):Unit = {
realRts.unsafeRunSync(z).getOrElse(c => throw c.squash)
}

}
46 changes: 46 additions & 0 deletions src/test/scala/freskog/concurrency/partition/ConsumerTests.scala
@@ -0,0 +1,46 @@
package freskog.concurrency.partition

import java.util.concurrent.TimeUnit.MILLISECONDS

import freskog.concurrency.partition.Partition.startConsumer
import scalaz.zio.duration.Duration
import scalaz.zio.{Promise, UIO}
import scalaz.zio.stm.TQueue

class ConsumerTests extends BaseTests {

val config =
Config(
userTTL = Duration(100, MILLISECONDS),
idleTTL = Duration(100, MILLISECONDS),
maxPending = 1
)

behavior of "a consumer"

it should "always successfully process a value on the queue" in {
runReal(
for {
env <- partEnv(config)
queue <- TQueue.make[String](1).commit
promise <- Promise.make[Nothing,String]
_ <- startConsumer("p1", queue, UIO.unit, promise.succeed(_:String).unit).provide(env)
_ <- queue.offer("published").commit
result <- promise.await.timeoutFail("not published")(Duration(150,MILLISECONDS)).fold(identity,identity)
} yield assert(result == "published")
)
}

it should "a defect in client code doesn't break the consumer" in {
runReal(
for {
env <- partEnv(config)
queue <- TQueue.make[Boolean](1).commit
promise <- Promise.make[Nothing,String]
_ <- startConsumer("p1", queue, UIO.unit, (b:Boolean) => if(b) throw new IllegalArgumentException("BOOM!") else promise.succeed("done!").unit).provide(env)
_ <- queue.offerAll(List(true,false)).commit
result <- promise.await.timeoutFail("not published")(Duration(150,MILLISECONDS)).fold(identity,identity)
} yield assert(result == "done!")
)
}
}
21 changes: 21 additions & 0 deletions src/test/scala/freskog/concurrency/partition/PublishTests.scala
@@ -0,0 +1,21 @@
package freskog.concurrency.partition

import freskog.concurrency.partition.Partition.publish
import scalaz.zio.stm.TQueue

class PublishTests extends BaseTests {

behavior of "a publisher"

it should "return true when publishing to an empty TQueue" in {
runSTM {
(TQueue.make[Int](1) >>= (publish(_, 1))) map (published => assert(published))
}
}

it should "return false when publishing to a full TQueue" in {
runSTM(
(TQueue.make[Int](0) >>= (publish(_, 1))) map (published => assert(!published))
)
}
}
22 changes: 22 additions & 0 deletions src/test/scala/freskog/concurrency/partition/TestRuntime.scala
@@ -0,0 +1,22 @@
package freskog.concurrency.partition


import scalaz.zio.{Ref, Runtime}
import scalaz.zio.clock.Clock
import scalaz.zio.console.Console
import scalaz.zio.internal.{Platform, PlatformLive}
import scalaz.zio.scheduler.Scheduler
import scalaz.zio.testkit.{TestClock, TestConsole, TestScheduler}


case class TestRuntime(clockR:Ref[TestClock.Data], consoleR:Ref[TestConsole.Data], schedR:Ref[TestClock.Data]) extends Runtime[Clock with Console] { self =>
type Environment = Clock with Console

val Platform: Platform = PlatformLive.Default
val Environment: Environment =
new Clock with Console {
override val clock: Clock.Service[Any] = TestClock(clockR)
override val console: Console.Service[Any] = TestConsole(consoleR)
override val scheduler: Scheduler.Service[Any] = TestScheduler(schedR, self)
}
}

0 comments on commit 5cb083a

Please sign in to comment.