Skip to content

Commit

Permalink
add RateLimitedStream
Browse files Browse the repository at this point in the history
  • Loading branch information
erantapaa committed Jul 17, 2014
1 parent ed3aff6 commit d2c657c
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 0 deletions.
101 changes: 101 additions & 0 deletions RateLimitedStream.hs
@@ -0,0 +1,101 @@
{-# LANGUAGE BangPatterns, CPP #-}
{-# OPTIONS_GHC -fno-warn-name-shadowing -fwarn-unused-imports #-}
-- -Wall

-- A module for stream processing built on top of Control.Monad.Par

-- (In the future may want to look into the stream interface used by
-- the stream fusion framework.)

module RateLimitedStream
(
Stream, streamFromList, streamMap, streamFold, streamFilter
) where

import Control.Monad.Par.Scheds.Trace as P
import Control.DeepSeq

--------------------------------------------------------------------------------
-- Types

-- <<IList
data IList a
= Nil
| Cons a (IVar (IList a))
| Fork (Par ()) (IVar (IList a))

type Stream a = IVar (IList a)
-- >>

instance NFData a => NFData (IList a) where
-- rnf Nil = r0
rnf Nil = ()
rnf (Cons a b) = rnf a `seq` rnf b
rnf (Fork _ a) = rnf a

-- -----------------------------------------------------------------------------
-- Stream operators

-- <<streamFromList
streamFromList :: NFData a => Int -> [a] -> Par (Stream a)
streamFromList n xs = do
var <- new -- <1>
fork $ loop n xs var -- <2>
return var -- <3>
where
loop k [] var = put var Nil -- <4>
loop k xs var | k <= 0 = do
tail <- new
put var (Fork (loop n xs tail) tail)
loop k (x:xs) var = do -- <5>
tail <- new -- <6>
put var (Cons x tail) -- <7>
loop (k-1) xs tail -- <8>
-- >>

-- <<streamMap
streamMap :: NFData b => (a -> b) -> Stream a -> Par (Stream b)
streamMap fn inp = do out <- new
fork $ loop out inp
return out
where
loop out inp = do
x <- get inp
case x of
Nil -> put out Nil
Cons a inp' -> do out' <- new
put out (Cons (fn a) out')
loop out' inp'
Fork p inp' -> do fork p; loop out inp'
-- >>


-- | Reduce a stream to a single value. This function will not return
-- until it reaches the end-of-stream.
-- <<streamFold
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc inp = do
x <- get inp
case x of
Nil -> return acc
Cons b inp' -> streamFold fn (fn acc b) inp'
Fork p inp' -> do fork p; streamFold fn acc inp'
-- >>

streamFilter :: NFData a => (a -> Bool) -> Stream a -> Par (Stream a)
streamFilter p inp = do
out <- new
fork $ loop out inp
return out
where
loop out inp = do
x <- get inp
case x of
Nil -> put out Nil
Cons a inp'
| p a -> do out' <- new
put_ out (Cons a inp')
loop out' inp'
| otherwise -> loop out inp'
Fork p inp' -> do fork p; loop out inp'

10 changes: 10 additions & 0 deletions parconc-examples.cabal
Expand Up @@ -221,6 +221,16 @@ executable rsa-pipeline
ghc-options: -threaded
default-language: Haskell2010

executable rsa-ratelimited
main-is: rsa-ratelimited.hs
other-modules: ByteStringCompat RateLimitedStream
build-depends: base >= 4.5 && < 4.8
, bytestring >= 0.9 && < 0.11
, monad-par >= 0.3.4 && < 0.4
, deepseq ==1.3.*
ghc-options: -threaded
default-language: Haskell2010

executable fwsparse
main-is: fwsparse.hs
other-modules: SparseGraph MapCompat
Expand Down
103 changes: 103 additions & 0 deletions rsa-ratelimited.hs
@@ -0,0 +1,103 @@
--
-- Derived from a program believed to be originally written by John
-- Launchbury, and incorporating the RSA algorithm which is in the
-- public domain.
--

import System.Environment
import Data.List
import qualified Data.ByteString.Lazy.Char8 as B
import Data.ByteString.Lazy.Char8 (ByteString)
import ByteStringCompat
import Control.Monad.Par.Scheds.Trace

import RateLimitedStream

main = do
[f] <- getArgs
text <- case f of
"-" -> B.getContents
_ -> B.readFile f
B.putStr (pipeline n e d text)

-- example keys, created by makeKey below
n, d, e :: Integer
(n,d,e) = (3539517541822645630044332546732747854710141643130106075585179940882036712515975698104695392573887034788933523673604280427152984392565826058380509963039612419361429882234327760449752708861159361414595229,121492527803044541056704751360974487724009957507650761043424679483464778334890045929773805597614290949,216244483337223224019000724904989828660716358310562600433314577442746058361727768326718965949745599136958260211917551718034992348233259083876505235987999070191048638795502931877693189179113255689722281)


-- <<encrypt
encrypt :: Integer -> Integer -> Stream ByteString -> Par (Stream ByteString)
encrypt n e s = streamMap (B.pack . show . power e n . code) s

decrypt :: Integer -> Integer -> Stream ByteString -> Par (Stream ByteString)
decrypt n d s = streamMap (B.pack . decode . power d n . integer) s
-- >>

integer :: ByteString -> Integer
integer b | Just (i,_) <- B.readInteger b = i

-- <<pipeline
pipeline :: Integer -> Integer -> Integer -> ByteString -> ByteString
pipeline n e d b = runPar $ do
s0 <- streamFromList 2 (chunk (size n) b)
s1 <- encrypt n e s0
s2 <- decrypt n d s1
xs <- streamFold (\x y -> (y : x)) [] s2
return (B.unlines (reverse xs))
-- >>

integers :: [ByteString] -> [Integer]
integers bs = [ i | Just (i,_) <- map B.readInteger bs ]

-------- Converting between Strings and Integers -----------

code :: ByteString -> Integer
code = B.foldl' accum 0
where accum x y = (128 * x) + fromIntegral (fromEnum y)

decode :: Integer -> String
decode n = reverse (expand n)
where expand 0 = []
expand x = toEnum (fromIntegral (x `mod` 128)) : expand (x `div` 128)

chunk :: Int -> ByteString -> [ByteString]
chunk n xs | B.null xs = []
chunk n xs = as : chunk n bs
where (as,bs) = B.splitAt (fromIntegral n) xs

size :: Integer -> Int
size n = (length (show n) * 47) `div` 100 -- log_128 10 = 0.4745

------- Constructing keys -------------------------

makeKeys :: Integer -> Integer -> (Integer, Integer, Integer)
makeKeys r s = (p*q, d, invert ((p-1)*(q-1)) d)
where p = nextPrime r
q = nextPrime s
d = nextPrime (p+q+1)

nextPrime :: Integer -> Integer
nextPrime a = head (filter prime [odd,odd+2..])
where odd | even a = a+1
| True = a
prime p = and [power (p-1) p x == 1 | x <- [3,5,7]]

invert :: Integer -> Integer -> Integer
invert n a = if e<0 then e+n else e
where e=iter n 0 a 1

iter :: Integer -> Integer -> Integer -> Integer -> Integer
iter g v 0 w = v
iter g v h w = iter h w (g `mod` h) (v - (g `div` h)*w)

------- Fast exponentiation, mod m -----------------

power :: Integer -> Integer -> Integer -> Integer
power 0 m x = 1
power n m x | even n = sqr (power (n `div` 2) m x) `mod` m
| True = (x * power (n-1) m x) `mod` m

sqr :: Integer -> Integer
sqr x = x * x


0 comments on commit d2c657c

Please sign in to comment.