-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
use_collections_async.py
180 lines (152 loc) · 7.46 KB
/
use_collections_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""
FILE: use_collections_async.py
DESCRIPTION:
This sample demonstrates how to use collections in Confidential Ledger. In this sample, we write
ledger entries to different collections. Collections may be used to group semantically or
logically related ledger entries.
USAGE:
python use_collections_async.py
Set the environment variables with your own values before running the sample:
1) CONFIDENTIALLEDGER_ENDPOINT - the endpoint of the Confidential Ledger.
"""
import asyncio
import logging
import os
import sys
import tempfile
from typing import Any, Dict
from azure.confidentialledger.aio import ConfidentialLedgerClient
from azure.confidentialledger.certificate.aio import (
ConfidentialLedgerCertificateClient,
)
from azure.identity.aio import DefaultAzureCredential
logging.basicConfig(level=logging.ERROR)
LOG = logging.getLogger()
async def main():
# Set the values of the client ID, tenant ID, and client secret of the AAD application as
# environment variables:
# AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET, CONFIDENTIALLEDGER_ENDPOINT
try:
ledger_endpoint = os.environ["CONFIDENTIALLEDGER_ENDPOINT"]
except KeyError:
LOG.error(
"Missing environment variable 'CONFIDENTIALLEDGER_ENDPOINT' - "
"please set it before running the example"
)
sys.exit(1)
# Under the current URI format, the ledger id is the first part of the ledger endpoint.
# i.e. https://<ledger id>.confidential-ledger.azure.com
ledger_id = ledger_endpoint.replace("https://", "").split(".")[0]
identity_service_client = ConfidentialLedgerCertificateClient() # type: ignore[call-arg]
async with identity_service_client:
ledger_certificate = await identity_service_client.get_ledger_identity(
ledger_id
)
# The Confidential Ledger's TLS certificate must be written to a file to be used by the
# ConfidentialLedgerClient. Here, we write it to a temporary file so that is is cleaned up
# automatically when the program exits.
with tempfile.TemporaryDirectory() as tempdir:
ledger_cert_file = os.path.join(tempdir, f"{ledger_id}.pem")
with open(ledger_cert_file, "w") as outfile:
outfile.write(ledger_certificate["ledgerTlsCertificate"])
print(
f"Ledger certificate has been written to {ledger_cert_file}. "
"It will be deleted when the script completes."
)
# Build a client through AAD
credential = DefaultAzureCredential()
ledger_client = ConfidentialLedgerClient(
ledger_endpoint,
credential=credential,
ledger_certificate_path=ledger_cert_file,
)
# Using the async objects as a context manager ensures they are properly closed after use.
async with credential:
async with ledger_client:
print("This Confidential Ledger will contain messages from different senders.")
print(
"We will group ledger entries by the sender of the message. For all client "
"methods that take an optional 'collection_id' parameter, if none is provided, "
"a service-assigned, default collection id will be assigned."
)
tids: Dict[Any, Dict[str, Any]] = {}
senders = [None, "Alice", "Bob"]
for msg_idx in range(3):
for sender in senders:
if sender is None:
msg = f"My message {msg_idx}"
else:
msg = f"{sender}'s message {msg_idx}"
post_poller = await ledger_client.begin_create_ledger_entry( # type: ignore[attr-defined]
entry={"contents": msg}, collection_id=sender,
)
post_result = await post_poller.result()
if sender is None:
print(
f"Wrote '{msg}' to the default collection at "
f"{post_result['transactionId']}"
)
else:
print(
f"Wrote '{msg}' to collection {sender} at "
f"{post_result['transactionId']}"
)
if sender not in tids:
tids[sender] = {}
tids[sender]["first"] = post_result["transactionId"]
tids[sender]["last"] = post_result["transactionId"]
print("Let's retrieve the latest entry in each collection")
for sender in senders:
current_entry = await ledger_client.get_current_ledger_entry()
output = "Current entry in {0} is {1}"
print(
output.format(
"default collection" if sender is None else f"{sender}'s collection",
current_entry["contents"],
)
)
print("Let's retrieve the first entry in each collection")
for sender in senders:
get_poller = await ledger_client.begin_get_ledger_entry( # type: ignore[attr-defined]
tids[sender]["first"],
collection_id=sender
)
first_entry = await get_poller.result()
output = "First entry in {0} is {1}"
print(
output.format(
"default collection" if sender is None else f"{sender}'s collection",
first_entry["entry"]["contents"],
)
)
print("Let's retrieve get all the entries in each collection")
for sender in senders:
entries_list = ledger_client.list_ledger_entries(
collection_id=sender,
from_transaction_id=tids[sender]["first"],
to_transaction_id=tids[sender]["last"],
)
async for entry in entries_list:
output = "Entry in {0}: {1}"
print(
output.format(
"default collection" if sender is None else f"{sender}'s "
"collection",
entry,
)
)
collection_ids = []
collections = ledger_client.list_collections()
async for collection in collections:
collection_ids.append(collection["collectionId"])
print(
"In conclusion, these are all the collections in the Confidential Ledger:\n" +
"\n\t".join(collection_ids)
)
if __name__ == "__main__":
asyncio.run(main())