Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 7 additions & 30 deletions internal/destregistry/providers/destwebhook/httphelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package destwebhook

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -51,15 +50,8 @@ func ExecuteHTTPRequest(ctx context.Context, client *http.Client, req *http.Requ
// Extract body for error details
var bodyStr string
if delivery.Response != nil {
if body, ok := delivery.Response["body"]; ok {
switch v := body.(type) {
case string:
bodyStr = v
case map[string]interface{}:
if jsonBytes, err := json.Marshal(v); err == nil {
bodyStr = string(jsonBytes)
}
}
if body, ok := delivery.Response["body"].(string); ok {
bodyStr = body
}
}

Expand Down Expand Up @@ -131,27 +123,12 @@ func ClassifyNetworkError(err error) string {
}
}

// ParseHTTPResponse reads and parses the HTTP response body into the delivery.
// ParseHTTPResponse reads the HTTP response body into the delivery as a raw string.
// The body is stored verbatim regardless of content type to preserve data integrity.
func ParseHTTPResponse(delivery *destregistry.Delivery, resp *http.Response) {
bodyBytes, _ := io.ReadAll(resp.Body)

if strings.Contains(resp.Header.Get("Content-Type"), "application/json") {
var response map[string]interface{}
if err := json.Unmarshal(bodyBytes, &response); err != nil {
delivery.Response = map[string]interface{}{
"status": resp.StatusCode,
"body": string(bodyBytes),
}
return
}
delivery.Response = map[string]interface{}{
"status": resp.StatusCode,
"body": response,
}
} else {
delivery.Response = map[string]interface{}{
"status": resp.StatusCode,
"body": string(bodyBytes),
}
delivery.Response = map[string]interface{}{
"status": resp.StatusCode,
"body": string(bodyBytes),
}
}
49 changes: 32 additions & 17 deletions internal/logstore/pglogstore/pglogstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,12 @@ func scanAttemptRecords(rows pgx.Rows) ([]attemptRecordWithPosition, error) {
attemptTime time.Time
attemptNumber int
manual bool
code string
responseData map[string]any
eventTime time.Time
eligibleForRetry bool
eventData string
eventMetadata map[string]string
code string
responseDataStr string
eventTime time.Time
eligibleForRetry bool
eventData string
eventMetadata map[string]string
)

if err := rows.Scan(
Expand All @@ -475,7 +475,7 @@ func scanAttemptRecords(rows pgx.Rows) ([]attemptRecordWithPosition, error) {
&attemptNumber,
&manual,
&code,
&responseData,
&responseDataStr,
&eventTime,
&eligibleForRetry,
&eventData,
Expand All @@ -484,6 +484,13 @@ func scanAttemptRecords(rows pgx.Rows) ([]attemptRecordWithPosition, error) {
return nil, fmt.Errorf("scan failed: %w", err)
}

var responseData map[string]any
if responseDataStr != "" {
if err := json.Unmarshal([]byte(responseDataStr), &responseData); err != nil {
return nil, fmt.Errorf("failed to unmarshal response_data: %w", err)
}
}

// Normalize to UTC for consistent behavior across backends.
attemptTime = attemptTime.UTC()
eventTime = eventTime.UTC()
Expand Down Expand Up @@ -635,12 +642,12 @@ func (s *logStore) RetrieveAttempt(ctx context.Context, req driver.RetrieveAttem
attemptTime time.Time
attemptNumber int
manual bool
code string
responseData map[string]any
eventTime time.Time
eligibleForRetry bool
eventData string
eventMetadata map[string]string
code string
responseDataStr string
eventTime time.Time
eligibleForRetry bool
eventData string
eventMetadata map[string]string
)

err := row.Scan(
Expand All @@ -655,7 +662,7 @@ func (s *logStore) RetrieveAttempt(ctx context.Context, req driver.RetrieveAttem
&attemptNumber,
&manual,
&code,
&responseData,
&responseDataStr,
&eventTime,
&eligibleForRetry,
&eventData,
Expand All @@ -668,6 +675,13 @@ func (s *logStore) RetrieveAttempt(ctx context.Context, req driver.RetrieveAttem
return nil, fmt.Errorf("scan failed: %w", err)
}

var responseData map[string]any
if responseDataStr != "" {
if err := json.Unmarshal([]byte(responseDataStr), &responseData); err != nil {
return nil, fmt.Errorf("failed to unmarshal response_data: %w", err)
}
}

// Normalize to UTC for consistent behavior across backends.
attemptTime = attemptTime.UTC()
eventTime = eventTime.UTC()
Expand Down Expand Up @@ -751,7 +765,7 @@ func (s *logStore) InsertMany(ctx context.Context, entries []*models.LogEntry) e
)
SELECT * FROM unnest(
$1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[],
$8::timestamptz[], $9::integer[], $10::boolean[], $11::text[], $12::jsonb[],
$8::timestamptz[], $9::integer[], $10::boolean[], $11::text[], $12::text[],
$13::timestamptz[], $14::boolean[], $15::text[], $16::jsonb[]
)
ON CONFLICT (time, id) DO UPDATE SET
Expand Down Expand Up @@ -820,7 +834,7 @@ func attemptArrays(entries []*models.LogEntry) []any {
attemptNumbers := make([]int, n)
manuals := make([]bool, n)
codes := make([]string, n)
responseDatas := make([]map[string]any, n)
responseDatas := make([]string, n)
eventTimes := make([]time.Time, n)
eligibleForRetries := make([]bool, n)
eventDatas := make([]string, n)
Expand All @@ -841,7 +855,8 @@ func attemptArrays(entries []*models.LogEntry) []any {
attemptNumbers[i] = a.AttemptNumber
manuals[i] = a.Manual
codes[i] = a.Code
responseDatas[i] = a.ResponseData
responseDataJSON, _ := json.Marshal(a.ResponseData)
responseDatas[i] = string(responseDataJSON)
eventTimes[i] = e.Time
eligibleForRetries[i] = e.EligibleForRetry
eventDatas[i] = string(e.Data)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;

ALTER TABLE attempts ALTER COLUMN response_data TYPE jsonb USING response_data::jsonb;

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
BEGIN;

-- =============================================================================
-- Migration: Change response_data column from JSONB to TEXT
--
-- Matches the approach taken in 000006 for events.data and attempts.event_data.
-- Storing response_data as TEXT avoids JSONB key reordering and keeps the
-- serialization boundary in application code rather than the database.
--
-- Columns changed:
-- attempts.response_data (JSONB -> TEXT)
-- =============================================================================

ALTER TABLE attempts ALTER COLUMN response_data TYPE text USING response_data::text;

COMMIT;
Loading