From 376ed1371c308ed680d8d4084f7a8ebbab4d17c2 Mon Sep 17 00:00:00 2001 From: Stefane Fermigier Date: Fri, 6 Apr 2018 15:46:30 +0200 Subject: [PATCH] refactor tests --- .../communities/tests/test_community_noweb.py | 34 +- .../communities/tests/test_community_web.py | 14 +- abilian/sbe/apps/documents/tests/conftest.py | 1 + .../apps/documents/tests/test_documents.py | 632 ++++-------------- .../documents/tests/test_documents_web.py | 432 ++++++++++++ .../apps/forum/tests/test_integration_new.py | 4 +- .../sbe/apps/wiki/tests/test_web_classic.py | 2 + abilian/sbe/apps/wiki/tests/test_web_new.py | 4 +- 8 files changed, 565 insertions(+), 558 deletions(-) create mode 100644 abilian/sbe/apps/documents/tests/conftest.py create mode 100644 abilian/sbe/apps/documents/tests/test_documents_web.py diff --git a/abilian/sbe/apps/communities/tests/test_community_noweb.py b/abilian/sbe/apps/communities/tests/test_community_noweb.py index 4742f53d..39c25b36 100644 --- a/abilian/sbe/apps/communities/tests/test_community_noweb.py +++ b/abilian/sbe/apps/communities/tests/test_community_noweb.py @@ -9,6 +9,7 @@ import sqlalchemy as sa from abilian.core.entities import Entity from abilian.core.models.subjects import User +from abilian.testing.util import login from flask_login import login_user, logout_user from mock import mock from pytest import fixture @@ -200,39 +201,6 @@ class CommunityContent(Entity): ########################################################################## -def login(user, remember=False, force=False): - """Perform user login for `user`, so that code needing a logged-in user can - work. - - This method can also be used as a context manager, so that logout is - performed automatically:: - - with login(user): - assert ... - - .. seealso:: :meth:`logout` - """ - # self._login_tests_sanity_check() - success = login_user(user, remember=remember, force=force) - if not success: - raise ValueError( - 'User is not active, cannot login; or use force=True', - ) - - class LoginContext(object): - def __enter__(self): - return None - - def __exit__(self, type, value, traceback): - logout() - - return LoginContext() - - -def logout(): - logout_user() - - def test_community_indexed(app, db): index_service = app.services['indexing'] index_service.start() diff --git a/abilian/sbe/apps/communities/tests/test_community_web.py b/abilian/sbe/apps/communities/tests/test_community_web.py index a6987b7e..44ee5fed 100644 --- a/abilian/sbe/apps/communities/tests/test_community_web.py +++ b/abilian/sbe/apps/communities/tests/test_community_web.py @@ -3,7 +3,7 @@ from __future__ import absolute_import, print_function, unicode_literals from abilian.services.security import Admin -from abilian.testing.util import login +from abilian.testing.util import client_login from flask import url_for from flask_login import current_user from pytest import mark @@ -16,7 +16,7 @@ def test_index(app, client, community1): security_service.start() user = community1.test_user - with login(client, user): + with client_login(client, user): response = client.get(url_for("communities.index")) print(current_user) assert response.status_code == 200 @@ -29,7 +29,7 @@ def test_community_home(app, client, community1, community2): url = app.default_view.url_for(community1) user1 = community1.test_user - with login(client, user1): + with client_login(client, user1): response = client.get(url) print(current_user) assert response.status_code == 302 @@ -41,7 +41,7 @@ def test_community_home(app, client, community1, community2): assert response.headers['Location'] == expected_url user2 = community2.test_user - with login(client, user2): + with client_login(client, user2): response = client.get(url) print(current_user) # assert response.headers['Location'] == "" @@ -54,7 +54,7 @@ def test_new(app, client, community1): user = community1.test_user - with login(client, user): + with client_login(client, user): response = client.get(url_for("communities.new")) assert response.status_code == 403 @@ -71,7 +71,7 @@ def test_community_settings(app, client, community1): url = url_for('communities.settings', community_id=community1.slug) user = community1.test_user - with login(client, user): + with client_login(client, user): response = client.get(url) assert response.status_code == 403 @@ -105,7 +105,7 @@ def test_members(app, client, db, community1, community2): user1 = community1.test_user user2 = community2.test_user - with login(client, user1): + with client_login(client, user1): url = url_for( "communities.members", community_id=community1.slug, diff --git a/abilian/sbe/apps/documents/tests/conftest.py b/abilian/sbe/apps/documents/tests/conftest.py new file mode 100644 index 00000000..025ee962 --- /dev/null +++ b/abilian/sbe/apps/documents/tests/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ['abilian.sbe.apps.communities.tests.fixtures'] diff --git a/abilian/sbe/apps/documents/tests/test_documents.py b/abilian/sbe/apps/documents/tests/test_documents.py index ee31efa2..32d4d85c 100644 --- a/abilian/sbe/apps/documents/tests/test_documents.py +++ b/abilian/sbe/apps/documents/tests/test_documents.py @@ -3,535 +3,139 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals -import sys -import unittest -from io import BytesIO -from itertools import count from pathlib import Path -from zipfile import ZipFile -import flask_mail -import pytest -from abilian.web.util import url_for -from flask import g, get_flashed_messages -from toolz import first -from werkzeug.datastructures import FileStorage - -from abilian.sbe.apps.communities.models import WRITER -from abilian.sbe.apps.communities.presenters import CommunityPresenter -from abilian.sbe.apps.communities.tests.base import CommunityBaseTestCase, \ - CommunityIndexingTestCase - -from ..models import Document, Folder, PathAndSecurityIndexable, db -from ..views import util as view_util -from ..views.folders import explore_archive - - -class BaseTests(CommunityBaseTestCase): - no_login = True - - def setUp(self): - super(BaseTests, self).setUp() - - self.root_folder = Folder(title="root") - db.session.add(self.root_folder) - db.session.flush() - - @staticmethod - def uid_from_url(url): - return int(url.split("/")[-1]) - - @staticmethod - def open_file(filename): - path = Path(__file__).parent / "data" / "dummy_files" / filename - return path.open('rb') - - -class TestBlobs(BaseTests): - def test_document(self): - root = Folder(title="root") - doc = Document(parent=root, title="test") - data = self.open_file("onepage.pdf").read() - doc.set_content(data, "application/pdf") - self.session.add(doc) - self.session.commit() - - # coverage - self.app.config['ANTIVIRUS_CHECK_REQUIRED'] = True - doc.ensure_antivirus_scheduled() - - def test_antivirus_properties(self): - root = Folder(title="root") - doc = Document(parent=root, title="test") - doc.set_content(b'content', 'text/plain') - appcfg = self.app.config - meta = doc.content_blob.meta - - # 1: not check required ################ - # no data in meta - appcfg['ANTIVIRUS_CHECK_REQUIRED'] = False - assert doc.antivirus_scanned is False - assert doc.antivirus_status is None - assert doc.antivirus_required is False - assert doc.antivirus_ok is True - - # antivirus was run, but no result - meta['antivirus'] = None - assert doc.antivirus_scanned is True - assert doc.antivirus_status is None - assert doc.antivirus_required is False - assert doc.antivirus_ok is True - - # virus detected - meta['antivirus'] = False - assert doc.antivirus_scanned is True - assert doc.antivirus_status is False - assert doc.antivirus_required is False - assert doc.antivirus_ok is False - - # virus free - meta['antivirus'] = True - assert doc.antivirus_scanned is True - assert doc.antivirus_status is True - assert doc.antivirus_required is False - assert doc.antivirus_ok is True - - # 2: check required ################## - # no data in meta - del meta['antivirus'] - appcfg['ANTIVIRUS_CHECK_REQUIRED'] = True - assert doc.antivirus_scanned is False - assert doc.antivirus_status is None - assert doc.antivirus_required is True - assert doc.antivirus_ok is False - - # antivirus was run, but no result - meta['antivirus'] = None - assert doc.antivirus_scanned is True - assert doc.antivirus_status is None - assert doc.antivirus_required is True - assert doc.antivirus_ok is False - - # virus detected - meta['antivirus'] = False - assert doc.antivirus_scanned is True - assert doc.antivirus_status is False - assert doc.antivirus_required is False - assert doc.antivirus_ok is False +from abilian.core.models.subjects import User +from abilian.testing.util import login + +from ..models import Document, Folder + + +def open_file(filename): + path = Path(__file__).parent / "data" / "dummy_files" / filename + return path.open('rb') + + +def test_document(app, session, req_ctx): + root = Folder(title="root") + doc = Document(parent=root, title="test") + data = open_file("onepage.pdf").read() + doc.set_content(data, "application/pdf") + session.add(doc) + session.commit() + + # coverage + app.config['ANTIVIRUS_CHECK_REQUIRED'] = True + doc.ensure_antivirus_scheduled() + + +def test_antivirus_properties(app, session, req_ctx): + root = Folder(title="root") + doc = Document(parent=root, title="test") + doc.set_content(b'content', 'text/plain') + appcfg = app.config + meta = doc.content_blob.meta + + # 1: not check required ################ + # no data in meta + appcfg['ANTIVIRUS_CHECK_REQUIRED'] = False + assert doc.antivirus_scanned is False + assert doc.antivirus_status is None + assert doc.antivirus_required is False + assert doc.antivirus_ok is True + + # antivirus was run, but no result + meta['antivirus'] = None + assert doc.antivirus_scanned is True + assert doc.antivirus_status is None + assert doc.antivirus_required is False + assert doc.antivirus_ok is True + + # virus detected + meta['antivirus'] = False + assert doc.antivirus_scanned is True + assert doc.antivirus_status is False + assert doc.antivirus_required is False + assert doc.antivirus_ok is False + + # virus free + meta['antivirus'] = True + assert doc.antivirus_scanned is True + assert doc.antivirus_status is True + assert doc.antivirus_required is False + assert doc.antivirus_ok is True + + # 2: check required ################## + # no data in meta + del meta['antivirus'] + appcfg['ANTIVIRUS_CHECK_REQUIRED'] = True + assert doc.antivirus_scanned is False + assert doc.antivirus_status is None + assert doc.antivirus_required is True + assert doc.antivirus_ok is False + + # antivirus was run, but no result + meta['antivirus'] = None + assert doc.antivirus_scanned is True + assert doc.antivirus_status is None + assert doc.antivirus_required is True + assert doc.antivirus_ok is False + + # virus detected + meta['antivirus'] = False + assert doc.antivirus_scanned is True + assert doc.antivirus_status is False + assert doc.antivirus_required is False + assert doc.antivirus_ok is False + + # virus free + meta['antivirus'] = True + assert doc.antivirus_scanned is True + assert doc.antivirus_status is True + assert doc.antivirus_required is False + assert doc.antivirus_ok is True + + +def test_folder_indexed(app, session, community1, community2): + index_service = app.services['indexing'] + index_service.start() + + security_service = app.services['security'] + security_service.start() + + folder = Folder(title='Folder 1', parent=community1.folder) + session.add(folder) + + folder_other = Folder(title='Folder 2: other', parent=community2.folder) + session.add(folder_other) + + user_no_community = User( + email='no_community@example.com', + can_login=True, + ) + session.add(user_no_community) - # virus free - meta['antivirus'] = True - assert doc.antivirus_scanned is True - assert doc.antivirus_status is True - assert doc.antivirus_required is False - assert doc.antivirus_ok is True + session.commit() + svc = index_service + obj_types = (Folder.entity_type, ) -class IndexingTestCase(CommunityIndexingTestCase): - def test_folder_indexed(self): - folder = Folder(title='Folder 1', parent=self.community.folder) - self.session.add(folder) - folder_other = Folder(title='Folder 2: other', parent=self.c2.folder) - self.session.add(folder_other) - self.session.commit() + with app.test_request_context(): - svc = self.svc - obj_types = (Folder.entity_type, ) - with self.login(self.user_no_community): + with login(user_no_community): res = svc.search('folder', object_types=obj_types) assert len(res) == 0 - with self.login(self.user): + with login(community1.test_user): res = svc.search('folder', object_types=obj_types) assert len(res) == 1 hit = res[0] assert hit['object_key'] == folder.object_key - assert hit['community_slug'] == self.community.slug + assert hit['community_slug'] == community1.slug - with self.login(self.user_c2): + with login(community2.test_user): res = svc.search('folder', object_types=obj_types) assert len(res) == 1 hit = res[0] assert hit['object_key'] == folder_other.object_key - assert hit['community_slug'] == self.c2.slug - - -class TestViews(CommunityIndexingTestCase, BaseTests): - def setUp(self): - super(TestViews, self).setUp() - self.community.type = 'participative' - self.community.set_membership(self.user, WRITER) - self.session.commit() - self.folder = self.community.folder - - def test_util_create(self): - with self.login(self.user): - g.user = self.user - g.community = CommunityPresenter(self.community) - name = 'document' - fs = FileStorage( - BytesIO(b'content'), - filename=name, - content_type='text/plain', - ) - doc = view_util.create_document(self.folder, fs) - self.session.flush() - assert doc.parent == self.folder - assert doc.name == name - - # test upload with same name: should be renamed - fs = FileStorage( - BytesIO(b'content'), - filename=name, - content_type='text/plain', - ) - doc2 = view_util.create_document(self.folder, fs) - self.session.flush() - assert doc2.parent == self.folder - assert len(self.folder.children) == 2 - assert doc2.name == name + '-1' - - messages = get_flashed_messages() - assert len(messages) == 1 - - def test_home(self): - with self.client_login(self.user.email, password='azerty'): - response = self.get( - url_for('documents.index', community_id=self.community.slug), - ) - assert response.status_code == 302 - expected = 'http://localhost/communities/{}/docs/folder/{}' \ - ''.format(self.community.slug, self.folder.id) - assert response.headers['Location'] == expected - - def _test_upload( - self, - title, - content_type, - test_preview=True, - assert_preview_available=True, - ): - data = { - 'file': (self.open_file(title), title, content_type), - 'action': 'upload', - } - - folder = self.community.folder - url = url_for( - "documents.folder_post", - community_id=self.community.slug, - folder_id=folder.id, - ) - response = self.client.post(url, data=data) - assert response.status_code == 302 - - doc = folder.children[0] - assert doc.title == title - - url = url_for( - "documents.document_view", - community_id=self.community.slug, - doc_id=doc.id, - ) - response = self.get(url) - assert response.status_code == 200 - - url = url_for( - "documents.document_download", - community_id=self.community.slug, - doc_id=doc.id, - ) - response = self.get(url) - assert response.status_code == 200 - assert response.headers['Content-Type'] == content_type - - content = self.open_file(title).read() - assert response.data == content - - if test_preview: - url = url_for( - "documents.document_preview_image", - community_id=self.community.slug, - doc_id=doc.id, - size=500, - ) - response = self.get(url) - if assert_preview_available: - assert response.status_code == 200 - assert response.headers['Content-Type'] == 'image/jpeg' - else: - # redirect to 'missing image' - assert response.status_code == 302 - assert response.headers['Cache-Control'] == 'no-cache' - - url = url_for( - "documents.document_delete", - community_id=self.community.slug, - doc_id=doc.id, - ) - response = self.client.post(url) - assert response.status_code == 302 - - url = url_for( - "documents.document_view", - community_id=self.community.slug, - doc_id=doc.id, - ) - response = self.get(url) - assert response.status_code == 404 - - def test_text_upload(self): - name = 'wikipedia-fr.txt' - with self.client_login(self.user.email, password='azerty'): - self._test_upload(name, "text/plain", test_preview=False) - - def test_pdf_upload(self): - name = "onepage.pdf" - with self.client_login(self.user.email, password='azerty'): - self._test_upload(name, "application/pdf") - - def test_image_upload(self): - NAME = "picture.jpg" - with self.client_login(self.user.email, password='azerty'): - self._test_upload(NAME, "image/jpeg") - - @pytest.mark.skip() # FIXME: magic detection mismatch? - def test_binary_upload(self): - NAME = "random.bin" - with self.client_login(self.user.email, password='azerty'): - self._test_upload( - NAME, - "application/octet-stream", - assert_preview_available=False, - ) - - @pytest.mark.skipif( - sys.version_info >= (3, 0), - reason="Doesn't work yet on Py3k", - ) - def test_explore_archive(self): - fd = self.open_file('content.zip') - result = [('/'.join(path), f) for path, f in explore_archive(fd)] - assert result == [('', fd)] - - fd = self.open_file('content.zip') - archive_content = explore_archive(fd, uncompress=True) - result = { - '/'.join(path) + '/' + f.filename - for path, f in archive_content - } - assert result == { - 'existing-doc/file.txt', - 'existing-doc/subfolder_in_renamed/doc.txt', - 'folder 1/doc.txt', - 'folder 1/dos cp437: é.txt', - 'folder 1/osx: utf-8: é.txt', - } - - def test_zip_upload_uncompress(self): - folder = Folder(title='folder 1', parent=self.community.folder) - self.session.add(folder) - self.session.flush() - folder = self.community.folder - files = [] - files.append((BytesIO(b'A document'), 'existing-doc', 'text/plain')) - files.append(( - self.open_file('content.zip'), - 'content.zip', - 'application/zip', - )) - data = {'file': files, 'action': 'upload', 'uncompress_files': True} - url = url_for( - "documents.folder_post", - community_id=self.community.slug, - folder_id=folder.id, - ) - with self.client_login(self.user.email, password='azerty'): - response = self.client.post(url, data=data) - - assert response.status_code == 302 - expected = {'existing-doc', 'folder 1', 'existing-doc-1'} - assert expected == {f.title for f in folder.children} - expected = {'folder 1', 'existing-doc-1'} - assert expected == {f.title for f in folder.subfolders} - - def test_zip(self): - with self.client_login(self.user.email, password='azerty'): - title = "onepage.pdf" - content_type = "application/pdf" - data = { - 'file': (self.open_file(title), title, content_type), - 'action': 'upload', - } - folder = self.community.folder - url = url_for( - "documents.folder_post", - community_id=self.community.slug, - folder_id=folder.id, - ) - response = self.client.post(url, data=data) - assert response.status_code == 302 - - doc = folder.children[0] - data = { - 'action': 'download', - 'object-selected': ["cmis:document:%d" % doc.id], - } - url = url_for( - "documents.folder_post", - community_id=self.community.slug, - folder_id=folder.id, - ) - response = self.client.post(url, data=data) - assert response.status_code == 200 - assert response.content_type == 'application/zip' - - zipfile = ZipFile(BytesIO(response.data)) - assert [zipfile.namelist()[0]] == [title] - - def test_recursive_zip(self): - with self.client_login(self.user.email, password='azerty'): - data = { - 'action': 'new', - 'title': 'my folder', - } - folder = self.community.folder - url = url_for( - "documents.folder_post", - community_id=self.community.slug, - folder_id=folder.id, - ) - response = self.client.post(url, data=data) - assert response.status_code == 302 - - my_folder = folder.children[0] - - title = "onepage.pdf" - content_type = "application/pdf" - data = { - 'file': (self.open_file(title), title, content_type), - 'action': 'upload', - } - url = url_for( - "documents.folder_post", - community_id=self.community.slug, - folder_id=my_folder.id, - ) - response = self.client.post(url, data=data) - assert response.status_code == 302 - - data = { - 'action': 'download', - 'object-selected': ["cmis:folder:%d" % my_folder.id], - } - url = url_for( - "documents.folder_post", - community_id=self.community.slug, - folder_id=folder.id, - ) - response = self.client.post(url, data=data) - assert response.status_code == 200 - assert response.content_type == 'application/zip' - - zipfile = ZipFile(BytesIO(response.data)) - assert zipfile.namelist() == ['my folder/' + title] - - def test_document_send_by_mail(self): - mail = self.app.extensions['mail'] - folder = self.community.folder - - with self.client_login(self.user.email, password='azerty'): - # upload files - for filename in ('ascii title.txt', 'utf-8 est arrivé!.txt'): - content_type = "text/plain" - data = { - 'file': (BytesIO(b'file content'), filename, content_type), - 'action': 'upload', - } - url = url_for( - "documents.folder_post", - community_id=self.community.slug, - folder_id=folder.id, - ) - self.client.post(url, data=data) - - ascii_doc = folder.children[0] - unicode_doc = folder.children[1] - - def get_send_url(doc_id): - return url_for( - 'documents.document_send', - community_id=self.community.slug, - doc_id=doc_id, - ) - - # mail ascii filename - with mail.record_messages() as outbox: - url = get_send_url(ascii_doc.id) - response = self.client.post( - url, - data={ - 'recipient': 'dest@example.com', - 'message': 'Voilà un fichier', - }, - ) - assert response.status_code == 302 - assert len(outbox) == 1 - - msg = outbox[0] - assert msg.subject == '[Abilian Test] Unknown sent you a file' - assert msg.recipients == ['dest@example.com'] - - assert len(msg.attachments) == 1 - - attachment = first(msg.attachments) - assert isinstance(attachment, flask_mail.Attachment) - assert attachment.filename == "ascii title.txt" - - # mail unicode filename - with mail.record_messages() as outbox: - url = get_send_url(unicode_doc.id) - response = self.client.post( - url, - data={ - 'recipient': 'dest@example.com', - 'message': 'Voilà un fichier', - }, - ) - assert response.status_code == 302 - assert len(outbox) == 1 - - msg = outbox[0] - assert isinstance(msg, flask_mail.Message) - assert msg.subject == '[Abilian Test] Unknown sent you a file' - assert msg.recipients == ['dest@example.com'] - assert len(msg.attachments) == 1 - - attachment = first(msg.attachments) - assert isinstance(attachment, flask_mail.Attachment) - assert attachment.filename == "utf-8 est arrivé!.txt" - - -class TestPathIndexable(unittest.TestCase): - class MockPath(PathAndSecurityIndexable): - def __init__(self, id, parent=None): - self.id = id - self.parent = parent - - def setUp(self): - id_gen = count() - obj = self.MockPath(next(id_gen)) - obj = self.MockPath(next(id_gen), parent=obj) - obj = self.MockPath(next(id_gen), parent=obj) - self.obj = self.MockPath(next(id_gen), parent=obj) - - def test_iter_to_root(self): - assert [o.id for o in self.obj._iter_to_root()] == [3, 2, 1, 0] - assert [o.id for o in self.obj._iter_to_root(skip_self=True)] == [ - 2, - 1, - 0, - ] - - def test_indexable_parent_ids(self): - assert self.obj._indexable_parent_ids == '/0/1/2' + assert hit['community_slug'] == community2.slug diff --git a/abilian/sbe/apps/documents/tests/test_documents_web.py b/abilian/sbe/apps/documents/tests/test_documents_web.py new file mode 100644 index 00000000..beb08a75 --- /dev/null +++ b/abilian/sbe/apps/documents/tests/test_documents_web.py @@ -0,0 +1,432 @@ +# coding=utf-8 +"""""" +from __future__ import absolute_import, division, print_function, \ + unicode_literals + +import sys +import unittest +from io import BytesIO +from itertools import count +from pathlib import Path +from zipfile import ZipFile + +import flask_mail +import pytest +from abilian.web.util import url_for +from flask import g, get_flashed_messages +from toolz import first +from werkzeug.datastructures import FileStorage + +from abilian.sbe.apps.communities.models import WRITER +from abilian.sbe.apps.communities.presenters import CommunityPresenter +from abilian.sbe.apps.communities.tests.base import CommunityBaseTestCase, \ + CommunityIndexingTestCase + +from ..models import Folder, PathAndSecurityIndexable, db +from ..views import util as view_util +from ..views.folders import explore_archive + + +def open_file(filename): + path = Path(__file__).parent / "data" / "dummy_files" / filename + return path.open('rb') + + +class BaseTests(CommunityBaseTestCase): + no_login = True + + def setUp(self): + super(BaseTests, self).setUp() + + self.root_folder = Folder(title="root") + db.session.add(self.root_folder) + db.session.flush() + + @staticmethod + def uid_from_url(url): + return int(url.split("/")[-1]) + + @staticmethod + def open_file(filename): + return open_file(filename) + + +class TestViews(CommunityIndexingTestCase, BaseTests): + def setUp(self): + super(TestViews, self).setUp() + self.community.type = 'participative' + self.community.set_membership(self.user, WRITER) + self.session.commit() + self.folder = self.community.folder + + def test_util_create(self): + with self.login(self.user): + g.user = self.user + g.community = CommunityPresenter(self.community) + name = 'document' + fs = FileStorage( + BytesIO(b'content'), + filename=name, + content_type='text/plain', + ) + doc = view_util.create_document(self.folder, fs) + self.session.flush() + assert doc.parent == self.folder + assert doc.name == name + + # test upload with same name: should be renamed + fs = FileStorage( + BytesIO(b'content'), + filename=name, + content_type='text/plain', + ) + doc2 = view_util.create_document(self.folder, fs) + self.session.flush() + assert doc2.parent == self.folder + assert len(self.folder.children) == 2 + assert doc2.name == name + '-1' + + messages = get_flashed_messages() + assert len(messages) == 1 + + def test_home(self): + with self.client_login(self.user.email, password='azerty'): + response = self.get( + url_for('documents.index', community_id=self.community.slug), + ) + assert response.status_code == 302 + expected = 'http://localhost/communities/{}/docs/folder/{}' \ + ''.format(self.community.slug, self.folder.id) + assert response.headers['Location'] == expected + + def _test_upload( + self, + title, + content_type, + test_preview=True, + assert_preview_available=True, + ): + data = { + 'file': (self.open_file(title), title, content_type), + 'action': 'upload', + } + + folder = self.community.folder + url = url_for( + "documents.folder_post", + community_id=self.community.slug, + folder_id=folder.id, + ) + response = self.client.post(url, data=data) + assert response.status_code == 302 + + doc = folder.children[0] + assert doc.title == title + + url = url_for( + "documents.document_view", + community_id=self.community.slug, + doc_id=doc.id, + ) + response = self.get(url) + assert response.status_code == 200 + + url = url_for( + "documents.document_download", + community_id=self.community.slug, + doc_id=doc.id, + ) + response = self.get(url) + assert response.status_code == 200 + assert response.headers['Content-Type'] == content_type + + content = self.open_file(title).read() + assert response.data == content + + if test_preview: + url = url_for( + "documents.document_preview_image", + community_id=self.community.slug, + doc_id=doc.id, + size=500, + ) + response = self.get(url) + if assert_preview_available: + assert response.status_code == 200 + assert response.headers['Content-Type'] == 'image/jpeg' + else: + # redirect to 'missing image' + assert response.status_code == 302 + assert response.headers['Cache-Control'] == 'no-cache' + + url = url_for( + "documents.document_delete", + community_id=self.community.slug, + doc_id=doc.id, + ) + response = self.client.post(url) + assert response.status_code == 302 + + url = url_for( + "documents.document_view", + community_id=self.community.slug, + doc_id=doc.id, + ) + response = self.get(url) + assert response.status_code == 404 + + def test_text_upload(self): + name = 'wikipedia-fr.txt' + with self.client_login(self.user.email, password='azerty'): + self._test_upload(name, "text/plain", test_preview=False) + + def test_pdf_upload(self): + name = "onepage.pdf" + with self.client_login(self.user.email, password='azerty'): + self._test_upload(name, "application/pdf") + + def test_image_upload(self): + NAME = "picture.jpg" + with self.client_login(self.user.email, password='azerty'): + self._test_upload(NAME, "image/jpeg") + + @pytest.mark.skip() # FIXME: magic detection mismatch? + def test_binary_upload(self): + NAME = "random.bin" + with self.client_login(self.user.email, password='azerty'): + self._test_upload( + NAME, + "application/octet-stream", + assert_preview_available=False, + ) + + @pytest.mark.skipif( + sys.version_info >= (3, 0), + reason="Doesn't work yet on Py3k", + ) + def test_explore_archive(self): + fd = self.open_file('content.zip') + result = [('/'.join(path), f) for path, f in explore_archive(fd)] + assert result == [('', fd)] + + fd = self.open_file('content.zip') + archive_content = explore_archive(fd, uncompress=True) + result = { + '/'.join(path) + '/' + f.filename + for path, f in archive_content + } + assert result == { + 'existing-doc/file.txt', + 'existing-doc/subfolder_in_renamed/doc.txt', + 'folder 1/doc.txt', + 'folder 1/dos cp437: é.txt', + 'folder 1/osx: utf-8: é.txt', + } + + def test_zip_upload_uncompress(self): + folder = Folder(title='folder 1', parent=self.community.folder) + self.session.add(folder) + self.session.flush() + folder = self.community.folder + files = [] + files.append((BytesIO(b'A document'), 'existing-doc', 'text/plain')) + files.append(( + self.open_file('content.zip'), + 'content.zip', + 'application/zip', + )) + data = {'file': files, 'action': 'upload', 'uncompress_files': True} + url = url_for( + "documents.folder_post", + community_id=self.community.slug, + folder_id=folder.id, + ) + with self.client_login(self.user.email, password='azerty'): + response = self.client.post(url, data=data) + + assert response.status_code == 302 + expected = {'existing-doc', 'folder 1', 'existing-doc-1'} + assert expected == {f.title for f in folder.children} + expected = {'folder 1', 'existing-doc-1'} + assert expected == {f.title for f in folder.subfolders} + + def test_zip(self): + with self.client_login(self.user.email, password='azerty'): + title = "onepage.pdf" + content_type = "application/pdf" + data = { + 'file': (self.open_file(title), title, content_type), + 'action': 'upload', + } + folder = self.community.folder + url = url_for( + "documents.folder_post", + community_id=self.community.slug, + folder_id=folder.id, + ) + response = self.client.post(url, data=data) + assert response.status_code == 302 + + doc = folder.children[0] + data = { + 'action': 'download', + 'object-selected': ["cmis:document:%d" % doc.id], + } + url = url_for( + "documents.folder_post", + community_id=self.community.slug, + folder_id=folder.id, + ) + response = self.client.post(url, data=data) + assert response.status_code == 200 + assert response.content_type == 'application/zip' + + zipfile = ZipFile(BytesIO(response.data)) + assert [zipfile.namelist()[0]] == [title] + + def test_recursive_zip(self): + with self.client_login(self.user.email, password='azerty'): + data = { + 'action': 'new', + 'title': 'my folder', + } + folder = self.community.folder + url = url_for( + "documents.folder_post", + community_id=self.community.slug, + folder_id=folder.id, + ) + response = self.client.post(url, data=data) + assert response.status_code == 302 + + my_folder = folder.children[0] + + title = "onepage.pdf" + content_type = "application/pdf" + data = { + 'file': (self.open_file(title), title, content_type), + 'action': 'upload', + } + url = url_for( + "documents.folder_post", + community_id=self.community.slug, + folder_id=my_folder.id, + ) + response = self.client.post(url, data=data) + assert response.status_code == 302 + + data = { + 'action': 'download', + 'object-selected': ["cmis:folder:%d" % my_folder.id], + } + url = url_for( + "documents.folder_post", + community_id=self.community.slug, + folder_id=folder.id, + ) + response = self.client.post(url, data=data) + assert response.status_code == 200 + assert response.content_type == 'application/zip' + + zipfile = ZipFile(BytesIO(response.data)) + assert zipfile.namelist() == ['my folder/' + title] + + def test_document_send_by_mail(self): + mail = self.app.extensions['mail'] + folder = self.community.folder + + with self.client_login(self.user.email, password='azerty'): + # upload files + for filename in ('ascii title.txt', 'utf-8 est arrivé!.txt'): + content_type = "text/plain" + data = { + 'file': (BytesIO(b'file content'), filename, content_type), + 'action': 'upload', + } + url = url_for( + "documents.folder_post", + community_id=self.community.slug, + folder_id=folder.id, + ) + self.client.post(url, data=data) + + ascii_doc = folder.children[0] + unicode_doc = folder.children[1] + + def get_send_url(doc_id): + return url_for( + 'documents.document_send', + community_id=self.community.slug, + doc_id=doc_id, + ) + + # mail ascii filename + with mail.record_messages() as outbox: + url = get_send_url(ascii_doc.id) + response = self.client.post( + url, + data={ + 'recipient': 'dest@example.com', + 'message': 'Voilà un fichier', + }, + ) + assert response.status_code == 302 + assert len(outbox) == 1 + + msg = outbox[0] + assert msg.subject == '[Abilian Test] Unknown sent you a file' + assert msg.recipients == ['dest@example.com'] + + assert len(msg.attachments) == 1 + + attachment = first(msg.attachments) + assert isinstance(attachment, flask_mail.Attachment) + assert attachment.filename == "ascii title.txt" + + # mail unicode filename + with mail.record_messages() as outbox: + url = get_send_url(unicode_doc.id) + response = self.client.post( + url, + data={ + 'recipient': 'dest@example.com', + 'message': 'Voilà un fichier', + }, + ) + assert response.status_code == 302 + assert len(outbox) == 1 + + msg = outbox[0] + assert isinstance(msg, flask_mail.Message) + assert msg.subject == '[Abilian Test] Unknown sent you a file' + assert msg.recipients == ['dest@example.com'] + assert len(msg.attachments) == 1 + + attachment = first(msg.attachments) + assert isinstance(attachment, flask_mail.Attachment) + assert attachment.filename == "utf-8 est arrivé!.txt" + + +class TestPathIndexable(unittest.TestCase): + class MockPath(PathAndSecurityIndexable): + def __init__(self, id, parent=None): + self.id = id + self.parent = parent + + def setUp(self): + id_gen = count() + obj = self.MockPath(next(id_gen)) + obj = self.MockPath(next(id_gen), parent=obj) + obj = self.MockPath(next(id_gen), parent=obj) + self.obj = self.MockPath(next(id_gen), parent=obj) + + def test_iter_to_root(self): + assert [o.id for o in self.obj._iter_to_root()] == [3, 2, 1, 0] + assert [o.id for o in self.obj._iter_to_root(skip_self=True)] == [ + 2, + 1, + 0, + ] + + def test_indexable_parent_ids(self): + assert self.obj._indexable_parent_ids == '/0/1/2' diff --git a/abilian/sbe/apps/forum/tests/test_integration_new.py b/abilian/sbe/apps/forum/tests/test_integration_new.py index a2fe9aa2..6eb5ab4e 100644 --- a/abilian/sbe/apps/forum/tests/test_integration_new.py +++ b/abilian/sbe/apps/forum/tests/test_integration_new.py @@ -7,7 +7,7 @@ from unittest import TestCase from abilian.core.models.subjects import User -from abilian.testing.util import login +from abilian.testing.util import client_login from flask import url_for from flask_login import login_user from mock import Mock, patch @@ -98,7 +98,7 @@ def test_create_thread_informative(app, db, client, community1, req_ctx): data['__action'] = "create" mail = app.extensions['mail'] - with login(client, user): + with client_login(client, user): with mail.record_messages() as outbox: data['send_by_email'] = "y" # actually should not be in html form response = client.post(url, data=data) diff --git a/abilian/sbe/apps/wiki/tests/test_web_classic.py b/abilian/sbe/apps/wiki/tests/test_web_classic.py index b469f2f6..ee7299bd 100644 --- a/abilian/sbe/apps/wiki/tests/test_web_classic.py +++ b/abilian/sbe/apps/wiki/tests/test_web_classic.py @@ -20,8 +20,10 @@ def setUp(self): def test_wiki_indexed(self): page = WikiPage(title='Community 1', community=self.community) self.session.add(page) + page_other = WikiPage(title='Community 2: other', community=self.c2) self.session.add(page_other) + self.session.commit() svc = self.svc diff --git a/abilian/sbe/apps/wiki/tests/test_web_new.py b/abilian/sbe/apps/wiki/tests/test_web_new.py index c1bc908c..7a609d08 100644 --- a/abilian/sbe/apps/wiki/tests/test_web_new.py +++ b/abilian/sbe/apps/wiki/tests/test_web_new.py @@ -3,7 +3,7 @@ import re -from abilian.testing.util import login +from abilian.testing.util import client_login from flask import g, url_for from pytest import mark from toolz import first @@ -43,7 +43,7 @@ def test_create_page(client, community1, req_ctx): community = community1 user = community.test_user - with login(client, user): + with client_login(client, user): title = 'Some page name' url = url_for("wiki.page_new", community_id=community.slug) url += '?title=Some+page+name'