Skip to content

Commit

Permalink
back to async
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugene Tolmachev committed Mar 1, 2018
1 parent 98da33b commit 68e9420
Show file tree
Hide file tree
Showing 25 changed files with 259 additions and 274 deletions.
1 change: 1 addition & 0 deletions .travis.yml
@@ -1,6 +1,7 @@
language: csharp

sudo: false # use the new container-based Travis infrastructure
dotnet: 2.0

script:
- ./build.sh
Expand Down
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
@@ -1,3 +1,6 @@
#### 1.0.0-beta-1 - Nov 2017
* netstandard 2.0 release

#### 0.2.9 - Nov 2017
* Fixing spout timeout when running as a shelled component

Expand Down
9 changes: 0 additions & 9 deletions build.cmd
@@ -1,18 +1,9 @@
@echo off
cls

.paket\paket.bootstrapper.exe
if errorlevel 1 (
exit /b %errorlevel%
)

.paket\paket.exe restore
if errorlevel 1 (
exit /b %errorlevel%
)

IF NOT EXIST build.fsx (
.paket\paket.exe update
packages\build\FAKE\tools\FAKE.exe init.fsx
)
packages\build\FAKE\tools\FAKE.exe build.fsx %*
4 changes: 2 additions & 2 deletions build.fsx
Expand Up @@ -59,7 +59,7 @@ let release = LoadReleaseNotes "RELEASE_NOTES.md"

let assemblyInfo =
[ "Description",summary
"Version", release.AssemblyVersion
"Version", release.NugetVersion
"Authors", gitOwner
"PackageProjectUrl", gitHome
"RepositoryUrl", gitHome
Expand Down Expand Up @@ -133,7 +133,7 @@ Target "Package" (fun _ ->
)

Target "PublishNuget" (fun _ ->
runDotnet "src/FsShelter" "nuget publish"
runDotnet "src/FsShelter" (sprintf "nuget push bin/Release/FsShelter.%s.nupkg -s nuget.org -k %s" release.NugetVersion (environVar "nugetkey"))
)


Expand Down
3 changes: 3 additions & 0 deletions docs/content/RELEASE_NOTES.md
@@ -1,3 +1,6 @@
#### 1.0.0-beta-1 - Nov 2017
* netstandard 2.0 release

#### 0.2.9 - Nov 2017
* Fixing spout timeout when running as a shelled component

Expand Down
1 change: 0 additions & 1 deletion paket.dependencies
Expand Up @@ -23,7 +23,6 @@ group Main
nuget FsPickler
nuget Google.Protobuf
nuget apache-thrift-netcore
nuget Hopac
nuget newtonsoft.json

github prolucid/protoshell src/main/proto/multilang.proto
Expand Down
3 changes: 0 additions & 3 deletions paket.lock
Expand Up @@ -36,9 +36,6 @@ NUGET
FSharp.Core (>= 3.1)
Google.Protobuf (3.4.1)
NETStandard.Library (>= 1.6.1)
Hopac (0.3.23)
FSharp.Core (>= 4.0.1.7-alpha)
NETStandard.Library (>= 1.6)
Microsoft.AspNetCore.Hosting.Abstractions (2.0.1)
Microsoft.AspNetCore.Hosting.Server.Abstractions (>= 2.0.1)
Microsoft.AspNetCore.Http.Abstractions (>= 2.0.1)
Expand Down
28 changes: 14 additions & 14 deletions samples/Guaranteed/Program.fs
Expand Up @@ -26,22 +26,22 @@ let main argv =
| ["graph"] ->
topology
|> DotGraph.writeToConsole
// | ["self-host"] ->
// let stop =
// topology
// // |> Host.runWith (sprintf "self-%d-%d" (System.Diagnostics.Process.GetCurrentProcess().Id) >> Logging.callbackLog)
// |> Host.run
// printf "Running the topology, press ENTER to stop..."
// let sw = System.Diagnostics.Stopwatch.StartNew()
// System.Threading.Thread.Sleep (-1) |> ignore
// stop()
// sw.Stop()
// let (count,_) = Topology.source.PostAndReply Get
// printf "Count: %s, %d/s\n" count (1000L*(int64 count)/sw.ElapsedMilliseconds)
| ["self-host"] ->
let stop =
topology
// |> Host.runWith (sprintf "self-%d-%d" (System.Diagnostics.Process.GetCurrentProcess().Id) >> Logging.callbackLog)
|> Host.run
printf "Running the topology, press ENTER to stop..."
let sw = System.Diagnostics.Stopwatch.StartNew()
System.Console.ReadLine() |> ignore
stop()
sw.Stop()
let (count,_) = Topology.source.PostAndReply Get
printf "Count: %s, %d/s\n" count (1000L*(int64 count)/sw.ElapsedMilliseconds)
| _ ->
topology
|> Task.ofTopology
// |> Task.run ProtoIO.start
|> Task.runWith (string >> Logging.callbackLog) ProtoIO.start // start using a traffic logger
|> Task.run ProtoIO.start
// |> Task.runWith (string >> Logging.callbackLog) ProtoIO.start // start using a traffic logger
0

25 changes: 12 additions & 13 deletions samples/Guaranteed/Topology.fs
@@ -1,5 +1,4 @@
module Guaranteed.Topology
open Hopac

// data schema for the topology, every case is a unqiue stream
type Schema =
Expand All @@ -9,26 +8,26 @@ type Schema =

// numbers spout - produces messages
let numbers source =
job {
let! (tupleId,number) = source() |> Job.fromAsync
async {
let! (tupleId,number) = source()
return Some(tupleId, Original (number))
}

// add 1 bolt - consumes and emits messages to either Even or Odd stream
let addOne (input,(emit:_->Job<_>)) =
job {
do! match input with
| Original x ->
match x % 2 with
| 1 -> Even (x+1)
| _ -> Odd (x+1)
| _ -> failwithf "unexpected input: %A" input
|> emit
let addOne (input,emit) =
async {
match input with
| Original x ->
match x % 2 with
| 1 -> Even (x+1)
| _ -> Odd (x+1)
| _ -> failwithf "unexpected input: %A" input
|> emit
}

// terminating bolt - consumes messages
let logResult (info,input) =
job {
async {
match input with
| Even x
| Odd x -> info (sprintf "Got: %A" input)
Expand Down
22 changes: 10 additions & 12 deletions samples/WordCount/Topology.fs
@@ -1,7 +1,6 @@
module Sample.Topology

open System
open Hopac

// data schema for the topology, every case is a unqiue stream
type Schema =
Expand All @@ -10,31 +9,30 @@ type Schema =
| WordCount of string*int64

// sentences spout - feeds messages into the topology
let sentences source = job { return source() |> Sentence |> Some }
let sentences source = async { return source() |> Sentence |> Some }

// split bolt - consumes sentences and emits words
let splitIntoWords (input, (emit:_->Job<_>)) =
job {
let splitIntoWords (input, emit) =
async {
match input with
| Sentence s ->
do! s.Split([|' '|],StringSplitOptions.RemoveEmptyEntries)
|> Seq.map (Word >> emit)
|> Job.seqCollect
|> Job.Ignore
s.Split([|' '|],StringSplitOptions.RemoveEmptyEntries)
|> Seq.map Word
|> Seq.iter emit
| _ -> failwithf "unexpected input: %A" input
}

// count words bolt
let countWords (input, increment, (emit:_->Job<_>)) =
job {
let countWords (input, increment, emit) =
async {
match input with
| Word word -> do! WordCount (word,increment word) |> emit
| Word word -> WordCount (word,increment word) |> emit
| _ -> failwithf "unexpected input: %A" input
}

// log word count - terminating bolt
let logResult (log, input) =
job {
async {
match input with
| WordCount (word,count) -> log (sprintf "%s: %d" word count)
| _ -> failwithf "unexpected input: %A" input
Expand Down
2 changes: 1 addition & 1 deletion src/FsShelter.Tests/FsShelter.Tests.fsproj
Expand Up @@ -10,7 +10,7 @@
<Compile Include="SchemaTests.fs" />
<Compile Include="TranslationTests.fs" />
<Compile Include="TaskTests.fs" />
<!-- <Compile Include="HostTests.fs" /> -->
<Compile Include="HostTests.fs" />
<Compile Include="IO\CommonTests.fs" />
<Compile Include="IO\JsonIOTests.fs" />
<Compile Include="IO\ProtoIOTests.fs" />
Expand Down
13 changes: 7 additions & 6 deletions src/FsShelter.Tests/HostTests.fs
Expand Up @@ -28,12 +28,13 @@ module Topologies =
}

let printBolt (log,t) =
match t with
| Original _ -> log (sprintf "Init: %A" DateTime.Now)
| MaybeString _ -> log (sprintf "Shutdown: %A" DateTime.Now)
| Tick
| _ -> () //log (sprintf "tuple: %A" t)
|> async.Return
async {
match t with
| Original _ -> log (sprintf "Init: %A" DateTime.Now)
| MaybeString _ -> log (sprintf "Shutdown: %A" DateTime.Now)
| Tick
| _ -> () //log (sprintf "tuple: %A" t)
}

let world = {rnd = Random(); count = ref 0L; acked = ref 0L}
let t1 = topology "test" {
Expand Down
3 changes: 1 addition & 2 deletions src/FsShelter.Tests/IO/CommonTests.fs
Expand Up @@ -7,11 +7,10 @@ open FsShelter.Multilang
open System
open System.IO
open TupleSchema
open Hopac

let toDict (s:seq<_*_>) = System.Linq.Enumerable.ToDictionary(s, fst, snd)

let syncOut (w:unit->unit) = w() |> Job.result
let syncOut (w:unit->unit) = w()

type GuidRec = { g : Nullable<Guid>}

Expand Down
37 changes: 18 additions & 19 deletions src/FsShelter.Tests/IO/JsonIOTests.fs
Expand Up @@ -7,7 +7,6 @@ open FsShelter.Multilang
open System
open JsonIO
open CommonTests
open Hopac

[<Test>]
let ``reads handshake``() =
Expand All @@ -23,7 +22,7 @@ let ``reads handshake``() =
Conf ["FsShelter.id",box "Simple-2-1456522507"; "dev.zookeeper.path", box "/tmp/dev-storm-zookeeper"; "topology.tick.tuple.freq.secs", box 30L; "topology.classpath", null],
"C:\\Users\\eugene\\storm-local\\workers\\9ee413b6-c7d2-4896-ae4d-d150da988822\\pids",
{TaskId=5;ComponentId="SimpleSpout";Components=Map [1,"AddOneBolt"; 2,"AddOneBolt"; 3,"ResultBolt"; 4, "ResultBolt"; 5,"SimpleSpout"; 6,"__acker"]})
in'() |> run =! expected
in'() |> Async.RunSynchronously =! expected


[<Test>]
Expand All @@ -32,7 +31,7 @@ let ``reads next``() =
let sw = new System.IO.StringWriter()
let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore

in'() |> run =! InCommand<Schema>.Next
in'() |> Async.RunSynchronously =! InCommand<Schema>.Next


[<Test>]
Expand All @@ -41,7 +40,7 @@ let ``reads ack``() =
let sw = new System.IO.StringWriter()
let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore

in'() |> run =! InCommand<Schema>.Ack "zzz"
in'() |> Async.RunSynchronously =! InCommand<Schema>.Ack "zzz"


[<Test>]
Expand All @@ -50,31 +49,31 @@ let ``reads nack``() =
let sw = new System.IO.StringWriter()
let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore

in'() |> run =! InCommand<Schema>.Nack "zzz"
in'() |> Async.RunSynchronously =! InCommand<Schema>.Nack "zzz"

[<Test>]
let ``reads activate``() =
let sr = new System.IO.StringReader("""{"id":"","command":"activate"}"""+END)
let sw = new System.IO.StringWriter()
let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore

in'() |> run =! InCommand<Schema>.Activate
in'() |> Async.RunSynchronously =! InCommand<Schema>.Activate

[<Test>]
let ``reads deactivate``() =
let sr = new System.IO.StringReader("""{"id":"","command":"deactivate"}"""+END)
let sw = new System.IO.StringWriter()
let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore

in'() |> run =! InCommand<Schema>.Deactivate
in'() |> Async.RunSynchronously =! InCommand<Schema>.Deactivate

[<Test>]
let ``reads tuple``() =
let sr = new System.IO.StringReader("""{"comp":"AddOneBolt","tuple":[62],"task":1,"stream":"Original","id":"2651792242051038370"}"""+END)
let sw = new System.IO.StringWriter()
let (in',_) = JsonIO.startWith (sr,sw) syncOut ignore

in'() |> run =! InCommand<Schema>.Tuple(Original {x=62},"2651792242051038370","AddOneBolt","Original",1)
in'() |> Async.RunSynchronously =! InCommand<Schema>.Tuple(Original {x=62},"2651792242051038370","AddOneBolt","Original",1)


[<Test>]
Expand All @@ -83,7 +82,7 @@ let ``writes tuple``() =
let sw = new System.IO.StringWriter()
let (_,out) = JsonIO.startWith (sr,sw) syncOut ignore

out(Emit(Original {x=62},Some "2651792242051038370",["123"],"Original",None,None)) |> run
out(Emit(Original {x=62},Some "2651792242051038370",["123"],"Original",None,None))
Threading.Thread.Sleep(10)
sw.ToString() =! """{"command":"emit","id":"2651792242051038370","tuple":[62],"anchors":["123"],"stream":"Original","need_task_ids":false}"""+END

Expand All @@ -94,10 +93,10 @@ let ``rw complex tuple``() =
let (in',out) = JsonIO.startWith (sr,sw) syncOut ignore
let even = Even({x=62},{str="a"})

job {
do! out(Emit(even,Some "2651792242051038370",[],"Even",None,None))
async {
out(Emit(even,Some "2651792242051038370",[],"Even",None,None))
return! in'()
} |> run =! InCommand<Schema>.Tuple(even,"2651792242051038370","AddOneBolt","Even",1)
} |> Async.RunSynchronously =! InCommand<Schema>.Tuple(even,"2651792242051038370","AddOneBolt","Even",1)
Threading.Thread.Sleep 100
sw.ToString() =! """{"command":"emit","id":"2651792242051038370","tuple":[62,"a"],"stream":"Even","need_task_ids":false}"""+END

Expand All @@ -109,10 +108,10 @@ let ``rw option tuple``() =
let (in',out) = JsonIO.startWith (sr,sw) syncOut ignore
let t = MaybeString(Some "zzz")

job {
do! out(Emit(t,Some "2651792242051038370",[],"MaybeString",None,None))
async {
out(Emit(t,Some "2651792242051038370",[],"MaybeString",None,None))
return! in'()
} |> run =! InCommand<Schema>.Tuple(t,"2651792242051038370","AddOneBolt","MaybeString",1)
} |> Async.RunSynchronously =! InCommand<Schema>.Tuple(t,"2651792242051038370","AddOneBolt","MaybeString",1)
sw.ToString() =! """{"command":"emit","id":"2651792242051038370","tuple":[{"Case":"Some","Fields":["zzz"]}],"stream":"MaybeString","need_task_ids":false}"""+END


Expand All @@ -124,13 +123,13 @@ let ``roundtrip throughput``() =
let (in',out') = JsonIO.startWith (new IO.StreamReader(mem), new IO.StreamWriter(mem)) syncOut ignore

let sw = System.Diagnostics.Stopwatch.StartNew()
job {
async {
for i in {1..count} do
do! Emit(justFields,Some "2651792242051038370",[],"JustFields",Some 1,None) |> out'
Emit(justFields,Some "2651792242051038370",[],"JustFields",Some 1,None) |> out'

mem.Seek(0L, IO.SeekOrigin.Begin) |> ignore
for i in {1..count} do
do! in'() |> Job.Ignore
} |> run
do! in'() |> Async.Ignore
} |> Async.RunSynchronously
sw.Stop()
printf "[Json] Ellapsed: %dms, %f/s\n" sw.ElapsedMilliseconds ((float count)/sw.Elapsed.TotalSeconds)

0 comments on commit 68e9420

Please sign in to comment.