-
Notifications
You must be signed in to change notification settings - Fork 2
/
FxBookingChangeFeed.cs
132 lines (120 loc) · 5.81 KB
/
FxBookingChangeFeed.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using TravelService.MultiAgent.Orchestrator.Models;
using Microsoft.Extensions.Configuration;
using Microsoft.SemanticKernel.Connectors.OpenAI;
using Newtonsoft.Json;
using Microsoft.SemanticKernel.Embeddings;
#pragma warning disable SKEXP0010
#pragma warning disable SKEXP0001
namespace TravelService.MultiAgent.Orchestrator
{
public class FxBookingChangeFeed
{
private readonly ILogger _logger;
private readonly string databaseId;
private readonly CosmosClient client;
private readonly AzureOpenAITextEmbeddingGenerationService _azureOpenAITextEmbeddingGenerationService;
public FxBookingChangeFeed(ILoggerFactory loggerFactory, IConfiguration configuration, CosmosClient cosmosClient, AzureOpenAITextEmbeddingGenerationService azureOpenAITextEmbeddingGenerationService)
{
_logger = loggerFactory.CreateLogger<FxBookingChangeFeed>();
databaseId = configuration.GetValue<string>("DatabaseId");
client = cosmosClient;
_azureOpenAITextEmbeddingGenerationService = azureOpenAITextEmbeddingGenerationService;
}
private static readonly string EndpointUri = Environment.GetEnvironmentVariable("CosmosDBEndpointUri");
[Function("ProcessBookingChanges")]
public async Task Run(
[CosmosDBTrigger(
databaseName: "ContosoTravelAgency",
containerName: "Bookings",
Connection = "cosmosDB",
LeaseContainerName = "leases")] IReadOnlyList<Booking> bookings,
ILogger log)
{
if (bookings != null && bookings.Count > 0)
{
foreach (var booking in bookings)
{
var passengerContainer = client.GetContainer(databaseId, "Passengers");
var passenger = await passengerContainer.ReadItemAsync<Passenger>(
booking.passengerId, new PartitionKey(booking.passengerId));
var flightsContainer = client.GetContainer(databaseId, "FlightListings");
var flightQuery = new QueryDefinition("SELECT * FROM c WHERE c.flightNumber = @flightNumber")
.WithParameter("@flightNumber", booking.flightId);
var flightIterator = flightsContainer.GetItemQueryIterator<FlightListing>(flightQuery);
FlightListing flight = null;
if (flightIterator.HasMoreResults)
{
var flightResult = await flightIterator.ReadNextAsync();
flight = flightResult.FirstOrDefault();
}
if (flight == null)
{
continue;
}
var airlinesContainer = client.GetContainer(databaseId, "Airlines");
var airline = await airlinesContainer.ReadItemAsync<Airline>(
flight.airlineId, new PartitionKey(flight.airlineId));
var denormalizedBooking = new
{
booking.id,
booking.bookingDate,
booking.status,
booking.seatNumber,
booking.pricePaid,
passenger = new
{
passenger.Resource.id,
passenger.Resource.firstName,
passenger.Resource.lastName,
passenger.Resource.email,
passenger.Resource.phone
},
flight = new
{
flight.id,
flight.flightNumber,
flight.departure,
flight.destination,
flight.departureTime,
flight.price,
flight.aircraftType,
flight.availableSeats,
flight.duration,
airline = new
{
airline.Resource.id,
airline.Resource.name,
airline.Resource.code,
airline.Resource.city,
airline.Resource.country,
airline.Resource.logoUrl
}
}
};
var semanticBookingContainer = client.GetContainer(databaseId, "SemanticBookingLayer");
await semanticBookingContainer.UpsertItemAsync(denormalizedBooking, new PartitionKey(denormalizedBooking.id));
var semanticBookingVectorContainer = client.GetContainer(databaseId, "SemanticBookingVectorLayer");
var bookingVector = new
{
booking.id,
metadata = JsonConvert.SerializeObject(denormalizedBooking),
vector = (await _azureOpenAITextEmbeddingGenerationService.GenerateEmbeddingAsync(JsonConvert.SerializeObject(denormalizedBooking))).ToArray()
};
await semanticBookingVectorContainer.UpsertItemAsync(bookingVector, new PartitionKey(bookingVector.id));
}
}
}
}
public class Airline
{
public string id { get; set; }
public string name { get; set; }
public string code { get; set; }
public string city { get; set; }
public string country { get; set; }
public string logoUrl { get; set; }
}
}