Skip to content

Commit

Permalink
[ChangeFeed]Unify cursor and add live mode (#13243)
Browse files Browse the repository at this point in the history
* [ChangeFeed]Unify cursor and add live mode

* make the token into a str

* address comments

* 1. add a while True for sample
2. make the list of shards in cursor to a dict in internal code
3. test list 3-shard events in multiple times generate same results as
   list all events at once
4. Java is using sequential list, so it could give 1 shard cursor even
   there are 3 shards, the test makes sure python is working with 1 shard
   cursor.

* make end_time/start_time and continuation_token mutual exclusive

* update dependency version

* make all '' to "" in cursor

* fix pylint and update changelog

* fix blob pylint
  • Loading branch information
xiafu-msft committed Sep 9, 2020
1 parent 7d13ac8 commit f52dba8
Show file tree
Hide file tree
Showing 20 changed files with 476,916 additions and 185,077 deletions.
5 changes: 5 additions & 0 deletions sdk/storage/azure-storage-blob-changefeed/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 12.0.0b2 (2020-9-9)
**Breaking changes**
- Change the `continuation_token` from a dict to a str.
- `start_time`/`end_time` and `continuation_token` are mutually exclusive now.

## 12.0.0b1 (2020-07-07)
- Initial Release. Please see the README for information on the new design.
- Support for ChangeFeedClient: get change feed events by page, get all change feed events, get events in a time range
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# license information.
# --------------------------------------------------------------------------

VERSION = "12.0.0b1"
VERSION = "12.0.0b2"
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import os
from datetime import datetime
from time import sleep

from azure.storage.blob.changefeed import ChangeFeedClient


Expand All @@ -35,7 +37,7 @@ def list_events_by_page(self):
# Instantiate a ChangeFeedClient
# [START list_events_by_page]
# [START create_change_feed_client]
cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
# [END create_change_feed_client]

Expand All @@ -54,7 +56,7 @@ def list_events_by_page(self):

def list_all_events(self):
# [START list_all_events]
cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
change_feed = cf_client.list_changes()

Expand All @@ -64,9 +66,9 @@ def list_all_events(self):
# [END list_all_events]

def list_range_of_events(self):
cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
start_time = datetime(2019, 1, 1)
start_time = datetime(2020, 8, 18, 10)
end_time = datetime(2020, 3, 4)
change_feed = cf_client.list_changes(start_time=start_time, end_time=end_time)

Expand All @@ -77,7 +79,7 @@ def list_range_of_events(self):
def list_events_using_continuation_token(self):

# Instantiate a ChangeFeedClient
cf_client = ChangeFeedClient("http://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
# to get continuation token
change_feed = cf_client.list_changes(results_per_page=2).by_page()
Expand All @@ -87,16 +89,33 @@ def list_events_using_continuation_token(self):
token = change_feed.continuation_token

# restart using the continuation token
change_feed2 = cf_client.list_changes(results_per_page=3).by_page(continuation_token=token)
change_feed2 = cf_client.list_changes(results_per_page=56).by_page(continuation_token=token)
change_feed_page2 = next(change_feed2)
for event in change_feed_page2:
print(event)

def list_events_in_live_mode(self):
# Instantiate a ChangeFeedClient
cf_client = ChangeFeedClient("https://{}.blob.core.windows.net".format(self.ACCOUNT_NAME),
credential=self.ACCOUNT_KEY)
token = None
while True:
change_feed = cf_client.list_changes(results_per_page=500).by_page(continuation_token=token)

for page in change_feed:
for event in page:
print(event)
token = change_feed.continuation_token

sleep(60)
print("continue printing events")


if __name__ == '__main__':
sample = ChangeFeedSamples()
sample.list_events_by_page()
sample.list_all_events()
sample.list_range_of_events()
sample.list_events_using_continuation_token()
sample.list_events_in_live_mode()

2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob-changefeed/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
'azure.storage.blob.changefeed',
],
install_requires=[
"azure-storage-blob~=12.4.0b1",
"azure-storage-blob~=12.5.0",
],
extras_require={
":python_version<'3.0'": ['futures', 'azure-storage-nspkg<4.0.0,>=3.0.0'],
Expand Down
149,880 changes: 70,052 additions & 79,828 deletions ...ge-blob-changefeed/tests/recordings/test_change_feed.test_get_all_change_feed_events.yaml

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

160,467 changes: 71,567 additions & 88,900 deletions ...ests/recordings/test_change_feed.test_get_change_feed_events_with_continuation_token.yaml

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

219 changes: 216 additions & 3 deletions sdk/storage/azure-storage-blob-changefeed/tests/test_change_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import json
from time import sleep

import pytest
from datetime import datetime
from datetime import datetime, timedelta

from math import ceil

Expand Down Expand Up @@ -97,12 +100,222 @@ def test_get_change_feed_events_with_continuation_token(self, resource_group, lo
@GlobalStorageAccountPreparer()
def test_get_change_feed_events_in_a_time_range(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)
start_time = datetime(2020, 5, 12)
end_time = datetime(2020, 5, 13)
start_time = datetime(2020, 8, 12)
end_time = datetime(2020, 8, 18)
change_feed = cf_client.list_changes(start_time=start_time, end_time=end_time, results_per_page=2).by_page()

# print first page of events
page1 = next(change_feed)
events = list(page1)

self.assertIsNot(len(events), 0)

@GlobalStorageAccountPreparer()
def test_change_feed_does_not_fail_on_empty_event_stream(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)
start_time = datetime(2021, 8, 19)
change_feed = cf_client.list_changes(start_time=start_time)

events = list(change_feed)
self.assertEqual(len(events), 0)

@GlobalStorageAccountPreparer()
def test_read_change_feed_tail_where_3_shards_have_data(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)

# to read until the end
start_time = datetime(2020, 8, 19, 23)
change_feed = cf_client.list_changes(start_time=start_time).by_page()

events = list()
for page in change_feed:
for event in page:
events.append(event)
token = change_feed.continuation_token

dict_token = json.loads(token)
self.assertTrue(len(events) > 0)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

if self.is_live:
sleep(120)
print("continue printing events")

# restart using the continuation token after waiting for 2 minutes
change_feed2 = cf_client.list_changes(results_per_page=6).by_page(continuation_token=token)
change_feed_page2 = next(change_feed2)
events2 = list()
for event in change_feed_page2:
events2.append(event)

self.assertNotEqual(events2, 0)

if self.is_live:
sleep(120)
print("continue printing events")

# restart using the continuation token which has Non-zero EventIndex for 3 shards
token2 = change_feed2.continuation_token
dict_token2 = json.loads(token2)
self.assertEqual(len(dict_token2['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][0]['EventIndex'], 0)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][1]['EventIndex'], 0)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][2]['EventIndex'], 0)

change_feed3 = cf_client.list_changes(results_per_page=57).by_page(continuation_token=token2)
change_feed_page3 = next(change_feed3)
events3 = list()
for event in change_feed_page3:
events3.append(event)
self.assertNotEqual(events2, 0)

@GlobalStorageAccountPreparer()
def test_read_change_feed_tail_where_only_1_shard_has_data(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)

# to read until the end
start_time = datetime(2020, 8, 20, 1)
change_feed = cf_client.list_changes(start_time=start_time, results_per_page=3).by_page()

page = next(change_feed)
events_on_first_page = list()
for event in page:
events_on_first_page.append(event)

token = change_feed.continuation_token
dict_token = json.loads(token)

self.assertEqual(len(events_on_first_page), 3)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

# if self.is_live:
# sleep(120)
print("continue printing events")

# restart using the continuation token after waiting for 2 minutes
change_feed2 = cf_client.list_changes(results_per_page=5).by_page(continuation_token=token)
events2 = []
for page in change_feed2:
for event in page:
events2.append(event)

self.assertIsNot(len(events2), 0)

@GlobalStorageAccountPreparer()
def test_read_change_feed_with_3_shards_in_a_time_range(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)

# to get continuation token
start_time = datetime(2020, 8, 19, 22)
end_time = datetime(2020, 8, 19, 23)
change_feed = cf_client.list_changes(start_time=start_time, end_time=end_time, results_per_page=16).by_page()

page = next(change_feed)
events = list(page)
self.assertEqual(len(events), 16)

token = change_feed.continuation_token

dict_token = json.loads(token)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['EndTime'])
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

change_feed2 = cf_client.list_changes().by_page(continuation_token=token)
events = list(next(change_feed2))

end_time_str = (end_time + timedelta(hours=1)).isoformat()
self.assertTrue(events[len(events) - 1]['eventTime'] < end_time_str)

@GlobalStorageAccountPreparer()
def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_same_result_as_reading_all(
self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)

# to read until the end
start_time = datetime(2020, 8, 5, 17)
end_time = datetime(2020, 8, 5, 17, 15)

all_events = list(cf_client.list_changes(start_time=start_time, end_time=end_time))
change_feed = cf_client.list_changes(start_time=start_time, end_time=end_time, results_per_page=50).by_page()

events = list()
for _ in (0, 2):
page = next(change_feed)
for event in page:
events.append(event)
token = change_feed.continuation_token

dict_token = json.loads(token)
self.assertTrue(len(events) > 0)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])

# make sure end_time and continuation_token are mutual exclusive
with self.assertRaises(ValueError):
cf_client.list_changes(results_per_page=50, end_time=datetime.now()).by_page(continuation_token=token)
# make sure start_time and continuation_token are mutual exclusive
with self.assertRaises(ValueError):
cf_client.list_changes(results_per_page=50, start_time=datetime.now()).by_page(continuation_token=token)

# restart using the continuation token after waiting for 2 minutes
change_feed2 = cf_client.list_changes(results_per_page=50).by_page(continuation_token=token)
events2 = list()
for _ in (0, 2):
page = next(change_feed2)
for event in page:
events2.append(event)

self.assertNotEqual(events2, 0)

# restart using the continuation token which has Non-zero EventIndex for 3 shards
token2 = change_feed2.continuation_token
dict_token2 = json.loads(token2)
self.assertEqual(len(dict_token2['CurrentSegmentCursor']['ShardCursors']), 3)

change_feed3 = cf_client.list_changes(results_per_page=50).by_page(continuation_token=token2)
events3 = list()
for page in change_feed3:
for event in page:
events3.append(event)

token3 = change_feed3.continuation_token
dict_token3 = json.loads(token3)

self.assertNotEqual(events3, 0)
self.assertEqual(len(dict_token3['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertEqual(len(events)+len(events2)+len(events3), len(all_events))

@GlobalStorageAccountPreparer()
def test_list_3_shards_events_works_with_1_shard_cursor(self, resource_group, location, storage_account, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account, "blob"), storage_account_key)
start_time = datetime(2020, 8, 5, 17)
end_time = datetime(2020, 8, 5, 17, 15)
change_feed = cf_client.list_changes(results_per_page=1, start_time=start_time, end_time=end_time).by_page()
next(change_feed)
token_with_1_shard = change_feed.continuation_token

change_feed = cf_client.list_changes(results_per_page=50).by_page(continuation_token=token_with_1_shard)
events = list()
for _ in range(0, 2):
page = next(change_feed)
for event in page:
events.append(event)
dict_token = json.loads(change_feed.continuation_token)
dict_token_with_1_shard = json.loads(token_with_1_shard)
self.assertEqual(len(dict_token_with_1_shard['CurrentSegmentCursor']['ShardCursors']), 1)
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)

0 comments on commit f52dba8

Please sign in to comment.