/
conn_streaming_ingest.go
75 lines (63 loc) · 2.2 KB
/
conn_streaming_ingest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package azkustodata
import (
"context"
"fmt"
"github.com/Azure/azure-kusto-go/azkustodata/errors"
"github.com/google/uuid"
"io"
"net/url"
"time"
)
type DataFormatForStreaming interface {
CamelCase() string
KnownOrDefault() DataFormatForStreaming
}
var (
streamingIngestDefaultTimeout = 10 * time.Minute
)
func (c *Conn) StreamIngest(ctx context.Context, db, table string, payload io.Reader, format DataFormatForStreaming, mappingName string, clientRequestId string, isBlobUri bool) error {
streamUrl, err := url.Parse(c.endStreamIngest.String())
if err != nil {
return errors.ES(errors.OpIngestStream, errors.KClientArgs, "could not parse the stream endpoint(%s): %s", c.endStreamIngest.String(), err).SetNoRetry()
}
path, err := url.JoinPath(streamUrl.Path, db, table)
if err != nil {
return errors.ES(errors.OpIngestStream, errors.KClientArgs, "could not join the stream endpoint(%s) with the db(%s) and table(%s): %s", c.endStreamIngest.String(), db, table, err).SetNoRetry()
}
streamUrl.Path = path
qv := url.Values{}
if mappingName != "" {
qv.Add("mappingName", mappingName)
}
qv.Add("streamFormat", format.KnownOrDefault().CamelCase())
if isBlobUri {
qv.Add("sourceKind", "uri")
}
streamUrl.RawQuery = qv.Encode()
var closeablePayload io.ReadCloser
var ok bool
if closeablePayload, ok = payload.(io.ReadCloser); !ok {
closeablePayload = io.NopCloser(payload)
}
if clientRequestId == "" {
clientRequestId = "KGC.executeStreaming;" + uuid.New().String()
}
properties := requestProperties{}
properties.ClientRequestID = clientRequestId
headers := c.getHeaders(properties)
headers.Del("Content-Type")
if !isBlobUri {
headers.Add("Content-Encoding", "gzip")
}
if _, ok := ctx.Deadline(); !ok {
ctx, _ = context.WithTimeout(ctx, streamingIngestDefaultTimeout)
}
_, body, err := c.doRequestImpl(ctx, errors.OpIngestStream, streamUrl, closeablePayload, headers, fmt.Sprintf("With db: %s, table: %s, mappingName: %s, clientRequestId: %s", db, table, mappingName, clientRequestId))
if body != nil {
body.Close()
}
if err != nil {
return errors.ES(errors.OpIngestStream, errors.KHTTPError, "streaming ingestion failed: endpoint(%s): %s", streamUrl.String(), err)
}
return nil
}