Skip to content
This repository has been archived by the owner on Nov 5, 2019. It is now read-only.

Commit

Permalink
Add multiprocess file storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Wayne Parrott committed May 20, 2016
1 parent f322ef9 commit 3067a17
Show file tree
Hide file tree
Showing 3 changed files with 555 additions and 0 deletions.
261 changes: 261 additions & 0 deletions oauth2client/contrib/multiprocess_file_storage.py
@@ -0,0 +1,261 @@
# Copyright 2016 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""File-based storage that supports multiple credentials and cross-process
access.
This module supersedes the functionality previously found in `multistore_file`.
This module provides the MultiprocessFileStorage class that:
* Is tied to a single credential via a user-specified key. This key can be used
to distinguish between multiple users, client ids, and/or scopes.
* Can be safely accessed and refreshed across threads and processes.
Process & thread safety guarantees the following behavior:
* If one process refreshes a credential, subsequent refreshes from other
processes will re-fetch the credentials from the file instead of performing
an http request.
* If two processes attempt to refresh concurrently, only one process will be
able to acquire the lock and refresh, with the deadlock caveat below.
* The interprocess lock will not deadlock, instead, the if a process can not
acquire the interprocess lock within INTERPROCESS_LOCK_DEADLINE it will
allow refreshing the credential but will not write the updated credential to
disk, This logic happens during every lock cycle - if the credentials are
refreshed again it will retry locking and writing as normal.
"""

import base64
import json
import logging
import os
import threading

import fasteners
from six import iteritems

from oauth2client.client import Credentials
from oauth2client.client import Storage as BaseStorage

INTERPROCESS_LOCK_DEADLINE = 1
logger = logging.getLogger(__name__)
_backends = {}
_backends_lock = threading.Lock()


def _get_backend(filename):
"""A helper method to get or create a backend with thread locking.
There should only be one backend per-file per-process."""

# This method prevents race conditions.
try:
return _backends[filename]
except KeyError:
pass

with _backends_lock:
backend = _MultiprocessStorageBackend(filename)
_backends[filename] = backend

return backend


class _MultiprocessStorageBackend(object):

def __init__(self, filename):
self._file = None
self._filename = filename
self._process_lock = fasteners.InterProcessLock(
'{}.lock'.format(filename))
self._thread_lock = threading.Lock()
self._read_only = False
self._credentials = {}

def _create_file_if_needed(self):
"""Creates the an empty credential storage file if it does not
exist."""
if self._read_only:
return False

if not os.path.exists(self._filename):
open(self._filename, 'a+b').close()
logging.info('Credential file {} created'.format(self._filename))
return False

return True

def _load_credentials(self):
"""(Re-)loads the credentials from the file."""
if not self._create_file_if_needed():
return

self._credentials.update(self._load_credentials_file(self._file))

logger.debug('Read credential file')

def _load_credentials_file(self, fh):
credentials = {}

try:
fh.seek(0)
data = json.load(fh)
except Exception:
logger.warning(
'Credentials file could not be loaded, will ignore and '
'overwrite.')
return credentials

if data.get('file_version') != 2:
data = {}
logger.warning(
'Credentials file is not version 2, will ignore and '
'overwrite.')
return credentials

for key, encoded_credential in iteritems(data.get('credentials', {})):
try:
credential_json = base64.b64decode(encoded_credential)
credential = Credentials.new_from_json(credential_json)
credentials[key] = credential
except:
logger.warning(
'Invalid credential {} in file, ignoring.'.format(key))

return credentials

def _write_credentials(self):
if self._read_only:
logger.debug('In read-only mode, not writing credentials.')
return

self._create_file_if_needed()
self._write_credentials_file(self._file, self._credentials)
logger.debug('Wrote credential file {}.'.format(self._filename))

def _write_credentials_file(self, fh, credentials):
data = {'file_version': 2, 'credentials': {}}

for key, credential in iteritems(credentials):
credential_json = credential.to_json()
encoded_credential = base64.b64encode(credential_json)
data['credentials'][key] = encoded_credential

fh.seek(0)
json.dump(data, fh)
fh.truncate()

def acquire_lock(self):
self._thread_lock.acquire()
locked = self._process_lock.acquire(timeout=INTERPROCESS_LOCK_DEADLINE)

self._create_file_if_needed()

if locked:
self._file = open(self._filename, 'r+')
self._read_only = False

else:
self._read_only = True
logger.warn(
'Failed to obtain interprocess lock for credentials. '
'If a credential is being refreshed, other processes may '
'not see the updated access token and refresh as well.')
self._file = open(self._filename, 'r')

self._load_credentials()

def release_lock(self):
if self._file is not None:
self._file.close()
self._file = None

if not self._read_only:
self._process_lock.release()

self._thread_lock.release()

def _refresh_predicate(self, credentials):
if credentials is None:
return True
if credentials.invalid:
return True
if credentials.access_token_expired:
return True
return False

def locked_get(self, key):
# Check if the credential is already in memory.
credentials = self._credentials.get(key, None)

# Use the refresh predicate to determine if the entire store should be
# reloaded. This basically checks if the credentials are invalid
# or expired. This covers the situation where another process has
# refreshed the credentials and this process doesn't know about it yet.
# In that case, this process won't needlessly refresh the credentials.
if self._refresh_predicate(credentials):
self._load_credentials()
credentials = self._credentials.get(key, None)

return credentials

def locked_put(self, key, credentials):
self._load_credentials()
self._credentials[key] = credentials
self._write_credentials()

def locked_delete(self, key):
self._load_credentials()
try:
del self._credentials[key]
except KeyError:
pass
self._write_credentials()


class MultiprocessFileStorage(BaseStorage):
def __init__(self, filename, key):
self._key = key
self._backend = _get_backend(filename)

def acquire_lock(self):
self._backend.acquire_lock()

def release_lock(self):
self._backend.release_lock()

def locked_get(self):
"""Retrieves the current credentials from the store.
Returns:
oauth2client.client.Credentials or None
"""
credential = self._backend.locked_get(self._key)

if credential:
credential.set_store(self)

return credential

def locked_put(self, credentials):
"""Writes the given credentials to the store.
Args:
credentials: an oauth2client.client.Credentials object.
"""
return self._backend.locked_put(self._key, credentials)

def locked_delete(self):
"""Deletes the current credentials from the store."""
return self._backend.locked_delete(self._key)

0 comments on commit 3067a17

Please sign in to comment.