Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ChangeFeed]Unify cursor and add live mode #13243

Merged
merged 9 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdk/storage/azure-storage-blob-changefeed/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 12.0.0b2 (2020-9-9)
**Breaking changes**
- Change the `continuation_token` from a dict to a str.
- `start_time`/`end_time` and `continuation_token` are mutually exclusive now.

## 12.0.0b1 (2020-07-07)
- Initial Release. Please see the README for information on the new design.
- Support for ChangeFeedClient: get change feed events by page, get all change feed events, get events in a time range
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# license information.
# --------------------------------------------------------------------------

VERSION = "12.0.0b1"
VERSION = "12.0.0b2"
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import os
from datetime import datetime
from time import sleep

from azure.storage.blob.changefeed import ChangeFeedClient


Expand All @@ -35,7 +37,7 @@ def list_events_by_page(self):
# Instantiate a ChangeFeedClient
# [START list_events_by_page]
# [START create_change_feed_client]
cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
# [END create_change_feed_client]

Expand All @@ -54,7 +56,7 @@ def list_events_by_page(self):

def list_all_events(self):
# [START list_all_events]
cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
change_feed = cf_client.list_changes()

Expand All @@ -64,9 +66,9 @@ def list_all_events(self):
# [END list_all_events]

def list_range_of_events(self):
cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
start_time = datetime(2019, 1, 1)
start_time = datetime(2020, 8, 18, 10)
end_time = datetime(2020, 3, 4)
change_feed = cf_client.list_changes(start_time=start_time, end_time=end_time)

Expand All @@ -77,7 +79,7 @@ def list_range_of_events(self):
def list_events_using_continuation_token(self):

# Instantiate a ChangeFeedClient
cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
# to get continuation token
change_feed = cf_client.list_changes(results_per_page=2).by_page()
Expand All @@ -87,16 +89,33 @@ def list_events_using_continuation_token(self):
token = change_feed.continuation_token

# restart using the continuation token
change_feed2 = cf_client.list_changes(results_per_page=3).by_page(continuation_token=token)
change_feed2 = cf_client.list_changes(results_per_page=56).by_page(continuation_token=token)
change_feed_page2 = next(change_feed2)
for event in change_feed_page2:
print(event)

def list_events_in_live_mode(self):
# Instantiate a ChangeFeedClient
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
token = None
while True:
change_feed = cf_client.list_changes(results_per_page=500).by_page(continuation_token=token)

for page in change_feed:
for event in page:
print(event)
token = change_feed.continuation_token

sleep(60)
print("continue printing events")


if __name__ == '__main__':
sample = ChangeFeedSamples()
sample.list_events_by_page()
sample.list_all_events()
sample.list_range_of_events()
sample.list_events_using_continuation_token()
sample.list_events_in_live_mode()

2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-changefeed/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
'azure.storage.blob.changefeed',
],
install_requires=[
"azure-storage-blob~=12.4.0b1",
"azure-storage-blob~=12.5.0",
],
extras_require={
":python_version<'3.0'": ['futures', 'azure-storage-nspkg<4.0.0,>=3.0.0'],
Expand Down
149,880 changes: 70,052 additions & 79,828 deletions ...ge-blob-changefeed/tests/recordings/test_change_feed.test_get_all_change_feed_events.yaml

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

160,467 changes: 71,567 additions & 88,900 deletions ...ests/recordings/test_change_feed.test_get_change_feed_events_with_continuation_token.yaml

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

219 changes: 216 additions & 3 deletions sdk/storage/azure-storage-blob-changefeed/tests/test_change_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import json
from time import sleep

import pytest
from datetime import datetime
from datetime import datetime, timedelta

from math import ceil

Expand Down Expand Up @@ -97,12 +100,222 @@ def test_get_change_feed_events_with_continuation_token(self, resource_group, lo
@GlobalStorageAccountPreparer()
def test_get_change_feed_events_in_a_time_range(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)
start_time = datetime(2020, 5, 12)
end_time = datetime(2020, 5, 13)
start_time = datetime(2020, 8, 12)
end_time = datetime(2020, 8, 18)
change_feed = cf_client.list_changes(start_time=start_time, end_time=end_time, results_per_page=2).by_page()

# print first page of events
page1 = next(change_feed)
events = list(page1)

self.assertIsNot(len(events), 0)

@GlobalStorageAccountPreparer()
def test_change_feed_does_not_fail_on_empty_event_stream(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)
start_time = datetime(2021, 8, 19)
change_feed = cf_client.list_changes(start_time=start_time)

events = list(change_feed)
self.assertEqual(len(events), 0)

@GlobalStorageAccountPreparer()
def test_read_change_feed_tail_where_3_shards_have_data(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)

# to read until the end
start_time = datetime(2020, 8, 19, 23)
change_feed = cf_client.list_changes(start_time=start_time).by_page()

events = list()
for page in change_feed:
for event in page:
events.append(event)
token = change_feed.continuation_token

dict_token = json.loads(token)
self.assertTrue(len(events) > 0)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

if self.is_live:
sleep(120)
xiafu-msft marked this conversation as resolved.
Show resolved Hide resolved
print("continue printing events")

# restart using the continuation token after waiting for 2 minutes
change_feed2 = cf_client.list_changes(results_per_page=6).by_page(continuation_token=token)
change_feed_page2 = next(change_feed2)
events2 = list()
for event in change_feed_page2:
events2.append(event)

self.assertNotEqual(events2, 0)

if self.is_live:
sleep(120)
print("continue printing events")

# restart using the continuation token which has Non-zero EventIndex for 3 shards
xiafu-msft marked this conversation as resolved.
Show resolved Hide resolved
token2 = change_feed2.continuation_token
xiafu-msft marked this conversation as resolved.
Show resolved Hide resolved
dict_token2 = json.loads(token2)
self.assertEqual(len(dict_token2['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][0]['EventIndex'], 0)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][1]['EventIndex'], 0)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][2]['EventIndex'], 0)

change_feed3 = cf_client.list_changes(results_per_page=57).by_page(continuation_token=token2)
change_feed_page3 = next(change_feed3)
events3 = list()
for event in change_feed_page3:
events3.append(event)
self.assertNotEqual(events2, 0)

@GlobalStorageAccountPreparer()
def test_read_change_feed_tail_where_only_1_shard_has_data(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)

# to read until the end
start_time = datetime(2020, 8, 20, 1)
change_feed = cf_client.list_changes(start_time=start_time, results_per_page=3).by_page()

page = next(change_feed)
events_on_first_page = list()
for event in page:
events_on_first_page.append(event)

token = change_feed.continuation_token
xiafu-msft marked this conversation as resolved.
Show resolved Hide resolved
dict_token = json.loads(token)

self.assertEqual(len(events_on_first_page), 3)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

# if self.is_live:
# sleep(120)
print("continue printing events")

# restart using the continuation token after waiting for 2 minutes
change_feed2 = cf_client.list_changes(results_per_page=5).by_page(continuation_token=token)
events2 = []
for page in change_feed2:
for event in page:
events2.append(event)

self.assertIsNot(len(events2), 0)

@GlobalStorageAccountPreparer()
def test_read_change_feed_with_3_shards_in_a_time_range(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)

# to get continuation token
start_time = datetime(2020, 8, 19, 22)
end_time = datetime(2020, 8, 19, 23)
change_feed = cf_client.list_changes(start_time=start_time, end_time=end_time, results_per_page=16).by_page()

page = next(change_feed)
events = list(page)
self.assertEqual(len(events), 16)

token = change_feed.continuation_token

dict_token = json.loads(token)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['EndTime'])
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

change_feed2 = cf_client.list_changes().by_page(continuation_token=token)
events = list(next(change_feed2))

end_time_str = (end_time + timedelta(hours=1)).isoformat()
self.assertTrue(events[len(events) - 1]['eventTime'] < end_time_str)

@GlobalStorageAccountPreparer()
def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_same_result_as_reading_all(
self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)

# to read until the end
start_time = datetime(2020, 8, 5, 17)
end_time = datetime(2020, 8, 5, 17, 15)

all_events = list(cf_client.list_changes(start_time=start_time, end_time=end_time))
change_feed = cf_client.list_changes(start_time=start_time, end_time=end_time, results_per_page=50).by_page()

events = list()
for _ in (0, 2):
page = next(change_feed)
for event in page:
events.append(event)
token = change_feed.continuation_token

dict_token = json.loads(token)
self.assertTrue(len(events) > 0)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

# make sure end_time and continuation_token are mutual exclusive
with self.assertRaises(ValueError):
cf_client.list_changes(results_per_page=50, end_time=datetime.now()).by_page(continuation_token=token)
# make sure start_time and continuation_token are mutual exclusive
with self.assertRaises(ValueError):
cf_client.list_changes(results_per_page=50, start_time=datetime.now()).by_page(continuation_token=token)

# restart using the continuation token after waiting for 2 minutes
change_feed2 = cf_client.list_changes(results_per_page=50).by_page(continuation_token=token)
events2 = list()
for _ in (0, 2):
page = next(change_feed2)
for event in page:
events2.append(event)

self.assertNotEqual(events2, 0)

# restart using the continuation token which has Non-zero EventIndex for 3 shards
token2 = change_feed2.continuation_token
dict_token2 = json.loads(token2)
self.assertEqual(len(dict_token2['CurrentSegmentCursor']['ShardCursors']), 3)

change_feed3 = cf_client.list_changes(results_per_page=50).by_page(continuation_token=token2)
events3 = list()
for page in change_feed3:
for event in page:
events3.append(event)

token3 = change_feed3.continuation_token
dict_token3 = json.loads(token3)

self.assertNotEqual(events3, 0)
self.assertEqual(len(dict_token3['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertEqual(len(events)+len(events2)+len(events3), len(all_events))

@GlobalStorageAccountPreparer()
def test_list_3_shards_events_works_with_1_shard_cursor(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)
start_time = datetime(2020, 8, 5, 17)
end_time = datetime(2020, 8, 5, 17, 15)
change_feed = cf_client.list_changes(results_per_page=1, start_time=start_time, end_time=end_time).by_page()
next(change_feed)
token_with_1_shard = change_feed.continuation_token

change_feed = cf_client.list_changes(results_per_page=50).by_page(continuation_token=token_with_1_shard)
events = list()
for _ in range(0, 2):
page = next(change_feed)
for event in page:
events.append(event)
dict_token = json.loads(change_feed.continuation_token)
dict_token_with_1_shard = json.loads(token_with_1_shard)
self.assertEqual(len(dict_token_with_1_shard['CurrentSegmentCursor']['ShardCursors']), 1)
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)