From f682f1881be456aff6b33cc8179c1ba8e87bc507 Mon Sep 17 00:00:00 2001 From: Bochun Zhang Date: Tue, 2 Aug 2016 17:34:00 -0700 Subject: [PATCH] Add revoke command. The revoke command revokes and deletes the tokens currently cached locally. --- oauth2l/__init__.py | 45 ++++++++++++++++- oauth2l/oauth2l_test.py | 104 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 1 deletion(-) diff --git a/oauth2l/__init__.py b/oauth2l/__init__.py index 808cbde1..457b86d9 100644 --- a/oauth2l/__init__.py +++ b/oauth2l/__init__.py @@ -56,6 +56,7 @@ from __future__ import print_function import argparse +import httplib2 import json import os import sys @@ -66,7 +67,6 @@ else: import http.client as http_client # pragma: NO COVER -import httplib2 from oauth2client import client from oauth2client import service_account from oauth2client import tools @@ -343,6 +343,40 @@ def _Info(args): print(_CompactJson(tokeninfo)) +def _Revoke(args, client_info=None, credentials_filename=None): + """Revokes the access token created from the json secret.""" + client_secrets, service_account_json_keyfile = _ProcessJsonArg(args) + scopes = _ExpandScopes(args.scope) + if not scopes: + raise ValueError('No scopes provided') + joined_scopes = ' '.join(sorted(scopes)) + credential_store = None + + if service_account_json_keyfile: + with open(service_account_json_keyfile, 'r') as json_keyfile_obj: + client_credentials = json.load(json_keyfile_obj) + credential_store = _GetCredentialStore(credentials_filename, + client_credentials['private_key_id'], joined_scopes) + + elif client_secrets: + client_info = GetClientInfoFromFile(client_secrets) + credential_store = _GetCredentialStore(credentials_filename, + client_info['client_id'], joined_scopes) + else: + client_info = client_info or GetDefaultClientInfo() + credential_store = _GetCredentialStore(credentials_filename, + client_info['client_id'], joined_scopes) + + credentials = credential_store.get() + if credentials: + credentials.revoke(httplib2.Http()) + credential_store.delete() + print('Successfully revoked token.') + + return 0 + + + def _Test(args): """Test an access token. Exits with 0 if valid, 1 otherwise.""" return 1 - (_TestToken(args.access_token)) @@ -401,6 +435,15 @@ def _GetParser(): 'access_token', help=('Info for this token will be printed.')) + # revoke + revoke = subparsers.add_parser('revoke', help=_Revoke.__doc__, + parents=[shared_flags]) + revoke.set_defaults(func=_Revoke) + revoke.add_argument( + 'scope', + nargs='*', + help='Scope of the token. May be provided multiple times.') + # test test = subparsers.add_parser('test', help=_Test.__doc__, parents=[shared_flags]) diff --git a/oauth2l/oauth2l_test.py b/oauth2l/oauth2l_test.py index 2d4efed0..33809ef8 100644 --- a/oauth2l/oauth2l_test.py +++ b/oauth2l/oauth2l_test.py @@ -524,3 +524,107 @@ def testCachedInvalid(self, mock_storage, mock_flow): credentials.invalid = True output = _GetCommandOutput('fetch', ['fake.scope']) self.assertIn('Failed to fetch credentials', output) + + +class TestRevoke(unittest.TestCase): + def setUp(self): + # Set up an access token to use + self.access_token = 'ya29.abdefghijklmnopqrstuvwxyz' + self.user_agent = 'oauth2l/1.0' + + credentials = mock.MagicMock() + self.mock_credentials = credentials.start() + self.addCleanup(credentials.stop) + self.mock_credentials.revoke = self.mock_revoke = mock.MagicMock() + + mock_get_store = mock.patch.object(oauth2l, '_GetCredentialStore', + autospec=True) + self.mock_get_store = mock_get_store.start() + self.addCleanup(mock_get_store.stop) + + mock_store = mock.patch('oauth2client.contrib.multiprocess_file_storage' + '.MultiprocessFileStorage', autospec=True) + self.mock_store = mock_store.start() + self.addCleanup(mock_store.stop) + + self.mock_get_store.return_value = self.mock_store + + def testServiceAccountCacheHit(self): + self.mock_store.get.return_value = self.mock_credentials + + service_account_path = os.path.join( + os.path.dirname(__file__), 'testdata/fake_service_account.json') + revoke_args = ['--json=' + service_account_path, 'userinfo.email'] + output = _GetCommandOutput('revoke', revoke_args) + self.assertIn('Successfully revoked token.', output) + self.assertEqual(1, self.mock_revoke.call_count) + self.mock_get_store.assert_called_once_with(None, 'abc', + 'https://www.googleapis.com/auth/userinfo.email') + + def testServiceAccountCacheMiss(self): + self.mock_store.get.return_value = None + + service_account_path = os.path.join( + os.path.dirname(__file__), 'testdata/fake_service_account.json') + revoke_args = ['--json=' + service_account_path, 'userinfo.email'] + output = _GetCommandOutput('revoke', revoke_args) + self.assertNotIn('Successfully revoked token.', output) + self.assertEqual(0, self.mock_revoke.call_count) + self.mock_get_store.assert_called_once_with(None, 'abc', + 'https://www.googleapis.com/auth/userinfo.email') + + def testClientSecretsCacheHit(self): + self.mock_store.get.return_value = self.mock_credentials + + service_account_path = os.path.join( + os.path.dirname(__file__), 'testdata/fake_client_secrets.json') + revoke_args = ['--json=' + service_account_path, 'userinfo.email'] + output = _GetCommandOutput('revoke', revoke_args) + self.assertIn('Successfully revoked token.', output) + self.assertEqual(1, self.mock_revoke.call_count) + self.mock_get_store.assert_called_once_with(None, + '144169.apps.googleusercontent.com', + 'https://www.googleapis.com/auth/userinfo.email') + + def testClientSecretsCacheMiss(self): + self.mock_store.get.return_value = None + + service_account_path = os.path.join( + os.path.dirname(__file__), 'testdata/fake_client_secrets.json') + revoke_args = ['--json=' + service_account_path, 'userinfo.email'] + output = _GetCommandOutput('revoke', revoke_args) + self.assertNotIn('Successfully revoked token.', output) + self.assertEqual(0, self.mock_revoke.call_count) + self.mock_get_store.assert_called_once_with(None, + '144169.apps.googleusercontent.com', + 'https://www.googleapis.com/auth/userinfo.email') + + def testDefaultClientInfoCacheHit(self): + self.mock_store.get.return_value = self.mock_credentials + + revoke_args = ['userinfo.email'] + output = _GetCommandOutput('revoke', revoke_args) + self.assertIn('Successfully revoked token.', output) + self.assertEqual(1, self.mock_revoke.call_count) + self.mock_get_store.assert_called_once_with(None, + '1055925038659-sb6bdak55edef9a0joshf24g7i2kiatf.apps.' + 'googleusercontent.com', + 'https://www.googleapis.com/auth/userinfo.email') + + def testDefaultClientInfoCacheMiss(self): + self.mock_store.get.return_value = None + + revoke_args = ['userinfo.email'] + output = _GetCommandOutput('revoke', revoke_args) + self.assertNotIn('Successfully revoked token.', output) + self.assertEqual(0, self.mock_revoke.call_count) + self.mock_get_store.assert_called_once_with(None, + '1055925038659-sb6bdak55edef9a0joshf24g7i2kiatf.apps.' + 'googleusercontent.com', + 'https://www.googleapis.com/auth/userinfo.email') + + def testNoScopeProvided(self): + output = _GetCommandOutput('revoke', []) + self.assertIn('No scopes provided', output) + self.assertEqual(0, self.mock_revoke.call_count) + self.assertEqual(0, self.mock_get_store.call_count) \ No newline at end of file