From 3f73f7434a415385d696adcb944d9216054bc43f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kmie=C4=87?= Date: Thu, 22 May 2025 11:19:35 +0000 Subject: [PATCH 1/3] Added ImportDocuments --- v2/arangodb/collection_documents.go | 2 + v2/arangodb/collection_documents_impl.go | 3 + v2/arangodb/collection_documents_import.go | 204 ++++++++++++++++++ .../collection_documents_import_impl.go | 88 ++++++++ v2/arangodb/shared.go | 2 + v2/connection/connection_http_internal.go | 2 +- .../database_collection_doc_import_test.go | 113 ++++++++++ 7 files changed, 413 insertions(+), 1 deletion(-) create mode 100644 v2/arangodb/collection_documents_import.go create mode 100644 v2/arangodb/collection_documents_import_impl.go create mode 100644 v2/tests/database_collection_doc_import_test.go diff --git a/v2/arangodb/collection_documents.go b/v2/arangodb/collection_documents.go index 7fa06eed..c627374b 100644 --- a/v2/arangodb/collection_documents.go +++ b/v2/arangodb/collection_documents.go @@ -33,4 +33,6 @@ type CollectionDocuments interface { CollectionDocumentUpdate CollectionDocumentReplace CollectionDocumentDelete + + CollectionDocumentImport } diff --git a/v2/arangodb/collection_documents_impl.go b/v2/arangodb/collection_documents_impl.go index 32830e33..4fd3c24d 100644 --- a/v2/arangodb/collection_documents_impl.go +++ b/v2/arangodb/collection_documents_impl.go @@ -37,6 +37,7 @@ func newCollectionDocuments(collection *collection) *collectionDocuments { d.collectionDocumentCreate = newCollectionDocumentCreate(d.collection) d.collectionDocumentDelete = newCollectionDocumentDelete(d.collection) + d.collectionDocumentImport = newCollectionDocumentImport(d.collection) return d } @@ -52,6 +53,8 @@ type collectionDocuments struct { *collectionDocumentRead *collectionDocumentCreate *collectionDocumentDelete + + *collectionDocumentImport } func (c collectionDocuments) DocumentExists(ctx context.Context, key string) (bool, error) { diff --git a/v2/arangodb/collection_documents_import.go b/v2/arangodb/collection_documents_import.go new file mode 100644 index 00000000..aa6a97c7 --- /dev/null +++ b/v2/arangodb/collection_documents_import.go @@ -0,0 +1,204 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package arangodb + +import ( + "context" + + "github.com/arangodb/go-driver/v2/connection" +) + +const ( + QueryFromPrefix = "fromPrefix" + QueryToPrefix = "toPrefix" + QueryComplete = "complete" + QueryOnDuplicate = "onDuplicate" +) + +// CollectionDocumentDelete removes document(s) with given key(s) from the collection +// https://docs.arangodb.com/stable/develop/http-api/documents/#remove-a-document +type CollectionDocumentImport interface { + + // ImportDocuments imports one or more documents into the collection. // TODO FIX + // The document data is loaded from the given documents argument, statistics are returned. + // The documents argument can be one of the following: + // - An array of structs: All structs will be imported as individual documents. + // - An array of maps: All maps will be imported as individual documents. + // To wait until all documents have been synced to disk, prepare a context with `WithWaitForSync`. + // To return details about documents that could not be imported, prepare a context with `WithImportDetails`. + ImportDocuments(ctx context.Context, documents string, documentsType CollectionDocumentImportDocumentType) (CollectionDocumentImportResponse, error) + ImportDocumentsWithOptions(ctx context.Context, documents string, documentsType CollectionDocumentImportDocumentType, options *CollectionDocumentImportOptions) (CollectionDocumentImportResponse, error) +} + +type CollectionDocumentImportResponse struct { + CollectionDocumentImportStatistics `json:",inline"` +} + +// ImportDocumentRequest holds Query parameters for /import. +type CollectionDocumentImportRequest struct { + CollectionDocumentImportOptions `json:",inline"` + Collection *string `json:"collection,inline"` + Type *CollectionDocumentImportDocumentType `json:"type,inline"` +} + +// ImportDocumentOptions holds optional options that control the import document process. +type CollectionDocumentImportOptions struct { + // FromPrefix is an optional prefix for the values in _from attributes. If specified, the value is automatically + // prepended to each _from input value. This allows specifying just the keys for _from. + FromPrefix *string `json:"fromPrefix,omitempty"` + // ToPrefix is an optional prefix for the values in _to attributes. If specified, the value is automatically + // prepended to each _to input value. This allows specifying just the keys for _to. + ToPrefix *string `json:"toPrefix,omitempty"` + // Overwrite is a flag that if set, then all data in the collection will be removed prior to the import. + // Note that any existing index definitions will be preseved. + Overwrite *bool `json:"overwrite,omitempty"` + // OnDuplicate controls what action is carried out in case of a unique key constraint violation. + // Possible values are: + // - ImportOnDuplicateError + // - ImportOnDuplicateUpdate + // - ImportOnDuplicateReplace + // - ImportOnDuplicateIgnore + OnDuplicate *CollectionDocumentImportOnDuplicate `json:"onDuplicate,omitempty"` + // Complete is a flag that if set, will make the whole import fail if any error occurs. + // Otherwise the import will continue even if some documents cannot be imported. + Complete *bool `json:"complete,omitempty"` + + // Wait until the deletion operation has been synced to disk. + WithWaitForSync *bool +} + +type CollectionDocumentImportDocumentType string + +const ( + // ImportDocumentTypeDocuments + // Each line is expected to be one JSON object. + // example : + // {"_key":"john","name":"John Smith","age":35} + // {"_key":"katie","name":"Katie Foster","age":28} + ImportDocumentTypeDocuments CollectionDocumentImportDocumentType = CollectionDocumentImportDocumentType("documents") + + // ImportDocumentTypeArray + // The request body is expected to be a JSON array of objects. + // example : + // [ + // {"_key":"john","name":"John Smith","age":35}, + // {"_key":"katie","name":"Katie Foster","age":28} + // ] + ImportDocumentTypeArray CollectionDocumentImportDocumentType = CollectionDocumentImportDocumentType("array") + + // ImportDocumentTypeAuto + // Automatically determines the type either documents(ImportDocumentTypeDocumentsError) or array(ImportDocumentTypeArrayError) + ImportDocumentTypeAuto CollectionDocumentImportDocumentType = CollectionDocumentImportDocumentType("auto") + + // ImportDocumentTypeTabular + // The first line is an array of strings that defines the attribute keys. The subsequent lines are arrays with the attribute values. + // The keys and values are matched by the order of the array elements. + // example: + // ["_key","name","age"] + // ["john","John Smith",35] + // ["katie","Katie Foster",28] + ImportDocumentTypeTabular CollectionDocumentImportDocumentType = CollectionDocumentImportDocumentType("") +) + +type CollectionDocumentImportOnDuplicate string + +const ( + // ImportOnDuplicateError will not import the current document because of the unique key constraint violation. + // This is the default setting. + ImportOnDuplicateError CollectionDocumentImportOnDuplicate = CollectionDocumentImportOnDuplicate("error") + // ImportOnDuplicateUpdate will update an existing document in the database with the data specified in the request. + // Attributes of the existing document that are not present in the request will be preserved. + ImportOnDuplicateUpdate CollectionDocumentImportOnDuplicate = CollectionDocumentImportOnDuplicate("update") + // ImportOnDuplicateReplace will replace an existing document in the database with the data specified in the request. + ImportOnDuplicateReplace CollectionDocumentImportOnDuplicate = CollectionDocumentImportOnDuplicate("replace") + // ImportOnDuplicateIgnore will not update an existing document and simply ignore the error caused by a unique key constraint violation. + ImportOnDuplicateIgnore CollectionDocumentImportOnDuplicate = CollectionDocumentImportOnDuplicate("ignore") +) + +// CollectionDocumentImportResponse holds statistics of an import action. +type CollectionDocumentImportStatistics struct { + // Created holds the number of documents imported. + Created int64 `json:"created,omitempty"` + // Errors holds the number of documents that were not imported due to an error. + Errors int64 `json:"errors,omitempty"` + // Empty holds the number of empty lines found in the input (will only contain a value greater zero for types documents or auto). + Empty int64 `json:"empty,omitempty"` + // Updated holds the number of updated/replaced documents (in case onDuplicate was set to either update or replace). + Updated int64 `json:"updated,omitempty"` + // Ignored holds the number of failed but ignored insert operations (in case onDuplicate was set to ignore). + Ignored int64 `json:"ignored,omitempty"` + // if query parameter details is set to true, the result will contain a details attribute which is an array + // with more detailed information about which documents could not be inserted. + Details []string +} + +func (c *CollectionDocumentImportOptions) modifyRequest(r connection.Request) error { + if c == nil { + return nil + } + + if c.FromPrefix != nil { + r.AddQuery(QueryFromPrefix, *c.FromPrefix) + } + + if c.ToPrefix != nil { + r.AddQuery(QueryToPrefix, *c.ToPrefix) + } + + if c.Overwrite != nil { + r.AddQuery(QueryOverwrite, boolToString(*c.Overwrite)) + } + + if c.OnDuplicate != nil { + r.AddQuery(QueryOnDuplicate, string(*c.OnDuplicate)) + } + + if c.Complete != nil { + r.AddQuery(QueryComplete, boolToString(*c.Complete)) + } + + if c.WithWaitForSync != nil { + r.AddQuery(QueryWaitForSync, boolToString(*c.WithWaitForSync)) + } + + return nil +} + +func (c *CollectionDocumentImportRequest) modifyRequest(r connection.Request) error { + if c == nil { + return nil + } + + if c.Collection != nil { + r.AddQuery(QueryCollection, *c.Collection) + } + + if c.Type != nil && string(*c.Type) != "" { + r.AddQuery(QueryType, string(*c.Type)) + } + + r.AddHeader(connection.ContentType, "text/plain") + r.AddHeader("Accept", "text/plain") + + c.CollectionDocumentImportOptions.modifyRequest(r) + + return nil +} diff --git a/v2/arangodb/collection_documents_import_impl.go b/v2/arangodb/collection_documents_import_impl.go new file mode 100644 index 00000000..428637fb --- /dev/null +++ b/v2/arangodb/collection_documents_import_impl.go @@ -0,0 +1,88 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package arangodb + +import ( + "context" + "fmt" + "net/http" + "reflect" + + "github.com/arangodb/go-driver/v2/arangodb/shared" + "github.com/arangodb/go-driver/v2/connection" + "github.com/pkg/errors" +) + +func newCollectionDocumentImport(collection *collection) *collectionDocumentImport { + return &collectionDocumentImport{ + collection: collection, + } +} + +var _ CollectionDocumentImport = &collectionDocumentImport{} + +type collectionDocumentImport struct { + collection *collection +} + +func (c collectionDocumentImport) ImportDocuments(ctx context.Context, documents string, documentsType CollectionDocumentImportDocumentType) (CollectionDocumentImportResponse, error) { + return c.ImportDocumentsWithOptions(ctx, documents, documentsType, nil) +} + +func (c collectionDocumentImport) ImportDocumentsWithOptions(ctx context.Context, documents string, documentsType CollectionDocumentImportDocumentType, opts *CollectionDocumentImportOptions) (CollectionDocumentImportResponse, error) { + documentsVal := reflect.ValueOf(documents) + switch documentsVal.Kind() { + case reflect.String: + // OK + default: + return CollectionDocumentImportResponse{}, errors.WithStack(shared.InvalidArgumentError{Message: fmt.Sprintf("documents data The body must either be a JSON-encoded array of objects or a string with multiple JSON objects separated by newlines got %s", documentsVal.Kind())}) + } + + url := c.collection.db.url("_api/import") + // print(url) + + var response struct { + shared.ResponseStruct `json:",inline"` + CollectionDocumentImportResponse `json:",inline"` + } + + request := &CollectionDocumentImportRequest{ + Collection: &c.collection.name, + Type: &documentsType, + } + if opts != nil { + request.CollectionDocumentImportOptions = *opts + } + resp, err := connection.CallPost( + ctx, c.collection.connection(), url, &response, + []byte(documents), c.collection.withModifiers(request.modifyRequest)...) + + if err != nil { + return CollectionDocumentImportResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusCreated: + return response.CollectionDocumentImportResponse, nil + default: + return CollectionDocumentImportResponse{}, response.AsArangoErrorWithCode(code) + } +} diff --git a/v2/arangodb/shared.go b/v2/arangodb/shared.go index 7ccedf85..ada024c6 100644 --- a/v2/arangodb/shared.go +++ b/v2/arangodb/shared.go @@ -40,6 +40,8 @@ const ( QueryOverwriteMode = "overwriteMode" QueryVersionAttribute = "versionAttribute" QueryIsRestore = "isRestore" + QueryCollection = "collection" + QueryType = "type" ) // PrimarySortCompression Defines how to compress the primary sort data (introduced in v3.7.1) diff --git a/v2/connection/connection_http_internal.go b/v2/connection/connection_http_internal.go index 1c650ca7..4abd058a 100644 --- a/v2/connection/connection_http_internal.go +++ b/v2/connection/connection_http_internal.go @@ -250,7 +250,7 @@ func (j *httpConnection) stream(ctx context.Context, req *httpRequest) (*httpRes ctx = context.Background() } - reader := j.bodyReadFunc(j.Decoder(j.contentType), req, j.streamSender) + reader := j.bodyReadFunc(j.Decoder(req.headers["content-type"]), req, j.streamSender) r, err := req.asRequest(ctx, reader) if err != nil { return nil, nil, errors.WithStack(err) diff --git a/v2/tests/database_collection_doc_import_test.go b/v2/tests/database_collection_doc_import_test.go new file mode 100644 index 00000000..e5815302 --- /dev/null +++ b/v2/tests/database_collection_doc_import_test.go @@ -0,0 +1,113 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package tests + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/arangodb/go-driver/v2/arangodb" +) + +type DocWithNameCode struct { + Key string `json:"_key,omitempty"` + Name string `json:"name"` + Age int `json:"age"` +} + +func Test_DatabaseCollectionDocImport(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + WithDatabase(t, client, nil, func(db arangodb.Database) { + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + doc := `{"_key":"john","name":"John Smith","age":35} +{"_key":"katie","name":"Katie Foster","age":28} +` + + resp, err := col.ImportDocuments(ctx, doc, arangodb.ImportDocumentTypeDocuments) + require.NoError(t, err) + _ = resp + + var obj DocWithNameCode + meta, err := col.ReadDocument(ctx, "john", &obj) + require.NoError(t, err) + require.Equal(t, "john", meta.Key) + require.Equal(t, 35, obj.Age) + + meta, err = col.ReadDocument(ctx, "katie", &obj) + require.NoError(t, err) + require.Equal(t, "katie", meta.Key) + require.Equal(t, 28, obj.Age) + }) + }) + + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // type = array / auto + doc := `[ + {"_key":"john","name":"John Smith","age":35}, + {"_key":"katie","name":"Katie Foster","age":28} +] +` + _, err := col.ImportDocuments(ctx, doc, arangodb.ImportDocumentTypeArray) + require.NoError(t, err) + + var obj DocWithNameCode + meta, err := col.ReadDocument(ctx, "john", &obj) + require.NoError(t, err) + require.Equal(t, "john", meta.Key) + require.Equal(t, 35, obj.Age) + + meta, err = col.ReadDocument(ctx, "katie", &obj) + require.NoError(t, err) + require.Equal(t, "katie", meta.Key) + require.Equal(t, 28, obj.Age) + }) + }) + + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // type = + doc := `["_key","name","age"] +["john","John Smith",35] +["katie","Katie Foster",28] +` + _, err := col.ImportDocuments(ctx, doc, arangodb.ImportDocumentTypeTabular) + require.NoError(t, err) + + var obj DocWithNameCode + meta, err := col.ReadDocument(ctx, "john", &obj) + require.NoError(t, err) + require.Equal(t, "john", meta.Key) + require.Equal(t, 35, obj.Age) + + meta, err = col.ReadDocument(ctx, "katie", &obj) + require.NoError(t, err) + require.Equal(t, "katie", meta.Key) + require.Equal(t, 28, obj.Age) + }) + }) + }) + + }) +} From dbbd6995b18bc1ebe4f45447d46fac4be90b4499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kmie=C4=87?= Date: Thu, 22 May 2025 11:40:18 +0000 Subject: [PATCH 2/3] Remove TODO --- v2/arangodb/collection_documents_import.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/arangodb/collection_documents_import.go b/v2/arangodb/collection_documents_import.go index aa6a97c7..38cbd73e 100644 --- a/v2/arangodb/collection_documents_import.go +++ b/v2/arangodb/collection_documents_import.go @@ -37,7 +37,7 @@ const ( // https://docs.arangodb.com/stable/develop/http-api/documents/#remove-a-document type CollectionDocumentImport interface { - // ImportDocuments imports one or more documents into the collection. // TODO FIX + // ImportDocuments imports one or more documents into the collection. // The document data is loaded from the given documents argument, statistics are returned. // The documents argument can be one of the following: // - An array of structs: All structs will be imported as individual documents. From 7b42893a1502bb17dd560b17a1c10ff4ebaabb26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kmie=C4=87?= Date: Thu, 22 May 2025 11:50:01 +0000 Subject: [PATCH 3/3] Cleaned up ContentTypeSelection --- v2/connection/connection_http_internal.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/v2/connection/connection_http_internal.go b/v2/connection/connection_http_internal.go index 4abd058a..c08340fc 100644 --- a/v2/connection/connection_http_internal.go +++ b/v2/connection/connection_http_internal.go @@ -250,7 +250,12 @@ func (j *httpConnection) stream(ctx context.Context, req *httpRequest) (*httpRes ctx = context.Background() } - reader := j.bodyReadFunc(j.Decoder(req.headers["content-type"]), req, j.streamSender) + contentType, ok := req.GetHeader(ContentType) + if !ok { + return nil, nil, errors.WithStack(NewError(1, "ContentType is not set.")) + } + + reader := j.bodyReadFunc(j.Decoder(contentType), req, j.streamSender) r, err := req.asRequest(ctx, reader) if err != nil { return nil, nil, errors.WithStack(err)