Skip to content

Commit

Permalink
use twitter streaming api heartbeats to monitor connection health
Browse files Browse the repository at this point in the history
  • Loading branch information
EugeneN committed Sep 3, 2017
1 parent 2d2fb6e commit 5ae38d3
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 32 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ buildroot := $(shell pwd)
default:
cat README.md

repl:
cd backend && stack ghci --no-load

setup:
cd backend && stack setup --install-ghc --no-system-ghc
cd frontend && stack setup --install-ghc --no-system-ghc
Expand Down
94 changes: 67 additions & 27 deletions backend/src/BL/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,40 @@ import Web.Twitter.Types.Lens
import Control.Applicative
import Control.Lens

import Control.Lens.Action (act, (^!))
import Control.Lens.Action (act, (^!))
import Control.Monad.Trans.Resource (MonadResource)

import Data.Aeson (FromJSON)
import Data.String (IsString)
import qualified Data.Conduit.Combinators as CC

import Control.Monad
import Control.Monad.IO.Class
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Char8 as BS
import Data.Conduit
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import qualified Data.Text as T
import qualified Data.Text.IO as T
import Data.Time.Clock (UTCTime (..), diffUTCTime,
getCurrentTime, secondsToDiffTime)
import Network.HTTP.Conduit as HTTP
import qualified Data.Conduit as C
import qualified Data.Conduit.Internal as CI
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import Data.Monoid ((<>))
import qualified Data.Text as T
import qualified Data.Text.IO as T
import Data.Time.Clock (UTCTime (..), diffUTCTime,
getCurrentTime,
secondsToDiffTime)
import Network.HTTP.Conduit as HTTP
import Web.Authenticate.OAuth

import qualified BL.Core as BLC
import BL.DataLayer (MyDb, getPrevState, writeTime)
import qualified BL.Core as BLC
import BL.DataLayer (MyDb, getPrevState, writeTime)
import BL.Types
import qualified Config as CFG
import Prelude hiding (error, lookup)
import qualified Config as CFG
import Prelude hiding (error, lookup)
import System.Log.Handler.Simple
import System.Log.Logger
import qualified Web.Twitter.Types as TT
import qualified Web.Twitter.Conduit.Base as WTCB
import qualified Web.Twitter.Conduit.Types as WTCT
import qualified Web.Twitter.Types as TT

logRealm = "Worker"

Expand All @@ -53,7 +63,7 @@ timeoutWorker :: MyDb -> MVar IPCMessage -> IO ThreadId
timeoutWorker db ch = forkIO $ forever $ do
curTime <- getCurrentTime
prevTime <- getPrevTimeFromDb db
saveCurTimeToDb db curTime
-- saveCurTimeToDb db curTime

debug $ "... " ++ show prevTime ++ " : " ++ show curTime
++ " / " ++ show (diffUTCTime curTime prevTime)
Expand All @@ -70,10 +80,10 @@ timeoutWorker db ch = forkIO $ forever $ do
(_, oldPrevTime) <- getPrevState db
return oldPrevTime

saveCurTimeToDb = writeTime

sendUpdateMessage ch = putMVar ch MReloadFeed

saveCurTimeToDb = writeTime

accountFetchWorker :: MVar UpdateMessage -> MVar FeedState -> Cfg -> IO ThreadId
accountFetchWorker accv fv cfg = forkIO $ forever $ do
fetchreq <- takeMVar accv
Expand All @@ -83,29 +93,59 @@ accountFetchWorker accv fv cfg = forkIO $ forever $ do
updateWorker :: MyDb -> MVar FeedState -> MVar UpdateMessage -> Cfg -> IO ThreadId
updateWorker db fv uv cfg = forkIO $ forever $ do
ur <- takeMVar uv
debug $ "***Got an update request at " ++ show ur
debug $ "*** Got an update request at " ++ show ur
-- TODO throttle update requests here
BLC.updateFeedSync db fv cfg


-- copied from http://hackage.haskell.org/package/twitter-conduit-0.2.2.2/docs/src/Web-Twitter-Conduit-Stream.html#stream
-- and added code to record keep-alive messages from the api (\r\n), which are expected to be sent every 30s
-- twitter api doc reccomend to wait 90s (3 keep-alive cycles), and then reconnect immediately should no messages arrive
stream_ :: (MonadResource m)
=> TWInfo
-> HTTP.Manager
-> APIRequest apiName responseType
-> m (ResumableSource m BS.ByteString)
stream_ info mgr req = do
rsrc <- WTCB.getResponse info mgr =<< liftIO (WTCB.makeRequest req)
return $ Web.Twitter.Conduit.responseBody rsrc

saveLatestMessageFromApi :: (Eq t, IsString t) => MyDb -> t -> IO ()
saveLatestMessageFromApi db x = do
curTime <- getCurrentTime
saveCurTimeToDb db curTime
if x == "\r\n" -- aka twitter streaming api's in-band hearbeat protocol message
then debug $ "❤❤❤❤ got heartbeet from streaming api @ " <> show curTime
else debug $ "𝄞♪♫♬ got smth from streaming api @ " <> show curTime

streamWorker :: MyDb -> MVar FeedState -> Cfg -> IO ThreadId
streamWorker db m cfg = forkIO $
withManager$ \mgr -> do
src <- stream (BLC.twInfo cfg) mgr userstream
src C.$$+- CL.mapM_ (^! act (liftIO . handleTL))
rsrc <- stream_ (BLC.twInfo cfg) mgr userstream

let pass = passthroughSink (CL.mapM_ (liftIO . saveLatestMessageFromApi db)) (\_ -> pure ())
let rsrc' = rsrc $=+ pass

let rsrc'' = rsrc' $=+ CL.sequence WTCB.sinkFromJSON
rsrc'' $$+- CL.mapM_ (^! act (liftIO . handleTL))

where
handleTL :: StreamingAPI -> IO ()
handleTL (SStatus s) = handleTweet $ BLC.statusToTweet s
handleTL (SRetweetedStatus s) = handleTweet $ BLC.retweetStatusToTweet s
handleTL (SEvent ev) = handleEvent ev
handleTL s = debug $ "???? got not a tweet: " ++ show s
handleTL (SDelete s) = debug $ "???? got SDelete: " ++ show s
handleTL (SFriends s) = debug $ "???? got SFriends: " ++ show s
handleTL (SDirectMessage s) = debug $ "???? got SDirectMessage: " ++ show s
handleTL (SUnknown s) = debug $ "???? got SUnknown: " ++ show s
handleTL s = debug $ "???? got not a tweet: " ++ show s

handleTweet t = BLC.handleIncomingFeedMessages db m [TweetMessage t]

handleEvent ( TT.Event { evCreatedAt = _
, evTargetObject = _
, evEvent = "follow"
, evTarget = (ETUser user)
, evSource = _}) = BLC.handleIncomingFeedMessages db m [UserMessage user]
handleEvent TT.Event { evCreatedAt = _
, evTargetObject = _
, evEvent = "follow"
, evTarget = (ETUser user)
, evSource = _} = BLC.handleIncomingFeedMessages db m [UserMessage user]

handleEvent x = debug $ "???? got unknown event: " ++ show x
6 changes: 3 additions & 3 deletions backend/src/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ runManager = monitorAppBus
cmd <- takeMVar av
case cmd of
MReloadFeed -> do
info "Reloading feed"
info "⚡⚡⚡⚡ Reloading feed"
restartStreamWorker rs
BLC.updateFeed uv

Expand All @@ -110,11 +110,11 @@ runManager = monitorAppBus
(RunState st db twi maybeSwi hwi uvi afwi fv av uv accv cfg) <- readMVar rs
case maybeSwi of
Just swi -> do
info "Killing old stream worker... "
info "☒☒☒☒ Killing old stream worker... "
killThread swi
info "done"

info "Starting new stream worker... "
info "☑☑☑☑ Starting new stream worker... "
newWorkerId <- streamWorker db fv cfg
info "done"

Expand Down
2 changes: 2 additions & 0 deletions backend/twic2.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Executable twic
, twitter-types-lens
, conduit-extra
, conduit
, conduit-combinators
, twitter-types >= 0.7.0
, unordered-containers
, uuid
Expand All @@ -74,3 +75,4 @@ Executable twic
, configurator
, lens-action
, lens
, resourcet
1 change: 1 addition & 0 deletions frontend/src/Components/Feed.hs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ feedComponent parentControllerE (wsi, wsReady) requestUserInfoU ntU busyU = do
AddNew t -> (old, cur, (unique $ new <> [t]))
ShowNew -> ((unique $ old <> cur), new, [])
ShowOld n -> (allButLast n old, (unique $ last_ n old <> cur), new)
_ -> (old, cur, new)

isFeedOp x = case x of
AddNew _ -> True
Expand Down
1 change: 0 additions & 1 deletion frontend/src/Lib/FW.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,4 @@ appContainer container anApp = do
where
draw :: (l ~ DOM.Node) => (Maybe (VD.VNode l), Maybe (VD.VNode l)) -> IO ()
draw (newVdom, oldVdom) = void . forkIO $ do
print $ "draw " <> show newVdom
VD.patch VD.domAPI container oldVdom newVdom
2 changes: 1 addition & 1 deletion frontend/stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ packages:

- location:
git: https://github.com/EugeneN/haskell-virtualdom.git
commit: f3e192cc77592dedd0cddc55d78a301c29ddf377
commit: c352364fa95939b0a3e7e7b67e2d2dd5df2924b3
extra-dep: true
# - location: '../../haskell-virtualdom'

Expand Down

0 comments on commit 5ae38d3

Please sign in to comment.