Skip to content

Commit

Permalink
Merge branch 'add_deleteContainer_operation'
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronpowell committed Jul 7, 2022
2 parents 7b9d5ae + 77675d3 commit 822f757
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 65 deletions.
17 changes: 15 additions & 2 deletions README.md
Expand Up @@ -100,9 +100,9 @@ let main argv =
} |> Async.RunSynchronously
```

### Delete
### DeleteItem

```fsharp
```f#
open FSharp.CosmosDb
let connStr = "..."
Expand All @@ -116,6 +116,19 @@ let updateUser id partitionKey =
|> Cosmos.execAsync
```

### DeleteContainer
```f#
open FSharp.CosmosDb
let connStr = "..."
connStr
|> Cosmos.container "ContainerName"
|> Cosmos.deleteContainer
|> Cosmos.execAsync
|> Async.Ignore
```

# FSharp.CosmosDb.Analyzer 💡

[![NuGet Badge - FSharp.CosmosDb](https://buildstats.info/nuget/FSharp.CosmosDb)](https://www.nuget.org/packages/FSharp.CosmosDb)
Expand Down
16 changes: 15 additions & 1 deletion samples/FSharp.CosmosDb.Samples/Program.fs
Expand Up @@ -50,7 +50,7 @@ let updateFamily conn id pk =

let deleteFamily conn id pk =
conn
|> Cosmos.delete<Family> id pk
|> Cosmos.deleteItem<Family> id pk
|> Cosmos.execAsync

[<EntryPoint>]
Expand Down Expand Up @@ -131,6 +131,20 @@ let main argv =
|> AsyncSeq.map (fun f -> { f with LastName = "Powellz" })
|> AsyncSeq.map (fun f -> conn |> Cosmos.replace f |> Cosmos.execAsync)
|> AsyncSeq.iter (fun f -> printfn "Replaced: %A" f)

do!
conn
|> Cosmos.container "Family"
|> Cosmos.deleteContainer
|> Cosmos.execAsync
|> Async.Ignore

do!
conn
|> Cosmos.container "Family"
|> Cosmos.deleteContainerIfExists
|> Cosmos.execAsync
|> Async.Ignore

return 0 // return an integer exit code
}
Expand Down
115 changes: 65 additions & 50 deletions src/FSharp.CosmosDb/Cosmos.fs
Expand Up @@ -47,89 +47,89 @@ module Cosmos =
Query = None
Parameters = [] }

let query<'T> query op : ContainerOperation<'T> =
Query
{ defaultQueryOp () with
Query = Some query
Connection = op }
let query<'T> query op : QueryOp<'T> =
{ defaultQueryOp () with
Query = Some query
Connection = op }

let parameters arr op =
match op with
| Query q ->
Query
{ q with
Parameters = q.Parameters @ arr }
| _ -> failwith "Only the Query discriminated union supports parameters"
{ op with QueryOp.Parameters = op.Parameters @ arr }

// --- DATABASE EXISTS --- //
let databaseExists<'T> op =
{ CheckIfDatabaseExistsOp.Connection = op }

// --- INSERT --- //

let insertMany<'T> (values: 'T list) op =
Insert { Connection = op; Values = values }
{ InsertOp.Connection = op; Values = values }

let insert<'T> (value: 'T) op =
Insert { Connection = op; Values = [ value ] }
{ InsertOp.Connection = op; Values = [ value ] }

// --- INSERT --- //

let upsertMany<'T> (values: 'T list) op =
Upsert { Connection = op; Values = values }
{ UpsertOp.Connection = op; Values = values }

let upsert<'T> (value: 'T) op =
Upsert { Connection = op; Values = [ value ] }
{ UpsertOp.Connection = op; Values = [ value ] }

// --- UPDATE --- //

let update<'T> id partitionKey (updater: 'T -> 'T) op =
Update
{ Connection = op
Id = id
PartitionKey = partitionKey
Updater = updater }

// --- DELETE --- //

let delete<'T> id partitionKey op =
Delete
{ Connection = op
Id = id
PartitionKey = partitionKey }
{ UpdateOp.Connection = op
Id = id
PartitionKey = partitionKey
Updater = updater }

// --- DELETE ITEM --- //

let deleteItem<'T> id partitionKey op =
{ DeleteItemOp.Connection = op
Id = id
PartitionKey = partitionKey }

// --- GET CONTAINER PROPERTIES --- //
let getContainerProperties op =
{ GetContainerPropertiesOp.Connection = op }

// --- CONTAINER EXISTS --- //
let containerExists op =
{ CheckIfContainerExistsOp.Connection = op }

// --- DELETE CONTAINER --- //

let deleteContainer<'T> op : DeleteContainerOp<'T> =
{ DeleteContainerOp.Connection = op }

// --- DELETE CONTAINER IF EXISTS --- //

let deleteContainerIfExists op : DeleteContainerIfExistsOp =
{ DeleteContainerIfExistsOp.Connection = op }

// --- READ --- //

let read id partitionKey op =
Read
{ Connection = op
Id = id
PartitionKey = partitionKey }
{ ReadOp.Connection = op
Id = id
PartitionKey = partitionKey }

// --- REPLACE --- //

let replace<'T> (item: 'T) op =
Replace { Connection = op; Item = item }
{ ReplaceOp.Connection = op; Item = item }

// --- Execute --- //

let private getClient (connInfo: ConnectionOperation) = connInfo.GetClient()

let dispose (connInfo: ConnectionOperation) = (connInfo :> IDisposable).Dispose()

let execAsync<'T> (op: ContainerOperation<'T>) =
match op with
| Query op -> OperationHandling.execQuery getClient op
| Insert op -> OperationHandling.execInsert getClient op
| Update op -> OperationHandling.execUpdate getClient op
| Delete op -> OperationHandling.execDelete getClient op
| Upsert op -> OperationHandling.execUpsert getClient op
| Read op -> OperationHandling.execRead getClient op
| Replace op -> OperationHandling.execReplace getClient op

let execBatchAsync<'T> batchSize (op: ContainerOperation<'T>) =
match op with
| Query op ->
let queryOps = QueryRequestOptions()
queryOps.MaxItemCount <- batchSize
OperationHandling.execQueryBatch getClient op queryOps
| _ -> failwith "Batch return operation only supported with query operations, use `execAsync` instead."
let execBatchAsync<'T> batchSize op =
let queryOps = QueryRequestOptions()
queryOps.MaxItemCount <- batchSize
OperationHandling.execQueryBatch getClient op queryOps

// --- Access Cosmos APIs directly --- //

Expand Down Expand Up @@ -233,3 +233,18 @@ module Cosmos =
processor.Build()
| None ->
failwith "Unable to connect the change feed. Ensure the container and lease container info is all set"

type Cosmos =
static member private getClient (connInfo: ConnectionOperation) = connInfo.GetClient()
static member execAsync (op: QueryOp<'T>) = OperationHandling.execQuery Cosmos.getClient op
static member execAsync op = OperationHandling.execCheckIfDatabaseExists Cosmos.getClient op
static member execAsync op = OperationHandling.execInsert Cosmos.getClient op
static member execAsync op = OperationHandling.execUpdate Cosmos.getClient op
static member execAsync op = OperationHandling.execDeleteItem Cosmos.getClient op
static member execAsync op = OperationHandling.execGetContainerProperties Cosmos.getClient op
static member execAsync op = OperationHandling.execCheckIfContainerExists Cosmos.getClient op
static member execAsync op = OperationHandling.execDeleteContainer Cosmos.getClient op
static member execAsync op = OperationHandling.execDeleteContainerIfExists Cosmos.getClient op
static member execAsync op = OperationHandling.execUpsert Cosmos.getClient op
static member execAsync op = OperationHandling.execRead Cosmos.getClient op
static member execAsync op = OperationHandling.execReplace Cosmos.getClient op
95 changes: 94 additions & 1 deletion src/FSharp.CosmosDb/OperationHandling.fs
Expand Up @@ -66,6 +66,29 @@ let execQueryBatch (getClient: ConnectionOperation -> CosmosClient) (op: QueryOp
| Some result -> result |> AsyncSeq.ofAsyncFeedIterator
| None ->
failwith "Unable to construct a query as some values are missing across the database, container name and query"

let execCheckIfDatabaseExists (getClient: ConnectionOperation -> CosmosClient) (op: CheckIfDatabaseExistsOp) =
let connInfo = op.Connection
let client = getClient connInfo

use iterator = client.GetDatabaseQueryIterator<DatabaseProperties>()

match connInfo.DatabaseId with
| Some databaseId ->
iterator
|> AsyncSeq.unfold (fun t ->
if iterator.HasMoreResults then
Some (iterator.ReadNextAsync(), iterator)
else
None)
|> AsyncSeq.collect (fun i ->
asyncSeq {
let! c = i |> Async.AwaitTask
for x in c do
yield x
})
|> AsyncSeq.exists (fun i -> i.Id = databaseId)
| None -> failwith "failed to check if database exists"

let execInsert (getClient: ConnectionOperation -> CosmosClient) (op: InsertOp<'T>) =
let connInfo = op.Connection
Expand Down Expand Up @@ -189,7 +212,7 @@ let execUpdate (getClient: ConnectionOperation -> CosmosClient) (op: UpdateOp<'T

| None -> failwith "Unable to read from the container to get the item for updating"

let execDelete (getClient: ConnectionOperation -> CosmosClient) (op: DeleteOp<'T>) =
let execDeleteItem (getClient: ConnectionOperation -> CosmosClient) (op: DeleteItemOp<'T>) =
let connInfo = op.Connection
let client = getClient connInfo

Expand All @@ -216,7 +239,77 @@ let execDelete (getClient: ConnectionOperation -> CosmosClient) (op: DeleteOp<'T
|> AsyncSeq.ofSeqAsync

| None -> failwith "Unable to read from the container to get the item for updating"

let execGetContainerProperties (getClient: ConnectionOperation -> CosmosClient) (op: GetContainerPropertiesOp) =
let connInfo = op.Connection
let client = getClient connInfo

let containerName =
match connInfo.ContainerName with
| None -> failwith "ContainerName is not provided"
| Some containerName -> containerName

use iterator =
match connInfo.DatabaseId with
| None -> failwith "DatabaseId is not provided"
| Some databaseId ->
client
.GetDatabase(databaseId)
.GetContainerQueryIterator<ContainerProperties>()

iterator
|> AsyncSeq.unfold (fun t ->
if iterator.HasMoreResults then
Some (iterator.ReadNextAsync(), iterator)
else
None)
|> AsyncSeq.collect (fun i ->
asyncSeq {
let! c = i |> Async.AwaitTask
for x in c do
yield x
})
|> AsyncSeq.tryFind (fun i -> i.Id = containerName)

let execCheckIfContainerExists (getClient: ConnectionOperation -> CosmosClient) (op: CheckIfContainerExistsOp) =
async {
let! containerProperties = execGetContainerProperties getClient { Connection= op.Connection }

return containerProperties
|> Option.isSome
}

let execDeleteContainer (getClient: ConnectionOperation -> CosmosClient) (op: DeleteContainerOp<'T>) =
let connInfo = op.Connection
let client = getClient connInfo

let result =
maybe {
let! databaseId = connInfo.DatabaseId
let! containerName = connInfo.ContainerName

let db = client.GetDatabase databaseId

let container = db.GetContainer containerName

return
container.DeleteContainerAsync ()
|> Async.AwaitTask
}

match result with
| Some result -> result
| None -> failwith "Unable to delete container"

let execDeleteContainerIfExists (getClient: ConnectionOperation -> CosmosClient) (op: DeleteContainerIfExistsOp) =
async {
let! databaseExists = execCheckIfDatabaseExists getClient { Connection= op.Connection }
let! containerExists = execCheckIfContainerExists getClient { Connection= op.Connection }
if databaseExists && containerExists then
do! execDeleteContainer getClient { Connection= op.Connection }
|> Async.Ignore
}

let execRead (getClient: ConnectionOperation -> CosmosClient) (op: ReadOp<'T>) =
let connInfo = op.Connection
let client = getClient connInfo
Expand Down
29 changes: 18 additions & 11 deletions src/FSharp.CosmosDb/Types.fs
@@ -1,5 +1,6 @@
namespace FSharp.CosmosDb

open FSharp.CosmosDb
open Microsoft.Azure.Cosmos
open System.Threading
open System.Threading.Tasks
Expand Down Expand Up @@ -86,6 +87,9 @@ type QueryOp<'T> =
Query: string option
Parameters: (string * obj) list }

type CheckIfDatabaseExistsOp =
{ Connection: ConnectionOperation }

type InsertOp<'T> =
{ Connection: ConnectionOperation
Values: 'T list }
Expand All @@ -100,11 +104,23 @@ type UpdateOp<'T> =
PartitionKey: string
Updater: 'T -> 'T }

type DeleteOp<'T> =
type DeleteItemOp<'T> =
{ Connection: ConnectionOperation
Id: string
PartitionKey: string }


type GetContainerPropertiesOp =
{ Connection: ConnectionOperation }

type CheckIfContainerExistsOp =
{ Connection: ConnectionOperation }

type DeleteContainerOp<'T> =
{ Connection: ConnectionOperation }

type DeleteContainerIfExistsOp =
{ Connection: ConnectionOperation }

type ReadOp<'T> =
{ Connection: ConnectionOperation
Id: string
Expand All @@ -114,15 +130,6 @@ type ReplaceOp<'T> =
{ Connection: ConnectionOperation
Item: 'T }

type ContainerOperation<'T> =
| Query of QueryOp<'T>
| Insert of InsertOp<'T>
| Update of UpdateOp<'T>
| Delete of DeleteOp<'T>
| Upsert of UpsertOp<'T>
| Read of ReadOp<'T>
| Replace of ReplaceOp<'T>

type ChangeFeedOptions<'T> =
{ Connection: ConnectionOperation
Processor: string
Expand Down

0 comments on commit 822f757

Please sign in to comment.