Skip to content

Commit

Permalink
tidy up
Browse files Browse the repository at this point in the history
  • Loading branch information
Julian Porter committed Nov 10, 2011
1 parent 838cbd8 commit d3b044e
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 101 deletions.
131 changes: 61 additions & 70 deletions MapReduce.cabal
@@ -1,72 +1,63 @@
Name: MapReduce
Version: 0.2
Copyright: (c) 2011, Julian Porter
License: BSD3
License-File: LICENSE
Author: Julian Porter
Maintainer: Julian Porter
Cabal-Version: >= 1.6
Stability: provisional
Synopsis: A framework for MapReduce on multicore systems
Description:
{- A library that defines the 'MapReduce' monad for multicore systems, and exports
the necessary functions.

Mapper / reducers are generalised to functions of type
@a -> ([(s,a)] -> [(s',b)])@ which are combined using the monad's bind
operation. The resulting monad is executed on initial data by invoking
'runMapReduce'.

For programmers only wishing to write conventional map / reduce algorithms,
which use functions of type @([s] -> [(s',b)])@ a wrapper function
'wrapMR' is provided, which converts such a function into the
appropriate monadic function.-}

Category: Concurrency
Build-Type: Simple
Tested-With: GHC==6.10, GHC==7.0.3, GHC==7.2.1
Extra-Source-Files: README

Source-repository head
type: git
location: git://github.com/Julianporter/Haskell-MapReduce.git

Library
Build-Depends: base >=4 && <5,
deepseq, parallel, bytestring, transformers, pureMD5, binary
GHC-Options:
Hs-Source-Dirs: src
Exposed-Modules: Process.MapReduce.Multicore,
Control.Monad2,
Control.Monad2.Trans.MapReduce,
Process.MapReduce.WordCount,
name: MapReduce
version: 0.2
copyright: (c) 2011, Julian Porter
license: BSD3
license-file: LICENSE
author: Julian Porter
maintainer: Julian Porter
cabal-version: >= 1.6
stability: provisional
synopsis: A framework for MapReduce on multicore systems
description: null
category: Concurrency
build-type: Simple
tested-with: GHC==6.10, GHC==7.0.3, GHC==7.2.1
extra-source-files: README

source-repository head
type: git
location: git://github.com/Julianporter/Haskell-MapReduce.git

library
build-depends: base >=4 && <5
ghc-options: null
hs-source-dirs: src
exposed-modules: Process.MapReduce.Multicore,
Process.MapReduce.WordCount,
Process.MapReduce.WordCount.Tests,
Process.MapReduce.WordCount.Documents
Control.Monad2,
Control.Monad2.Trans.MapReduce
exposed: True
extensions: MultiParamTypeClasses,
FlexibleInstances

executable wordcount
main-is: WordCount.hs
build-depends: base >=4 && <5
ghc-options: -threaded
hs-source-dirs: src
other-modules: Process.MapReduce.WordCount,
Process.MapReduce.Multicore

executable testwordcount
main-is: Main.hs
build-depends: base >=4 && <5, QuickCheck
ghc-options: -threaded
hs-source-dirs: src
other-modules: Process.MapReduce.WordCount,
Process.MapReduce.WordCount.Tests,
Process.MapReduce.WordCount.Documents
Exposed: True
Other-Modules: Process.Common.ParallelMap
Extensions: MultiParamTypeClasses,
FlexibleInstances


Executable wordcount
Main-Is: WordCount.hs
Build-Depends: base >=4 && <5
GHC-Options: -threaded
Hs-Source-Dirs: src
Other-modules: Process.MapReduce.WordCount,
Process.MapReduce.Multicore,
Process.Common.ParallelMap

Executable testwordcount
Main-Is: Main.hs
Build-Depends: base >=4 && <5, QuickCheck
GHC-Options: -threaded
Hs-Source-Dirs: src
Other-modules: Process.MapReduce.WordCount,
Process.MapReduce.WordCount.Tests,
Process.MapReduce.WordCount.Documents,
Process.MapReduce.Multicore,
Process.Common.ParallelMap


Process.MapReduce.WordCount.Documents,
Process.MapReduce.Multicore

test-suite testwordcount
type: detailed-0.9
build-depends: base >= 4
hs-source-dirs: src
ghc-options: -threaded
other-modules: Main,
Process.MapReduce.Multicore,
Process.MapReduce.WordCount,
Process.MapReduce.WordCount.Documents,
Process.MapReduce.WordCount.Tests

18 changes: 0 additions & 18 deletions src/Process/Common/ParallelMap.hs

This file was deleted.

33 changes: 20 additions & 13 deletions src/Process/MapReduce/Multicore.hs
Expand Up @@ -27,10 +27,17 @@ import Control.Monad (liftM)
import Control.DeepSeq (NFData)
import System.IO
import Prelude hiding (return,(>>=))
import qualified Process.Common.ParallelMap as P
import Data.Digest.Pure.MD5
import Data.Binary
import qualified Data.ByteString.Lazy as B
import Control.Parallel.Strategies (parMap, rdeepseq)

-- | The parallel map function; it must be functionally identical to 'map',
-- distributing the computation across all available nodes in some way.
pMap :: (NFData b) => (a -> b) -- ^ The function to apply
-> [a] -- ^ Input
-> [b] -- ^ output
pMap = parMap rdeepseq

-- | Generalised version of 'Monad' which depends on a pair of 'Tuple's, both
-- of which change when '>>=' is applied.
Expand Down Expand Up @@ -61,6 +68,7 @@ class MonadG m where
-- Its structure is intentionally opaque to application programmers.
newtype MapReduce s a s' b = MR { runMR :: [(s,a)] -> [(s',b)] }

-- | Make MapReduce into a 'MonadG' instance
instance MonadG MapReduce where
return = retMR
(>>=) = bindMR
Expand All @@ -85,7 +93,7 @@ bindMR f g = MR (\s ->
fs = runMR f s
gs = map g $ nub $ snd <$> fs
in
concat $ P.map (`runMR` fs) gs)
concat $ pMap (`runMR` fs) gs)

-- | Execute a MapReduce MonadG given specified initial data. Therefore, given
-- a 'MapReduce' @m@ and initial data @xs@ we apply the processing represented
Expand All @@ -95,21 +103,21 @@ bindMR f g = MR (\s ->
runMapReduce :: MapReduce s () s' b -- ^ 'MapReduce' representing the required processing
-> [s] -- ^ Initial data
-> [(s',b)] -- ^ Result of applying the processing to the data

runMapReduce m ss = runMR m [(s,()) | s <- ss]

-- | The hash function. Computes the MD5 hash of any 'Hashable' type
hash :: (Binary s) => s -> Int -- ^ computes the hash
hash :: (Binary s) => s -- ^ The value to hash
-> Int -- ^ its hash
hash s = sum $ map fromIntegral (B.unpack h)
where
h = encode $ show (md5 $ encode s)
h = encode (md5 $ encode s)

-- | Function used at the start of processing to determine how many threads of processing
-- to use. Should be used as the starting point for building a 'MapReduce'.
-- Therefore a generic 'MapReduce' should look like
--
-- @distributeMR >>= f1 >>= . . . >>= fn@
distributeMR :: (Binary s) => Int -- ^ Number of threads across which to distribute initial data
-- @'distributeMR' '>>=' f1 '>>=' . . . '>>=' fn@
distributeMR :: (Binary s) => Int -- ^ Number of threads across which to distribute initial data
-> MapReduce s () s Int -- ^ The 'MapReduce' required to do this
distributeMR n = MR (\ss -> [(s,hash s `mod` n) | s <- fst <$> ss])

Expand All @@ -121,14 +129,13 @@ distributeMR n = MR (\ss -> [(s,hash s `mod` n) | s <- fst <$> ss])
-- Therefore the generic 'MapReduce' using only traditional mappers and
-- reducers should look like
--
-- @distributeMR >>= liftMR f1 >>= . . . >>= liftMR fn@
-- @'distributeMR' '>>=' 'liftMR' f1 '>>=' . . . '>>=' 'liftMR' fn@
liftMR :: (Eq a) => ([s] -> [(s',b)]) -- traditional mapper / reducer of signature
-- @([s] -> [(s',b])@
-> a -> MapReduce s a s' b -- the mapper / reducer wrapped as an instance
-- @([s] -> [(s',b)]@
-> a -- the input key
-> MapReduce s a s' b -- the mapper / reducer wrapped as an instance
-- of 'MapReduce'
liftMR f = MR . g
where
g k ss = f $ fst <$> filter (\s -> k == snd s) ss
liftMR f k = MR (\ss -> f $ fst <$> filter (\s -> k == snd s) ss)



Expand Down

0 comments on commit d3b044e

Please sign in to comment.