Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mergeProd like capability #3

Open
sordina opened this issue Oct 27, 2016 · 13 comments
Open

mergeProd like capability #3

sordina opened this issue Oct 27, 2016 · 13 comments

Comments

@sordina
Copy link

sordina commented Oct 27, 2016

Hello,

I've been trying to accomplish a small program using Conduit, Pipes, Machines, and now Concurrent-Machines. I thought I'd finally found the solution with splitProd, but I now find I really need something more like mergeProd:

ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r

I want to be able to split a source in two, perform different processes on each side (possibly in lock-step), and then merge the results, either automatically with a tuple, or manually with another Process argument.

It doesn't even really need concurrency support as far as I can tell, but the closest thing to what I've been looking for seems to be splitProd so I thought it couldn't hurt to ask if it falls under the scope of concurrent-machines here.

Feel free to close the issue if it's misplaced!

@acowley
Copy link
Owner

acowley commented Oct 27, 2016

So the missing piece for you is that splitProd doesn't merge the result values? It's tricky because one side could produce multiple outputs from a single input, or possibly no outputs. Since that discrepancy needs to be resolved, making the output an Either lets you resolve that however appropriate with a downstream step.

For instance, if we need an output from each side to produce a merged output, we can latch the most recent output from each side to combine with the next output from the other side. If we must only use each output component once, we should clear the latch after using it, or we could keep it latched and re-use it until it can be replaced.

I may not be understanding your needs, but if so I will try to! This is the right place for this Issue.

@sordina
Copy link
Author

sordina commented Oct 27, 2016

You're right that the I've left things ambiguous, sorry about that.

I guess the abstract problem I'm trying to resolve is being able to determine downstream if both values were produced at the same time, which existing Either solutions don't seem very good at.

For example,

If I had a source of FileNames and wanted to use a Checksum process and a Length process to create a Checksum and an accumulating Length process, then I'd like to do something like the following:

checksum_and_length = mergeProd checksumP accLengthP

Now as you were saying, since checksumP could theoretically produce multiple, or zero values, what should we do in those cases? I think I'd be okay with latching the most recent output while the other awaited, or discarding until both processes yielded a value at the same time, maybe that could be explicit by having different versions of the function, such as:

  • mergeProdBothYield
  • mergeProdLatch

Either way you would probably want to stop as soon as one of the processes stopped.

The main problem with trying to accomplish this with Either is that you can' be sure that the Left and Right values corresponded to the same incoming upstream source item, not without some kind of "ID" mechanics.

I have a feeling like it should be fairly easy to implement this with the Step language, but it's still a bit beyond me at present...

Let me know what you think!

P.S. I meant mergeProd to have a signature more like the following:

mergeProd :: Monad m => ProcessT m a b -> ProcessT m a c -> ProcessT m a (b,c)

@sordina
Copy link
Author

sordina commented Oct 27, 2016

Here's my best go so far for non-concurrent machines:

run $ source [1..5] ~> splitProdPair (mapping succ) (mapping (negate . pred)) ~> mapping (uncurry (*))
[0,-3,-8,-15,-24]

mergeProd :: Monad m => MachineT m k a -> ProcessT m a b -> ProcessT m a c -> MachineT m k (b,c)
mergeProd ma pb pc = MachineT $ do
  bv <- runMachineT pb
  cv <- runMachineT pc
  case (bv, cv) of
       (Stop, _) -> return Stop
       (_, Stop) -> return Stop
       (Yield bo bk, Yield co ck) -> return $ Yield (bo,co) (mergeProd ma bk ck)
       (Await bf Refl bff, Await cf Refl cff) -> runMachineT ma >>= \u -> case u of
          Stop          -> runMachineT $ mergeProd stopped bff cff
          Yield o k     -> runMachineT $ mergeProd k (bf o) (cf o)
          Await g kg fg -> return $ Await (\a -> mergeProd (g a) (MachineT (return bv)) (MachineT (return cv)))
                                          kg
                                          (mergeProd fg (MachineT (return bv)) (MachineT (return cv)))
       (Await bf Refl bff, Yield co ck) -> trace "hybrid case 1" (return Stop)
       (Yield bo bk, Await cf Refl cff) -> trace "hybrid case 2" (return Stop)
       _ -> return Stop

@acowley
Copy link
Owner

acowley commented Oct 28, 2016

Here's an idea: we can restrict the type to Monad m => MachineT m k a -> Mealy a b -> Mealy a c -> MachineT m k (b,c). The Mealy type has the advantage that it produces precisely one output for each input. A disadvantage is that, as currently formulated in machines, it can't have side effects. But if you like that direction, we can get that added to machines. A concurrent-machines version of what you want would then be based on your code here with minimal async decorations added to the run calls on the two downstream parts.

This would be in keeping with the general theme that concurrent-machines provides fan outs, whereas machines is more focused on merging multiple upstream sources; i.e. a variation on push vs pull.

@sordina
Copy link
Author

sordina commented Oct 28, 2016

I really like this idea, but my use-case requires that there are side-effects, so Mealy won't be powerful enough. Is there any reason why there isn't a version of Mealy with side-effects?

@acowley
Copy link
Owner

acowley commented Oct 29, 2016

I don't think there's a reason. I found an Issue on the machines repo. I think a PR would be accepted on machines.

@sordina
Copy link
Author

sordina commented Oct 29, 2016

Thanks Anthony,

Would

newtype MealyM m a b = MealyM { runMealyM :: a -> m (b, MealyM m a b) }

be the right type for a MealyM?

@acowley
Copy link
Owner

acowley commented Oct 29, 2016

Yes, and a function to convert one of those to a ProcessT would probably be good if adding it to machines. It can be a class like AutomatonM, but I don't think there's much advantage to that over just a regular function (e.g. autoMealyM).

@sordina
Copy link
Author

sordina commented Oct 29, 2016

Sounds great! I'll try to whip that up today. Thanks a lot for the discussion so far :)

@sordina
Copy link
Author

sordina commented Nov 2, 2016

Hi Anthony,

I couldn't figure out how to do the Automaton implementation for MealyM...

Here's what I've tried, but I just can't get the types to line up correctly:

newtype MealyM m a b = MealyM { runMealyM :: a -> m (b, MealyM m a b) }

instance Functor m => Functor (MealyM m a) where
  {-# INLINE fmap #-}
  fmap f (MealyM m) = MealyM $ \a ->
    fmap (\(x,y) -> (f x, fmap f y)) (m a)

instance Pointed m => Pointed (MealyM m a) where
  {-# INLINE point #-}
  point b = r where r = MealyM (const (point (b, r)))

instance Monad m => Automaton (MealyM m) where
  auto = construct . go
    where
    go (MealyM f) = do
      a      <- await
      (b, m) <- lift $ f a
      yield b
      go m

What do you think could be the issue?

@acowley
Copy link
Owner

acowley commented Nov 2, 2016

It should be AutomatonM as in the Issue linked above.

@sordina
Copy link
Author

sordina commented Nov 2, 2016

Oh I completely missed that somehow...

@sordina
Copy link
Author

sordina commented Nov 2, 2016

Thanks for that, I've now got a fleshed out implementation, although my high-level test-case is still failing, so I assume there's a bug here or there. I'll get some property tests written to track it down next:

https://github.com/sordina/uniqhash/blob/master/Data/Machine/MealyM.hs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants