diff --git a/tests/fields/fields.py b/tests/fields/fields.py index 318c0c595..ed7baa2bd 100644 --- a/tests/fields/fields.py +++ b/tests/fields/fields.py @@ -16,7 +16,7 @@ from decimal import Decimal -from bson import Binary, DBRef, ObjectId +from bson import Binary, DBRef, ObjectId, SON try: from bson.int64 import Int64 except ImportError: @@ -35,7 +35,8 @@ class FieldTest(MongoDBTestCase): def test_default_values_nothing_set(self): - """Ensure that default field values are used when creating a document. + """Ensure that default field values are used when creating + a document. """ class Person(Document): name = StringField() @@ -47,8 +48,9 @@ class Person(Document): # Confirm saving now would store values data_to_be_saved = sorted(person.to_mongo().keys()) - self.assertEqual( - data_to_be_saved, ['age', 'created', 'name', 'userid']) + self.assertEqual(data_to_be_saved, + ['age', 'created', 'name', 'userid'] + ) self.assertTrue(person.validate() is None) @@ -68,7 +70,8 @@ class Person(Document): data_to_be_saved, ['age', 'created', 'name', 'userid']) def test_default_values_set_to_None(self): - """Ensure that default field values are used when creating a document. + """Ensure that default field values are used even when + we explcitly initialize the doc with None values. """ class Person(Document): name = StringField() @@ -100,7 +103,8 @@ class Person(Document): self.assertEqual(data_to_be_saved, ['age', 'created', 'userid']) def test_default_values_when_setting_to_None(self): - """Ensure that default field values are used when creating a document. + """Ensure that default field values are used when creating + a document. """ class Person(Document): name = StringField() @@ -120,10 +124,10 @@ class Person(Document): self.assertTrue(person.validate() is None) - self.assertEqual(person.name, person.name) - self.assertEqual(person.age, person.age) - self.assertEqual(person.userid, person.userid) - self.assertEqual(person.created, person.created) + self.assertEqual(person.name, None) + self.assertEqual(person.age, 30) + self.assertEqual(person.userid, 'test') + self.assertTrue(isinstance(person.created, datetime.datetime)) self.assertEqual(person._data['name'], person.name) self.assertEqual(person._data['age'], person.age) @@ -135,7 +139,8 @@ class Person(Document): self.assertEqual(data_to_be_saved, ['age', 'created', 'userid']) def test_default_values_when_deleting_value(self): - """Ensure that default field values are used when creating a document. + """Ensure that default field values are used after non-default + values are explicitly deleted. """ class Person(Document): name = StringField() @@ -143,7 +148,8 @@ class Person(Document): userid = StringField(default=lambda: 'test', required=True) created = DateTimeField(default=datetime.datetime.utcnow) - person = Person(name="Ross") + person = Person(name="Ross", age=50, userid='different', + created=datetime.datetime(2014, 6, 12)) del person.name del person.age del person.userid @@ -154,10 +160,11 @@ class Person(Document): self.assertTrue(person.validate() is None) - self.assertEqual(person.name, person.name) - self.assertEqual(person.age, person.age) - self.assertEqual(person.userid, person.userid) - self.assertEqual(person.created, person.created) + self.assertEqual(person.name, None) + self.assertEqual(person.age, 30) + self.assertEqual(person.userid, 'test') + self.assertTrue(isinstance(person.created, datetime.datetime)) + self.assertNotEqual(person.created, datetime.datetime(2014, 6, 12)) self.assertEqual(person._data['name'], person.name) self.assertEqual(person._data['age'], person.age) @@ -169,8 +176,7 @@ class Person(Document): self.assertEqual(data_to_be_saved, ['age', 'created', 'userid']) def test_required_values(self): - """Ensure that required field constraints are enforced. - """ + """Ensure that required field constraints are enforced.""" class Person(Document): name = StringField(required=True) age = IntField(required=True) @@ -182,9 +188,9 @@ class Person(Document): self.assertRaises(ValidationError, person.validate) def test_not_required_handles_none_in_update(self): - """Ensure that every fields should accept None if required is False. + """Ensure that every fields should accept None if required is + False. """ - class HandleNoneFields(Document): str_fld = StringField() int_fld = IntField() @@ -236,23 +242,27 @@ class HandleNoneFields(Document): doc.com_dt_fld = datetime.datetime.utcnow() doc.save() - collection = self.db[HandleNoneFields._get_collection_name()] - obj = collection.update({"_id": doc.id}, {"$unset": { - "str_fld": 1, - "int_fld": 1, - "flt_fld": 1, - "comp_dt_fld": 1} + # Unset all the fields + obj = HandleNoneFields._get_collection().update({"_id": doc.id}, { + "$unset": { + "str_fld": 1, + "int_fld": 1, + "flt_fld": 1, + "comp_dt_fld": 1 + } }) # Retrive data from db and verify it. - ret = HandleNoneFields.objects.all()[0] - + ret = HandleNoneFields.objects.first() self.assertEqual(ret.str_fld, None) self.assertEqual(ret.int_fld, None) self.assertEqual(ret.flt_fld, None) - # Return current time if retrived value is None. + + # ComplexDateTimeField returns current time if retrived value is None. self.assertTrue(isinstance(ret.comp_dt_fld, datetime.datetime)) + # Retrieved object shouldn't pass validation when a re-save is + # attempted. self.assertRaises(ValidationError, ret.validate) def test_int_and_float_ne_operator(self): @@ -280,7 +290,8 @@ class TestDocument(Document): self.assertEqual(1, TestDocument.objects(long_fld__ne=None).count()) def test_object_id_validation(self): - """Ensure that invalid values cannot be assigned to string fields. + """Ensure that invalid values cannot be assigned to an + ObjectIdField. """ class Person(Document): name = StringField() @@ -297,27 +308,8 @@ class Person(Document): person.id = '497ce96f395f2f052a494fd4' person.validate() - def test_db_field_validation(self): - """Ensure that db_field doesn't accept invalid values.""" - - # dot in the name - with self.assertRaises(ValueError): - class User(Document): - name = StringField(db_field='user.name') - - # name starting with $ - with self.assertRaises(ValueError): - class User(Document): - name = StringField(db_field='$name') - - # name containing a null character - with self.assertRaises(ValueError): - class User(Document): - name = StringField(db_field='name\0') - def test_string_validation(self): - """Ensure that invalid values cannot be assigned to string fields. - """ + """Ensure that invalid values cannot be assigned to string fields.""" class Person(Document): name = StringField(max_length=20) userid = StringField(r'[0-9a-z_]+$') @@ -490,10 +482,25 @@ class Person(Document): person_2 = Person(height='something invalid') self.assertRaises(ValidationError, person_2.validate) - Person.drop_collection() + def test_db_field_validation(self): + """Ensure that db_field doesn't accept invalid values.""" - def test_decimal_comparison(self): + # dot in the name + with self.assertRaises(ValueError): + class User(Document): + name = StringField(db_field='user.name') + + # name starting with $ + with self.assertRaises(ValueError): + class User(Document): + name = StringField(db_field='$name') + + # name containing a null character + with self.assertRaises(ValueError): + class User(Document): + name = StringField(db_field='name\0') + def test_decimal_comparison(self): class Person(Document): money = DecimalField() @@ -546,7 +553,8 @@ class Person(Document): self.assertEqual(expected, actual) def test_boolean_validation(self): - """Ensure that invalid values cannot be assigned to boolean fields. + """Ensure that invalid values cannot be assigned to boolean + fields. """ class Person(Document): admin = BooleanField() @@ -586,8 +594,7 @@ class Person(Document): self.assertRaises(ValidationError, person.validate) def test_uuid_field_binary(self): - """Test UUID fields storing as Binary object - """ + """Test UUID fields storing as Binary object.""" class Person(Document): api_key = UUIDField(binary=True) @@ -611,7 +618,8 @@ class Person(Document): self.assertRaises(ValidationError, person.validate) def test_datetime_validation(self): - """Ensure that invalid values cannot be assigned to datetime fields. + """Ensure that invalid values cannot be assigned to datetime + fields. """ class LogEntry(Document): time = DateTimeField() @@ -675,8 +683,6 @@ class LogEntry(Document): log.reload() self.assertEqual(log.date.date(), datetime.date.today()) - LogEntry.drop_collection() - # Post UTC - microseconds are rounded (down) nearest millisecond and # dropped d1 = datetime.datetime(1970, 1, 1, 0, 0, 1, 999) @@ -708,8 +714,6 @@ class LogEntry(Document): self.assertNotEqual(log.date, d1) self.assertEqual(log.date, d2) - LogEntry.drop_collection() - def test_datetime_usage(self): """Tests for regular datetime fields""" class LogEntry(Document): @@ -731,48 +735,42 @@ class LogEntry(Document): log1 = LogEntry.objects.get(date=d1.isoformat('T')) self.assertEqual(log, log1) - LogEntry.drop_collection() - - # create 60 log entries - for i in range(1950, 2010): + # create additional 19 log entries for a total of 20 + for i in range(1971, 1990): d = datetime.datetime(i, 1, 1, 0, 0, 1) LogEntry(date=d).save() - self.assertEqual(LogEntry.objects.count(), 60) + self.assertEqual(LogEntry.objects.count(), 20) # Test ordering logs = LogEntry.objects.order_by("date") - count = logs.count() i = 0 - while i == count - 1: + while i < 19: self.assertTrue(logs[i].date <= logs[i + 1].date) i += 1 logs = LogEntry.objects.order_by("-date") - count = logs.count() i = 0 - while i == count - 1: + while i < 19: self.assertTrue(logs[i].date >= logs[i + 1].date) i += 1 # Test searching logs = LogEntry.objects.filter(date__gte=datetime.datetime(1980, 1, 1)) - self.assertEqual(logs.count(), 30) + self.assertEqual(logs.count(), 10) logs = LogEntry.objects.filter(date__lte=datetime.datetime(1980, 1, 1)) - self.assertEqual(logs.count(), 30) + self.assertEqual(logs.count(), 10) logs = LogEntry.objects.filter( - date__lte=datetime.datetime(2011, 1, 1), - date__gte=datetime.datetime(2000, 1, 1), + date__lte=datetime.datetime(1980, 1, 1), + date__gte=datetime.datetime(1975, 1, 1), ) - self.assertEqual(logs.count(), 10) - - LogEntry.drop_collection() + self.assertEqual(logs.count(), 5) def test_complexdatetime_storage(self): - """Tests for complex datetime fields - which can handle microseconds - without rounding. + """Tests for complex datetime fields - which can handle + microseconds without rounding. """ class LogEntry(Document): date = ComplexDateTimeField() @@ -829,18 +827,16 @@ class LogEntry(Document): stored = LogEntry(date_with_dots=datetime.datetime(2014, 1, 1)).to_mongo()['date_with_dots'] self.assertTrue(re.match('^\d{4}.\d{2}.\d{2}.\d{2}.\d{2}.\d{2}.\d{6}$', stored) is not None) - LogEntry.drop_collection() - def test_complexdatetime_usage(self): - """Tests for complex datetime fields - which can handle microseconds - without rounding. + """Tests for complex datetime fields - which can handle + microseconds without rounding. """ class LogEntry(Document): date = ComplexDateTimeField() LogEntry.drop_collection() - d1 = datetime.datetime(1970, 1, 1, 0, 0, 1, 999) + d1 = datetime.datetime(1950, 1, 1, 0, 0, 1, 999) log = LogEntry() log.date = d1 log.save() @@ -848,10 +844,8 @@ class LogEntry(Document): log1 = LogEntry.objects.get(date=d1) self.assertEqual(log, log1) - LogEntry.drop_collection() - - # create 60 log entries - for i in range(1950, 2010): + # create extra 59 log entries for a total of 60 + for i in range(1951, 2010): d = datetime.datetime(i, 1, 1, 0, 0, 1, 999) LogEntry(date=d).save() @@ -859,16 +853,14 @@ class LogEntry(Document): # Test ordering logs = LogEntry.objects.order_by("date") - count = logs.count() i = 0 - while i == count - 1: + while i < 59: self.assertTrue(logs[i].date <= logs[i + 1].date) i += 1 logs = LogEntry.objects.order_by("-date") - count = logs.count() i = 0 - while i == count - 1: + while i < 59: self.assertTrue(logs[i].date >= logs[i + 1].date) i += 1 @@ -889,7 +881,9 @@ class LogEntry(Document): # Test microsecond-level ordering/filtering for microsecond in (99, 999, 9999, 10000): - LogEntry(date=datetime.datetime(2015, 1, 1, 0, 0, 0, microsecond)).save() + LogEntry( + date=datetime.datetime(2015, 1, 1, 0, 0, 0, microsecond) + ).save() logs = list(LogEntry.objects.order_by('date')) for next_idx, log in enumerate(logs[:-1], start=1): @@ -901,14 +895,12 @@ class LogEntry(Document): next_log = logs[next_idx] self.assertTrue(log.date > next_log.date) - logs = LogEntry.objects.filter(date__lte=datetime.datetime(2015, 1, 1, 0, 0, 0, 10000)) + logs = LogEntry.objects.filter( + date__lte=datetime.datetime(2015, 1, 1, 0, 0, 0, 10000)) self.assertEqual(logs.count(), 4) - LogEntry.drop_collection() - def test_list_validation(self): - """Ensure that a list field only accepts lists with valid elements. - """ + """Ensure that a list field only accepts lists with valid elements.""" class User(Document): pass @@ -922,6 +914,9 @@ class BlogPost(Document): authors = ListField(ReferenceField(User)) generic = ListField(GenericReferenceField()) + User.drop_collection() + BlogPost.drop_collection() + post = BlogPost(content='Went for a walk today...') post.validate() @@ -967,9 +962,6 @@ class BlogPost(Document): post.generic = [user] post.validate() - User.drop_collection() - BlogPost.drop_collection() - def test_sorted_list_sorting(self): """Ensure that a sorted list field properly sorts values. """ @@ -983,6 +975,8 @@ class BlogPost(Document): ordering='order') tags = SortedListField(StringField()) + BlogPost.drop_collection() + post = BlogPost(content='Went for a walk today...') post.save() @@ -1007,8 +1001,6 @@ class BlogPost(Document): self.assertEqual(post.comments[0].content, comment1.content) self.assertEqual(post.comments[1].content, comment2.content) - BlogPost.drop_collection() - def test_reverse_list_sorting(self): """Ensure that a reverse sorted list field properly sorts values""" @@ -1021,6 +1013,8 @@ class CategoryList(Document): ordering='count', reverse=True) name = StringField() + CategoryList.drop_collection() + catlist = CategoryList(name="Top categories") cat1 = Category(name='posts', count=10) cat2 = Category(name='food', count=100) @@ -1033,11 +1027,8 @@ class CategoryList(Document): self.assertEqual(catlist.categories[1].name, cat3.name) self.assertEqual(catlist.categories[2].name, cat1.name) - CategoryList.drop_collection() - def test_list_field(self): - """Ensure that list types work as expected. - """ + """Ensure that list types work as expected.""" class BlogPost(Document): info = ListField() @@ -1086,8 +1077,6 @@ class BlogPost(Document): post.save() self.assertEqual(BlogPost.objects(info=['1', '2', '3', '4', '1', '2', '3', '4']).count(), 1) - BlogPost.drop_collection() - def test_list_field_manipulative_operators(self): """Ensure that ListField works with standard list operators that manipulate the list. """ @@ -1238,30 +1227,32 @@ def reset_post(): post.reload() self.assertEqual(post.info, ['5', '4', '3', '2', '1', '0']) - # 'sort': though this operator method does manipulate the list, it is tested in - # the 'test_list_field_lexicograpic_operators' function - BlogPost.drop_collection() + # 'sort': though this operator method does manipulate the list, it is + # tested in the 'test_list_field_lexicograpic_operators' function def test_list_field_invalid_operators(self): class BlogPost(Document): ref = StringField() info = ListField(StringField()) + post = BlogPost() post.ref = "1234" post.info = ['0', '1', '2', '3', '4', '5'] + # '__hash__' # aka 'hash(list)' - # # assert TypeError self.assertRaises(TypeError, lambda: hash(post.info)) def test_list_field_lexicographic_operators(self): - """Ensure that ListField works with standard list operators that do lexigraphic ordering. + """Ensure that ListField works with standard list operators that + do lexigraphic ordering. """ class BlogPost(Document): ref = StringField() text_info = ListField(StringField()) oid_info = ListField(ObjectIdField()) bool_info = ListField(BooleanField()) + BlogPost.drop_collection() blogSmall = BlogPost(ref="small") @@ -1286,28 +1277,35 @@ class BlogPost(Document): blogLargeB.bool_info = [False, True] blogLargeB.save() blogLargeB.reload() + # '__eq__' aka '==' self.assertEqual(blogLargeA.text_info, blogLargeB.text_info) self.assertEqual(blogLargeA.bool_info, blogLargeB.bool_info) + # '__ge__' aka '>=' self.assertGreaterEqual(blogLargeA.text_info, blogSmall.text_info) self.assertGreaterEqual(blogLargeA.text_info, blogLargeB.text_info) self.assertGreaterEqual(blogLargeA.bool_info, blogSmall.bool_info) self.assertGreaterEqual(blogLargeA.bool_info, blogLargeB.bool_info) + # '__gt__' aka '>' self.assertGreaterEqual(blogLargeA.text_info, blogSmall.text_info) self.assertGreaterEqual(blogLargeA.bool_info, blogSmall.bool_info) + # '__le__' aka '<=' self.assertLessEqual(blogSmall.text_info, blogLargeB.text_info) self.assertLessEqual(blogLargeA.text_info, blogLargeB.text_info) self.assertLessEqual(blogSmall.bool_info, blogLargeB.bool_info) self.assertLessEqual(blogLargeA.bool_info, blogLargeB.bool_info) + # '__lt__' aka '<' self.assertLess(blogSmall.text_info, blogLargeB.text_info) self.assertLess(blogSmall.bool_info, blogLargeB.bool_info) + # '__ne__' aka '!=' self.assertNotEqual(blogSmall.text_info, blogLargeB.text_info) self.assertNotEqual(blogSmall.bool_info, blogLargeB.bool_info) + # 'sort' blogLargeB.bool_info = [True, False, True, False] blogLargeB.text_info.sort() @@ -1327,11 +1325,8 @@ class BlogPost(Document): self.assertEqual(blogLargeB.oid_info, sorted_target_list) self.assertEqual(blogLargeB.bool_info, [False, False, True, True]) - BlogPost.drop_collection() - def test_list_assignment(self): - """Ensure that list field element assignment and slicing work - """ + """Ensure that list field element assignment and slicing work.""" class BlogPost(Document): info = ListField() @@ -1391,8 +1386,9 @@ class Bar(Document): self.assertEqual(repr(foo.bars), '[]') def test_list_field_strict(self): - """Ensure that list field handles validation if provided a strict field type.""" - + """Ensure that list field handles validation if provided + a strict field type. + """ class Simple(Document): mapping = ListField(field=IntField()) @@ -1407,30 +1403,26 @@ class Simple(Document): e.mapping = ["abc"] e.save() - Simple.drop_collection() - def test_list_field_rejects_strings(self): - """Strings aren't valid list field data types""" - + """Strings aren't valid list field data types.""" class Simple(Document): mapping = ListField() Simple.drop_collection() + e = Simple() e.mapping = 'hello world' - self.assertRaises(ValidationError, e.save) def test_complex_field_required(self): - """Ensure required cant be None / Empty""" - + """Ensure required cant be None / Empty.""" class Simple(Document): mapping = ListField(required=True) Simple.drop_collection() + e = Simple() e.mapping = [] - self.assertRaises(ValidationError, e.save) class Simple(Document): @@ -1439,18 +1431,17 @@ class Simple(Document): Simple.drop_collection() e = Simple() e.mapping = {} - self.assertRaises(ValidationError, e.save) def test_complex_field_same_value_not_changed(self): - """ - If a complex field is set to the same value, it should not be marked as - changed. + """If a complex field is set to the same value, it should not + be marked as changed. """ class Simple(Document): mapping = ListField() Simple.drop_collection() + e = Simple().save() e.mapping = [] self.assertEqual([], e._changed_fields) @@ -1459,12 +1450,12 @@ class Simple(Document): mapping = DictField() Simple.drop_collection() + e = Simple().save() e.mapping = {} self.assertEqual([], e._changed_fields) def test_slice_marks_field_as_changed(self): - class Simple(Document): widgets = ListField() @@ -1477,7 +1468,6 @@ class Simple(Document): self.assertEqual(simple.widgets, [4]) def test_del_slice_marks_field_as_changed(self): - class Simple(Document): widgets = ListField() @@ -1490,7 +1480,6 @@ class Simple(Document): self.assertEqual(simple.widgets, [4]) def test_list_field_with_negative_indices(self): - class Simple(Document): widgets = ListField() @@ -1504,7 +1493,6 @@ class Simple(Document): def test_list_field_complex(self): """Ensure that the list fields can handle the complex types.""" - class SettingBase(EmbeddedDocument): meta = {'allow_inheritance': True} @@ -1518,6 +1506,7 @@ class Simple(Document): mapping = ListField() Simple.drop_collection() + e = Simple() e.mapping.append(StringSetting(value='foo')) e.mapping.append(IntegerSetting(value=42)) @@ -1555,11 +1544,8 @@ class Simple(Document): self.assertEqual( Simple.objects.filter(mapping__2__list__1__value='Boo').count(), 1) - Simple.drop_collection() - def test_dict_field(self): - """Ensure that dict types work as expected. - """ + """Ensure that dict types work as expected.""" class BlogPost(Document): info = DictField() @@ -1621,11 +1607,8 @@ class BlogPost(Document): post.reload() self.assertEqual([], post.info['authors']) - BlogPost.drop_collection() - def test_dictfield_dump_document(self): - """Ensure a DictField can handle another document's dump - """ + """Ensure a DictField can handle another document's dump.""" class Doc(Document): field = DictField() @@ -1663,7 +1646,6 @@ class ToEmbedChild(ToEmbedParent): def test_dictfield_strict(self): """Ensure that dict field handles validation if provided a strict field type.""" - class Simple(Document): mapping = DictField(field=IntField()) @@ -1678,11 +1660,8 @@ class Simple(Document): e.mapping['somestring'] = "abc" e.save() - Simple.drop_collection() - def test_dictfield_complex(self): """Ensure that the dict field can handle the complex types.""" - class SettingBase(EmbeddedDocument): meta = {'allow_inheritance': True} @@ -1696,6 +1675,7 @@ class Simple(Document): mapping = DictField() Simple.drop_collection() + e = Simple() e.mapping['somestring'] = StringSetting(value='foo') e.mapping['someint'] = IntegerSetting(value=42) @@ -1732,11 +1712,8 @@ class Simple(Document): self.assertEqual( Simple.objects.filter(mapping__nested_dict__list__1__value='Boo').count(), 1) - Simple.drop_collection() - def test_atomic_update_dict_field(self): """Ensure that the entire DictField can be atomically updated.""" - class Simple(Document): mapping = DictField(field=ListField(IntField(required=True))) @@ -1754,11 +1731,8 @@ class Simple(Document): with self.assertRaises(ValueError): e.update(set__mapping={"somestrings": ["foo", "bar", ]}) - Simple.drop_collection() - def test_mapfield(self): """Ensure that the MapField handles the declared type.""" - class Simple(Document): mapping = MapField(IntField()) @@ -1776,11 +1750,8 @@ class Simple(Document): class NoDeclaredType(Document): mapping = MapField() - Simple.drop_collection() - def test_complex_mapfield(self): """Ensure that the MapField can handle complex declared types.""" - class SettingBase(EmbeddedDocument): meta = {"allow_inheritance": True} @@ -1809,7 +1780,6 @@ class Extensible(Document): e.save() def test_embedded_mapfield_db_field(self): - class Embedded(EmbeddedDocument): number = IntField(default=0, db_field='i') @@ -1846,11 +1816,10 @@ class Test(Document): test.my_map['1'].name = 'test updated' test.save() - Test.drop_collection() - def test_map_field_lookup(self): - """Ensure MapField lookups succeed on Fields without a lookup method""" - + """Ensure MapField lookups succeed on Fields without a lookup + method. + """ class Action(EmbeddedDocument): operation = StringField() object = StringField() @@ -1872,7 +1841,6 @@ class Log(Document): actions__friends__object='beer').count()) def test_map_field_unicode(self): - class Info(EmbeddedDocument): description = StringField() value_list = ListField(field=StringField()) @@ -1890,12 +1858,12 @@ class BlogPost(Document): tree.save() - self.assertEqual(BlogPost.objects.get(id=tree.id).info_dict[u"éééé"].description, u"VALUE: éééé") - - BlogPost.drop_collection() + self.assertEqual( + BlogPost.objects.get(id=tree.id).info_dict[u"éééé"].description, + u"VALUE: éééé" + ) def test_embedded_db_field(self): - class Embedded(EmbeddedDocument): number = IntField(default=0, db_field='i') @@ -1949,8 +1917,8 @@ class Person(Document): person.validate() def test_embedded_document_inheritance(self): - """Ensure that subclasses of embedded documents may be provided to - EmbeddedDocumentFields of the superclass' type. + """Ensure that subclasses of embedded documents may be provided + to EmbeddedDocumentFields of the superclass' type. """ class User(EmbeddedDocument): name = StringField() @@ -1976,7 +1944,6 @@ def test_embedded_document_inheritance_with_list(self): """Ensure that nested list of subclassed embedded documents is handled correctly. """ - class Group(EmbeddedDocument): name = StringField() content = ListField(StringField()) @@ -1998,9 +1965,9 @@ class User(Basedoc): self.assertEqual(content, User.objects.first().groups[0].content) def test_reference_miss(self): - """Ensure an exception is raised when dereferencing unknown document + """Ensure an exception is raised when dereferencing an unknown + document. """ - class Foo(Document): pass @@ -2029,8 +1996,8 @@ class Bar(Document): self.assertEqual(bar.generic_ref, {'_ref': expected, '_cls': 'Foo'}) def test_reference_validation(self): - """Ensure that invalid docment objects cannot be assigned to reference - fields. + """Ensure that invalid document objects cannot be assigned to + reference fields. """ class User(Document): name = StringField() @@ -2042,6 +2009,8 @@ class BlogPost(Document): User.drop_collection() BlogPost.drop_collection() + # Make sure ReferenceField only accepts a document class or a string + # with a document class name. self.assertRaises(ValidationError, ReferenceField, EmbeddedDocument) user = User(name='Test User') @@ -2056,19 +2025,18 @@ class BlogPost(Document): post1.author = post2 self.assertRaises(ValidationError, post1.validate) + # Make sure referencing a saved document of the right type works user.save() post1.author = user post1.save() + # Make sure referencing a saved document of the *wrong* type fails post2.save() post1.author = post2 self.assertRaises(ValidationError, post1.validate) - User.drop_collection() - BlogPost.drop_collection() - def test_dbref_reference_fields(self): - + """Make sure storing references as bson.dbref.DBRef works.""" class Person(Document): name = StringField() parent = ReferenceField('self', dbref=True) @@ -2078,2426 +2046,2378 @@ class Person(Document): p1 = Person(name="John").save() Person(name="Ross", parent=p1).save() - col = Person._get_collection() - data = col.find_one({'name': 'Ross'}) - self.assertEqual(data['parent'], DBRef('person', p1.pk)) + self.assertEqual( + Person._get_collection().find_one({'name': 'Ross'})['parent'], + DBRef('person', p1.pk) + ) p = Person.objects.get(name="Ross") self.assertEqual(p.parent, p1) def test_dbref_to_mongo(self): + """Make sure that calling to_mongo on a ReferenceField which + has dbref=False, but actually actually contains a DBRef returns + an ID of that DBRef. + """ class Person(Document): name = StringField() parent = ReferenceField('self', dbref=False) - p1 = Person._from_son({'name': "Yakxxx", - 'parent': "50a234ea469ac1eda42d347d"}) - mongoed = p1.to_mongo() - self.assertTrue(isinstance(mongoed['parent'], ObjectId)) + p = Person( + name='Steve', + parent=DBRef('person', 'abcdefghijklmnop') + ) + self.assertEqual(p.to_mongo(), SON([ + ('name', u'Steve'), + ('parent', 'abcdefghijklmnop') + ])) - def test_cached_reference_field_get_and_save(self): - """ - Tests #1047: CachedReferenceField creates DBRefs on to_python, but can't save them on to_mongo - """ - class Animal(Document): + def test_objectid_reference_fields(self): + + class Person(Document): name = StringField() - tag = StringField() + parent = ReferenceField('self', dbref=False) - class Ocorrence(Document): - person = StringField() - animal = CachedReferenceField(Animal) + Person.drop_collection() - Animal.drop_collection() - Ocorrence.drop_collection() + p1 = Person(name="John").save() + Person(name="Ross", parent=p1).save() - Ocorrence(person="testte", - animal=Animal(name="Leopard", tag="heavy").save()).save() - p = Ocorrence.objects.get() - p.person = 'new_testte' - p.save() + col = Person._get_collection() + data = col.find_one({'name': 'Ross'}) + self.assertEqual(data['parent'], p1.pk) - def test_cached_reference_fields(self): - class Animal(Document): - name = StringField() - tag = StringField() + p = Person.objects.get(name="Ross") + self.assertEqual(p.parent, p1) - class Ocorrence(Document): - person = StringField() - animal = CachedReferenceField( - Animal, fields=['tag']) + def test_list_item_dereference(self): + """Ensure that DBRef items in ListFields are dereferenced. + """ + class User(Document): + name = StringField() - Animal.drop_collection() - Ocorrence.drop_collection() + class Group(Document): + members = ListField(ReferenceField(User)) - a = Animal(name="Leopard", tag="heavy") - a.save() + User.drop_collection() + Group.drop_collection() - self.assertEqual(Animal._cached_reference_fields, [Ocorrence.animal]) - o = Ocorrence(person="teste", animal=a) - o.save() + user1 = User(name='user1') + user1.save() + user2 = User(name='user2') + user2.save() - p = Ocorrence(person="Wilson") - p.save() + group = Group(members=[user1, user2]) + group.save() - self.assertEqual(Ocorrence.objects(animal=None).count(), 1) + group_obj = Group.objects.first() - self.assertEqual( - a.to_mongo(fields=['tag']), {'tag': 'heavy', "_id": a.pk}) + self.assertEqual(group_obj.members[0].name, user1.name) + self.assertEqual(group_obj.members[1].name, user2.name) - self.assertEqual(o.to_mongo()['animal']['tag'], 'heavy') + def test_recursive_reference(self): + """Ensure that ReferenceFields can reference their own documents. + """ + class Employee(Document): + name = StringField() + boss = ReferenceField('self') + friends = ListField(ReferenceField('self')) - # counts - Ocorrence(person="teste 2").save() - Ocorrence(person="teste 3").save() + Employee.drop_collection() - count = Ocorrence.objects(animal__tag='heavy').count() - self.assertEqual(count, 1) + bill = Employee(name='Bill Lumbergh') + bill.save() - ocorrence = Ocorrence.objects(animal__tag='heavy').first() - self.assertEqual(ocorrence.person, "teste") - self.assertTrue(isinstance(ocorrence.animal, Animal)) + michael = Employee(name='Michael Bolton') + michael.save() - def test_cached_reference_field_decimal(self): - class PersonAuto(Document): - name = StringField() - salary = DecimalField() + samir = Employee(name='Samir Nagheenanajar') + samir.save() - class SocialTest(Document): - group = StringField() - person = CachedReferenceField( - PersonAuto, - fields=('salary',)) + friends = [michael, samir] + peter = Employee(name='Peter Gibbons', boss=bill, friends=friends) + peter.save() - PersonAuto.drop_collection() - SocialTest.drop_collection() + peter = Employee.objects.with_id(peter.id) + self.assertEqual(peter.boss, bill) + self.assertEqual(peter.friends, friends) - p = PersonAuto(name="Alberto", salary=Decimal('7000.00')) - p.save() + def test_recursive_embedding(self): + """Ensure that EmbeddedDocumentFields can contain their own documents. + """ + class TreeNode(EmbeddedDocument): + name = StringField() + children = ListField(EmbeddedDocumentField('self')) - s = SocialTest(group="dev", person=p) - s.save() + class Tree(Document): + name = StringField() + children = ListField(EmbeddedDocumentField('TreeNode')) - self.assertEqual( - SocialTest.objects._collection.find_one({'person.salary': 7000.00}), { - '_id': s.pk, - 'group': s.group, - 'person': { - '_id': p.pk, - 'salary': 7000.00 - } - }) + Tree.drop_collection() - def test_cached_reference_field_reference(self): - class Group(Document): - name = StringField() + tree = Tree(name="Tree") + first_child = TreeNode(name="Child 1") + tree.children.append(first_child) - class Person(Document): - name = StringField() - group = ReferenceField(Group) + second_child = TreeNode(name="Child 2") + first_child.children.append(second_child) + tree.save() - class SocialData(Document): - obs = StringField() - tags = ListField( - StringField()) - person = CachedReferenceField( - Person, - fields=('group',)) + tree = Tree.objects.first() + self.assertEqual(len(tree.children), 1) - Group.drop_collection() - Person.drop_collection() - SocialData.drop_collection() + self.assertEqual(len(tree.children[0].children), 1) - g1 = Group(name='dev') - g1.save() + third_child = TreeNode(name="Child 3") + tree.children[0].children.append(third_child) + tree.save() - g2 = Group(name="designers") - g2.save() + self.assertEqual(len(tree.children), 1) + self.assertEqual(tree.children[0].name, first_child.name) + self.assertEqual(tree.children[0].children[0].name, second_child.name) + self.assertEqual(tree.children[0].children[1].name, third_child.name) - p1 = Person(name="Alberto", group=g1) - p1.save() + # Test updating + tree.children[0].name = 'I am Child 1' + tree.children[0].children[0].name = 'I am Child 2' + tree.children[0].children[1].name = 'I am Child 3' + tree.save() - p2 = Person(name="Andre", group=g1) - p2.save() + self.assertEqual(tree.children[0].name, 'I am Child 1') + self.assertEqual(tree.children[0].children[0].name, 'I am Child 2') + self.assertEqual(tree.children[0].children[1].name, 'I am Child 3') - p3 = Person(name="Afro design", group=g2) - p3.save() + # Test removal + self.assertEqual(len(tree.children[0].children), 2) + del(tree.children[0].children[1]) - s1 = SocialData(obs="testing 123", person=p1, tags=['tag1', 'tag2']) - s1.save() + tree.save() + self.assertEqual(len(tree.children[0].children), 1) - s2 = SocialData(obs="testing 321", person=p3, tags=['tag3', 'tag4']) - s2.save() + tree.children[0].children.pop(0) + tree.save() + self.assertEqual(len(tree.children[0].children), 0) + self.assertEqual(tree.children[0].children, []) - self.assertEqual(SocialData.objects._collection.find_one( - {'tags': 'tag2'}), { - '_id': s1.pk, - 'obs': 'testing 123', - 'tags': ['tag1', 'tag2'], - 'person': { - '_id': p1.pk, - 'group': g1.pk - } - }) + tree.children[0].children.insert(0, third_child) + tree.children[0].children.insert(0, second_child) + tree.save() + self.assertEqual(len(tree.children[0].children), 2) + self.assertEqual(tree.children[0].children[0].name, second_child.name) + self.assertEqual(tree.children[0].children[1].name, third_child.name) - self.assertEqual(SocialData.objects(person__group=g2).count(), 1) - self.assertEqual(SocialData.objects(person__group=g2).first(), s2) + def test_undefined_reference(self): + """Ensure that ReferenceFields may reference undefined Documents. + """ + class Product(Document): + name = StringField() + company = ReferenceField('Company') - def test_cached_reference_field_update_all(self): - class Person(Document): - TYPES = ( - ('pf', "PF"), - ('pj', "PJ") - ) + class Company(Document): name = StringField() - tp = StringField( - choices=TYPES - ) - father = CachedReferenceField('self', fields=('tp',)) + Product.drop_collection() + Company.drop_collection() - Person.drop_collection() + ten_gen = Company(name='10gen') + ten_gen.save() + mongodb = Product(name='MongoDB', company=ten_gen) + mongodb.save() - a1 = Person(name="Wilson Father", tp="pj") - a1.save() + me = Product(name='MongoEngine') + me.save() - a2 = Person(name='Wilson Junior', tp='pf', father=a1) - a2.save() + obj = Product.objects(company=ten_gen).first() + self.assertEqual(obj, mongodb) + self.assertEqual(obj.company, ten_gen) - self.assertEqual(dict(a2.to_mongo()), { - "_id": a2.pk, - "name": u"Wilson Junior", - "tp": u"pf", - "father": { - "_id": a1.pk, - "tp": u"pj" - } - }) + obj = Product.objects(company=None).first() + self.assertEqual(obj, me) - self.assertEqual(Person.objects(father=a1)._query, { - 'father._id': a1.pk - }) - self.assertEqual(Person.objects(father=a1).count(), 1) + obj = Product.objects.get(company=None) + self.assertEqual(obj, me) - Person.objects.update(set__tp="pf") - Person.father.sync_all() + def test_reference_query_conversion(self): + """Ensure that ReferenceFields can be queried using objects and values + of the type of the primary key of the referenced object. + """ + class Member(Document): + user_num = IntField(primary_key=True) - a2.reload() - self.assertEqual(dict(a2.to_mongo()), { - "_id": a2.pk, - "name": u"Wilson Junior", - "tp": u"pf", - "father": { - "_id": a1.pk, - "tp": u"pf" - } - }) + class BlogPost(Document): + title = StringField() + author = ReferenceField(Member, dbref=False) - def test_cached_reference_fields_on_embedded_documents(self): - with self.assertRaises(InvalidDocumentError): - class Test(Document): - name = StringField() + Member.drop_collection() + BlogPost.drop_collection() - type('WrongEmbeddedDocument', ( - EmbeddedDocument,), { - 'test': CachedReferenceField(Test) - }) + m1 = Member(user_num=1) + m1.save() + m2 = Member(user_num=2) + m2.save() - def test_cached_reference_auto_sync(self): - class Person(Document): - TYPES = ( - ('pf', "PF"), - ('pj', "PJ") - ) - name = StringField() - tp = StringField( - choices=TYPES - ) + post1 = BlogPost(title='post 1', author=m1) + post1.save() - father = CachedReferenceField('self', fields=('tp',)) + post2 = BlogPost(title='post 2', author=m2) + post2.save() - Person.drop_collection() + post = BlogPost.objects(author=m1).first() + self.assertEqual(post.id, post1.id) - a1 = Person(name="Wilson Father", tp="pj") - a1.save() + post = BlogPost.objects(author=m2).first() + self.assertEqual(post.id, post2.id) - a2 = Person(name='Wilson Junior', tp='pf', father=a1) - a2.save() + def test_reference_query_conversion_dbref(self): + """Ensure that ReferenceFields can be queried using objects and values + of the type of the primary key of the referenced object. + """ + class Member(Document): + user_num = IntField(primary_key=True) - a1.tp = 'pf' - a1.save() + class BlogPost(Document): + title = StringField() + author = ReferenceField(Member, dbref=True) - a2.reload() - self.assertEqual(dict(a2.to_mongo()), { - '_id': a2.pk, - 'name': 'Wilson Junior', - 'tp': 'pf', - 'father': { - '_id': a1.pk, - 'tp': 'pf' - } - }) + Member.drop_collection() + BlogPost.drop_collection() - def test_cached_reference_auto_sync_disabled(self): - class Persone(Document): - TYPES = ( - ('pf', "PF"), - ('pj', "PJ") - ) - name = StringField() - tp = StringField( - choices=TYPES - ) + m1 = Member(user_num=1) + m1.save() + m2 = Member(user_num=2) + m2.save() - father = CachedReferenceField( - 'self', fields=('tp',), auto_sync=False) + post1 = BlogPost(title='post 1', author=m1) + post1.save() - Persone.drop_collection() + post2 = BlogPost(title='post 2', author=m2) + post2.save() - a1 = Persone(name="Wilson Father", tp="pj") - a1.save() + post = BlogPost.objects(author=m1).first() + self.assertEqual(post.id, post1.id) - a2 = Persone(name='Wilson Junior', tp='pf', father=a1) - a2.save() + post = BlogPost.objects(author=m2).first() + self.assertEqual(post.id, post2.id) - a1.tp = 'pf' - a1.save() + def test_drop_abstract_document(self): + """Ensure that an abstract document cannot be dropped given it + has no underlying collection. + """ + class AbstractDoc(Document): + name = StringField() + meta = {"abstract": True} - self.assertEqual(Persone.objects._collection.find_one({'_id': a2.pk}), { - '_id': a2.pk, - 'name': 'Wilson Junior', - 'tp': 'pf', - 'father': { - '_id': a1.pk, - 'tp': 'pj' - } - }) + self.assertRaises(OperationError, AbstractDoc.drop_collection) - def test_cached_reference_embedded_fields(self): - class Owner(EmbeddedDocument): - TPS = ( - ('n', "Normal"), - ('u', "Urgent") - ) + def test_reference_class_with_abstract_parent(self): + """Ensure that a class with an abstract parent can be referenced. + """ + class Sibling(Document): name = StringField() - tp = StringField( - verbose_name="Type", - db_field="t", - choices=TPS) + meta = {"abstract": True} - class Animal(Document): - name = StringField() - tag = StringField() + class Sister(Sibling): + pass - owner = EmbeddedDocumentField(Owner) + class Brother(Sibling): + sibling = ReferenceField(Sibling) - class Ocorrence(Document): - person = StringField() - animal = CachedReferenceField( - Animal, fields=['tag', 'owner.tp']) + Sister.drop_collection() + Brother.drop_collection() - Animal.drop_collection() - Ocorrence.drop_collection() + sister = Sister(name="Alice") + sister.save() + brother = Brother(name="Bob", sibling=sister) + brother.save() - a = Animal(name="Leopard", tag="heavy", - owner=Owner(tp='u', name="Wilson Júnior") - ) - a.save() + self.assertEquals(Brother.objects[0].sibling.name, sister.name) - o = Ocorrence(person="teste", animal=a) - o.save() - self.assertEqual(dict(a.to_mongo(fields=['tag', 'owner.tp'])), { - '_id': a.pk, - 'tag': 'heavy', - 'owner': { - 't': 'u' - } - }) - self.assertEqual(o.to_mongo()['animal']['tag'], 'heavy') - self.assertEqual(o.to_mongo()['animal']['owner']['t'], 'u') + def test_reference_abstract_class(self): + """Ensure that an abstract class instance cannot be used in the + reference of that abstract class. + """ + class Sibling(Document): + name = StringField() + meta = {"abstract": True} - # counts - Ocorrence(person="teste 2").save() - Ocorrence(person="teste 3").save() + class Sister(Sibling): + pass - count = Ocorrence.objects( - animal__tag='heavy', animal__owner__tp='u').count() - self.assertEqual(count, 1) + class Brother(Sibling): + sibling = ReferenceField(Sibling) - ocorrence = Ocorrence.objects( - animal__tag='heavy', - animal__owner__tp='u').first() - self.assertEqual(ocorrence.person, "teste") - self.assertTrue(isinstance(ocorrence.animal, Animal)) + Sister.drop_collection() + Brother.drop_collection() - def test_cached_reference_embedded_list_fields(self): - class Owner(EmbeddedDocument): - name = StringField() - tags = ListField(StringField()) + sister = Sibling(name="Alice") + brother = Brother(name="Bob", sibling=sister) + self.assertRaises(ValidationError, brother.save) - class Animal(Document): + def test_abstract_reference_base_type(self): + """Ensure that an an abstract reference fails validation when given a + Document that does not inherit from the abstract type. + """ + class Sibling(Document): name = StringField() - tag = StringField() + meta = {"abstract": True} - owner = EmbeddedDocumentField(Owner) + class Brother(Sibling): + sibling = ReferenceField(Sibling) - class Ocorrence(Document): - person = StringField() - animal = CachedReferenceField( - Animal, fields=['tag', 'owner.tags']) + class Mother(Document): + name = StringField() - Animal.drop_collection() - Ocorrence.drop_collection() + Brother.drop_collection() + Mother.drop_collection() - a = Animal(name="Leopard", tag="heavy", - owner=Owner(tags=['cool', 'funny'], - name="Wilson Júnior") - ) - a.save() + mother = Mother(name="Carol") + mother.save() + brother = Brother(name="Bob", sibling=mother) + self.assertRaises(ValidationError, brother.save) - o = Ocorrence(person="teste 2", animal=a) - o.save() - self.assertEqual(dict(a.to_mongo(fields=['tag', 'owner.tags'])), { - '_id': a.pk, - 'tag': 'heavy', - 'owner': { - 'tags': ['cool', 'funny'] - } - }) + def test_generic_reference(self): + """Ensure that a GenericReferenceField properly dereferences items. + """ + class Link(Document): + title = StringField() + meta = {'allow_inheritance': False} - self.assertEqual(o.to_mongo()['animal']['tag'], 'heavy') - self.assertEqual(o.to_mongo()['animal']['owner']['tags'], - ['cool', 'funny']) + class Post(Document): + title = StringField() - # counts - Ocorrence(person="teste 2").save() - Ocorrence(person="teste 3").save() + class Bookmark(Document): + bookmark_object = GenericReferenceField() - query = Ocorrence.objects( - animal__tag='heavy', animal__owner__tags='cool')._query - self.assertEqual( - query, {'animal.owner.tags': 'cool', 'animal.tag': 'heavy'}) + Link.drop_collection() + Post.drop_collection() + Bookmark.drop_collection() - ocorrence = Ocorrence.objects( - animal__tag='heavy', - animal__owner__tags='cool').first() - self.assertEqual(ocorrence.person, "teste 2") - self.assertTrue(isinstance(ocorrence.animal, Animal)) + link_1 = Link(title="Pitchfork") + link_1.save() - def test_objectid_reference_fields(self): + post_1 = Post(title="Behind the Scenes of the Pavement Reunion") + post_1.save() - class Person(Document): - name = StringField() - parent = ReferenceField('self', dbref=False) + bm = Bookmark(bookmark_object=post_1) + bm.save() - Person.drop_collection() + bm = Bookmark.objects(bookmark_object=post_1).first() - p1 = Person(name="John").save() - Person(name="Ross", parent=p1).save() + self.assertEqual(bm.bookmark_object, post_1) + self.assertTrue(isinstance(bm.bookmark_object, Post)) - col = Person._get_collection() - data = col.find_one({'name': 'Ross'}) - self.assertEqual(data['parent'], p1.pk) + bm.bookmark_object = link_1 + bm.save() - p = Person.objects.get(name="Ross") - self.assertEqual(p.parent, p1) + bm = Bookmark.objects(bookmark_object=link_1).first() - def test_list_item_dereference(self): - """Ensure that DBRef items in ListFields are dereferenced. + self.assertEqual(bm.bookmark_object, link_1) + self.assertTrue(isinstance(bm.bookmark_object, Link)) + + def test_generic_reference_list(self): + """Ensure that a ListField properly dereferences generic references. """ - class User(Document): - name = StringField() + class Link(Document): + title = StringField() - class Group(Document): - members = ListField(ReferenceField(User)) + class Post(Document): + title = StringField() + class User(Document): + bookmarks = ListField(GenericReferenceField()) + + Link.drop_collection() + Post.drop_collection() User.drop_collection() - Group.drop_collection() - user1 = User(name='user1') - user1.save() - user2 = User(name='user2') - user2.save() + link_1 = Link(title="Pitchfork") + link_1.save() - group = Group(members=[user1, user2]) - group.save() + post_1 = Post(title="Behind the Scenes of the Pavement Reunion") + post_1.save() - group_obj = Group.objects.first() + user = User(bookmarks=[post_1, link_1]) + user.save() - self.assertEqual(group_obj.members[0].name, user1.name) - self.assertEqual(group_obj.members[1].name, user2.name) + user = User.objects(bookmarks__all=[post_1, link_1]).first() - User.drop_collection() - Group.drop_collection() + self.assertEqual(user.bookmarks[0], post_1) + self.assertEqual(user.bookmarks[1], link_1) - def test_recursive_reference(self): - """Ensure that ReferenceFields can reference their own documents. + def test_generic_reference_document_not_registered(self): + """Ensure dereferencing out of the document registry throws a + `NotRegistered` error. """ - class Employee(Document): - name = StringField() - boss = ReferenceField('self') - friends = ListField(ReferenceField('self')) - - Employee.drop_collection() - bill = Employee(name='Bill Lumbergh') - bill.save() + class Link(Document): + title = StringField() - michael = Employee(name='Michael Bolton') - michael.save() + class User(Document): + bookmarks = ListField(GenericReferenceField()) - samir = Employee(name='Samir Nagheenanajar') - samir.save() + Link.drop_collection() + User.drop_collection() - friends = [michael, samir] - peter = Employee(name='Peter Gibbons', boss=bill, friends=friends) - peter.save() + link_1 = Link(title="Pitchfork") + link_1.save() - peter = Employee.objects.with_id(peter.id) - self.assertEqual(peter.boss, bill) - self.assertEqual(peter.friends, friends) + user = User(bookmarks=[link_1]) + user.save() - def test_recursive_embedding(self): - """Ensure that EmbeddedDocumentFields can contain their own documents. - """ - class TreeNode(EmbeddedDocument): - name = StringField() - children = ListField(EmbeddedDocumentField('self')) + # Mimic User and Link definitions being in a different file + # and the Link model not being imported in the User file. + del(_document_registry["Link"]) - class Tree(Document): - name = StringField() - children = ListField(EmbeddedDocumentField('TreeNode')) + user = User.objects.first() + try: + user.bookmarks + raise AssertionError("Link was removed from the registry") + except NotRegistered: + pass - Tree.drop_collection() - tree = Tree(name="Tree") + def test_generic_reference_is_none(self): - first_child = TreeNode(name="Child 1") - tree.children.append(first_child) + class Person(Document): + name = StringField() + city = GenericReferenceField() - second_child = TreeNode(name="Child 2") - first_child.children.append(second_child) - tree.save() + Person.drop_collection() - tree = Tree.objects.first() - self.assertEqual(len(tree.children), 1) + Person(name="Wilson Jr").save() + self.assertEqual(repr(Person.objects(city=None)), + "[]") - self.assertEqual(len(tree.children[0].children), 1) + def test_generic_reference_choices(self): + """Ensure that a GenericReferenceField can handle choices.""" + class Link(Document): + title = StringField() - third_child = TreeNode(name="Child 3") - tree.children[0].children.append(third_child) - tree.save() + class Post(Document): + title = StringField() - self.assertEqual(len(tree.children), 1) - self.assertEqual(tree.children[0].name, first_child.name) - self.assertEqual(tree.children[0].children[0].name, second_child.name) - self.assertEqual(tree.children[0].children[1].name, third_child.name) + class Bookmark(Document): + bookmark_object = GenericReferenceField(choices=(Post,)) - # Test updating - tree.children[0].name = 'I am Child 1' - tree.children[0].children[0].name = 'I am Child 2' - tree.children[0].children[1].name = 'I am Child 3' - tree.save() + Link.drop_collection() + Post.drop_collection() + Bookmark.drop_collection() - self.assertEqual(tree.children[0].name, 'I am Child 1') - self.assertEqual(tree.children[0].children[0].name, 'I am Child 2') - self.assertEqual(tree.children[0].children[1].name, 'I am Child 3') + link_1 = Link(title="Pitchfork") + link_1.save() - # Test removal - self.assertEqual(len(tree.children[0].children), 2) - del(tree.children[0].children[1]) + post_1 = Post(title="Behind the Scenes of the Pavement Reunion") + post_1.save() - tree.save() - self.assertEqual(len(tree.children[0].children), 1) + bm = Bookmark(bookmark_object=link_1) + self.assertRaises(ValidationError, bm.validate) - tree.children[0].children.pop(0) - tree.save() - self.assertEqual(len(tree.children[0].children), 0) - self.assertEqual(tree.children[0].children, []) + bm = Bookmark(bookmark_object=post_1) + bm.save() - tree.children[0].children.insert(0, third_child) - tree.children[0].children.insert(0, second_child) - tree.save() - self.assertEqual(len(tree.children[0].children), 2) - self.assertEqual(tree.children[0].children[0].name, second_child.name) - self.assertEqual(tree.children[0].children[1].name, third_child.name) + bm = Bookmark.objects.first() + self.assertEqual(bm.bookmark_object, post_1) - def test_undefined_reference(self): - """Ensure that ReferenceFields may reference undefined Documents. + def test_generic_reference_string_choices(self): + """Ensure that a GenericReferenceField can handle choices as strings """ - class Product(Document): - name = StringField() - company = ReferenceField('Company') + class Link(Document): + title = StringField() - class Company(Document): - name = StringField() + class Post(Document): + title = StringField() - Product.drop_collection() - Company.drop_collection() + class Bookmark(Document): + bookmark_object = GenericReferenceField(choices=('Post', Link)) - ten_gen = Company(name='10gen') - ten_gen.save() - mongodb = Product(name='MongoDB', company=ten_gen) - mongodb.save() + Link.drop_collection() + Post.drop_collection() + Bookmark.drop_collection() - me = Product(name='MongoEngine') - me.save() + link_1 = Link(title="Pitchfork") + link_1.save() - obj = Product.objects(company=ten_gen).first() - self.assertEqual(obj, mongodb) - self.assertEqual(obj.company, ten_gen) + post_1 = Post(title="Behind the Scenes of the Pavement Reunion") + post_1.save() - obj = Product.objects(company=None).first() - self.assertEqual(obj, me) + bm = Bookmark(bookmark_object=link_1) + bm.save() - obj = Product.objects.get(company=None) - self.assertEqual(obj, me) + bm = Bookmark(bookmark_object=post_1) + bm.save() - def test_reference_query_conversion(self): - """Ensure that ReferenceFields can be queried using objects and values - of the type of the primary key of the referenced object. - """ - class Member(Document): - user_num = IntField(primary_key=True) + bm = Bookmark(bookmark_object=bm) + self.assertRaises(ValidationError, bm.validate) - class BlogPost(Document): + def test_generic_reference_choices_no_dereference(self): + """Ensure that a GenericReferenceField can handle choices on + non-derefenreced (i.e. DBRef) elements + """ + class Post(Document): title = StringField() - author = ReferenceField(Member, dbref=False) - - Member.drop_collection() - BlogPost.drop_collection() - m1 = Member(user_num=1) - m1.save() - m2 = Member(user_num=2) - m2.save() - - post1 = BlogPost(title='post 1', author=m1) - post1.save() + class Bookmark(Document): + bookmark_object = GenericReferenceField(choices=(Post, )) + other_field = StringField() - post2 = BlogPost(title='post 2', author=m2) - post2.save() + Post.drop_collection() + Bookmark.drop_collection() - post = BlogPost.objects(author=m1).first() - self.assertEqual(post.id, post1.id) + post_1 = Post(title="Behind the Scenes of the Pavement Reunion") + post_1.save() - post = BlogPost.objects(author=m2).first() - self.assertEqual(post.id, post2.id) + bm = Bookmark(bookmark_object=post_1) + bm.save() - Member.drop_collection() - BlogPost.drop_collection() + bm = Bookmark.objects.get(id=bm.id) + # bookmark_object is now a DBRef + bm.other_field = 'dummy_change' + bm.save() - def test_reference_query_conversion_dbref(self): - """Ensure that ReferenceFields can be queried using objects and values - of the type of the primary key of the referenced object. + def test_generic_reference_list_choices(self): + """Ensure that a ListField properly dereferences generic references and + respects choices. """ - class Member(Document): - user_num = IntField(primary_key=True) + class Link(Document): + title = StringField() - class BlogPost(Document): + class Post(Document): title = StringField() - author = ReferenceField(Member, dbref=True) - Member.drop_collection() - BlogPost.drop_collection() + class User(Document): + bookmarks = ListField(GenericReferenceField(choices=(Post,))) - m1 = Member(user_num=1) - m1.save() - m2 = Member(user_num=2) - m2.save() + Link.drop_collection() + Post.drop_collection() + User.drop_collection() - post1 = BlogPost(title='post 1', author=m1) - post1.save() + link_1 = Link(title="Pitchfork") + link_1.save() - post2 = BlogPost(title='post 2', author=m2) - post2.save() + post_1 = Post(title="Behind the Scenes of the Pavement Reunion") + post_1.save() - post = BlogPost.objects(author=m1).first() - self.assertEqual(post.id, post1.id) + user = User(bookmarks=[link_1]) + self.assertRaises(ValidationError, user.validate) - post = BlogPost.objects(author=m2).first() - self.assertEqual(post.id, post2.id) + user = User(bookmarks=[post_1]) + user.save() - Member.drop_collection() - BlogPost.drop_collection() + user = User.objects.first() + self.assertEqual(user.bookmarks, [post_1]) - def test_drop_abstract_document(self): - """Ensure that an abstract document cannot be dropped given it - has no underlying collection. + def test_generic_reference_list_item_modification(self): + """Ensure that modifications of related documents (through generic reference) don't influence on querying """ - class AbstractDoc(Document): - name = StringField() - meta = {"abstract": True} - - self.assertRaises(OperationError, AbstractDoc.drop_collection) + class Post(Document): + title = StringField() - def test_reference_class_with_abstract_parent(self): - """Ensure that a class with an abstract parent can be referenced. - """ - class Sibling(Document): - name = StringField() - meta = {"abstract": True} + class User(Document): + username = StringField() + bookmarks = ListField(GenericReferenceField()) - class Sister(Sibling): - pass + Post.drop_collection() + User.drop_collection() - class Brother(Sibling): - sibling = ReferenceField(Sibling) + post_1 = Post(title="Behind the Scenes of the Pavement Reunion") + post_1.save() - Sister.drop_collection() - Brother.drop_collection() + user = User(bookmarks=[post_1]) + user.save() - sister = Sister(name="Alice") - sister.save() - brother = Brother(name="Bob", sibling=sister) - brother.save() + post_1.title = "Title was modified" + user.username = "New username" + user.save() - self.assertEquals(Brother.objects[0].sibling.name, sister.name) + user = User.objects(bookmarks__all=[post_1]).first() - Sister.drop_collection() - Brother.drop_collection() + self.assertNotEqual(user, None) + self.assertEqual(user.bookmarks[0], post_1) - def test_reference_abstract_class(self): - """Ensure that an abstract class instance cannot be used in the - reference of that abstract class. + def test_generic_reference_filter_by_dbref(self): + """Ensure we can search for a specific generic reference by + providing its ObjectId. """ - class Sibling(Document): - name = StringField() - meta = {"abstract": True} - - class Sister(Sibling): - pass - - class Brother(Sibling): - sibling = ReferenceField(Sibling) + class Doc(Document): + ref = GenericReferenceField() - Sister.drop_collection() - Brother.drop_collection() + Doc.drop_collection() - sister = Sibling(name="Alice") - brother = Brother(name="Bob", sibling=sister) - self.assertRaises(ValidationError, brother.save) + doc1 = Doc.objects.create() + doc2 = Doc.objects.create(ref=doc1) - Sister.drop_collection() - Brother.drop_collection() + doc = Doc.objects.get(ref=DBRef('doc', doc1.pk)) + self.assertEqual(doc, doc2) - def test_abstract_reference_base_type(self): - """Ensure that an an abstract reference fails validation when given a - Document that does not inherit from the abstract type. + def test_generic_reference_filter_by_objectid(self): + """Ensure we can search for a specific generic reference by + providing its DBRef. """ - class Sibling(Document): - name = StringField() - meta = {"abstract": True} - - class Brother(Sibling): - sibling = ReferenceField(Sibling) + class Doc(Document): + ref = GenericReferenceField() - class Mother(Document): - name = StringField() + Doc.drop_collection() - Brother.drop_collection() - Mother.drop_collection() + doc1 = Doc.objects.create() + doc2 = Doc.objects.create(ref=doc1) - mother = Mother(name="Carol") - mother.save() - brother = Brother(name="Bob", sibling=mother) - self.assertRaises(ValidationError, brother.save) + self.assertTrue(isinstance(doc1.pk, ObjectId)) - Brother.drop_collection() - Mother.drop_collection() + doc = Doc.objects.get(ref=doc1.pk) + self.assertEqual(doc, doc2) - def test_generic_reference(self): - """Ensure that a GenericReferenceField properly dereferences items. + def test_binary_fields(self): + """Ensure that binary fields can be stored and retrieved. """ - class Link(Document): - title = StringField() - meta = {'allow_inheritance': False} + class Attachment(Document): + content_type = StringField() + blob = BinaryField() - class Post(Document): - title = StringField() + BLOB = six.b('\xe6\x00\xc4\xff\x07') + MIME_TYPE = 'application/octet-stream' - class Bookmark(Document): - bookmark_object = GenericReferenceField() + Attachment.drop_collection() - Link.drop_collection() - Post.drop_collection() - Bookmark.drop_collection() + attachment = Attachment(content_type=MIME_TYPE, blob=BLOB) + attachment.save() - link_1 = Link(title="Pitchfork") - link_1.save() + attachment_1 = Attachment.objects().first() + self.assertEqual(MIME_TYPE, attachment_1.content_type) + self.assertEqual(BLOB, six.binary_type(attachment_1.blob)) - post_1 = Post(title="Behind the Scenes of the Pavement Reunion") - post_1.save() + def test_binary_validation(self): + """Ensure that invalid values cannot be assigned to binary fields. + """ + class Attachment(Document): + blob = BinaryField() - bm = Bookmark(bookmark_object=post_1) - bm.save() + class AttachmentRequired(Document): + blob = BinaryField(required=True) - bm = Bookmark.objects(bookmark_object=post_1).first() + class AttachmentSizeLimit(Document): + blob = BinaryField(max_bytes=4) - self.assertEqual(bm.bookmark_object, post_1) - self.assertTrue(isinstance(bm.bookmark_object, Post)) + Attachment.drop_collection() + AttachmentRequired.drop_collection() + AttachmentSizeLimit.drop_collection() - bm.bookmark_object = link_1 - bm.save() + attachment = Attachment() + attachment.validate() + attachment.blob = 2 + self.assertRaises(ValidationError, attachment.validate) - bm = Bookmark.objects(bookmark_object=link_1).first() + attachment_required = AttachmentRequired() + self.assertRaises(ValidationError, attachment_required.validate) + attachment_required.blob = Binary(six.b('\xe6\x00\xc4\xff\x07')) + attachment_required.validate() - self.assertEqual(bm.bookmark_object, link_1) - self.assertTrue(isinstance(bm.bookmark_object, Link)) + attachment_size_limit = AttachmentSizeLimit( + blob=six.b('\xe6\x00\xc4\xff\x07')) + self.assertRaises(ValidationError, attachment_size_limit.validate) + attachment_size_limit.blob = six.b('\xe6\x00\xc4\xff') + attachment_size_limit.validate() - Link.drop_collection() - Post.drop_collection() - Bookmark.drop_collection() + def test_binary_field_primary(self): + class Attachment(Document): + id = BinaryField(primary_key=True) - def test_generic_reference_list(self): - """Ensure that a ListField properly dereferences generic references. - """ - class Link(Document): - title = StringField() + Attachment.drop_collection() + binary_id = uuid.uuid4().bytes + att = Attachment(id=binary_id).save() + self.assertEqual(1, Attachment.objects.count()) + self.assertEqual(1, Attachment.objects.filter(id=att.id).count()) + att.delete() + self.assertEqual(0, Attachment.objects.count()) - class Post(Document): - title = StringField() + def test_binary_field_primary_filter_by_binary_pk_as_str(self): + raise SkipTest("Querying by id as string is not currently supported") - class User(Document): - bookmarks = ListField(GenericReferenceField()) + class Attachment(Document): + id = BinaryField(primary_key=True) - Link.drop_collection() - Post.drop_collection() - User.drop_collection() + Attachment.drop_collection() + binary_id = uuid.uuid4().bytes + att = Attachment(id=binary_id).save() + self.assertEqual(1, Attachment.objects.filter(id=binary_id).count()) + att.delete() + self.assertEqual(0, Attachment.objects.count()) - link_1 = Link(title="Pitchfork") - link_1.save() + def test_choices_allow_using_sets_as_choices(self): + """Ensure that sets can be used when setting choices + """ + class Shirt(Document): + size = StringField(choices={'M', 'L'}) - post_1 = Post(title="Behind the Scenes of the Pavement Reunion") - post_1.save() + Shirt(size='M').validate() - user = User(bookmarks=[post_1, link_1]) - user.save() + def test_choices_validation_allow_no_value(self): + """Ensure that .validate passes and no value was provided + for a field setup with choices + """ + class Shirt(Document): + size = StringField(choices=('S', 'M')) - user = User.objects(bookmarks__all=[post_1, link_1]).first() + shirt = Shirt() + shirt.validate() - self.assertEqual(user.bookmarks[0], post_1) - self.assertEqual(user.bookmarks[1], link_1) + def test_choices_validation_accept_possible_value(self): + """Ensure that value is in a container of allowed values. + """ + class Shirt(Document): + size = StringField(choices=('S', 'M')) - Link.drop_collection() - Post.drop_collection() - User.drop_collection() + shirt = Shirt(size='S') + shirt.validate() - def test_generic_reference_document_not_registered(self): - """Ensure dereferencing out of the document registry throws a - `NotRegistered` error. + def test_choices_validation_reject_unknown_value(self): + """Ensure that unallowed value are rejected upon validation """ - class Link(Document): - title = StringField() + class Shirt(Document): + size = StringField(choices=('S', 'M')) - class User(Document): - bookmarks = ListField(GenericReferenceField()) - - Link.drop_collection() - User.drop_collection() - - link_1 = Link(title="Pitchfork") - link_1.save() + shirt = Shirt(size="XS") + with self.assertRaises(ValidationError): + shirt.validate() - user = User(bookmarks=[link_1]) - user.save() + def test_choices_validation_documents(self): + """ + Ensure fields with document choices validate given a valid choice. + """ + class UserComments(EmbeddedDocument): + author = StringField() + message = StringField() - # Mimic User and Link definitions being in a different file - # and the Link model not being imported in the User file. - del(_document_registry["Link"]) + class BlogPost(Document): + comments = ListField( + GenericEmbeddedDocumentField(choices=(UserComments,)) + ) - user = User.objects.first() - try: - user.bookmarks - raise AssertionError("Link was removed from the registry") - except NotRegistered: - pass + # Ensure Validation Passes + BlogPost(comments=[ + UserComments(author='user2', message='message2'), + ]).save() - Link.drop_collection() - User.drop_collection() + def test_choices_validation_documents_invalid(self): + """ + Ensure fields with document choices validate given an invalid choice. + This should throw a ValidationError exception. + """ + class UserComments(EmbeddedDocument): + author = StringField() + message = StringField() - def test_generic_reference_is_none(self): + class ModeratorComments(EmbeddedDocument): + author = StringField() + message = StringField() - class Person(Document): - name = StringField() - city = GenericReferenceField() + class BlogPost(Document): + comments = ListField( + GenericEmbeddedDocumentField(choices=(UserComments,)) + ) - Person.drop_collection() - Person(name="Wilson Jr").save() + # Single Entry Failure + post = BlogPost(comments=[ + ModeratorComments(author='mod1', message='message1'), + ]) + self.assertRaises(ValidationError, post.save) - self.assertEqual(repr(Person.objects(city=None)), - "[]") + # Mixed Entry Failure + post = BlogPost(comments=[ + ModeratorComments(author='mod1', message='message1'), + UserComments(author='user2', message='message2'), + ]) + self.assertRaises(ValidationError, post.save) - def test_generic_reference_choices(self): - """Ensure that a GenericReferenceField can handle choices + def test_choices_validation_documents_inheritance(self): """ - class Link(Document): - title = StringField() - - class Post(Document): - title = StringField() - - class Bookmark(Document): - bookmark_object = GenericReferenceField(choices=(Post,)) - - Link.drop_collection() - Post.drop_collection() - Bookmark.drop_collection() + Ensure fields with document choices validate given subclass of choice. + """ + class Comments(EmbeddedDocument): + meta = { + 'abstract': True + } + author = StringField() + message = StringField() - link_1 = Link(title="Pitchfork") - link_1.save() + class UserComments(Comments): + pass - post_1 = Post(title="Behind the Scenes of the Pavement Reunion") - post_1.save() + class BlogPost(Document): + comments = ListField( + GenericEmbeddedDocumentField(choices=(Comments,)) + ) - bm = Bookmark(bookmark_object=link_1) - self.assertRaises(ValidationError, bm.validate) + # Save Valid EmbeddedDocument Type + BlogPost(comments=[ + UserComments(author='user2', message='message2'), + ]).save() - bm = Bookmark(bookmark_object=post_1) - bm.save() + def test_choices_get_field_display(self): + """Test dynamic helper for returning the display value of a choices + field. + """ + class Shirt(Document): + size = StringField(max_length=3, choices=( + ('S', 'Small'), ('M', 'Medium'), ('L', 'Large'), + ('XL', 'Extra Large'), ('XXL', 'Extra Extra Large'))) + style = StringField(max_length=3, choices=( + ('S', 'Small'), ('B', 'Baggy'), ('W', 'Wide')), default='W') - bm = Bookmark.objects.first() - self.assertEqual(bm.bookmark_object, post_1) + Shirt.drop_collection() - def test_generic_reference_string_choices(self): - """Ensure that a GenericReferenceField can handle choices as strings - """ - class Link(Document): - title = StringField() + shirt1 = Shirt() + shirt2 = Shirt() - class Post(Document): - title = StringField() + # Make sure get__display returns the default value (or None) + self.assertEqual(shirt1.get_size_display(), None) + self.assertEqual(shirt1.get_style_display(), 'Wide') - class Bookmark(Document): - bookmark_object = GenericReferenceField(choices=('Post', Link)) + shirt1.size = 'XXL' + shirt1.style = 'B' + shirt2.size = 'M' + shirt2.style = 'S' + self.assertEqual(shirt1.get_size_display(), 'Extra Extra Large') + self.assertEqual(shirt1.get_style_display(), 'Baggy') + self.assertEqual(shirt2.get_size_display(), 'Medium') + self.assertEqual(shirt2.get_style_display(), 'Small') - Link.drop_collection() - Post.drop_collection() - Bookmark.drop_collection() + # Set as Z - an invalid choice + shirt1.size = 'Z' + shirt1.style = 'Z' + self.assertEqual(shirt1.get_size_display(), 'Z') + self.assertEqual(shirt1.get_style_display(), 'Z') + self.assertRaises(ValidationError, shirt1.validate) - link_1 = Link(title="Pitchfork") - link_1.save() + def test_simple_choices_validation(self): + """Ensure that value is in a container of allowed values. + """ + class Shirt(Document): + size = StringField(max_length=3, + choices=('S', 'M', 'L', 'XL', 'XXL')) - post_1 = Post(title="Behind the Scenes of the Pavement Reunion") - post_1.save() + Shirt.drop_collection() - bm = Bookmark(bookmark_object=link_1) - bm.save() + shirt = Shirt() + shirt.validate() - bm = Bookmark(bookmark_object=post_1) - bm.save() + shirt.size = "S" + shirt.validate() - bm = Bookmark(bookmark_object=bm) - self.assertRaises(ValidationError, bm.validate) + shirt.size = "XS" + self.assertRaises(ValidationError, shirt.validate) - def test_generic_reference_choices_no_dereference(self): - """Ensure that a GenericReferenceField can handle choices on - non-derefenreced (i.e. DBRef) elements + def test_simple_choices_get_field_display(self): + """Test dynamic helper for returning the display value of a choices + field. """ - class Post(Document): - title = StringField() + class Shirt(Document): + size = StringField(max_length=3, + choices=('S', 'M', 'L', 'XL', 'XXL')) + style = StringField(max_length=3, + choices=('Small', 'Baggy', 'wide'), + default='Small') - class Bookmark(Document): - bookmark_object = GenericReferenceField(choices=(Post, )) - other_field = StringField() + Shirt.drop_collection() - Post.drop_collection() - Bookmark.drop_collection() + shirt = Shirt() - post_1 = Post(title="Behind the Scenes of the Pavement Reunion") - post_1.save() + self.assertEqual(shirt.get_size_display(), None) + self.assertEqual(shirt.get_style_display(), 'Small') - bm = Bookmark(bookmark_object=post_1) - bm.save() + shirt.size = "XXL" + shirt.style = "Baggy" + self.assertEqual(shirt.get_size_display(), 'XXL') + self.assertEqual(shirt.get_style_display(), 'Baggy') - bm = Bookmark.objects.get(id=bm.id) - # bookmark_object is now a DBRef - bm.other_field = 'dummy_change' - bm.save() + # Set as Z - an invalid choice + shirt.size = "Z" + shirt.style = "Z" + self.assertEqual(shirt.get_size_display(), 'Z') + self.assertEqual(shirt.get_style_display(), 'Z') + self.assertRaises(ValidationError, shirt.validate) - def test_generic_reference_list_choices(self): - """Ensure that a ListField properly dereferences generic references and - respects choices. + def test_simple_choices_validation_invalid_value(self): + """Ensure that error messages are correct. """ - class Link(Document): - title = StringField() - - class Post(Document): - title = StringField() + SIZES = ('S', 'M', 'L', 'XL', 'XXL') + COLORS = (('R', 'Red'), ('B', 'Blue')) + SIZE_MESSAGE = u"Value must be one of ('S', 'M', 'L', 'XL', 'XXL')" + COLOR_MESSAGE = u"Value must be one of ['R', 'B']" - class User(Document): - bookmarks = ListField(GenericReferenceField(choices=(Post,))) + class Shirt(Document): + size = StringField(max_length=3, choices=SIZES) + color = StringField(max_length=1, choices=COLORS) - Link.drop_collection() - Post.drop_collection() - User.drop_collection() + Shirt.drop_collection() - link_1 = Link(title="Pitchfork") - link_1.save() + shirt = Shirt() + shirt.validate() - post_1 = Post(title="Behind the Scenes of the Pavement Reunion") - post_1.save() + shirt.size = "S" + shirt.color = "R" + shirt.validate() - user = User(bookmarks=[link_1]) - self.assertRaises(ValidationError, user.validate) + shirt.size = "XS" + shirt.color = "G" - user = User(bookmarks=[post_1]) - user.save() + try: + shirt.validate() + except ValidationError as error: + # get the validation rules + error_dict = error.to_dict() + self.assertEqual(error_dict['size'], SIZE_MESSAGE) + self.assertEqual(error_dict['color'], COLOR_MESSAGE) - user = User.objects.first() - self.assertEqual(user.bookmarks, [post_1]) + def test_ensure_unique_default_instances(self): + """Ensure that every field has it's own unique default instance.""" + class D(Document): + data = DictField() + data2 = DictField(default=lambda: {}) - Link.drop_collection() - Post.drop_collection() - User.drop_collection() + d1 = D() + d1.data['foo'] = 'bar' + d1.data2['foo'] = 'bar' + d2 = D() + self.assertEqual(d2.data, {}) + self.assertEqual(d2.data2, {}) - def test_generic_reference_list_item_modification(self): - """Ensure that modifications of related documents (through generic reference) don't influence on querying - """ - class Post(Document): - title = StringField() + def test_sequence_field(self): + class Person(Document): + id = SequenceField(primary_key=True) + name = StringField() - class User(Document): - username = StringField() - bookmarks = ListField(GenericReferenceField()) + self.db['mongoengine.counters'].drop() + Person.drop_collection() - Post.drop_collection() - User.drop_collection() + for x in range(10): + Person(name="Person %s" % x).save() - post_1 = Post(title="Behind the Scenes of the Pavement Reunion") - post_1.save() + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 10) - user = User(bookmarks=[post_1]) - user.save() + ids = [i.id for i in Person.objects] + self.assertEqual(ids, range(1, 11)) - post_1.title = "Title was modified" - user.username = "New username" - user.save() + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 10) - user = User.objects(bookmarks__all=[post_1]).first() + Person.id.set_next_value(1000) + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 1000) - self.assertNotEqual(user, None) - self.assertEqual(user.bookmarks[0], post_1) + def test_sequence_field_get_next_value(self): + class Person(Document): + id = SequenceField(primary_key=True) + name = StringField() - Post.drop_collection() - User.drop_collection() + self.db['mongoengine.counters'].drop() + Person.drop_collection() - def test_generic_reference_filter_by_dbref(self): - """Ensure we can search for a specific generic reference by - providing its ObjectId. - """ - class Doc(Document): - ref = GenericReferenceField() + for x in range(10): + Person(name="Person %s" % x).save() - Doc.drop_collection() + self.assertEqual(Person.id.get_next_value(), 11) + self.db['mongoengine.counters'].drop() - doc1 = Doc.objects.create() - doc2 = Doc.objects.create(ref=doc1) + self.assertEqual(Person.id.get_next_value(), 1) - doc = Doc.objects.get(ref=DBRef('doc', doc1.pk)) - self.assertEqual(doc, doc2) + class Person(Document): + id = SequenceField(primary_key=True, value_decorator=str) + name = StringField() - def test_generic_reference_filter_by_objectid(self): - """Ensure we can search for a specific generic reference by - providing its DBRef. - """ - class Doc(Document): - ref = GenericReferenceField() + self.db['mongoengine.counters'].drop() + Person.drop_collection() - Doc.drop_collection() + for x in range(10): + Person(name="Person %s" % x).save() - doc1 = Doc.objects.create() - doc2 = Doc.objects.create(ref=doc1) + self.assertEqual(Person.id.get_next_value(), '11') + self.db['mongoengine.counters'].drop() - self.assertTrue(isinstance(doc1.pk, ObjectId)) + self.assertEqual(Person.id.get_next_value(), '1') - doc = Doc.objects.get(ref=doc1.pk) - self.assertEqual(doc, doc2) + def test_sequence_field_sequence_name(self): + class Person(Document): + id = SequenceField(primary_key=True, sequence_name='jelly') + name = StringField() - def test_binary_fields(self): - """Ensure that binary fields can be stored and retrieved. - """ - class Attachment(Document): - content_type = StringField() - blob = BinaryField() + self.db['mongoengine.counters'].drop() + Person.drop_collection() - BLOB = six.b('\xe6\x00\xc4\xff\x07') - MIME_TYPE = 'application/octet-stream' + for x in range(10): + Person(name="Person %s" % x).save() - Attachment.drop_collection() + c = self.db['mongoengine.counters'].find_one({'_id': 'jelly.id'}) + self.assertEqual(c['next'], 10) - attachment = Attachment(content_type=MIME_TYPE, blob=BLOB) - attachment.save() + ids = [i.id for i in Person.objects] + self.assertEqual(ids, range(1, 11)) - attachment_1 = Attachment.objects().first() - self.assertEqual(MIME_TYPE, attachment_1.content_type) - self.assertEqual(BLOB, six.binary_type(attachment_1.blob)) + c = self.db['mongoengine.counters'].find_one({'_id': 'jelly.id'}) + self.assertEqual(c['next'], 10) - Attachment.drop_collection() + Person.id.set_next_value(1000) + c = self.db['mongoengine.counters'].find_one({'_id': 'jelly.id'}) + self.assertEqual(c['next'], 1000) - def test_binary_validation(self): - """Ensure that invalid values cannot be assigned to binary fields. - """ - class Attachment(Document): - blob = BinaryField() + def test_multiple_sequence_fields(self): + class Person(Document): + id = SequenceField(primary_key=True) + counter = SequenceField() + name = StringField() - class AttachmentRequired(Document): - blob = BinaryField(required=True) + self.db['mongoengine.counters'].drop() + Person.drop_collection() - class AttachmentSizeLimit(Document): - blob = BinaryField(max_bytes=4) + for x in range(10): + Person(name="Person %s" % x).save() - Attachment.drop_collection() - AttachmentRequired.drop_collection() - AttachmentSizeLimit.drop_collection() + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 10) - attachment = Attachment() - attachment.validate() - attachment.blob = 2 - self.assertRaises(ValidationError, attachment.validate) + ids = [i.id for i in Person.objects] + self.assertEqual(ids, range(1, 11)) - attachment_required = AttachmentRequired() - self.assertRaises(ValidationError, attachment_required.validate) - attachment_required.blob = Binary(six.b('\xe6\x00\xc4\xff\x07')) - attachment_required.validate() + counters = [i.counter for i in Person.objects] + self.assertEqual(counters, range(1, 11)) - attachment_size_limit = AttachmentSizeLimit( - blob=six.b('\xe6\x00\xc4\xff\x07')) - self.assertRaises(ValidationError, attachment_size_limit.validate) - attachment_size_limit.blob = six.b('\xe6\x00\xc4\xff') - attachment_size_limit.validate() + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 10) - Attachment.drop_collection() - AttachmentRequired.drop_collection() - AttachmentSizeLimit.drop_collection() + Person.id.set_next_value(1000) + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 1000) - def test_binary_field_primary(self): + Person.counter.set_next_value(999) + c = self.db['mongoengine.counters'].find_one({'_id': 'person.counter'}) + self.assertEqual(c['next'], 999) - class Attachment(Document): - id = BinaryField(primary_key=True) + def test_sequence_fields_reload(self): + class Animal(Document): + counter = SequenceField() + name = StringField() - Attachment.drop_collection() - binary_id = uuid.uuid4().bytes - att = Attachment(id=binary_id).save() - self.assertEqual(1, Attachment.objects.count()) - self.assertEqual(1, Attachment.objects.filter(id=att.id).count()) - # TODO use assertIsNotNone once Python 2.6 support is dropped - self.assertTrue(Attachment.objects.filter(id=att.id).first() is not None) - att.delete() - self.assertEqual(0, Attachment.objects.count()) + self.db['mongoengine.counters'].drop() + Animal.drop_collection() - def test_binary_field_primary_filter_by_binary_pk_as_str(self): + a = Animal(name="Boi").save() - raise SkipTest("Querying by id as string is not currently supported") + self.assertEqual(a.counter, 1) + a.reload() + self.assertEqual(a.counter, 1) - class Attachment(Document): - id = BinaryField(primary_key=True) + a.counter = None + self.assertEqual(a.counter, 2) + a.save() - Attachment.drop_collection() - binary_id = uuid.uuid4().bytes - att = Attachment(id=binary_id).save() - self.assertEqual(1, Attachment.objects.filter(id=binary_id).count()) - # TODO use assertIsNotNone once Python 2.6 support is dropped - self.assertTrue(Attachment.objects.filter(id=binary_id).first() is not None) - att.delete() - self.assertEqual(0, Attachment.objects.count()) + self.assertEqual(a.counter, 2) - def test_choices_allow_using_sets_as_choices(self): - """Ensure that sets can be used when setting choices - """ - class Shirt(Document): - size = StringField(choices={'M', 'L'}) + a = Animal.objects.first() + self.assertEqual(a.counter, 2) + a.reload() + self.assertEqual(a.counter, 2) - Shirt(size='M').validate() + def test_multiple_sequence_fields_on_docs(self): + class Animal(Document): + id = SequenceField(primary_key=True) + name = StringField() - def test_choices_validation_allow_no_value(self): - """Ensure that .validate passes and no value was provided - for a field setup with choices - """ - class Shirt(Document): - size = StringField(choices=('S', 'M')) + class Person(Document): + id = SequenceField(primary_key=True) + name = StringField() - shirt = Shirt() - shirt.validate() + self.db['mongoengine.counters'].drop() + Animal.drop_collection() + Person.drop_collection() - def test_choices_validation_accept_possible_value(self): - """Ensure that value is in a container of allowed values. - """ - class Shirt(Document): - size = StringField(choices=('S', 'M')) + for x in range(10): + Animal(name="Animal %s" % x).save() + Person(name="Person %s" % x).save() - shirt = Shirt(size='S') - shirt.validate() + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 10) - def test_choices_validation_reject_unknown_value(self): - """Ensure that unallowed value are rejected upon validation - """ - class Shirt(Document): - size = StringField(choices=('S', 'M')) + c = self.db['mongoengine.counters'].find_one({'_id': 'animal.id'}) + self.assertEqual(c['next'], 10) - shirt = Shirt(size="XS") - with self.assertRaises(ValidationError): - shirt.validate() + ids = [i.id for i in Person.objects] + self.assertEqual(ids, range(1, 11)) - def test_choices_validation_documents(self): - """ - Ensure fields with document choices validate given a valid choice. - """ - class UserComments(EmbeddedDocument): - author = StringField() - message = StringField() + id = [i.id for i in Animal.objects] + self.assertEqual(id, range(1, 11)) - class BlogPost(Document): - comments = ListField( - GenericEmbeddedDocumentField(choices=(UserComments,)) - ) + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 10) - # Ensure Validation Passes - BlogPost(comments=[ - UserComments(author='user2', message='message2'), - ]).save() + c = self.db['mongoengine.counters'].find_one({'_id': 'animal.id'}) + self.assertEqual(c['next'], 10) - def test_choices_validation_documents_invalid(self): - """ - Ensure fields with document choices validate given an invalid choice. - This should throw a ValidationError exception. - """ - class UserComments(EmbeddedDocument): - author = StringField() - message = StringField() + def test_sequence_field_value_decorator(self): + class Person(Document): + id = SequenceField(primary_key=True, value_decorator=str) + name = StringField() - class ModeratorComments(EmbeddedDocument): - author = StringField() - message = StringField() + self.db['mongoengine.counters'].drop() + Person.drop_collection() - class BlogPost(Document): - comments = ListField( - GenericEmbeddedDocumentField(choices=(UserComments,)) - ) + for x in range(10): + p = Person(name="Person %s" % x) + p.save() - # Single Entry Failure - post = BlogPost(comments=[ - ModeratorComments(author='mod1', message='message1'), - ]) - self.assertRaises(ValidationError, post.save) + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 10) - # Mixed Entry Failure - post = BlogPost(comments=[ - ModeratorComments(author='mod1', message='message1'), - UserComments(author='user2', message='message2'), - ]) - self.assertRaises(ValidationError, post.save) + ids = [i.id for i in Person.objects] + self.assertEqual(ids, map(str, range(1, 11))) - def test_choices_validation_documents_inheritance(self): - """ - Ensure fields with document choices validate given subclass of choice. - """ - class Comments(EmbeddedDocument): - meta = { - 'abstract': True - } - author = StringField() - message = StringField() + c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) + self.assertEqual(c['next'], 10) - class UserComments(Comments): - pass + def test_embedded_sequence_field(self): + class Comment(EmbeddedDocument): + id = SequenceField() + content = StringField(required=True) - class BlogPost(Document): - comments = ListField( - GenericEmbeddedDocumentField(choices=(Comments,)) - ) + class Post(Document): + title = StringField(required=True) + comments = ListField(EmbeddedDocumentField(Comment)) - # Save Valid EmbeddedDocument Type - BlogPost(comments=[ - UserComments(author='user2', message='message2'), - ]).save() + self.db['mongoengine.counters'].drop() + Post.drop_collection() - def test_choices_get_field_display(self): - """Test dynamic helper for returning the display value of a choices - field. - """ - class Shirt(Document): - size = StringField(max_length=3, choices=( - ('S', 'Small'), ('M', 'Medium'), ('L', 'Large'), - ('XL', 'Extra Large'), ('XXL', 'Extra Extra Large'))) - style = StringField(max_length=3, choices=( - ('S', 'Small'), ('B', 'Baggy'), ('W', 'Wide')), default='W') + Post(title="MongoEngine", + comments=[Comment(content="NoSQL Rocks"), + Comment(content="MongoEngine Rocks")]).save() + c = self.db['mongoengine.counters'].find_one({'_id': 'comment.id'}) + self.assertEqual(c['next'], 2) + post = Post.objects.first() + self.assertEqual(1, post.comments[0].id) + self.assertEqual(2, post.comments[1].id) - Shirt.drop_collection() + def test_inherited_sequencefield(self): + class Base(Document): + name = StringField() + counter = SequenceField() + meta = {'abstract': True} - shirt1 = Shirt() - shirt2 = Shirt() + class Foo(Base): + pass - # Make sure get__display returns the default value (or None) - self.assertEqual(shirt1.get_size_display(), None) - self.assertEqual(shirt1.get_style_display(), 'Wide') + class Bar(Base): + pass - shirt1.size = 'XXL' - shirt1.style = 'B' - shirt2.size = 'M' - shirt2.style = 'S' - self.assertEqual(shirt1.get_size_display(), 'Extra Extra Large') - self.assertEqual(shirt1.get_style_display(), 'Baggy') - self.assertEqual(shirt2.get_size_display(), 'Medium') - self.assertEqual(shirt2.get_style_display(), 'Small') + bar = Bar(name='Bar') + bar.save() - # Set as Z - an invalid choice - shirt1.size = 'Z' - shirt1.style = 'Z' - self.assertEqual(shirt1.get_size_display(), 'Z') - self.assertEqual(shirt1.get_style_display(), 'Z') - self.assertRaises(ValidationError, shirt1.validate) + foo = Foo(name='Foo') + foo.save() - def test_simple_choices_validation(self): - """Ensure that value is in a container of allowed values. - """ - class Shirt(Document): - size = StringField(max_length=3, - choices=('S', 'M', 'L', 'XL', 'XXL')) + self.assertTrue('base.counter' in + self.db['mongoengine.counters'].find().distinct('_id')) + self.assertFalse(('foo.counter' or 'bar.counter') in + self.db['mongoengine.counters'].find().distinct('_id')) + self.assertNotEqual(foo.counter, bar.counter) + self.assertEqual(foo._fields['counter'].owner_document, Base) + self.assertEqual(bar._fields['counter'].owner_document, Base) - Shirt.drop_collection() + def test_no_inherited_sequencefield(self): + class Base(Document): + name = StringField() + meta = {'abstract': True} - shirt = Shirt() - shirt.validate() + class Foo(Base): + counter = SequenceField() - shirt.size = "S" - shirt.validate() + class Bar(Base): + counter = SequenceField() - shirt.size = "XS" - self.assertRaises(ValidationError, shirt.validate) + bar = Bar(name='Bar') + bar.save() - Shirt.drop_collection() + foo = Foo(name='Foo') + foo.save() - def test_simple_choices_get_field_display(self): - """Test dynamic helper for returning the display value of a choices - field. - """ - class Shirt(Document): - size = StringField(max_length=3, - choices=('S', 'M', 'L', 'XL', 'XXL')) - style = StringField(max_length=3, - choices=('Small', 'Baggy', 'wide'), - default='Small') + self.assertFalse('base.counter' in + self.db['mongoengine.counters'].find().distinct('_id')) + self.assertTrue(('foo.counter' and 'bar.counter') in + self.db['mongoengine.counters'].find().distinct('_id')) + self.assertEqual(foo.counter, bar.counter) + self.assertEqual(foo._fields['counter'].owner_document, Foo) + self.assertEqual(bar._fields['counter'].owner_document, Bar) - Shirt.drop_collection() + def test_generic_embedded_document(self): + class Car(EmbeddedDocument): + name = StringField() - shirt = Shirt() + class Dish(EmbeddedDocument): + food = StringField(required=True) + number = IntField() - self.assertEqual(shirt.get_size_display(), None) - self.assertEqual(shirt.get_style_display(), 'Small') + class Person(Document): + name = StringField() + like = GenericEmbeddedDocumentField() - shirt.size = "XXL" - shirt.style = "Baggy" - self.assertEqual(shirt.get_size_display(), 'XXL') - self.assertEqual(shirt.get_style_display(), 'Baggy') + Person.drop_collection() - # Set as Z - an invalid choice - shirt.size = "Z" - shirt.style = "Z" - self.assertEqual(shirt.get_size_display(), 'Z') - self.assertEqual(shirt.get_style_display(), 'Z') - self.assertRaises(ValidationError, shirt.validate) + person = Person(name='Test User') + person.like = Car(name='Fiat') + person.save() - Shirt.drop_collection() + person = Person.objects.first() + self.assertTrue(isinstance(person.like, Car)) - def test_simple_choices_validation_invalid_value(self): - """Ensure that error messages are correct. - """ - SIZES = ('S', 'M', 'L', 'XL', 'XXL') - COLORS = (('R', 'Red'), ('B', 'Blue')) - SIZE_MESSAGE = u"Value must be one of ('S', 'M', 'L', 'XL', 'XXL')" - COLOR_MESSAGE = u"Value must be one of ['R', 'B']" + person.like = Dish(food="arroz", number=15) + person.save() - class Shirt(Document): - size = StringField(max_length=3, choices=SIZES) - color = StringField(max_length=1, choices=COLORS) + person = Person.objects.first() + self.assertTrue(isinstance(person.like, Dish)) - Shirt.drop_collection() - - shirt = Shirt() - shirt.validate() - - shirt.size = "S" - shirt.color = "R" - shirt.validate() - - shirt.size = "XS" - shirt.color = "G" - - try: - shirt.validate() - except ValidationError as error: - # get the validation rules - error_dict = error.to_dict() - self.assertEqual(error_dict['size'], SIZE_MESSAGE) - self.assertEqual(error_dict['color'], COLOR_MESSAGE) - - Shirt.drop_collection() - - def test_ensure_unique_default_instances(self): - """Ensure that every field has it's own unique default instance.""" - class D(Document): - data = DictField() - data2 = DictField(default=lambda: {}) + def test_generic_embedded_document_choices(self): + """Ensure you can limit GenericEmbeddedDocument choices.""" + class Car(EmbeddedDocument): + name = StringField() - d1 = D() - d1.data['foo'] = 'bar' - d1.data2['foo'] = 'bar' - d2 = D() - self.assertEqual(d2.data, {}) - self.assertEqual(d2.data2, {}) + class Dish(EmbeddedDocument): + food = StringField(required=True) + number = IntField() - def test_sequence_field(self): class Person(Document): - id = SequenceField(primary_key=True) name = StringField() + like = GenericEmbeddedDocumentField(choices=(Dish,)) - self.db['mongoengine.counters'].drop() Person.drop_collection() - for x in range(10): - Person(name="Person %s" % x).save() + person = Person(name='Test User') + person.like = Car(name='Fiat') + self.assertRaises(ValidationError, person.validate) - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 10) + person.like = Dish(food="arroz", number=15) + person.save() - ids = [i.id for i in Person.objects] - self.assertEqual(ids, range(1, 11)) + person = Person.objects.first() + self.assertTrue(isinstance(person.like, Dish)) - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 10) + def test_generic_list_embedded_document_choices(self): + """Ensure you can limit GenericEmbeddedDocument choices inside + a list field. + """ + class Car(EmbeddedDocument): + name = StringField() - Person.id.set_next_value(1000) - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 1000) + class Dish(EmbeddedDocument): + food = StringField(required=True) + number = IntField() - def test_sequence_field_get_next_value(self): class Person(Document): - id = SequenceField(primary_key=True) name = StringField() + likes = ListField(GenericEmbeddedDocumentField(choices=(Dish,))) - self.db['mongoengine.counters'].drop() Person.drop_collection() - for x in range(10): - Person(name="Person %s" % x).save() - - self.assertEqual(Person.id.get_next_value(), 11) - self.db['mongoengine.counters'].drop() + person = Person(name='Test User') + person.likes = [Car(name='Fiat')] + self.assertRaises(ValidationError, person.validate) - self.assertEqual(Person.id.get_next_value(), 1) + person.likes = [Dish(food="arroz", number=15)] + person.save() - class Person(Document): - id = SequenceField(primary_key=True, value_decorator=str) - name = StringField() + person = Person.objects.first() + self.assertTrue(isinstance(person.likes[0], Dish)) - self.db['mongoengine.counters'].drop() - Person.drop_collection() + def test_recursive_validation(self): + """Ensure that a validation result to_dict is available.""" + class Author(EmbeddedDocument): + name = StringField(required=True) - for x in range(10): - Person(name="Person %s" % x).save() + class Comment(EmbeddedDocument): + author = EmbeddedDocumentField(Author, required=True) + content = StringField(required=True) - self.assertEqual(Person.id.get_next_value(), '11') - self.db['mongoengine.counters'].drop() + class Post(Document): + title = StringField(required=True) + comments = ListField(EmbeddedDocumentField(Comment)) - self.assertEqual(Person.id.get_next_value(), '1') + bob = Author(name='Bob') + post = Post(title='hello world') + post.comments.append(Comment(content='hello', author=bob)) + post.comments.append(Comment(author=bob)) - def test_sequence_field_sequence_name(self): - class Person(Document): - id = SequenceField(primary_key=True, sequence_name='jelly') - name = StringField() + self.assertRaises(ValidationError, post.validate) + try: + post.validate() + except ValidationError as error: + # ValidationError.errors property + self.assertTrue(hasattr(error, 'errors')) + self.assertTrue(isinstance(error.errors, dict)) + self.assertTrue('comments' in error.errors) + self.assertTrue(1 in error.errors['comments']) + self.assertTrue(isinstance(error.errors['comments'][1]['content'], + ValidationError)) - self.db['mongoengine.counters'].drop() - Person.drop_collection() + # ValidationError.schema property + error_dict = error.to_dict() + self.assertTrue(isinstance(error_dict, dict)) + self.assertTrue('comments' in error_dict) + self.assertTrue(1 in error_dict['comments']) + self.assertTrue('content' in error_dict['comments'][1]) + self.assertEqual(error_dict['comments'][1]['content'], + u'Field is required') - for x in range(10): - Person(name="Person %s" % x).save() + post.comments[1].content = 'here we go' + post.validate() - c = self.db['mongoengine.counters'].find_one({'_id': 'jelly.id'}) - self.assertEqual(c['next'], 10) + def test_email_field(self): + class User(Document): + email = EmailField() - ids = [i.id for i in Person.objects] - self.assertEqual(ids, range(1, 11)) + user = User(email="ross@example.com") + self.assertTrue(user.validate() is None) - c = self.db['mongoengine.counters'].find_one({'_id': 'jelly.id'}) - self.assertEqual(c['next'], 10) + user = User(email="ross@example.co.uk") + self.assertTrue(user.validate() is None) - Person.id.set_next_value(1000) - c = self.db['mongoengine.counters'].find_one({'_id': 'jelly.id'}) - self.assertEqual(c['next'], 1000) + user = User(email=("Kofq@rhom0e4klgauOhpbpNdogawnyIKvQS0wk2mjqrgGQ5S" + "aJIazqqWkm7.net")) + self.assertTrue(user.validate() is None) - def test_multiple_sequence_fields(self): - class Person(Document): - id = SequenceField(primary_key=True) - counter = SequenceField() - name = StringField() + user = User(email="new-tld@example.technology") + self.assertTrue(user.validate() is None) - self.db['mongoengine.counters'].drop() - Person.drop_collection() + user = User(email='me@localhost') + self.assertRaises(ValidationError, user.validate) - for x in range(10): - Person(name="Person %s" % x).save() + user = User(email="ross@example.com.") + self.assertRaises(ValidationError, user.validate) - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 10) + def test_email_field_honors_regex(self): + class User(Document): + email = EmailField(regex=r'\w+@example.com') - ids = [i.id for i in Person.objects] - self.assertEqual(ids, range(1, 11)) + # Fails regex validation + user = User(email='me@foo.com') + self.assertRaises(ValidationError, user.validate) - counters = [i.counter for i in Person.objects] - self.assertEqual(counters, range(1, 11)) + # Passes regex validation + user = User(email='me@example.com') + self.assertTrue(user.validate() is None) - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 10) + def test_tuples_as_tuples(self): + """Ensure that tuples remain tuples when they are inside + a ComplexBaseField. + """ + class EnumField(BaseField): - Person.id.set_next_value(1000) - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 1000) + def __init__(self, **kwargs): + super(EnumField, self).__init__(**kwargs) - Person.counter.set_next_value(999) - c = self.db['mongoengine.counters'].find_one({'_id': 'person.counter'}) - self.assertEqual(c['next'], 999) + def to_mongo(self, value): + return value - def test_sequence_fields_reload(self): - class Animal(Document): - counter = SequenceField() - name = StringField() + def to_python(self, value): + return tuple(value) - self.db['mongoengine.counters'].drop() - Animal.drop_collection() + class TestDoc(Document): + items = ListField(EnumField()) - a = Animal(name="Boi").save() + TestDoc.drop_collection() - self.assertEqual(a.counter, 1) - a.reload() - self.assertEqual(a.counter, 1) + tuples = [(100, 'Testing')] + doc = TestDoc() + doc.items = tuples + doc.save() + x = TestDoc.objects().get() + self.assertTrue(x is not None) + self.assertTrue(len(x.items) == 1) + self.assertTrue(tuple(x.items[0]) in tuples) + self.assertTrue(x.items[0] in tuples) - a.counter = None - self.assertEqual(a.counter, 2) - a.save() + def test_dynamic_fields_class(self): + class Doc2(Document): + field_1 = StringField(db_field='f') - self.assertEqual(a.counter, 2) + class Doc(Document): + my_id = IntField(required=True, unique=True, primary_key=True) + embed_me = DynamicField(db_field='e') + field_x = StringField(db_field='x') - a = Animal.objects.first() - self.assertEqual(a.counter, 2) - a.reload() - self.assertEqual(a.counter, 2) + Doc.drop_collection() + Doc2.drop_collection() - def test_multiple_sequence_fields_on_docs(self): + doc2 = Doc2(field_1="hello") + doc = Doc(my_id=1, embed_me=doc2, field_x="x") + self.assertRaises(OperationError, doc.save) - class Animal(Document): - id = SequenceField(primary_key=True) - name = StringField() + doc2.save() + doc.save() - class Person(Document): - id = SequenceField(primary_key=True) - name = StringField() + doc = Doc.objects.get() + self.assertEqual(doc.embed_me.field_1, "hello") - self.db['mongoengine.counters'].drop() - Animal.drop_collection() - Person.drop_collection() - - for x in range(10): - Animal(name="Animal %s" % x).save() - Person(name="Person %s" % x).save() + def test_dynamic_fields_embedded_class(self): + class Embed(EmbeddedDocument): + field_1 = StringField(db_field='f') - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 10) + class Doc(Document): + my_id = IntField(required=True, unique=True, primary_key=True) + embed_me = DynamicField(db_field='e') + field_x = StringField(db_field='x') - c = self.db['mongoengine.counters'].find_one({'_id': 'animal.id'}) - self.assertEqual(c['next'], 10) + Doc.drop_collection() - ids = [i.id for i in Person.objects] - self.assertEqual(ids, range(1, 11)) + Doc(my_id=1, embed_me=Embed(field_1="hello"), field_x="x").save() - id = [i.id for i in Animal.objects] - self.assertEqual(id, range(1, 11)) + doc = Doc.objects.get() + self.assertEqual(doc.embed_me.field_1, "hello") - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 10) + def test_dynamicfield_dump_document(self): + """Ensure a DynamicField can handle another document's dump.""" + class Doc(Document): + field = DynamicField() - c = self.db['mongoengine.counters'].find_one({'_id': 'animal.id'}) - self.assertEqual(c['next'], 10) + class ToEmbed(Document): + id = IntField(primary_key=True, default=1) + recursive = DynamicField() - def test_sequence_field_value_decorator(self): - class Person(Document): - id = SequenceField(primary_key=True, value_decorator=str) - name = StringField() + class ToEmbedParent(Document): + id = IntField(primary_key=True, default=1) + recursive = DynamicField() - self.db['mongoengine.counters'].drop() - Person.drop_collection() + meta = {'allow_inheritance': True} - for x in range(10): - p = Person(name="Person %s" % x) - p.save() + class ToEmbedChild(ToEmbedParent): + pass - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 10) + to_embed_recursive = ToEmbed(id=1).save() + to_embed = ToEmbed(id=2, recursive=to_embed_recursive).save() + doc = Doc(field=to_embed) + doc.save() + assert isinstance(doc.field, ToEmbed) + assert doc.field == to_embed + # Same thing with a Document with a _cls field + to_embed_recursive = ToEmbedChild(id=1).save() + to_embed_child = ToEmbedChild(id=2, recursive=to_embed_recursive).save() + doc = Doc(field=to_embed_child) + doc.save() + assert isinstance(doc.field, ToEmbedChild) + assert doc.field == to_embed_child - ids = [i.id for i in Person.objects] - self.assertEqual(ids, map(str, range(1, 11))) + def test_invalid_dict_value(self): + class DictFieldTest(Document): + dictionary = DictField(required=True) - c = self.db['mongoengine.counters'].find_one({'_id': 'person.id'}) - self.assertEqual(c['next'], 10) + DictFieldTest.drop_collection() - def test_embedded_sequence_field(self): - class Comment(EmbeddedDocument): - id = SequenceField() - content = StringField(required=True) + test = DictFieldTest(dictionary=None) + test.dictionary # Just access to test getter + self.assertRaises(ValidationError, test.validate) - class Post(Document): - title = StringField(required=True) - comments = ListField(EmbeddedDocumentField(Comment)) + test = DictFieldTest(dictionary=False) + test.dictionary # Just access to test getter + self.assertRaises(ValidationError, test.validate) - self.db['mongoengine.counters'].drop() - Post.drop_collection() + def test_cls_field(self): + class Animal(Document): + meta = {'allow_inheritance': True} - Post(title="MongoEngine", - comments=[Comment(content="NoSQL Rocks"), - Comment(content="MongoEngine Rocks")]).save() - c = self.db['mongoengine.counters'].find_one({'_id': 'comment.id'}) - self.assertEqual(c['next'], 2) - post = Post.objects.first() - self.assertEqual(1, post.comments[0].id) - self.assertEqual(2, post.comments[1].id) + class Fish(Animal): + pass - def test_inherited_sequencefield(self): - class Base(Document): - name = StringField() - counter = SequenceField() - meta = {'abstract': True} + class Mammal(Animal): + pass - class Foo(Base): + class Dog(Mammal): pass - class Bar(Base): + class Human(Mammal): pass - bar = Bar(name='Bar') - bar.save() + Animal.objects.delete() + Dog().save() + Fish().save() + Human().save() + self.assertEquals(Animal.objects(_cls__in=["Animal.Mammal.Dog", "Animal.Fish"]).count(), 2) + self.assertEquals(Animal.objects(_cls__in=["Animal.Fish.Guppy"]).count(), 0) - foo = Foo(name='Foo') - foo.save() + def test_sparse_field(self): + class Doc(Document): + name = StringField(required=False, unique=True, sparse=True) - self.assertTrue('base.counter' in - self.db['mongoengine.counters'].find().distinct('_id')) - self.assertFalse(('foo.counter' or 'bar.counter') in - self.db['mongoengine.counters'].find().distinct('_id')) - self.assertNotEqual(foo.counter, bar.counter) - self.assertEqual(foo._fields['counter'].owner_document, Base) - self.assertEqual(bar._fields['counter'].owner_document, Base) + # This would raise an exception in a non-sparse unique index + Doc().save() + Doc().save() - def test_no_inherited_sequencefield(self): - class Base(Document): - name = StringField() - meta = {'abstract': True} + def test_undefined_field_exception(self): + """Tests if a `FieldDoesNotExist` exception is raised when + trying to instantiate a document with a field that's not + defined. + """ + class Doc(Document): + foo = StringField() - class Foo(Base): - counter = SequenceField() + with self.assertRaises(FieldDoesNotExist): + Doc(bar='test') - class Bar(Base): - counter = SequenceField() + def test_undefined_field_exception_with_strict(self): + """Tests if a `FieldDoesNotExist` exception is raised when + trying to instantiate a document with a field that's not + defined, even when strict is set to False. + """ + class Doc(Document): + foo = StringField() + meta = {'strict': False} - bar = Bar(name='Bar') - bar.save() + with self.assertRaises(FieldDoesNotExist): + Doc(bar='test') - foo = Foo(name='Foo') - foo.save() + def test_long_field_is_considered_as_int64(self): + """ + Tests that long fields are stored as long in mongo, even if long + value is small enough to be an int. + """ + class TestLongFieldConsideredAsInt64(Document): + some_long = LongField() - self.assertFalse('base.counter' in - self.db['mongoengine.counters'].find().distinct('_id')) - self.assertTrue(('foo.counter' and 'bar.counter') in - self.db['mongoengine.counters'].find().distinct('_id')) - self.assertEqual(foo.counter, bar.counter) - self.assertEqual(foo._fields['counter'].owner_document, Foo) - self.assertEqual(bar._fields['counter'].owner_document, Bar) + doc = TestLongFieldConsideredAsInt64(some_long=42).save() + db = get_db() + self.assertTrue(isinstance(db.test_long_field_considered_as_int64.find()[0]['some_long'], Int64)) + self.assertTrue(isinstance(doc.some_long, six.integer_types)) - def test_generic_embedded_document(self): - class Car(EmbeddedDocument): - name = StringField() - class Dish(EmbeddedDocument): - food = StringField(required=True) - number = IntField() +class EmbeddedDocumentListFieldTestCase(MongoDBTestCase): - class Person(Document): - name = StringField() - like = GenericEmbeddedDocumentField() + def setUp(self): + """ + Create two BlogPost entries in the database, each with + several EmbeddedDocuments. + """ + class Comments(EmbeddedDocument): + author = StringField() + message = StringField() - Person.drop_collection() + class BlogPost(Document): + comments = EmbeddedDocumentListField(Comments) - person = Person(name='Test User') - person.like = Car(name='Fiat') - person.save() + BlogPost.drop_collection() - person = Person.objects.first() - self.assertTrue(isinstance(person.like, Car)) + self.Comments = Comments + self.BlogPost = BlogPost - person.like = Dish(food="arroz", number=15) - person.save() + self.post1 = self.BlogPost(comments=[ + self.Comments(author='user1', message='message1'), + self.Comments(author='user2', message='message1') + ]).save() - person = Person.objects.first() - self.assertTrue(isinstance(person.like, Dish)) + self.post2 = self.BlogPost(comments=[ + self.Comments(author='user2', message='message2'), + self.Comments(author='user2', message='message3'), + self.Comments(author='user3', message='message1') + ]).save() - def test_generic_embedded_document_choices(self): - """Ensure you can limit GenericEmbeddedDocument choices + def test_no_keyword_filter(self): """ - class Car(EmbeddedDocument): - name = StringField() - - class Dish(EmbeddedDocument): - food = StringField(required=True) - number = IntField() - - class Person(Document): - name = StringField() - like = GenericEmbeddedDocumentField(choices=(Dish,)) + Tests the filter method of a List of Embedded Documents + with a no keyword. + """ + filtered = self.post1.comments.filter() - Person.drop_collection() + # Ensure nothing was changed + self.assertListEqual(filtered, self.post1.comments) - person = Person(name='Test User') - person.like = Car(name='Fiat') - self.assertRaises(ValidationError, person.validate) + def test_single_keyword_filter(self): + """ + Tests the filter method of a List of Embedded Documents + with a single keyword. + """ + filtered = self.post1.comments.filter(author='user1') - person.like = Dish(food="arroz", number=15) - person.save() + # Ensure only 1 entry was returned. + self.assertEqual(len(filtered), 1) - person = Person.objects.first() - self.assertTrue(isinstance(person.like, Dish)) + # Ensure the entry returned is the correct entry. + self.assertEqual(filtered[0].author, 'user1') - def test_generic_list_embedded_document_choices(self): - """Ensure you can limit GenericEmbeddedDocument choices inside a list - field + def test_multi_keyword_filter(self): """ - class Car(EmbeddedDocument): - name = StringField() + Tests the filter method of a List of Embedded Documents + with multiple keywords. + """ + filtered = self.post2.comments.filter( + author='user2', message='message2' + ) - class Dish(EmbeddedDocument): - food = StringField(required=True) - number = IntField() + # Ensure only 1 entry was returned. + self.assertEqual(len(filtered), 1) - class Person(Document): - name = StringField() - likes = ListField(GenericEmbeddedDocumentField(choices=(Dish,))) + # Ensure the entry returned is the correct entry. + self.assertEqual(filtered[0].author, 'user2') + self.assertEqual(filtered[0].message, 'message2') - Person.drop_collection() + def test_chained_filter(self): + """ + Tests chained filter methods of a List of Embedded Documents + """ + filtered = self.post2.comments.filter(author='user2').filter( + message='message2' + ) - person = Person(name='Test User') - person.likes = [Car(name='Fiat')] - self.assertRaises(ValidationError, person.validate) + # Ensure only 1 entry was returned. + self.assertEqual(len(filtered), 1) - person.likes = [Dish(food="arroz", number=15)] - person.save() + # Ensure the entry returned is the correct entry. + self.assertEqual(filtered[0].author, 'user2') + self.assertEqual(filtered[0].message, 'message2') - person = Person.objects.first() - self.assertTrue(isinstance(person.likes[0], Dish)) + def test_unknown_keyword_filter(self): + """ + Tests the filter method of a List of Embedded Documents + when the keyword is not a known keyword. + """ + with self.assertRaises(AttributeError): + self.post2.comments.filter(year=2) - def test_recursive_validation(self): - """Ensure that a validation result to_dict is available. + def test_no_keyword_exclude(self): """ - class Author(EmbeddedDocument): - name = StringField(required=True) + Tests the exclude method of a List of Embedded Documents + with a no keyword. + """ + filtered = self.post1.comments.exclude() - class Comment(EmbeddedDocument): - author = EmbeddedDocumentField(Author, required=True) - content = StringField(required=True) + # Ensure everything was removed + self.assertListEqual(filtered, []) - class Post(Document): - title = StringField(required=True) - comments = ListField(EmbeddedDocumentField(Comment)) + def test_single_keyword_exclude(self): + """ + Tests the exclude method of a List of Embedded Documents + with a single keyword. + """ + excluded = self.post1.comments.exclude(author='user1') - bob = Author(name='Bob') - post = Post(title='hello world') - post.comments.append(Comment(content='hello', author=bob)) - post.comments.append(Comment(author=bob)) + # Ensure only 1 entry was returned. + self.assertEqual(len(excluded), 1) - self.assertRaises(ValidationError, post.validate) - try: - post.validate() - except ValidationError as error: - # ValidationError.errors property - self.assertTrue(hasattr(error, 'errors')) - self.assertTrue(isinstance(error.errors, dict)) - self.assertTrue('comments' in error.errors) - self.assertTrue(1 in error.errors['comments']) - self.assertTrue(isinstance(error.errors['comments'][1]['content'], - ValidationError)) + # Ensure the entry returned is the correct entry. + self.assertEqual(excluded[0].author, 'user2') - # ValidationError.schema property - error_dict = error.to_dict() - self.assertTrue(isinstance(error_dict, dict)) - self.assertTrue('comments' in error_dict) - self.assertTrue(1 in error_dict['comments']) - self.assertTrue('content' in error_dict['comments'][1]) - self.assertEqual(error_dict['comments'][1]['content'], - u'Field is required') + def test_multi_keyword_exclude(self): + """ + Tests the exclude method of a List of Embedded Documents + with multiple keywords. + """ + excluded = self.post2.comments.exclude( + author='user3', message='message1' + ) - post.comments[1].content = 'here we go' - post.validate() + # Ensure only 2 entries were returned. + self.assertEqual(len(excluded), 2) - def test_email_field(self): - class User(Document): - email = EmailField() + # Ensure the entries returned are the correct entries. + self.assertEqual(excluded[0].author, 'user2') + self.assertEqual(excluded[1].author, 'user2') - user = User(email="ross@example.com") - self.assertTrue(user.validate() is None) + def test_non_matching_exclude(self): + """ + Tests the exclude method of a List of Embedded Documents + when the keyword does not match any entries. + """ + excluded = self.post2.comments.exclude(author='user4') - user = User(email="ross@example.co.uk") - self.assertTrue(user.validate() is None) + # Ensure the 3 entries still exist. + self.assertEqual(len(excluded), 3) - user = User(email=("Kofq@rhom0e4klgauOhpbpNdogawnyIKvQS0wk2mjqrgGQ5S" - "aJIazqqWkm7.net")) - self.assertTrue(user.validate() is None) + def test_unknown_keyword_exclude(self): + """ + Tests the exclude method of a List of Embedded Documents + when the keyword is not a known keyword. + """ + with self.assertRaises(AttributeError): + self.post2.comments.exclude(year=2) - user = User(email="new-tld@example.technology") - self.assertTrue(user.validate() is None) + def test_chained_filter_exclude(self): + """ + Tests the exclude method after a filter method of a List of + Embedded Documents. + """ + excluded = self.post2.comments.filter(author='user2').exclude( + message='message2' + ) - user = User(email='me@localhost') - self.assertRaises(ValidationError, user.validate) + # Ensure only 1 entry was returned. + self.assertEqual(len(excluded), 1) - user = User(email="ross@example.com.") - self.assertRaises(ValidationError, user.validate) + # Ensure the entry returned is the correct entry. + self.assertEqual(excluded[0].author, 'user2') + self.assertEqual(excluded[0].message, 'message3') - def test_email_field_honors_regex(self): - class User(Document): - email = EmailField(regex=r'\w+@example.com') + def test_count(self): + """ + Tests the count method of a List of Embedded Documents. + """ + self.assertEqual(self.post1.comments.count(), 2) + self.assertEqual(self.post1.comments.count(), len(self.post1.comments)) - # Fails regex validation - user = User(email='me@foo.com') - self.assertRaises(ValidationError, user.validate) + def test_filtered_count(self): + """ + Tests the filter + count method of a List of Embedded Documents. + """ + count = self.post1.comments.filter(author='user1').count() + self.assertEqual(count, 1) - # Passes regex validation - user = User(email='me@example.com') - self.assertTrue(user.validate() is None) + def test_single_keyword_get(self): + """ + Tests the get method of a List of Embedded Documents using a + single keyword. + """ + comment = self.post1.comments.get(author='user1') + self.assertIsInstance(comment, self.Comments) + self.assertEqual(comment.author, 'user1') - def test_tuples_as_tuples(self): + def test_multi_keyword_get(self): """ - Ensure that tuples remain tuples when they are - inside a ComplexBaseField + Tests the get method of a List of Embedded Documents using + multiple keywords. """ - class EnumField(BaseField): + comment = self.post2.comments.get(author='user2', message='message2') + self.assertIsInstance(comment, self.Comments) + self.assertEqual(comment.author, 'user2') + self.assertEqual(comment.message, 'message2') - def __init__(self, **kwargs): - super(EnumField, self).__init__(**kwargs) + def test_no_keyword_multiple_return_get(self): + """ + Tests the get method of a List of Embedded Documents without + a keyword to return multiple documents. + """ + with self.assertRaises(MultipleObjectsReturned): + self.post1.comments.get() - def to_mongo(self, value): - return value + def test_keyword_multiple_return_get(self): + """ + Tests the get method of a List of Embedded Documents with a keyword + to return multiple documents. + """ + with self.assertRaises(MultipleObjectsReturned): + self.post2.comments.get(author='user2') - def to_python(self, value): - return tuple(value) + def test_unknown_keyword_get(self): + """ + Tests the get method of a List of Embedded Documents with an + unknown keyword. + """ + with self.assertRaises(AttributeError): + self.post2.comments.get(year=2020) - class TestDoc(Document): - items = ListField(EnumField()) + def test_no_result_get(self): + """ + Tests the get method of a List of Embedded Documents where get + returns no results. + """ + with self.assertRaises(DoesNotExist): + self.post1.comments.get(author='user3') - TestDoc.drop_collection() - tuples = [(100, 'Testing')] - doc = TestDoc() - doc.items = tuples - doc.save() - x = TestDoc.objects().get() - self.assertTrue(x is not None) - self.assertTrue(len(x.items) == 1) - self.assertTrue(tuple(x.items[0]) in tuples) - self.assertTrue(x.items[0] in tuples) + def test_first(self): + """ + Tests the first method of a List of Embedded Documents to + ensure it returns the first comment. + """ + comment = self.post1.comments.first() - def test_dynamic_fields_class(self): + # Ensure a Comment object was returned. + self.assertIsInstance(comment, self.Comments) + self.assertEqual(comment, self.post1.comments[0]) - class Doc2(Document): - field_1 = StringField(db_field='f') + def test_create(self): + """ + Test the create method of a List of Embedded Documents. + """ + comment = self.post1.comments.create( + author='user4', message='message1' + ) + self.post1.save() - class Doc(Document): - my_id = IntField(required=True, unique=True, primary_key=True) - embed_me = DynamicField(db_field='e') - field_x = StringField(db_field='x') + # Ensure the returned value is the comment object. + self.assertIsInstance(comment, self.Comments) + self.assertEqual(comment.author, 'user4') + self.assertEqual(comment.message, 'message1') - Doc.drop_collection() - Doc2.drop_collection() + # Ensure the new comment was actually saved to the database. + self.assertIn( + comment, + self.BlogPost.objects(comments__author='user4')[0].comments + ) - doc2 = Doc2(field_1="hello") - doc = Doc(my_id=1, embed_me=doc2, field_x="x") - self.assertRaises(OperationError, doc.save) + def test_filtered_create(self): + """ + Test the create method of a List of Embedded Documents chained + to a call to the filter method. Filtering should have no effect + on creation. + """ + comment = self.post1.comments.filter(author='user1').create( + author='user4', message='message1' + ) + self.post1.save() - doc2.save() - doc.save() + # Ensure the returned value is the comment object. + self.assertIsInstance(comment, self.Comments) + self.assertEqual(comment.author, 'user4') + self.assertEqual(comment.message, 'message1') - doc = Doc.objects.get() - self.assertEqual(doc.embed_me.field_1, "hello") + # Ensure the new comment was actually saved to the database. + self.assertIn( + comment, + self.BlogPost.objects(comments__author='user4')[0].comments + ) - def test_dynamic_fields_embedded_class(self): + def test_no_keyword_update(self): + """ + Tests the update method of a List of Embedded Documents with + no keywords. + """ + original = list(self.post1.comments) + number = self.post1.comments.update() + self.post1.save() - class Embed(EmbeddedDocument): - field_1 = StringField(db_field='f') + # Ensure that nothing was altered. + self.assertIn( + original[0], + self.BlogPost.objects(id=self.post1.id)[0].comments + ) - class Doc(Document): - my_id = IntField(required=True, unique=True, primary_key=True) - embed_me = DynamicField(db_field='e') - field_x = StringField(db_field='x') + self.assertIn( + original[1], + self.BlogPost.objects(id=self.post1.id)[0].comments + ) - Doc.drop_collection() + # Ensure the method returned 0 as the number of entries + # modified + self.assertEqual(number, 0) - Doc(my_id=1, embed_me=Embed(field_1="hello"), field_x="x").save() + def test_single_keyword_update(self): + """ + Tests the update method of a List of Embedded Documents with + a single keyword. + """ + number = self.post1.comments.update(author='user4') + self.post1.save() - doc = Doc.objects.get() - self.assertEqual(doc.embed_me.field_1, "hello") + comments = self.BlogPost.objects(id=self.post1.id)[0].comments - def test_dynamicfield_dump_document(self): - """Ensure a DynamicField can handle another document's dump - """ - class Doc(Document): - field = DynamicField() + # Ensure that the database was updated properly. + self.assertEqual(comments[0].author, 'user4') + self.assertEqual(comments[1].author, 'user4') - class ToEmbed(Document): - id = IntField(primary_key=True, default=1) - recursive = DynamicField() + # Ensure the method returned 2 as the number of entries + # modified + self.assertEqual(number, 2) - class ToEmbedParent(Document): - id = IntField(primary_key=True, default=1) - recursive = DynamicField() + def test_unicode(self): + """ + Tests that unicode strings handled correctly + """ + post = self.BlogPost(comments=[ + self.Comments(author='user1', message=u'сообщение'), + self.Comments(author='user2', message=u'хабарлама') + ]).save() + self.assertEqual(post.comments.get(message=u'сообщение').author, + 'user1') - meta = {'allow_inheritance': True} + def test_save(self): + """ + Tests the save method of a List of Embedded Documents. + """ + comments = self.post1.comments + new_comment = self.Comments(author='user4') + comments.append(new_comment) + comments.save() - class ToEmbedChild(ToEmbedParent): - pass + # Ensure that the new comment has been added to the database. + self.assertIn( + new_comment, + self.BlogPost.objects(id=self.post1.id)[0].comments + ) - to_embed_recursive = ToEmbed(id=1).save() - to_embed = ToEmbed(id=2, recursive=to_embed_recursive).save() - doc = Doc(field=to_embed) - doc.save() - assert isinstance(doc.field, ToEmbed) - assert doc.field == to_embed - # Same thing with a Document with a _cls field - to_embed_recursive = ToEmbedChild(id=1).save() - to_embed_child = ToEmbedChild(id=2, recursive=to_embed_recursive).save() - doc = Doc(field=to_embed_child) - doc.save() - assert isinstance(doc.field, ToEmbedChild) - assert doc.field == to_embed_child + def test_delete(self): + """ + Tests the delete method of a List of Embedded Documents. + """ + number = self.post1.comments.delete() + self.post1.save() - def test_invalid_dict_value(self): - class DictFieldTest(Document): - dictionary = DictField(required=True) + # Ensure that all the comments under post1 were deleted in the + # database. + self.assertListEqual( + self.BlogPost.objects(id=self.post1.id)[0].comments, [] + ) - DictFieldTest.drop_collection() + # Ensure that post1 comments were deleted from the list. + self.assertListEqual(self.post1.comments, []) - test = DictFieldTest(dictionary=None) - test.dictionary # Just access to test getter - self.assertRaises(ValidationError, test.validate) + # Ensure that comments still returned a EmbeddedDocumentList object. + self.assertIsInstance(self.post1.comments, EmbeddedDocumentList) - test = DictFieldTest(dictionary=False) - test.dictionary # Just access to test getter - self.assertRaises(ValidationError, test.validate) + # Ensure that the delete method returned 2 as the number of entries + # deleted from the database + self.assertEqual(number, 2) - def test_cls_field(self): - class Animal(Document): - meta = {'allow_inheritance': True} + def test_empty_list_embedded_documents_with_unique_field(self): + """ + Tests that only one document with an empty list of embedded documents + that have a unique field can be saved, but if the unique field is + also sparse than multiple documents with an empty list can be saved. + """ + class EmbeddedWithUnique(EmbeddedDocument): + number = IntField(unique=True) - class Fish(Animal): - pass + class A(Document): + my_list = ListField(EmbeddedDocumentField(EmbeddedWithUnique)) - class Mammal(Animal): - pass + A(my_list=[]).save() + with self.assertRaises(NotUniqueError): + A(my_list=[]).save() - class Dog(Mammal): - pass + class EmbeddedWithSparseUnique(EmbeddedDocument): + number = IntField(unique=True, sparse=True) - class Human(Mammal): - pass + class B(Document): + my_list = ListField(EmbeddedDocumentField(EmbeddedWithSparseUnique)) - Animal.objects.delete() - Dog().save() - Fish().save() - Human().save() - self.assertEquals(Animal.objects(_cls__in=["Animal.Mammal.Dog", "Animal.Fish"]).count(), 2) - self.assertEquals(Animal.objects(_cls__in=["Animal.Fish.Guppy"]).count(), 0) + A.drop_collection() + B.drop_collection() - def test_sparse_field(self): - class Doc(Document): - name = StringField(required=False, unique=True, sparse=True) - try: - Doc().save() - Doc().save() - except Exception: - self.fail() + B(my_list=[]).save() + B(my_list=[]).save() - def test_undefined_field_exception(self): - """Tests if a `FieldDoesNotExist` exception is raised when trying to - instanciate a document with a field that's not defined. + def test_filtered_delete(self): """ - class Doc(Document): - foo = StringField() - - with self.assertRaises(FieldDoesNotExist): - Doc(bar='test') + Tests the delete method of a List of Embedded Documents + after the filter method has been called. + """ + comment = self.post1.comments[1] + number = self.post1.comments.filter(author='user2').delete() + self.post1.save() + # Ensure that only the user2 comment was deleted. + self.assertNotIn( + comment, self.BlogPost.objects(id=self.post1.id)[0].comments + ) + self.assertEqual( + len(self.BlogPost.objects(id=self.post1.id)[0].comments), 1 + ) - def test_undefined_field_exception_with_strict(self): - """Tests if a `FieldDoesNotExist` exception is raised when trying to - instanciate a document with a field that's not defined, - even when strict is set to False. - """ - class Doc(Document): - foo = StringField() - meta = {'strict': False} + # Ensure that the user2 comment no longer exists in the list. + self.assertNotIn(comment, self.post1.comments) + self.assertEqual(len(self.post1.comments), 1) - with self.assertRaises(FieldDoesNotExist): - Doc(bar='test') + # Ensure that the delete method returned 1 as the number of entries + # deleted from the database + self.assertEqual(number, 1) - def test_long_field_is_considered_as_int64(self): + def test_custom_data(self): """ - Tests that long fields are stored as long in mongo, even if long value - is small enough to be an int. + Tests that custom data is saved in the field object + and doesn't interfere with the rest of field functionalities. """ - class TestLongFieldConsideredAsInt64(Document): - some_long = LongField() - - doc = TestLongFieldConsideredAsInt64(some_long=42).save() - db = get_db() - self.assertTrue(isinstance(db.test_long_field_considered_as_int64.find()[0]['some_long'], Int64)) - self.assertTrue(isinstance(doc.some_long, six.integer_types)) + custom_data = {'a': 'a_value', 'b': [1, 2]} + class CustomData(Document): + a_field = IntField() + c_field = IntField(custom_data=custom_data) -class EmbeddedDocumentListFieldTestCase(MongoDBTestCase): + CustomData.drop_collection() - def setUp(self): + a1 = CustomData(a_field=1, c_field=2).save() + self.assertEqual(2, a1.c_field) + self.assertFalse(hasattr(a1.c_field, 'custom_data')) + self.assertTrue(hasattr(CustomData.c_field, 'custom_data')) + self.assertEqual(custom_data['a'], CustomData.c_field.custom_data['a']) + + +class CachedReferenceFieldTest(MongoDBTestCase): + + def test_cached_reference_field_get_and_save(self): """ - Create two BlogPost entries in the database, each with - several EmbeddedDocuments. + Tests #1047: CachedReferenceField creates DBRefs on to_python, + but can't save them on to_mongo. """ - class Comments(EmbeddedDocument): - author = StringField() - message = StringField() + class Animal(Document): + name = StringField() + tag = StringField() - class BlogPost(Document): - comments = EmbeddedDocumentListField(Comments) + class Ocorrence(Document): + person = StringField() + animal = CachedReferenceField(Animal) - BlogPost.drop_collection() + Animal.drop_collection() + Ocorrence.drop_collection() - self.Comments = Comments - self.BlogPost = BlogPost + Ocorrence(person="testte", + animal=Animal(name="Leopard", tag="heavy").save()).save() + p = Ocorrence.objects.get() + p.person = 'new_testte' + p.save() - self.post1 = self.BlogPost(comments=[ - self.Comments(author='user1', message='message1'), - self.Comments(author='user2', message='message1') - ]).save() + def test_cached_reference_fields(self): + class Animal(Document): + name = StringField() + tag = StringField() - self.post2 = self.BlogPost(comments=[ - self.Comments(author='user2', message='message2'), - self.Comments(author='user2', message='message3'), - self.Comments(author='user3', message='message1') - ]).save() + class Ocorrence(Document): + person = StringField() + animal = CachedReferenceField( + Animal, fields=['tag']) - def test_no_keyword_filter(self): - """ - Tests the filter method of a List of Embedded Documents - with a no keyword. - """ - filtered = self.post1.comments.filter() + Animal.drop_collection() + Ocorrence.drop_collection() - # Ensure nothing was changed - self.assertListEqual(filtered, self.post1.comments) + a = Animal(name="Leopard", tag="heavy") + a.save() - def test_single_keyword_filter(self): - """ - Tests the filter method of a List of Embedded Documents - with a single keyword. - """ - filtered = self.post1.comments.filter(author='user1') + self.assertEqual(Animal._cached_reference_fields, [Ocorrence.animal]) + o = Ocorrence(person="teste", animal=a) + o.save() - # Ensure only 1 entry was returned. - self.assertEqual(len(filtered), 1) + p = Ocorrence(person="Wilson") + p.save() - # Ensure the entry returned is the correct entry. - self.assertEqual(filtered[0].author, 'user1') + self.assertEqual(Ocorrence.objects(animal=None).count(), 1) - def test_multi_keyword_filter(self): - """ - Tests the filter method of a List of Embedded Documents - with multiple keywords. - """ - filtered = self.post2.comments.filter( - author='user2', message='message2' - ) + self.assertEqual( + a.to_mongo(fields=['tag']), {'tag': 'heavy', "_id": a.pk}) - # Ensure only 1 entry was returned. - self.assertEqual(len(filtered), 1) + self.assertEqual(o.to_mongo()['animal']['tag'], 'heavy') - # Ensure the entry returned is the correct entry. - self.assertEqual(filtered[0].author, 'user2') - self.assertEqual(filtered[0].message, 'message2') + # counts + Ocorrence(person="teste 2").save() + Ocorrence(person="teste 3").save() - def test_chained_filter(self): - """ - Tests chained filter methods of a List of Embedded Documents - """ - filtered = self.post2.comments.filter(author='user2').filter( - message='message2' - ) + count = Ocorrence.objects(animal__tag='heavy').count() + self.assertEqual(count, 1) - # Ensure only 1 entry was returned. - self.assertEqual(len(filtered), 1) + ocorrence = Ocorrence.objects(animal__tag='heavy').first() + self.assertEqual(ocorrence.person, "teste") + self.assertTrue(isinstance(ocorrence.animal, Animal)) - # Ensure the entry returned is the correct entry. - self.assertEqual(filtered[0].author, 'user2') - self.assertEqual(filtered[0].message, 'message2') + def test_cached_reference_field_decimal(self): + class PersonAuto(Document): + name = StringField() + salary = DecimalField() - def test_unknown_keyword_filter(self): - """ - Tests the filter method of a List of Embedded Documents - when the keyword is not a known keyword. - """ - with self.assertRaises(AttributeError): - self.post2.comments.filter(year=2) + class SocialTest(Document): + group = StringField() + person = CachedReferenceField( + PersonAuto, + fields=('salary',)) - def test_no_keyword_exclude(self): - """ - Tests the exclude method of a List of Embedded Documents - with a no keyword. - """ - filtered = self.post1.comments.exclude() + PersonAuto.drop_collection() + SocialTest.drop_collection() - # Ensure everything was removed - self.assertListEqual(filtered, []) + p = PersonAuto(name="Alberto", salary=Decimal('7000.00')) + p.save() - def test_single_keyword_exclude(self): - """ - Tests the exclude method of a List of Embedded Documents - with a single keyword. - """ - excluded = self.post1.comments.exclude(author='user1') + s = SocialTest(group="dev", person=p) + s.save() - # Ensure only 1 entry was returned. - self.assertEqual(len(excluded), 1) + self.assertEqual( + SocialTest.objects._collection.find_one({'person.salary': 7000.00}), { + '_id': s.pk, + 'group': s.group, + 'person': { + '_id': p.pk, + 'salary': 7000.00 + } + }) - # Ensure the entry returned is the correct entry. - self.assertEqual(excluded[0].author, 'user2') + def test_cached_reference_field_reference(self): + class Group(Document): + name = StringField() - def test_multi_keyword_exclude(self): - """ - Tests the exclude method of a List of Embedded Documents - with multiple keywords. - """ - excluded = self.post2.comments.exclude( - author='user3', message='message1' - ) + class Person(Document): + name = StringField() + group = ReferenceField(Group) - # Ensure only 2 entries were returned. - self.assertEqual(len(excluded), 2) + class SocialData(Document): + obs = StringField() + tags = ListField( + StringField()) + person = CachedReferenceField( + Person, + fields=('group',)) - # Ensure the entries returned are the correct entries. - self.assertEqual(excluded[0].author, 'user2') - self.assertEqual(excluded[1].author, 'user2') + Group.drop_collection() + Person.drop_collection() + SocialData.drop_collection() - def test_non_matching_exclude(self): - """ - Tests the exclude method of a List of Embedded Documents - when the keyword does not match any entries. - """ - excluded = self.post2.comments.exclude(author='user4') + g1 = Group(name='dev') + g1.save() - # Ensure the 3 entries still exist. - self.assertEqual(len(excluded), 3) + g2 = Group(name="designers") + g2.save() - def test_unknown_keyword_exclude(self): - """ - Tests the exclude method of a List of Embedded Documents - when the keyword is not a known keyword. - """ - with self.assertRaises(AttributeError): - self.post2.comments.exclude(year=2) + p1 = Person(name="Alberto", group=g1) + p1.save() - def test_chained_filter_exclude(self): - """ - Tests the exclude method after a filter method of a List of - Embedded Documents. - """ - excluded = self.post2.comments.filter(author='user2').exclude( - message='message2' - ) + p2 = Person(name="Andre", group=g1) + p2.save() - # Ensure only 1 entry was returned. - self.assertEqual(len(excluded), 1) + p3 = Person(name="Afro design", group=g2) + p3.save() - # Ensure the entry returned is the correct entry. - self.assertEqual(excluded[0].author, 'user2') - self.assertEqual(excluded[0].message, 'message3') + s1 = SocialData(obs="testing 123", person=p1, tags=['tag1', 'tag2']) + s1.save() - def test_count(self): - """ - Tests the count method of a List of Embedded Documents. - """ - self.assertEqual(self.post1.comments.count(), 2) - self.assertEqual(self.post1.comments.count(), len(self.post1.comments)) + s2 = SocialData(obs="testing 321", person=p3, tags=['tag3', 'tag4']) + s2.save() - def test_filtered_count(self): - """ - Tests the filter + count method of a List of Embedded Documents. - """ - count = self.post1.comments.filter(author='user1').count() - self.assertEqual(count, 1) + self.assertEqual(SocialData.objects._collection.find_one( + {'tags': 'tag2'}), { + '_id': s1.pk, + 'obs': 'testing 123', + 'tags': ['tag1', 'tag2'], + 'person': { + '_id': p1.pk, + 'group': g1.pk + } + }) - def test_single_keyword_get(self): - """ - Tests the get method of a List of Embedded Documents using a - single keyword. - """ - comment = self.post1.comments.get(author='user1') - self.assertIsInstance(comment, self.Comments) - self.assertEqual(comment.author, 'user1') + self.assertEqual(SocialData.objects(person__group=g2).count(), 1) + self.assertEqual(SocialData.objects(person__group=g2).first(), s2) - def test_multi_keyword_get(self): - """ - Tests the get method of a List of Embedded Documents using - multiple keywords. - """ - comment = self.post2.comments.get(author='user2', message='message2') - self.assertIsInstance(comment, self.Comments) - self.assertEqual(comment.author, 'user2') - self.assertEqual(comment.message, 'message2') + def test_cached_reference_field_update_all(self): + class Person(Document): + TYPES = ( + ('pf', "PF"), + ('pj', "PJ") + ) + name = StringField() + tp = StringField( + choices=TYPES + ) - def test_no_keyword_multiple_return_get(self): - """ - Tests the get method of a List of Embedded Documents without - a keyword to return multiple documents. - """ - with self.assertRaises(MultipleObjectsReturned): - self.post1.comments.get() + father = CachedReferenceField('self', fields=('tp',)) - def test_keyword_multiple_return_get(self): - """ - Tests the get method of a List of Embedded Documents with a keyword - to return multiple documents. - """ - with self.assertRaises(MultipleObjectsReturned): - self.post2.comments.get(author='user2') + Person.drop_collection() - def test_unknown_keyword_get(self): - """ - Tests the get method of a List of Embedded Documents with an - unknown keyword. - """ - with self.assertRaises(AttributeError): - self.post2.comments.get(year=2020) + a1 = Person(name="Wilson Father", tp="pj") + a1.save() - def test_no_result_get(self): - """ - Tests the get method of a List of Embedded Documents where get - returns no results. - """ - with self.assertRaises(DoesNotExist): - self.post1.comments.get(author='user3') + a2 = Person(name='Wilson Junior', tp='pf', father=a1) + a2.save() - def test_first(self): - """ - Tests the first method of a List of Embedded Documents to - ensure it returns the first comment. - """ - comment = self.post1.comments.first() + self.assertEqual(dict(a2.to_mongo()), { + "_id": a2.pk, + "name": u"Wilson Junior", + "tp": u"pf", + "father": { + "_id": a1.pk, + "tp": u"pj" + } + }) - # Ensure a Comment object was returned. - self.assertIsInstance(comment, self.Comments) - self.assertEqual(comment, self.post1.comments[0]) + self.assertEqual(Person.objects(father=a1)._query, { + 'father._id': a1.pk + }) + self.assertEqual(Person.objects(father=a1).count(), 1) - def test_create(self): - """ - Test the create method of a List of Embedded Documents. - """ - comment = self.post1.comments.create( - author='user4', message='message1' - ) - self.post1.save() + Person.objects.update(set__tp="pf") + Person.father.sync_all() - # Ensure the returned value is the comment object. - self.assertIsInstance(comment, self.Comments) - self.assertEqual(comment.author, 'user4') - self.assertEqual(comment.message, 'message1') + a2.reload() + self.assertEqual(dict(a2.to_mongo()), { + "_id": a2.pk, + "name": u"Wilson Junior", + "tp": u"pf", + "father": { + "_id": a1.pk, + "tp": u"pf" + } + }) - # Ensure the new comment was actually saved to the database. - self.assertIn( - comment, - self.BlogPost.objects(comments__author='user4')[0].comments - ) + def test_cached_reference_fields_on_embedded_documents(self): + with self.assertRaises(InvalidDocumentError): + class Test(Document): + name = StringField() - def test_filtered_create(self): - """ - Test the create method of a List of Embedded Documents chained - to a call to the filter method. Filtering should have no effect - on creation. - """ - comment = self.post1.comments.filter(author='user1').create( - author='user4', message='message1' - ) - self.post1.save() + type('WrongEmbeddedDocument', ( + EmbeddedDocument,), { + 'test': CachedReferenceField(Test) + }) - # Ensure the returned value is the comment object. - self.assertIsInstance(comment, self.Comments) - self.assertEqual(comment.author, 'user4') - self.assertEqual(comment.message, 'message1') + def test_cached_reference_auto_sync(self): + class Person(Document): + TYPES = ( + ('pf', "PF"), + ('pj', "PJ") + ) + name = StringField() + tp = StringField( + choices=TYPES + ) - # Ensure the new comment was actually saved to the database. - self.assertIn( - comment, - self.BlogPost.objects(comments__author='user4')[0].comments - ) + father = CachedReferenceField('self', fields=('tp',)) - def test_no_keyword_update(self): - """ - Tests the update method of a List of Embedded Documents with - no keywords. - """ - original = list(self.post1.comments) - number = self.post1.comments.update() - self.post1.save() + Person.drop_collection() - # Ensure that nothing was altered. - self.assertIn( - original[0], - self.BlogPost.objects(id=self.post1.id)[0].comments - ) + a1 = Person(name="Wilson Father", tp="pj") + a1.save() - self.assertIn( - original[1], - self.BlogPost.objects(id=self.post1.id)[0].comments - ) + a2 = Person(name='Wilson Junior', tp='pf', father=a1) + a2.save() - # Ensure the method returned 0 as the number of entries - # modified - self.assertEqual(number, 0) + a1.tp = 'pf' + a1.save() - def test_single_keyword_update(self): - """ - Tests the update method of a List of Embedded Documents with - a single keyword. - """ - number = self.post1.comments.update(author='user4') - self.post1.save() + a2.reload() + self.assertEqual(dict(a2.to_mongo()), { + '_id': a2.pk, + 'name': 'Wilson Junior', + 'tp': 'pf', + 'father': { + '_id': a1.pk, + 'tp': 'pf' + } + }) - comments = self.BlogPost.objects(id=self.post1.id)[0].comments + def test_cached_reference_auto_sync_disabled(self): + class Persone(Document): + TYPES = ( + ('pf', "PF"), + ('pj', "PJ") + ) + name = StringField() + tp = StringField( + choices=TYPES + ) - # Ensure that the database was updated properly. - self.assertEqual(comments[0].author, 'user4') - self.assertEqual(comments[1].author, 'user4') + father = CachedReferenceField( + 'self', fields=('tp',), auto_sync=False) - # Ensure the method returned 2 as the number of entries - # modified - self.assertEqual(number, 2) + Persone.drop_collection() - def test_unicode(self): - """ - Tests that unicode strings handled correctly - """ - post = self.BlogPost(comments=[ - self.Comments(author='user1', message=u'сообщение'), - self.Comments(author='user2', message=u'хабарлама') - ]).save() - self.assertEqual(post.comments.get(message=u'сообщение').author, - 'user1') + a1 = Persone(name="Wilson Father", tp="pj") + a1.save() - def test_save(self): - """ - Tests the save method of a List of Embedded Documents. - """ - comments = self.post1.comments - new_comment = self.Comments(author='user4') - comments.append(new_comment) - comments.save() + a2 = Persone(name='Wilson Junior', tp='pf', father=a1) + a2.save() - # Ensure that the new comment has been added to the database. - self.assertIn( - new_comment, - self.BlogPost.objects(id=self.post1.id)[0].comments - ) + a1.tp = 'pf' + a1.save() - def test_delete(self): - """ - Tests the delete method of a List of Embedded Documents. - """ - number = self.post1.comments.delete() - self.post1.save() + self.assertEqual(Persone.objects._collection.find_one({'_id': a2.pk}), { + '_id': a2.pk, + 'name': 'Wilson Junior', + 'tp': 'pf', + 'father': { + '_id': a1.pk, + 'tp': 'pj' + } + }) - # Ensure that all the comments under post1 were deleted in the - # database. - self.assertListEqual( - self.BlogPost.objects(id=self.post1.id)[0].comments, [] - ) + def test_cached_reference_embedded_fields(self): + class Owner(EmbeddedDocument): + TPS = ( + ('n', "Normal"), + ('u', "Urgent") + ) + name = StringField() + tp = StringField( + verbose_name="Type", + db_field="t", + choices=TPS) - # Ensure that post1 comments were deleted from the list. - self.assertListEqual(self.post1.comments, []) + class Animal(Document): + name = StringField() + tag = StringField() - # Ensure that comments still returned a EmbeddedDocumentList object. - self.assertIsInstance(self.post1.comments, EmbeddedDocumentList) + owner = EmbeddedDocumentField(Owner) - # Ensure that the delete method returned 2 as the number of entries - # deleted from the database - self.assertEqual(number, 2) + class Ocorrence(Document): + person = StringField() + animal = CachedReferenceField( + Animal, fields=['tag', 'owner.tp']) - def test_empty_list_embedded_documents_with_unique_field(self): - """ - Tests that only one document with an empty list of embedded documents - that have a unique field can be saved, but if the unique field is - also sparse than multiple documents with an empty list can be saved. - """ - class EmbeddedWithUnique(EmbeddedDocument): - number = IntField(unique=True) + Animal.drop_collection() + Ocorrence.drop_collection() - class A(Document): - my_list = ListField(EmbeddedDocumentField(EmbeddedWithUnique)) + a = Animal(name="Leopard", tag="heavy", + owner=Owner(tp='u', name="Wilson Júnior") + ) + a.save() - A(my_list=[]).save() - with self.assertRaises(NotUniqueError): - A(my_list=[]).save() + o = Ocorrence(person="teste", animal=a) + o.save() + self.assertEqual(dict(a.to_mongo(fields=['tag', 'owner.tp'])), { + '_id': a.pk, + 'tag': 'heavy', + 'owner': { + 't': 'u' + } + }) + self.assertEqual(o.to_mongo()['animal']['tag'], 'heavy') + self.assertEqual(o.to_mongo()['animal']['owner']['t'], 'u') - class EmbeddedWithSparseUnique(EmbeddedDocument): - number = IntField(unique=True, sparse=True) + # counts + Ocorrence(person="teste 2").save() + Ocorrence(person="teste 3").save() - class B(Document): - my_list = ListField(EmbeddedDocumentField(EmbeddedWithSparseUnique)) + count = Ocorrence.objects( + animal__tag='heavy', animal__owner__tp='u').count() + self.assertEqual(count, 1) - A.drop_collection() - B.drop_collection() + ocorrence = Ocorrence.objects( + animal__tag='heavy', + animal__owner__tp='u').first() + self.assertEqual(ocorrence.person, "teste") + self.assertTrue(isinstance(ocorrence.animal, Animal)) - B(my_list=[]).save() - B(my_list=[]).save() + def test_cached_reference_embedded_list_fields(self): + class Owner(EmbeddedDocument): + name = StringField() + tags = ListField(StringField()) - def test_filtered_delete(self): - """ - Tests the delete method of a List of Embedded Documents - after the filter method has been called. - """ - comment = self.post1.comments[1] - number = self.post1.comments.filter(author='user2').delete() - self.post1.save() + class Animal(Document): + name = StringField() + tag = StringField() - # Ensure that only the user2 comment was deleted. - self.assertNotIn( - comment, self.BlogPost.objects(id=self.post1.id)[0].comments - ) - self.assertEqual( - len(self.BlogPost.objects(id=self.post1.id)[0].comments), 1 - ) + owner = EmbeddedDocumentField(Owner) - # Ensure that the user2 comment no longer exists in the list. - self.assertNotIn(comment, self.post1.comments) - self.assertEqual(len(self.post1.comments), 1) + class Ocorrence(Document): + person = StringField() + animal = CachedReferenceField( + Animal, fields=['tag', 'owner.tags']) - # Ensure that the delete method returned 1 as the number of entries - # deleted from the database - self.assertEqual(number, 1) + Animal.drop_collection() + Ocorrence.drop_collection() - def test_custom_data(self): - """ - Tests that custom data is saved in the field object - and doesn't interfere with the rest of field functionalities. - """ - custom_data = {'a': 'a_value', 'b': [1, 2]} + a = Animal(name="Leopard", tag="heavy", + owner=Owner(tags=['cool', 'funny'], + name="Wilson Júnior") + ) + a.save() - class CustomData(Document): - a_field = IntField() - c_field = IntField(custom_data=custom_data) + o = Ocorrence(person="teste 2", animal=a) + o.save() + self.assertEqual(dict(a.to_mongo(fields=['tag', 'owner.tags'])), { + '_id': a.pk, + 'tag': 'heavy', + 'owner': { + 'tags': ['cool', 'funny'] + } + }) - CustomData.drop_collection() + self.assertEqual(o.to_mongo()['animal']['tag'], 'heavy') + self.assertEqual(o.to_mongo()['animal']['owner']['tags'], + ['cool', 'funny']) - a1 = CustomData(a_field=1, c_field=2).save() - self.assertEqual(2, a1.c_field) - self.assertFalse(hasattr(a1.c_field, 'custom_data')) - self.assertTrue(hasattr(CustomData.c_field, 'custom_data')) - self.assertEqual(custom_data['a'], CustomData.c_field.custom_data['a']) + # counts + Ocorrence(person="teste 2").save() + Ocorrence(person="teste 3").save() + + query = Ocorrence.objects( + animal__tag='heavy', animal__owner__tags='cool')._query + self.assertEqual( + query, {'animal.owner.tags': 'cool', 'animal.tag': 'heavy'}) + + ocorrence = Ocorrence.objects( + animal__tag='heavy', + animal__owner__tags='cool').first() + self.assertEqual(ocorrence.person, "teste 2") + self.assertTrue(isinstance(ocorrence.animal, Animal)) if __name__ == '__main__':