Skip to content

Commit

Permalink
Hook version of hedis instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
kakkun61 committed Mar 13, 2024
1 parent 3b786d3 commit b6f051b
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 94 deletions.
7 changes: 5 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ packages:
, utils/exceptions
, vendors/datadog

, /home/kazuki/Projects/hedis

-- https://github.com/vincenthz/hs-memory/pull/93
source-repository-package
type: git
Expand Down Expand Up @@ -77,6 +75,11 @@ source-repository-package
location: https://github.com/herp-inc/herp-logger
tag: v0.3

source-repository-package
type: git
location: https://github.com/kakkun61/hedis
tag: c523b1c2de5cd9ee4f6652f37daca01a1527c2ba

allow-newer:
http-api-data:base
, postgresql-simple:base
Expand Down
4 changes: 2 additions & 2 deletions examples/hedis/main.hs → examples/hedis/client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ main =
let tracer = makeTracer tracerProvider "main" tracerOptions
inSpan tracer "main" defaultSpanArguments $ do
connInfo <- appendHooksToConnectionInfo tracerProvider defaultConnectInfo
conn <- connect connInfo
runRedis conn $ do
connection <- connect connInfo
runRedis connection $ do
void $ set "hello" "hello"
void $ set "world" "world"
hello <- get "hello"
Expand Down
11 changes: 10 additions & 1 deletion examples/hedis/hedis-example.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ common common

executable hedis-client
import: common
main-is: main.hs
main-is: client.hs
hs-source-dirs: .
build-depends: base,
hedis,
hs-opentelemetry-sdk,
hs-opentelemetry-instrumentation-hedis

executable hedis-pubsub
import: common
main-is: pubsub.hs
hs-source-dirs: .
build-depends: base,
hedis,
Expand Down
47 changes: 47 additions & 0 deletions examples/hedis/pubsub.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{-# LANGUAGE OverloadedStrings #-}

import Control.Concurrent (forkIO)
import Control.Exception (bracket)
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Database.Redis
import OpenTelemetry.Instrumentation.Hedis (appendHooksToConnectionInfo)
import qualified OpenTelemetry.Trace as Otel
import System.IO (hFlush, stdout)


main :: IO ()
main =
bracket
Otel.initializeTracerProvider
Otel.shutdownTracerProvider
$ \tracerProvider -> do
let tracer = Otel.makeTracer tracerProvider "hedis-pubsub" Otel.tracerOptions
connInfo <- appendHooksToConnectionInfo tracerProvider defaultConnectInfo
connection <- connect connInfo
void $ forkIO $ Otel.inSpan tracer "single-thread" Otel.defaultSpanArguments $ do
runRedis connection $
pubSub (subscribe ["hello"]) $ \message -> do
Otel.inSpan tracer "single-thread callback" Otel.defaultSpanArguments $ do
putStrLn $ "single-thread: received: " ++ show message
pure mempty
void $ forkIO $ Otel.inSpan tracer "multithread" Otel.defaultSpanArguments $ do
controller <- newPubSubController [("hello", helloMessageCallback tracer)] []
pubSubForever connection controller $ do
putStrLn "subscribed acknowledged"
liftIO $ do
putStrLn "Press enter to publish a message..."
hFlush stdout
void $ getLine -- wait for subscribing
Otel.inSpan tracer "publish" Otel.defaultSpanArguments $ do
runRedis connection $ do
void $ publish "hello" "world"
putStrLn "Press enter to exit after while..."
hFlush stdout
void $ getLine -- wait for transporting spans


helloMessageCallback :: Otel.Tracer -> MessageCallback
helloMessageCallback tracer message = do
Otel.inSpan tracer "multithread callback" Otel.defaultSpanArguments $ do
putStrLn $ "multithread: receive: " ++ show message
2 changes: 1 addition & 1 deletion hie.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ cradle:
- path: "examples/hdbc-mysql/main.hs"
component: "hdbc-mysql-example:exe:hdbc-mysql-example"

- path: "examples/hedis/main.hs"
- path: "examples/hedis/client.hs"
component: "hedis-example:exe:hedis-client"

- path: "examples/http-server/main.hs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ library
hs-opentelemetry-semantic-conventions,
hedis >= 0.14,
bytestring,
text,
thread-local-storage
text
ghc-options: -Wcompat
-Wno-name-shadowing
if impl(ghc >= 6.4)
Expand Down
175 changes: 91 additions & 84 deletions instrumentation/hedis/src/OpenTelemetry/Instrumentation/Hedis.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@ module OpenTelemetry.Instrumentation.Hedis (
appendHooksToConnectionInfo,
) where

import Control.Exception (assert)
import Control.Monad ((>=>))
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.ByteString (ByteString)
import Data.Function ((&))
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Int (Int64)
import qualified Data.TLS.GHC as TLS
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import Data.Version (showVersion)
import qualified Database.Redis as Orig
import GHC.Stack (HasCallStack, withFrozenCallStack)
import qualified OpenTelemetry.Attributes.Map as Otel
import qualified OpenTelemetry.Context.ThreadLocal as Otel
import qualified OpenTelemetry.SemanticConventions as Otel
import qualified OpenTelemetry.Trace.Core as Otel
import Paths_hs_opentelemetry_instrumentation_hedis (version)
Expand All @@ -35,85 +30,46 @@ appendHooksToConnectionInfo tracerProvider connectInfo@Orig.ConnInfo {Orig.conne
tracerProvider
(Otel.InstrumentationLibrary "hs-opentelemetry-instrumentation-hedis" $ Text.pack $ showVersion version)
Otel.tracerOptions
tls <- makeThreadLocalStorage
pure
connectInfo
{ Orig.connectHooks =
Orig.Hooks
{ Orig.presendHook = presendHook tracer tls connectInfo >=> Orig.presendHook connectHooks
, Orig.postsendHook = Orig.postsendHook connectHooks >> postsendHook tls
, Orig.prerecieveHook = prereceiveHook tracer tls connectInfo >> Orig.prerecieveHook connectHooks
, Orig.postrecieveHook = Orig.postrecieveHook connectHooks >=> postrecieveHook tls
, Orig.encallbackHook = encallbackHook tracer connectInfo . Orig.encallbackHook connectHooks
connectHooks
{ Orig.sendRequestHook = sendDatabaseOrPubSubHook tracer connectInfo . Orig.sendRequestHook connectHooks
, Orig.sendPubSubHook = sendDatabaseOrPubSubHook tracer connectInfo . Orig.sendPubSubHook connectHooks
, Orig.callbackHook = callbackHook tracer connectInfo . Orig.callbackHook connectHooks
}
}


type ThreadLocalStorage = TLS.TLS (IORef (Maybe Otel.Span, Maybe Otel.Span))
sendDatabaseOrPubSubHook :: HasCallStack => Otel.Tracer -> Orig.ConnectInfo -> ([ByteString] -> IO a) -> [ByteString] -> IO a
sendDatabaseOrPubSubHook tracer connectInfo send message@[command, channel, _] | elem command ["PUBLISH", "SPUBLISH"] = do
let spanName = decodeBs channel <> " publish"
kind = Otel.Producer
attributes = makePubSubAttributes connectInfo $ Right message
Otel.inSpan tracer spanName Otel.defaultSpanArguments {Otel.kind, Otel.attributes} $ send message
sendDatabaseOrPubSubHook _ _ send message@(command : _) | elem command pubSubCommands = send message
sendDatabaseOrPubSubHook tracer connectInfo send message = do
let spanName = makeDatabaseSpanName message
attributes = makeDatabaseAttributes connectInfo message
kind = Otel.Client
Otel.inSpan tracer spanName Otel.defaultSpanArguments {Otel.kind, Otel.attributes} $ send message


makeThreadLocalStorage :: IO ThreadLocalStorage
makeThreadLocalStorage = TLS.mkTLS $ newIORef (Nothing, Nothing)
callbackHook :: HasCallStack => Otel.Tracer -> Orig.ConnectInfo -> (Orig.Message -> IO Orig.PubSub) -> Orig.Message -> IO Orig.PubSub
callbackHook tracer connectInfo callback message = do
let spanName = decodeBs (Orig.msgChannel message) <> " deliver"
kind = Otel.Consumer
attributes = makePubSubAttributes connectInfo $ Left message
Otel.inSpan tracer spanName Otel.defaultSpanArguments {Otel.kind, Otel.attributes} $ callback message


presendHook :: HasCallStack => Otel.Tracer -> ThreadLocalStorage -> Orig.ConnectInfo -> Orig.PresendHook
presendHook tracer tls connectInfo request = do
spansRef <- TLS.getTLS tls
(maybeSpan, s) <- readIORef spansRef
case maybeSpan of
Nothing -> do
context <- Otel.getContext
let attributes = makeAttributes connectInfo $ Just request
span_ <- Otel.createSpan tracer context "send" Otel.defaultSpanArguments {Otel.kind = Otel.Client, Otel.attributes}
writeIORef spansRef (Just span_, s)
pure request
Just _ -> pure request
makeDatabaseSpanName :: HasCallStack => [ByteString] -> Text
makeDatabaseSpanName (command : _) = decodeBs command
makeDatabaseSpanName _ = error "unexpected"


postsendHook :: ThreadLocalStorage -> Orig.PostsendHook
postsendHook tls = do
spansRef <- TLS.getTLS tls
(maybeSpan, s) <- readIORef spansRef
case maybeSpan of
Just span_ -> do
Otel.endSpan span_ Nothing
writeIORef spansRef (Nothing, s)
Nothing -> assert False $ pure () -- something went wrong


prereceiveHook :: HasCallStack => Otel.Tracer -> ThreadLocalStorage -> Orig.ConnectInfo -> Orig.PrerecieveHook
prereceiveHook tracer tls connectInfo = do
spansRef <- TLS.getTLS tls
(s, maybeSpan) <- readIORef spansRef
case maybeSpan of
Nothing -> do
context <- Otel.getContext
let attributes = makeAttributes connectInfo Nothing
span_ <- Otel.createSpan tracer context "recieve" Otel.defaultSpanArguments {Otel.kind = Otel.Client, Otel.attributes}
writeIORef spansRef (s, Just span_)
Just _ -> pure ()


postrecieveHook :: ThreadLocalStorage -> Orig.PostrecieveHook
postrecieveHook tls reply = do
spansRef <- TLS.getTLS tls
(s, maybeSpan) <- readIORef spansRef
case maybeSpan of
Just span_ -> do
Otel.endSpan span_ Nothing
writeIORef spansRef (s, Nothing)
Nothing -> assert False $ pure () -- something went wrong
pure reply


encallbackHook :: HasCallStack => Otel.Tracer -> Orig.ConnectInfo -> Orig.EncallbackHook
encallbackHook tracer connectInfo callback message = do
let attributes = makeAttributes connectInfo $ Just $ Orig.msgMessage message
Otel.inSpan tracer "sub" Otel.defaultSpanArguments {Otel.kind = Otel.Client, Otel.attributes} $ callback message


makeAttributes :: Orig.ConnectInfo -> Maybe ByteString -> Otel.AttributeMap
makeAttributes Orig.ConnInfo {Orig.connectHost, Orig.connectPort, Orig.connectUsername, Orig.connectDatabase} maybeRequest =
makeDatabaseAttributes :: Orig.ConnectInfo -> [ByteString] -> Otel.AttributeMap
makeDatabaseAttributes Orig.ConnInfo {Orig.connectHost, Orig.connectPort, Orig.connectUsername, Orig.connectDatabase} request =
let
host :: Text
maybePort :: Maybe Int64
Expand All @@ -122,16 +78,7 @@ makeAttributes Orig.ConnInfo {Orig.connectHost, Orig.connectPort, Orig.connectUs
case connectPort of
Orig.PortNumber n -> (Text.pack connectHost, Just $ fromInteger $ toInteger n, "tcp")
Orig.UnixSocket s -> (Text.pack s, Nothing, "unix")
(maybeOperation, maybeStatement) =
case maybeRequest of
Nothing -> (Nothing, Nothing)
Just request ->
let statement = decodeBs request
in ( case Text.splitOn "\r\n" statement of
(_ : _ : op : _) -> Just op
_ -> Nothing
, Just statement
)
statement = Text.unwords $ (\s -> "'" <> s <> "'") . decodeBs <$> request
in
mempty
-- Database Client attributes
Expand All @@ -145,13 +92,73 @@ makeAttributes Orig.ConnInfo {Orig.connectHost, Orig.connectPort, Orig.connectUs
& Otel.insertByKey Otel.server_address host
& maybe id (Otel.insertByKey Otel.server_port) maybePort
-- Database Client Call-level attributes
-- attributes to dissmiss: db.name
& maybe id (Otel.insertByKey Otel.db_operation) maybeOperation
& maybe id (Otel.insertByKey Otel.db_statement) maybeStatement
-- attributes to dissmiss: db.name, db.operation
& Otel.insertByKey Otel.db_statement statement
-- Redis Call-level attributes
& Otel.insertByKey Otel.db_redis_databaseIndex (fromInteger connectDatabase)


makePubSubAttributes :: Orig.ConnectInfo -> Either Orig.Message [ByteString] -> Otel.AttributeMap
makePubSubAttributes Orig.ConnInfo {Orig.connectHost, Orig.connectPort} message =
let
destination :: Text
operation :: Text
message' :: Text
maybePattern :: Maybe Text
(destination, operation, message', maybePattern) =
case message of
Right ["PUBLISH", channel, message''] -> (decodeBs channel, "publish", "'" <> decodeBs message'' <> "'", Nothing)
Right (command : _) -> error $ "unexpected command: " <> show command
Right [] -> error "unexpected"
Left Orig.Message {Orig.msgChannel, Orig.msgMessage} -> (decodeBs msgChannel, "deliver", "'" <> decodeBs msgMessage <> "'", Nothing)
Left Orig.PMessage {Orig.msgPattern, Orig.msgChannel, Orig.msgMessage} -> (decodeBs msgChannel, "deliver", "'" <> decodeBs msgMessage <> "'", Just $ decodeBs msgPattern)
host :: Text
maybePort :: Maybe Int64
transport :: Text
(host, maybePort, transport) =
case connectPort of
Orig.PortNumber n -> (Text.pack connectHost, Just $ fromInteger $ toInteger n, "tcp")
Orig.UnixSocket s -> (Text.pack s, Nothing, "unix")
in
mempty
-- Messaging attributes
-- attributes to dissmiss:
-- - messaging.batch.message_count
-- - messaging.client_id
-- - messaging.destination.template
-- - messaging.message.body.size
-- - messaging.message.conversation_id
-- - messaging.message.envelope.size
-- - messaging.message.id
-- - network.protocol.version
-- - network.type
& Otel.insertByKey Otel.messaging_destination_anonymous False
& Otel.insertByKey Otel.messaging_destination_name destination
& Otel.insertByKey Otel.messaging_destination_temporary False
& Otel.insertByKey Otel.messaging_operation operation
& Otel.insertByKey Otel.messaging_system "redis"
& Otel.insertByKey Otel.network_peer_address host
& maybe id (Otel.insertByKey Otel.network_peer_port) maybePort
& Otel.insertByKey Otel.network_protocol_name "redis"
& Otel.insertByKey Otel.network_transport transport
& Otel.insertByKey Otel.server_address host
& maybe id (Otel.insertByKey Otel.server_port) maybePort
-- Per-message attributes
& Otel.insert "messaging.message" (Otel.AttributeValue $ Otel.TextAttribute message')
& maybe id (Otel.insertByKey "messaging.redis.pattern" . Otel.AttributeValue . Otel.TextAttribute) maybePattern


pubSubCommands :: [ByteString]
pubSubCommands =
[ "SUBSCRIBE"
, "SSUBSCRIBE"
, "PSUBSCRIBE"
, "UNSUBSCRIBE"
, "SUNSUBSCRIBE"
, "PUNSUBSCRIBE"
]


decodeBs :: ByteString -> Text
#if MIN_VERSION_text(2,0,0)
decodeBs = Text.decodeUtf8Lenient
Expand Down
2 changes: 2 additions & 0 deletions stack-ghc-9.2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ extra-deps:
- amazonka-sso-2.0@sha256:902be13b604e4a3b51a9b8e1adc6a32f42322ae11f738a72a8c737b2d0a91a5e,2995
- amazonka-sts-2.0@sha256:5c721083e8d80883a893176de6105c27bbbd8176f467c27ac5f8d548a5e726d8,3209
- crypton-0.34@sha256:9e4b50d79d1fba681befa08151db7223d2b4bb72564853e8530e614105d53a1a,14577
- github: kakkun61/hedis
commit: c523b1c2de5cd9ee4f6652f37daca01a1527c2ba

nix:
enable: true
Expand Down
11 changes: 11 additions & 0 deletions stack-ghc-9.2.yaml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ packages:
size: 23275
original:
hackage: crypton-0.34@sha256:9e4b50d79d1fba681befa08151db7223d2b4bb72564853e8530e614105d53a1a,14577
- completed:
name: hedis
pantry-tree:
sha256: 31961196203986bdf5b4b271923ad8f4f36771de9796778f5196cc518b689366
size: 2678
sha256: 867afb4d2564e0fd5d04a28166113a00ab033f6a1aaa7de5d9b3bb67c1be3baf
size: 79738
url: https://github.com/kakkun61/hedis/archive/c523b1c2de5cd9ee4f6652f37daca01a1527c2ba.tar.gz
version: 0.15.2
original:
url: https://github.com/kakkun61/hedis/archive/c523b1c2de5cd9ee4f6652f37daca01a1527c2ba.tar.gz
snapshots:
- completed:
sha256: a684cdbdf9304b325a503e0fe1d9648e9c18155ce4c7cfebbe8a7f93674e6295
Expand Down
4 changes: 2 additions & 2 deletions stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ packages:
- utils/exceptions
- vendors/datadog

- /home/kazuki/Projects/hedis

extra-deps:
- hspec-2.11.7@sha256:2869580a2a29e7beb6268ea3dc561583f4ae229ed1f47fb1c92e8c09ce35acec,1763
- hspec-core-2.11.7@sha256:90d8873356d7e15f843bc523360e206e8e356ff6b82a1fa4b3889dc31d073ea1,6814
Expand All @@ -56,6 +54,8 @@ extra-deps:
- amazonka-sso-2.0@sha256:902be13b604e4a3b51a9b8e1adc6a32f42322ae11f738a72a8c737b2d0a91a5e,2995
- amazonka-sts-2.0@sha256:5c721083e8d80883a893176de6105c27bbbd8176f467c27ac5f8d548a5e726d8,3209
- crypton-0.34@sha256:9e4b50d79d1fba681befa08151db7223d2b4bb72564853e8530e614105d53a1a,14577
- github: kakkun61/hedis
commit: c523b1c2de5cd9ee4f6652f37daca01a1527c2ba

nix:
enable: true
Expand Down
Loading

0 comments on commit b6f051b

Please sign in to comment.