New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
analyze : evaluate streaming
for RFrame
#35
Comments
Streamly seems to have some good performances compared to the other streaming libraries. |
I like Streamly! And it's easier to work with (IMO) than Streaming. I've used both and compared (a little) for my map-reduce-folds library. It often seems faster than Vector as well. |
I think some operations that would constitute good benchmarks would be: |
I wrote the following benchmark. So far two things are tested:
{-# LANGUAGE GADTs #-}
module Main where
import Streamly
import qualified Streamly.Prelude as S
import Control.Monad.Catch (MonadThrow (..))
import Data.Function ((&))
import Data.Traversable (sequence)
import Control.Monad
import Analyze.Common (Data (..), DuplicateKeyError (..), RowSizeMismatch (..))
import qualified Analyze as A
import qualified Analyze.Common as AC
import qualified Data.Vector as V
import Data.Vector (Vector)
import qualified Data.Text as T
import Data.Text (Text)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HM
import qualified Data.HashSet as HS
import System.Random
import qualified System.Random.MWC as M
import qualified Criterion.Main as C
import Control.DeepSeq (deepseq)
-- implementation details required
data RFrameUpdateS k v m where
RFrameUpdateS :: MonadThrow m => {
_rframeUpdateKeys :: !(Vector k),
_rframeUpdateData :: !(SerialT m (Vector v))
} -> RFrameUpdateS k v m
data RFrameS k v m where
RFrameS :: MonadThrow m => {
_rframeKeys :: !(Vector k),
_rframeLookup :: !(HashMap k Int),
_rframeData :: !(SerialT m (Vector v))
} -> RFrameS k v m
fromUpdate :: (Data k, MonadThrow m) => RFrameUpdateS k v m -> m (RFrameS k v m)
fromUpdate (RFrameUpdateS ks vs) = AC.checkForDupes ks >> pure (RFrameS ks (AC.makeLookup ks) vs)
update :: (Data k, MonadThrow m) => RFrameUpdateS k v m -> RFrameS k v m -> m (RFrameS k v m)
update (RFrameUpdateS uks uvs) (RFrameS fks _ fvs) = do
fSize <- S.length fvs
uSize <- S.length uvs
if fSize /= uSize
then throwM (RowSizeMismatch fSize uSize)
else do
AC.checkForDupes uks
let kis = AC.mergeKeys fks uks
ks' = (\(k, _, _) -> k) <$> kis
look' = AC.makeLookup ks'
vs' = S.zipWith (AC.runIndexedLookup kis) fvs uvs
return (RFrameS ks' look' vs')
-- only concerned with generating data
n :: Int
n = 1000
testKeys :: IO (Vector Text)
testKeys = V.replicateM n $ liftM (T.pack . take 10 . randomRs ('a','z')) newStdGen
testData :: IO (Vector (Vector Double))
testData = do
gen <- M.create
V.replicateM n $ M.uniformVector gen n
testDataS :: IO (SerialT IO (Vector Double))
testDataS = do
vec <- testData
return $ S.fromFoldable vec
-- the actual benchmarks
cmprVec :: IO Bool
cmprVec = do
keys <- testKeys
dat <- testData
let
update = A.RFrameUpdate keys dat
frame1 <- A.fromUpdate update
frame2 <- A.fromUpdate update
return $ frame1 == frame2
cmprStream :: IO Bool
cmprStream = do
keys <- testKeys
dat <- testDataS
let
update = RFrameUpdateS keys dat
frame1 <- fromUpdate update
frame2 <- fromUpdate update
let
sameKeys = _rframeKeys frame1 == _rframeKeys frame2
sameLookup = _rframeLookup frame1 == _rframeLookup frame2
dat1 = _rframeData frame1
dat2 = _rframeData frame2
sameDat <- (S.zipWith (==) dat1 dat2 & S.notElem False)
return $ sameKeys && sameLookup && sameDat
cmpr f1 f2 = do
let
sameKeys = _rframeKeys f1 == _rframeKeys f2
sameLookup = _rframeLookup f1 == _rframeLookup f2
dat1 = _rframeData f1
dat2 = _rframeData f2
sameDat <- (S.zipWith (==) dat1 dat2 & S.notElem False)
return $ sameKeys && sameLookup && sameDat
takeRowsVec :: IO [Bool]
takeRowsVec = do
keys <- testKeys
dat <- testData
let
update = A.RFrameUpdate keys dat
frame <- A.fromUpdate update
let
postTake = zipWith A.takeRows ((div n) <$> [5,4,3,2,1]) (repeat frame)
return $ zipWith (==) (repeat frame) postTake
-- | Takes first 'n' rows of an 'RFrame'.
takeRows n (RFrameS ks look srm) = RFrameS ks look (S.take n srm)
takeRowsS :: IO [Bool]
takeRowsS = do
keys <- testKeys
dat <- testDataS
let
update = RFrameUpdateS keys dat
frame <- fromUpdate update
let
postTake = zipWith takeRows ((div n) <$> [5,4,3,2,1]) (repeat frame)
sequence $ zipWith cmpr (repeat frame) postTake
main :: IO()
main = C.defaultMain [ C.bgroup "Tests" [ C.bench "Vec" $ C.whnfIO cmprVec
, C.bench "Stream" $ C.whnfIO cmprStream
, C.bench "takeRowsVec" $ C.whnfIO takeRowsVec
, C.bench "takeRowsStream" $ C.whnfIO takeRowsS]]
I get this (at most a 5% improvement, in the worst case 20% decrease in performance)
It is very likely that I'm doing something wrong though, but I'm not sure where. |
Thank you @Magalame ! Well a 10% average speed improvement is already pretty cool but I think the real selling point of streaming would be if the resulting program used "constant" (with respect to the stream length) memory. Could you try adding a |
You're very welcome @ocramz !
So
1000:
1500:
|
I took a look at the original repo, and I stumbled upon ejconlon/analyze#3 if it's of any relevance here |
this is where it gets tricky and possibly more eyes on the problem would be beneficial. @Magalame Could you send in your benchmark code as a PR? Thanks! |
The RFrame type currently stores the frame entries as a Vector of Vectors (each inner vector being a data row). It would be nice to evaluate the performance of this way of storing with that of a streaming library (e.g.
Stream (Of (Vector v)) m ()
).The text was updated successfully, but these errors were encountered: