Skip to content

Commit

Permalink
Resolve PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Anmol Vijaywargiya committed Sep 3, 2021
1 parent ecc97bd commit 856d06a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 23 deletions.
2 changes: 0 additions & 2 deletions src/ziggurat/kafka_consumer/consumer_handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
(try
(when (not-empty batch)
(clog/info {:batch-size batch-size} (format "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size))
;(log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size)
(let [start-time (Instant/now)
result (batch-handler batch)
time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))]
Expand All @@ -62,7 +61,6 @@
success-count (- batch-size (+ to-be-retried-count skip-count))]

(clog/info {:messages-successfully-processed success-count :messages-skipped skip-count :messages-to-be-retried to-be-retried-count} (format "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" topic-entity success-count skip-count to-be-retried-count))
;(log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" topic-entity success-count skip-count to-be-retried-count)
(publish-batch-process-metrics topic-entity batch-size success-count skip-count to-be-retried-count time-taken-in-millis)
(retry messages-to-be-retried current-retry-count topic-entity))))
(catch InvalidReturnTypeException e
Expand Down
43 changes: 22 additions & 21 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,27 @@
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
(clog/with-logging-context {:consumer-group topic-entity-name :channel channel-name} (nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry-for-channel message-payload channel))
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message-payload channel)
(report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags))))))))
(clog/with-logging-context {:consumer-group topic-entity-name :channel channel-name}
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry-for-channel message-payload channel))
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message-payload channel)
(report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags))))))))

(defrecord MessagePayload [message topic-entity])

0 comments on commit 856d06a

Please sign in to comment.