Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sync continuous #739

Merged
merged 10 commits into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 2 additions & 57 deletions securedrop_client/api_jobs/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.db import File, Message, Reply
from securedrop_client.storage import mark_as_decrypted, mark_as_downloaded, \
set_message_or_reply_content, get_remote_data, update_local_storage
set_message_or_reply_content


logger = logging.getLogger(__name__)

Expand All @@ -31,62 +32,6 @@ def __init__(self, message: str,
self.uuid = uuid


class MetadataSyncJob(ApiJob):
'''
Update source metadata such that new download jobs can be added to the queue.
'''

NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL = 15

def __init__(self, data_dir: str, gpg: GpgHelper) -> None:
super().__init__(remaining_attempts=self.NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL)
self.data_dir = data_dir
self.gpg = gpg

def call_api(self, api_client: API, session: Session) -> Any:
'''
Override ApiJob.

Download new metadata, update the local database, import new keys, and
then the success signal will let the controller know to add any new download
jobs.
'''

# TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to
# pass the default request timeout to api calls instead of setting it on the api object
# directly.
api_client.default_request_timeout = 40
remote_sources, remote_submissions, remote_replies = \
get_remote_data(api_client)

update_local_storage(session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

fingerprints = self.gpg.fingerprints()
for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
fingerprint = source.key.get('fingerprint', None)
if not pub_key or not fingerprint:
# The below line needs to be excluded from the coverage computation
# as it will show as uncovered due to a cpython compiler optimziation.
# See: https://bugs.python.org/issue2506
continue # pragma: no cover

if fingerprint in fingerprints:
logger.debug("Skipping import of key with fingerprint {}".format(fingerprint))
continue

try:
logger.debug("Importing key with fingerprint {}".format(fingerprint))
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))


class DownloadJob(ApiJob):
'''
Download and decrypt a file that contains either a message, reply, or file submission.
Expand Down
67 changes: 67 additions & 0 deletions securedrop_client/api_jobs/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Any
import logging

from sdclientapi import API
from sqlalchemy.orm.session import Session

from securedrop_client.api_jobs.base import ApiJob
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.storage import get_remote_data, update_local_storage


logger = logging.getLogger(__name__)


class MetadataSyncJob(ApiJob):
'''
Update source metadata such that new download jobs can be added to the queue.
'''

NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL = 2

def __init__(self, data_dir: str, gpg: GpgHelper) -> None:
super().__init__(remaining_attempts=self.NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL)
self.data_dir = data_dir
self.gpg = gpg

def call_api(self, api_client: API, session: Session) -> Any:
'''
Override ApiJob.

Download new metadata, update the local database, import new keys, and
then the success signal will let the controller know to add any new download
jobs.
'''

# TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to
# pass the default request timeout to api calls instead of setting it on the api object
# directly.
api_client.default_request_timeout = 40
remote_sources, remote_submissions, remote_replies = get_remote_data(api_client)

update_local_storage(session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

fingerprints = self.gpg.fingerprints()
for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
fingerprint = source.key.get('fingerprint', None)
if not pub_key or not fingerprint:
# The below line needs to be excluded from the coverage computation
# as it will show as uncovered due to a cpython compiler optimziation.
# See: https://bugs.python.org/issue2506
continue # pragma: no cover

if fingerprint in fingerprints:
logger.debug("Skipping import of key with fingerprint {}".format(fingerprint))
continue

try:
logger.debug("Importing key with fingerprint {}".format(fingerprint))
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))
53 changes: 14 additions & 39 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
from securedrop_client import db
from securedrop_client.api_jobs.base import ApiInaccessibleError
from securedrop_client.api_jobs.downloads import FileDownloadJob, MessageDownloadJob, \
ReplyDownloadJob, DownloadChecksumMismatchException, MetadataSyncJob
ReplyDownloadJob, DownloadChecksumMismatchException
from securedrop_client.api_jobs.sources import DeleteSourceJob
from securedrop_client.api_jobs.uploads import SendReplyJob, SendReplyJobError, \
SendReplyJobTimeoutError
from securedrop_client.api_jobs.updatestar import UpdateStarJob, UpdateStarJobException
from securedrop_client.crypto import GpgHelper
from securedrop_client.export import Export
from securedrop_client.queue import ApiJobQueue
from securedrop_client.sync import ApiSync
from securedrop_client.utils import check_dir_permissions

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -194,6 +195,12 @@ def __init__(self, hostname: str, gui, session_maker: sessionmaker,
# File data.
self.data_dir = os.path.join(self.home, 'data')

# Background sync to keep client up-to-date with server changes
self.api_sync = ApiSync(self.api, self.session_maker, self.gpg, self.data_dir)
self.api_sync.sync_started.connect(self.on_sync_started, type=Qt.QueuedConnection)
self.api_sync.sync_success.connect(self.on_sync_success, type=Qt.QueuedConnection)
self.api_sync.sync_failure.connect(self.on_sync_failure, type=Qt.QueuedConnection)

@property
def is_authenticated(self) -> bool:
return self.__is_authenticated
Expand Down Expand Up @@ -226,11 +233,6 @@ def setup(self):
self.sync_timer.timeout.connect(self.update_sync)
self.sync_timer.start(30000)

# Automagically sync with the API every minute.
self.sync_update = QTimer()
self.sync_update.timeout.connect(self.sync_api)
self.sync_update.start(1000 * 60) # every minute.

# Run export object in a separate thread context (a reference to the
# thread is kept on self such that it does not get garbage collected
# after this method returns) - we want to keep our export thread around for
Expand Down Expand Up @@ -280,20 +282,6 @@ def call_api(self,
new_api_thread.start()

def on_queue_paused(self) -> None:
# TODO: remove if block once https://github.com/freedomofpress/securedrop-client/pull/739
# is merged and rely on continuous metadata sync to encounter same auth error from the
# server which will log the user out in the on_sync_failure handler
if (
not self.api or
not self.api_job_queue.main_queue.api_client or
not self.api_job_queue.download_file_queue.api_client or
not self.api_job_queue.metadata_queue.api_client
):
self.invalidate_token()
self.logout()
self.gui.show_login(error=_('Your session expired. Please log in again.'))
return

self.gui.update_error_status(
_('The SecureDrop server cannot be reached.'),
duration=0,
Expand Down Expand Up @@ -350,7 +338,7 @@ def on_authenticate_success(self, result):
self.gui.show_main_window(user)
self.update_sources()
self.api_job_queue.login(self.api)
self.sync_api()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where we used to call sync_api on login which i replaced with starting the background sync thread

self.api_sync.start(self.api)
self.is_authenticated = True
self.resume_queues()

Expand All @@ -359,6 +347,7 @@ def on_authenticate_failure(self, result: Exception) -> None:
self.invalidate_token()
error = _('There was a problem signing in. Please verify your credentials and try again.')
self.gui.show_login_error(error=error)
self.api_sync.stop()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we log out we need to make sure the background sync thread does not continue making api requests to get new data from the server


def login_offline_mode(self):
"""
Expand All @@ -384,24 +373,6 @@ def authenticated(self):
"""
return bool(self.api and self.api.token is not None)

def sync_api(self):
"""
Grab data from the remote SecureDrop API in a non-blocking manner.
"""
logger.debug("In sync_api on thread {}".format(self.thread().currentThreadId()))
if self.authenticated():
self.sync_events.emit('syncing')
logger.debug("You are authenticated, going to make your call")

job = MetadataSyncJob(self.data_dir, self.gpg)
job.success_signal.connect(self.on_sync_success, type=Qt.QueuedConnection)
job.failure_signal.connect(self.on_sync_failure, type=Qt.QueuedConnection)

self.api_job_queue.enqueue(job)

logger.debug("In sync_api, after call to submit job to queue, on "
"thread {}".format(self.thread().currentThreadId()))

def last_sync(self):
"""
Returns the time of last synchronisation with the remote SD server.
Expand All @@ -412,6 +383,9 @@ def last_sync(self):
except Exception:
return None

def on_sync_started(self) -> None:
self.sync_events.emit('syncing')

def on_sync_success(self) -> None:
"""
Called when syncronisation of data via the API queue succeeds.
Expand Down Expand Up @@ -506,6 +480,7 @@ def logout(self):
for failed_reply in failed_replies:
self.reply_failed.emit(failed_reply.uuid)

self.api_sync.stop()
self.api_job_queue.logout()
self.gui.logout()

Expand Down
Loading