Skip to content

Commit

Permalink
Reduce logging during file upload (#792)
Browse files Browse the repository at this point in the history
* Removes warning

* Updates logging for file upload

* Restores trace for placing block and proof in repo store
  • Loading branch information
benbierens committed Apr 30, 2024
1 parent 9365aa4 commit c7bc28d
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 48 deletions.
20 changes: 7 additions & 13 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,12 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
try:
await b.discoveryQueue.put(cid)
except CatchableError as exc:
trace "Exception in discovery loop", exc = exc.msg
warn "Exception in discovery loop", exc = exc.msg

logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len

trace "About to sleep discovery loop"
await sleepAsync(b.discoveryLoopSleep)

proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
Expand All @@ -87,10 +86,9 @@ proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
await sleepAsync(50.millis)
trace "Iterating blocks finished."

trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep
await sleepAsync(b.advertiseLoopSleep)

trace "Exiting advertise task loop"
info "Exiting advertise task loop"

proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
## Run advertise tasks
Expand All @@ -102,7 +100,6 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
cid = await b.advertiseQueue.get()

if cid in b.inFlightAdvReqs:
trace "Advertise request already in progress", cid
continue

try:
Expand All @@ -111,17 +108,15 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =

b.inFlightAdvReqs[cid] = request
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
trace "Advertising block", cid, inflight = b.inFlightAdvReqs.len
await request

finally:
b.inFlightAdvReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
trace "Advertised block", cid, inflight = b.inFlightAdvReqs.len
except CatchableError as exc:
trace "Exception in advertise task runner", exc = exc.msg
warn "Exception in advertise task runner", exc = exc.msg

trace "Exiting advertise task runner"
info "Exiting advertise task runner"

proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
## Run discovery tasks
Expand Down Expand Up @@ -166,9 +161,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
b.inFlightDiscReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CatchableError as exc:
trace "Exception in discovery task runner", exc = exc.msg
warn "Exception in discovery task runner", exc = exc.msg

trace "Exiting discovery task runner"
info "Exiting discovery task runner"

proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:
Expand All @@ -183,10 +178,9 @@ proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:
if cid notin b.advertiseQueue:
try:
trace "Queueing provide block", cid, queue = b.discoveryQueue.len
b.advertiseQueue.putNoWait(cid)
except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg
warn "Exception queueing discovery request", exc = exc.msg

proc start*(b: DiscoveryEngine) {.async.} =
## Start the discengine task
Expand Down
10 changes: 3 additions & 7 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@ proc blockPresenceHandler*(
not b.peers.anyIt( cid in it.peerHaveCids ))

proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Schedule a task for new blocks", items = blocksDelivery.len

let
cids = blocksDelivery.mapIt( it.blk.cid )

Expand All @@ -277,7 +275,7 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
trace "Unable to schedule task for peer", peer = p.id
warn "Unable to schedule task for peer", peer = p.id

break # do next peer

Expand All @@ -293,7 +291,7 @@ proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
.filterIt(it.failed)

if failed.len > 0:
trace "Failed to send block request cancellations to peers", peers = failed.len
warn "Failed to send block request cancellations to peers", peers = failed.len

proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
var cids = initHashSet[Cid]()
Expand All @@ -309,8 +307,6 @@ proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
return cids.toSeq

proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Resolving blocks", blocks = blocksDelivery.len

b.pendingBlocks.resolve(blocksDelivery)
await b.scheduleTasks(blocksDelivery)
let announceCids = getAnnouceCids(blocksDelivery)
Expand Down Expand Up @@ -618,7 +614,7 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
trace "Got new task from queue", peerId = peerCtx.id
await b.taskHandler(peerCtx)

trace "Exiting blockexc task runner"
info "Exiting blockexc task runner"

proc new*(
T: type BlockExcEngine,
Expand Down
2 changes: 0 additions & 2 deletions codex/blockexchange/engine/pendingblocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ proc resolve*(
trace "Block retrieval time", retrievalDurationUs, address = bd.address
else:
trace "Block handle already finished", address = bd.address
do:
warn "Attempting to resolve block that's not currently a pending block", address = bd.address

proc setInFlight*(
p: PendingBlocksManager,
Expand Down
3 changes: 0 additions & 3 deletions codex/blockexchange/network/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,6 @@ proc sendWantCancellations*(
addresses: seq[BlockAddress]): Future[void] {.async.} =
## Informs a remote peer that we're no longer interested in a set of blocks
##

trace "Sending block request cancellation to peer", addrs = addresses.len, peer = id

await b.sendWantList(id = id, addresses = addresses, cancel = true)

proc handleBlocksDelivery(
Expand Down
5 changes: 1 addition & 4 deletions codex/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,13 @@ method find*(
method provide*(d: Discovery, cid: Cid) {.async, base.} =
## Provide a bock Cid
##

trace "Providing block", cid
let
nodes = await d.protocol.addProvider(
cid.toNodeId(), d.providerRecord.get)

if nodes.len <= 0:
trace "Couldn't provide to any nodes!"
warn "Couldn't provide to any nodes!"

trace "Provided to nodes", nodes = nodes.len

method find*(
d: Discovery,
Expand Down
9 changes: 3 additions & 6 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ proc store*(
## Save stream contents as dataset with given blockSize
## to nodes's BlockStore, and return Cid of its manifest
##
trace "Storing data"
info "Storing data"

let
hcodec = Sha256HashCodec
Expand All @@ -308,8 +308,6 @@ proc store*(
let chunk = await chunker.getBytes();
chunk.len > 0):

trace "Got data from stream", len = chunk.len

without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err:
return failure(err)

Expand All @@ -322,7 +320,7 @@ proc store*(
cids.add(cid)

if err =? (await self.networkStore.putBlock(blk)).errorOption:
trace "Unable to store block", cid = blk.cid, err = err.msg
error "Unable to store block", cid = blk.cid, err = err.msg
return failure(&"Unable to store block {blk.cid}")
except CancelledError as exc:
raise exc
Expand Down Expand Up @@ -353,15 +351,14 @@ proc store*(
codec = dataCodec)

without manifestBlk =? await self.storeManifest(manifest), err:
trace "Unable to store manifest"
error "Unable to store manifest"
return failure(err)

info "Stored data", manifestCid = manifestBlk.cid,
treeCid = treeCid,
blocks = manifest.blocksCount,
datasetSize = manifest.datasetSize

# Announce manifest
await self.discovery.provide(manifestBlk.cid)
await self.discovery.provide(treeCid)

Expand Down
3 changes: 0 additions & 3 deletions codex/stores/networkstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ method putBlock*(
ttl = Duration.none): Future[?!void] {.async.} =
## Store block locally and notify the network
##

trace "Putting block into network store", cid = blk.cid

let res = await self.localStore.putBlock(blk, ttl)
if res.isErr:
return res
Expand Down
17 changes: 7 additions & 10 deletions codex/stores/repostore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ method putCidAndProof*(
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)

trace "Storing block cid and proof with key", key
trace "Storing block cid and proof", blockCid, key

let value = (blockCid, proof).encode()

Expand Down Expand Up @@ -313,7 +313,7 @@ method putBlock*(
return success()

without key =? makePrefixKey(self.postFixLen, blk.cid), err:
trace "Error getting key from provider", err = err.msg
warn "Error getting key from provider", err = err.msg
return failure(err)

if await key in self.repoDs:
Expand All @@ -325,39 +325,36 @@ method putBlock*(
return failure(
newException(QuotaUsedError, "Cannot store block, quota used!"))

trace "Storing block with key", key

var
batch: seq[BatchEntry]

let
used = self.quotaUsedBytes + blk.data.len.uint

if err =? (await self.repoDs.put(key, blk.data)).errorOption:
trace "Error storing block", err = err.msg
error "Error storing block", err = err.msg
return failure(err)

trace "Updating quota", used
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))

without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err:
trace "Unable to create block expiration metadata key", err = err.msg
warn "Unable to create block expiration metadata key", err = err.msg
return failure(err)
batch.add(blockExpEntry)

if err =? (await self.metaDs.put(batch)).errorOption:
trace "Error updating quota bytes", err = err.msg
error "Error updating quota bytes", err = err.msg

if err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block after failed quota update", err = err.msg
error "Error deleting block after failed quota update", err = err.msg
return failure(err)

return failure(err)

self.quotaUsedBytes = used
inc self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
trace "Unable to update block total metadata"
warn "Unable to update block total metadata"
return failure("Unable to update block total metadata")

self.updateMetrics()
Expand Down

0 comments on commit c7bc28d

Please sign in to comment.