@@ -175,44 +175,68 @@ static void cb_prom_flush(struct flb_event_chunk *event_chunk,
175175 int add_ts ;
176176 size_t off = 0 ;
177177 flb_sds_t metrics ;
178- cfl_sds_t text ;
178+ cfl_sds_t text = NULL ;
179+ cfl_sds_t tmp = NULL ;
179180 struct cmt * cmt ;
180181 struct prom_exporter * ctx = out_context ;
182+ int ok = CMT_DECODE_MSGPACK_SUCCESS ;
183+
184+ text = flb_sds_create_size (128 );
185+ if (text == NULL ) {
186+ flb_plg_debug (ctx -> ins , "failed to allocate buffer for text representation of metrics" );
187+ FLB_OUTPUT_RETURN (FLB_ERROR );
188+ }
181189
182190 /*
183191 * A new set of metrics has arrived, perform decoding, apply labels,
184192 * convert to Prometheus text format and store the output in the
185193 * hash table for metrics.
194+ * Note that metrics might be concatenated. So, we need to consume
195+ * until the end of event_chunk.
186196 */
187- ret = cmt_decode_msgpack_create (& cmt ,
188- (char * ) event_chunk -> data ,
189- event_chunk -> size , & off );
190- if (ret != 0 ) {
191- FLB_OUTPUT_RETURN (FLB_ERROR );
192- }
197+ while ((ret = cmt_decode_msgpack_create (& cmt ,
198+ (char * ) event_chunk -> data ,
199+ event_chunk -> size , & off )) == ok ) {
193200
194- /* append labels set by config */
195- append_labels (ctx , cmt );
201+ if (ret != 0 ) {
202+ flb_sds_destroy (text );
203+ FLB_OUTPUT_RETURN (FLB_ERROR );
204+ }
196205
197- /* add timestamp in the output format ? */
198- if (ctx -> add_timestamp ) {
199- add_ts = CMT_TRUE ;
200- }
201- else {
202- add_ts = CMT_FALSE ;
203- }
206+ /* append labels set by config */
207+ append_labels (ctx , cmt );
208+
209+ /* add timestamp in the output format ? */
210+ if (ctx -> add_timestamp ) {
211+ add_ts = CMT_TRUE ;
212+ }
213+ else {
214+ add_ts = CMT_FALSE ;
215+ }
204216
205- /* convert to text representation */
206- text = cmt_encode_prometheus_create (cmt , add_ts );
207- if (!text ) {
217+ /* convert to text representation */
218+ tmp = cmt_encode_prometheus_create (cmt , add_ts );
219+ if (!tmp ) {
220+ cmt_destroy (cmt );
221+ flb_sds_destroy (text );
222+ FLB_OUTPUT_RETURN (FLB_ERROR );
223+ }
224+ ret = flb_sds_cat_safe (& text , tmp , flb_sds_len (tmp ));
225+ if (ret != 0 ) {
226+ flb_plg_error (ctx -> ins , "could not concatenate text representant coming from: %s" ,
227+ flb_input_name (ins ));
228+ cmt_encode_prometheus_destroy (tmp );
229+ flb_sds_destroy (text );
230+ cmt_destroy (cmt );
231+ FLB_OUTPUT_RETURN (FLB_ERROR );
232+ }
233+ cmt_encode_prometheus_destroy (tmp );
208234 cmt_destroy (cmt );
209- FLB_OUTPUT_RETURN (FLB_ERROR );
210235 }
211- cmt_destroy (cmt );
212236
213237 if (cfl_sds_len (text ) == 0 ) {
214238 flb_plg_debug (ctx -> ins , "context without metrics (empty)" );
215- cmt_encode_text_destroy (text );
239+ flb_sds_destroy (text );
216240 FLB_OUTPUT_RETURN (FLB_OK );
217241 }
218242
@@ -221,11 +245,11 @@ static void cb_prom_flush(struct flb_event_chunk *event_chunk,
221245 if (ret == -1 ) {
222246 flb_plg_error (ctx -> ins , "could not store metrics coming from: %s" ,
223247 flb_input_name (ins ));
224- cmt_encode_prometheus_destroy (text );
248+ flb_sds_destroy (text );
225249 cmt_destroy (cmt );
226250 FLB_OUTPUT_RETURN (FLB_ERROR );
227251 }
228- cmt_encode_prometheus_destroy (text );
252+ flb_sds_destroy (text );
229253
230254 /* retrieve a full copy of all metrics */
231255 metrics = hash_format_metrics (ctx );
0 commit comments