Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/Infrastructure/BotSharp.Abstraction/Graph/IGraphDb.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using BotSharp.Abstraction.Graph.Models;
using BotSharp.Abstraction.Graph.Options;
using BotSharp.Abstraction.Graph.Requests;

namespace BotSharp.Abstraction.Graph;

Expand All @@ -9,4 +10,32 @@ public interface IGraphDb

Task<GraphQueryResult> ExecuteQueryAsync(string query, GraphQueryExecuteOptions? options = null)
=> throw new NotImplementedException();

#region Node
Task<GraphNodeModel> GetNodeAsync(string graphId, string nodeId)
=> throw new NotImplementedException();

Task<GraphNodeModel> CreateNodeAsync(string graphId, GraphNodeCreationRequest request)
=> throw new NotImplementedException();

Task<GraphNodeModel> MergeNodeAsync(string graphId, string nodeId, GraphNodeUpdateRequest request)
=> throw new NotImplementedException();

Task<bool> DeleteNodeAsync(string graphId, string nodeId)
=> throw new NotImplementedException();
#endregion

#region Edge
Task<GraphEdgeModel> GetEdgeAsync(string graphId, string edgeId)
=> throw new NotImplementedException();

Task<GraphEdgeModel> CreateEdgeAsync(string graphId, GraphEdgeCreationRequest request)
=> throw new NotImplementedException();

Task<GraphEdgeModel> UpdateEdgeAsync(string graphId, string edgeId, GraphEdgeUpdateRequest request)
=> throw new NotImplementedException();

Task<bool> DeleteEdgeAsync(string graphId, string edgeId)
=> throw new NotImplementedException();
#endregion
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace BotSharp.Abstraction.Graph.Models;

public class GraphEdgeModel
{
public string Id { get; set; } = string.Empty;
public string SourceNodeId { get; set; } = string.Empty;
public string TargetNodeId { get; set; } = string.Empty;
public string Type { get; set; } = string.Empty;
public object? Properties { get; set; }
public string? Direction { get; set; }
public bool? Directed { get; set; }
public float? Weight { get; set; }
public DateTime? CreatedAt { get; set; }
public DateTime? UpdatedAt { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace BotSharp.Abstraction.Graph.Models;

public class GraphNodeModel
{
public string Id { get; set; } = string.Empty;
public List<string> Labels { get; set; } = new();
public object Properties { get; set; } = new();
public DateTime? Time { get; set; } = DateTime.UtcNow;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace BotSharp.Abstraction.Graph.Requests;

public class GraphEdgeCreationRequest
{
public string? Id { get; set; }
public string SourceNodeId { get; set; } = null!;
public string TargetNodeId { get; set; } = null!;
public string Type { get; set; } = null!;
public bool Directed { get; set; } = true;
public float? Weight { get; set; } = 1.0f;
public Dictionary<string, object>? Properties { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace BotSharp.Abstraction.Graph.Requests;

public class GraphEdgeUpdateRequest
{
public string Id { get; set; } = null!;
public Dictionary<string, object>? Properties { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace BotSharp.Abstraction.Graph.Requests;

public class GraphNodeCreationRequest
{
public string? Id { get; set; }
public string[]? Labels { get; set; }
public Dictionary<string, object>? Properties { get; set; }
public DateTime? Time { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace BotSharp.Abstraction.Graph.Requests;

public class GraphNodeUpdateRequest
{
public string Id { get; set; } = null!;
public string[]? Labels { get; set; }
public Dictionary<string, object>? Properties { get; set; }
public DateTime? Time { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using BotSharp.Abstraction.Graph;
using BotSharp.Plugin.Membase.Interfaces;
using Microsoft.AspNetCore.Http;

namespace BotSharp.Plugin.Membase.Controllers;
Expand All @@ -8,6 +6,7 @@ namespace BotSharp.Plugin.Membase.Controllers;
[ApiController]
public class MembaseController : ControllerBase
{
private const string GraphDbProvider = "membase";
private readonly IServiceProvider _services;
private readonly IMembaseApi _membaseApi;

Expand Down Expand Up @@ -78,7 +77,7 @@ public async Task<IActionResult> ExecuteGraphQuery(string graphId, [FromBody] Cy

try
{
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == "membase");
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == GraphDbProvider);
var result = await graph.ExecuteQueryAsync(query: request.Query, options: new()
{
GraphId = graphId,
Expand Down Expand Up @@ -125,7 +124,8 @@ public async Task<IActionResult> GetNode(string graphId, string nodeId)

try
{
var node = await _membaseApi.GetNodeAsync(graphId, nodeId);
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == GraphDbProvider);
var node = await graph.GetNodeAsync(graphId, nodeId);
return Ok(node);
}
catch (Exception ex)
Expand Down Expand Up @@ -163,7 +163,14 @@ public async Task<IActionResult> CreateNode(string graphId, [FromBody] NodeCreat

try
{
var node = await _membaseApi.CreateNodeAsync(graphId, request);
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == GraphDbProvider);
var node = await graph.CreateNodeAsync(graphId, new GraphNodeCreationRequest
{
Id = request.Id,
Labels = request.Labels,
Properties = request.Properties,
Time = request.Time
});
return Ok(node);
}
catch (Exception ex)
Expand Down Expand Up @@ -201,7 +208,14 @@ public async Task<IActionResult> MergeNode(string graphId, [FromBody] NodeUpdate

try
{
var node = await _membaseApi.MergeNodeAsync(graphId, request.Id, request);
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == GraphDbProvider);
var node = await graph.MergeNodeAsync(graphId, request.Id, new GraphNodeUpdateRequest
{
Id = request.Id,
Labels = request.Labels,
Properties = request.Properties,
Time = request.Time
});
return Ok(node);
}
catch (Exception ex)
Expand Down Expand Up @@ -239,7 +253,8 @@ public async Task<IActionResult> DeleteNode(string graphId, string nodeId)

try
{
await _membaseApi.DeleteNodeAsync(graphId, nodeId);
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == GraphDbProvider);
await graph.DeleteNodeAsync(graphId, nodeId);
return Ok("done");
}
catch (Exception ex)
Expand Down Expand Up @@ -277,7 +292,8 @@ public async Task<IActionResult> GetEdge(string graphId, string edgeId)

try
{
var edge = await _membaseApi.GetEdgeAsync(graphId, edgeId);
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == GraphDbProvider);
var edge = await graph.GetEdgeAsync(graphId, edgeId);
return Ok(edge);
}
catch (Exception ex)
Expand Down Expand Up @@ -330,7 +346,17 @@ public async Task<IActionResult> CreateEdge(string graphId, [FromBody] EdgeCreat

try
{
var edge = await _membaseApi.CreateEdgeAsync(graphId, request);
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == GraphDbProvider);
var edge = await graph.CreateEdgeAsync(graphId, new GraphEdgeCreationRequest
{
Id = request.Id,
SourceNodeId = request.SourceNodeId,
TargetNodeId = request.TargetNodeId,
Type = request.Type,
Directed = request.Directed,
Weight = request.Weight,
Properties = request.Properties
});
return Ok(edge);
}
catch (Exception ex)
Expand Down Expand Up @@ -368,7 +394,12 @@ public async Task<IActionResult> UpdateEdge(string graphId, [FromBody] EdgeUpdat

try
{
var edge = await _membaseApi.UpdateEdgeAsync(graphId, request.Id, request);
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == GraphDbProvider);
var edge = await graph.UpdateEdgeAsync(graphId, request.Id, new GraphEdgeUpdateRequest
{
Id = request.Id,
Properties = request.Properties
});
return Ok(edge);
}
catch (Exception ex)
Expand Down Expand Up @@ -406,7 +437,8 @@ public async Task<IActionResult> DeleteEdge(string graphId, string edgeId)

try
{
await _membaseApi.DeleteEdgeAsync(graphId, edgeId);
var graph = _services.GetServices<IGraphDb>().First(x => x.Provider == GraphDbProvider);
await graph.DeleteEdgeAsync(graphId, edgeId);
return Ok("done");
}
catch (Exception ex)
Expand Down
92 changes: 86 additions & 6 deletions src/Plugins/BotSharp.Plugin.Membase/GraphDb/MembaseGraphDb.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net;
using BotSharp.Plugin.Membase.Models.Graph;
using Polly;
using Polly.Timeout;
using Refit;
Expand All @@ -11,6 +12,8 @@ public partial class MembaseGraphDb : IGraphDb
private readonly ILogger<MembaseGraphDb> _logger;
private readonly IMembaseApi _membaseApi;

private const int RETRY_COUNT = 3;

public MembaseGraphDb(
IServiceProvider services,
ILogger<MembaseGraphDb> logger,
Expand All @@ -23,8 +26,6 @@ public MembaseGraphDb(

public string Provider => "membase";

private const int RetryCount = 3;

public async Task<GraphQueryResult> ExecuteQueryAsync(string query, GraphQueryExecuteOptions? options = null)
{
if (string.IsNullOrEmpty(options?.GraphId))
Expand Down Expand Up @@ -54,7 +55,7 @@ public async Task<GraphQueryResult> ExecuteQueryAsync(string query, GraphQueryEx
}
catch (ApiException ex)
{
_logger.LogError($"Error when executing query in {Provider} graph db:\r\n{ex.Content}\r\n{query}\r\n{argLogs}");
_logger.LogError(ex, $"Error when executing query in {Provider} graph db:\r\n{ex.Content}\r\n{query}\r\n{argLogs}");
throw;
}
catch (Exception ex)
Expand All @@ -65,10 +66,88 @@ public async Task<GraphQueryResult> ExecuteQueryAsync(string query, GraphQueryEx
}


#region Node
public async Task<GraphNodeModel> GetNodeAsync(string graphId, string nodeId)
{
var node = await _membaseApi.GetNodeAsync(graphId, nodeId);
return Node.ToGraphNodeModel(node);
}

public async Task<GraphNodeModel> CreateNodeAsync(string graphId, GraphNodeCreationRequest request)
{
var node = await _membaseApi.CreateNodeAsync(graphId, new NodeCreationModel
{
Id = request.Id,
Labels = request.Labels,
Properties = request.Properties,
Time = request.Time
});
return Node.ToGraphNodeModel(node);
}

public async Task<GraphNodeModel> MergeNodeAsync(string graphId, string nodeId, GraphNodeUpdateRequest request)
{
var node = await _membaseApi.MergeNodeAsync(graphId, nodeId, new NodeUpdateModel
{
Id = request.Id,
Labels = request.Labels,
Properties = request.Properties,
Time = request.Time
});
return Node.ToGraphNodeModel(node);
}

public async Task<bool> DeleteNodeAsync(string graphId, string nodeId)
{
await _membaseApi.DeleteNodeAsync(graphId, nodeId);
return true;
}
#endregion

#region Edge
public async Task<GraphEdgeModel> GetEdgeAsync(string graphId, string edgeId)
{
var edge = await _membaseApi.GetEdgeAsync(graphId, edgeId);
return Edge.ToGraphEdgeModel(edge);
}

public async Task<GraphEdgeModel> CreateEdgeAsync(string graphId, GraphEdgeCreationRequest request)
{
var edge = await _membaseApi.CreateEdgeAsync(graphId, new EdgeCreationModel
{
Id = request.Id,
SourceNodeId = request.SourceNodeId,
TargetNodeId = request.TargetNodeId,
Type = request.Type,
Directed = request.Directed,
Weight = request.Weight,
Properties = request.Properties
});
return Edge.ToGraphEdgeModel(edge);
}

public async Task<GraphEdgeModel> UpdateEdgeAsync(string graphId, string edgeId, GraphEdgeUpdateRequest request)
{
var edge = await _membaseApi.UpdateEdgeAsync(graphId, edgeId, new EdgeUpdateModel
{
Id = request.Id,
Properties = request.Properties
});
return Edge.ToGraphEdgeModel(edge);
}

public async Task<bool> DeleteEdgeAsync(string graphId, string edgeId)
{
await _membaseApi.DeleteEdgeAsync(graphId, edgeId);
return true;
}
#endregion

#region Private methods
private AsyncPolicy BuildRetryPolicy()
{
var settings = _services.GetRequiredService<MembaseSettings>();
var timeoutSeconds = (double)settings.TimeoutSecond / RetryCount;
var timeoutSeconds = (double)settings.TimeoutSecond / RETRY_COUNT;

var timeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromSeconds(timeoutSeconds));

Expand All @@ -78,15 +157,16 @@ private AsyncPolicy BuildRetryPolicy()
.Or<TimeoutRejectedException>()
.Or<ApiException>(ex => ex.StatusCode == HttpStatusCode.ServiceUnavailable || ex.StatusCode == HttpStatusCode.InternalServerError)
.WaitAndRetryAsync(
retryCount: RetryCount,
retryCount: RETRY_COUNT,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (ex, timespan, retryAttempt, _) =>
{
_logger.LogWarning(ex,
"CypherQueryAsync retry {RetryAttempt}/{MaxRetries} after {Delay}s. Exception: {Message}",
retryAttempt, RetryCount, timespan.TotalSeconds, ex.Message);
retryAttempt, RETRY_COUNT, timespan.TotalSeconds, ex.Message);
});

return Policy.WrapAsync(retryPolicy, timeoutPolicy);
}
#endregion
}
Loading
Loading