Skip to content

Commit

Permalink
Working json display prototype.
Browse files Browse the repository at this point in the history
  • Loading branch information
KirinDave committed Jul 12, 2011
1 parent fd6c3da commit 7113d8c
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 22 deletions.
3 changes: 3 additions & 0 deletions Conduit.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ executable Conduit
hs-source-dirs: ., config
build-depends: base >= 4 && < 5
, yesod-core >= 0.8 && < 0.9
, yesod
, yesod-static
, wai-extra
, directory
Expand All @@ -63,5 +64,7 @@ executable Conduit
, containers
, stm
, monadIO
, yesod-json

ghc-options: -Wall -threaded
WS_ACCESS_KEY_ID
10 changes: 9 additions & 1 deletion Controller.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ module Controller
) where

import Tap
import Tap.Redis
import qualified Database.Redis.Redis as Redis
import Settings
import Yesod.Helpers.Static
import Data.ByteString (ByteString)
Expand All @@ -16,6 +18,7 @@ import Data.Dynamic (Dynamic, toDyn)

-- Import all relevant handler modules here.
import Handler.Root
import Handler.Display

-- This line actually creates our YesodSite instance. It is the second half
-- of the call to mkYesodData which occurs in Tap.hs. Please see
Expand All @@ -36,10 +39,15 @@ getRobotsR = return $ RepPlain $ toContent ("User-agent: *" :: ByteString)
-- migrations handled by Yesod.
withTap :: (Application -> IO a) -> IO a
withTap f = do
let h = Tap s
redisC <- localRedis
(ams, _) <- storeChannelsInto redisC ["metrics:statman"]
let h = Tap s ams
toWaiApp h >>= f
where
s = static Settings.staticdir

localRedis :: IO Redis.Redis
localRedis = Redis.connect "127.0.0.1" Redis.defaultPort

withDevelApp :: Dynamic
withDevelApp = toDyn (withTap :: (Application -> IO ()) -> IO ())
36 changes: 36 additions & 0 deletions Handler/Display.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{-# LANGUAGE TemplateHaskell, QuasiQuotes, OverloadedStrings #-}
module Handler.Display where

import Yesod.Json (jsonList, Json(..))
import Yesod.Content

import Tap
import Tap.Redis
import Control.Concurrent.MonadIO (liftIO, MonadIO)
import Database.Redis.Redis (Message(..))
import qualified Data.ByteString as B
import qualified Data.Foldable as F
import qualified Data.Sequence as S
import qualified Data.List as L

-- do Tap _ messages <- getYesod
-- queue <- liftIO $ clearChannel messages

--getDisplayTap :: Handler (ContentType, Content)
getDisplayTap :: Handler RepJson
getDisplayTap = do
Tap _ mstore <- getYesod
queue <- liftIO $ clearChannel mstore
--return (typeJson, toContent $ jsonArrayify queue)
return $ RepJson $ (toContent . jsonArrayify) queue


jsonArrayify :: MessageStore -> B.ByteString
jsonArrayify queue =
let msgList = (buildList $ getMessage `fmap` queue)
csml = L.intersperse "," msgList in
B.concat $ ["["] ++ csml ++ ["]"]
where getMessage (MMessage _ s) = s
getMessage _ = ""
buildList = F.foldr (:) []

3 changes: 2 additions & 1 deletion Handler/Root.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ getRootR = do
defaultLayout $ do
h2id <- lift newIdent
setTitle "Conduit homepage"
addWidget $(widgetFile "homepage")
addWidget $(widgetFile "homepage")

6 changes: 4 additions & 2 deletions Tap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import qualified Data.ByteString as B
-- starts running, such as database connections. Every handler will have
-- access to the data present here.
data Tap = Tap
{ getStatic :: Static -- ^ Settings for static file serving.
{ getStatic :: Static -- ^ Settings for static file serving.
, getMessages :: AtomicMessageStore
}

-- | A useful synonym; most of the handler functions in your application
Expand Down Expand Up @@ -94,4 +95,5 @@ instance Yesod Tap where
let fn' = statictmp ++ fn
exists <- liftIO $ doesFileExist fn'
unless exists $ liftIO $ L.writeFile fn' content
return $ Just $ Right (StaticR $ StaticRoute ["tmp", T.pack fn] [], [])
return $ Just $ Right (StaticR $ StaticRoute ["tmp", T.pack fn] [], [])

49 changes: 31 additions & 18 deletions Tap/Redis.hs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
module Tap.Redis (AtomicMessageStore
, MessageStore
, RedisMessage
, storeChannelsInto) where
, storeChannelsInto
, clearChannel
, checkChannel
, ) where

import Prelude hiding (take)
import Database.Redis.Redis (Message(..), Redis(..), subscribe)
import Data.Enumerator (Iteratee(..), Stream(..), Step(..),
returnI, liftI, continue, yield, ($$),
import Database.Redis.Redis (Message(..), Redis, subscribe)
import Data.Enumerator (Iteratee(..), Stream(..),
continue, yield, ($$),
run_)
import Data.Enumerator.Redis (enumSubscriptions)

import Data.Sequence ((><), Seq(..), fromList, take, empty)
import Data.Sequence ((><), Seq, fromList, take, empty)
import Data.ByteString (ByteString)

import Control.Monad (forM_)
import Control.Concurrent (forkIO, ThreadId(..))
import Control.Concurrent (forkIO, ThreadId)
import Control.Concurrent.MonadIO (liftIO, MonadIO)
import Control.Monad.STM (atomically)
import Control.Concurrent.STM.TVar
Expand All @@ -24,27 +26,38 @@ type RedisMessage = Message ByteString
type MessageStore = Seq RedisMessage
type AtomicMessageStore = TVar MessageStore

max_buffer_size = 64

enqMessages :: AtomicMessageStore -> [RedisMessage] -> IO ()
enqMessages tv xs =
atomically $ do
history <- readTVar tv
let concated = fromList xs >< history in
writeTVar tv (take max_buffer_size concated)
max_buffer_size :: Int
max_buffer_size = 1024

intoHistory :: AtomicMessageStore -> Iteratee RedisMessage IO ()
intoHistory tvar = continue step where
step (Chunks []) = continue step
step (Chunks xs) = (liftIO (enqMessages tvar xs)) >> continue step
step EOF = yield () EOF

enqMessages :: AtomicMessageStore -> [RedisMessage] -> IO ()
enqMessages tv xs =
atomically $ do
history <- readTVar tv
let concated = fromList xs >< history in
writeTVar tv (take max_buffer_size concated)

storeChannelsInto :: Redis -> [String] -> IO (AtomicMessageStore, ThreadId)
storeChannelsInto redis channels = do
subscribe redis channels :: IO [Message ()] -- We ignore the result, we don't care, so don't
-- allocate any space.
_ <- subscribe redis channels :: IO [Message ()] -- We ignore the result, we don't care, so don't.
newStore <- newTVarIO empty
spawned <- forkIO $ run_ ( enumSubscriptions redis 1000 $$ intoHistory newStore )
spawned <- forkIO $ do
run_ ( enumSubscriptions 1000 redis $$ intoHistory newStore )
return (newStore, spawned)

clearChannel :: AtomicMessageStore -> IO MessageStore
clearChannel ams =
atomically $ do
v <- readTVar ams
writeTVar ams empty
return v

checkChannel :: AtomicMessageStore -> IO MessageStore
checkChannel ams =
atomically $ readTVar ams >>= return

2 changes: 2 additions & 0 deletions config/routes
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
/robots.txt RobotsR GET

/ RootR GET

/tap DisplayTap GET

0 comments on commit 7113d8c

Please sign in to comment.