Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 313 additions & 0 deletions steam/account.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
"""
This module is used to safely store account credentials and provide mobile authenticator codes.

Example usage:

.. code:: python

import steam.account

account = steam.account.SteamAccount('username', 'password')
account.set_account_property('identity_secret', 'XYZ')
account.set_account_property('shared_secret', 'XYZ')
api_key = account.get_api_key()
"""
import os
import sys
import json
import base64
import re
import atexit
from base64 import b64decode

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend

import steam.guard
import steam.webauth
import steam.webapi

if sys.platform.startswith('win'):
BASE_LOCATION = '.' #Windows
else:
BASE_LOCATION = '.'

ACCOUNT_ATTRIBUTES = ['username', 'password', 'steamid', 'shared_secret', 'identity_secret', 'revocation_code',\
'secret_1', 'serial_number', 'deviceid', 'oauth_token', 'apikey']

class SteamAccount(object):
username = None
password = None

mobile_session = None
web_session = None
web_api = None
authenticator = None

_credentials = { }
_path = None
_file = None
_fernet_key = None
_fernet_suite = None
_web_auth = None
_mobile_auth = None


def __init__(self, username, password):
atexit.register(self.__cleanup__)
self.username = username
self.password = password
if not self._setup():
raise SteamAccountException('Could not access account.')

def __getattr__(self, key):
if key not in self._credentials.keys():
raise AttributeError("No %s attribute" % repr(key))
return self._credentials.get(key)

def __del__(self):
self.__save_account_credentials__()

def __cleanup__(self):
self.__save_account_credentials__()

def __save_account_credentials__(self):
"""
Make sure it doesnt write the file if the memory has been cleared
"""
if self._count_account_credentials() <= 2:
return
try:
self._update_credential_file()
except TypeError:
"""
Ignore TypeError exception when destructor gets called after the memory has been cleared
"""
pass
except ValueError:
"""
Ignore ValueError exception when the file could not be written
"""
pass

def set_account_property(self, property, value):
self._credentials[property] = value
self._update_credential_file()

def del_account_property(self, property):
del self._credentials[property]
self._update_credential_file()

def check_account_property(self, property):
return property in self._credentials

@property
def login_code(self):
if self.authenticator and hasattr(self.authenticator, 'shared_secret'):
return self.authenticator.get_code()
elif hasattr(self, 'shared_secret'):
return steam.guard.generate_twofactor_code(b64decode(self.shared_secret))
else:
raise SharedSecretNotSet('Add shared_secret to this instance to generate login codes')

def get_api_key(self, retrieve_if_missing=False, hostname_for_retrieving='localhost.com'):
if self.check_account_property('apikey'):
return self.apikey

elif retrieve_if_missing:
if not self._has_web_session():
self._spawn_web_session()

api_key = self.retrieve_api_key(hostname_for_retrieving)
self.set_account_property('apikey', api_key)
self._spawn_web_api()
return api_key
else:
raise APIKeyException('Could not return the apikey. The apikey is not set as account property and retrieve_if_missing is not allowed.')

def retrieve_api_key(self, hostname_for_retrieving='localhost.com'):
if not self._has_web_session():
raise APIKeyException('A web session is required to retrieve the api key.')

response = self.web_session.get('https://steamcommunity.com/dev/apikey')

if 'Access Denied' in response.text:
raise APIKeyException('You need at least 1 game on this account to access the steam api key page.')

else:
if 'Register for a new Steam Web API Key' in response.text:
regex_result = re.search(r'<input type="hidden" name="sessionid" value="(.*)">', response.text)
session_id = regex_result.group(1)

data = {
'domain': hostname_for_retrieving,
'agreeToTerms': 'agreed',
'submit': 'Register',
'sessionid': session_id
}

self.web_session.post('https://steamcommunity.com/dev/registerkey', data=data)
return self.retrieve_api_key()

elif 'Your Steam Web API Key' in response.text:
regex_result = re.search(r'<p>Key: (.*)</p>', response.text)
api_key = regex_result.group(1)
return api_key

else:
raise APIKeyException('An unhandled api key page appeared, please try again.')

def _setup(self):
self._generate_fernet_key()
self._spawn_fernet_suite()
self._path = '%s/%s' % (BASE_LOCATION, self.username)
if not os.path.exists(self._path) or not os.path.isfile(self._path):
self._create_credential_file()
else:
credentials = self._parse_credential_file()
for key, value in credentials.iteritems():
self._credentials[key] = value

if self.check_account_property('shared_secret'):
self._spawn_authenticator()

else:
self.authenticator = steam.guard.SteamAuthenticator()

if self.check_account_property('apikey'):
self._spawn_web_api()
return True

def _create_credential_file(self):
open(self._path, 'w+').close()
self._spawn_file_pointer()

data = json.dumps({
'username': self.username,
'password': self.password
})

token = self._fernet_suite.encrypt(data)
self._file.write(token)
self._file.close()

def _parse_credential_file(self):
self._spawn_file_pointer()

token = self._file.read()
data = json.loads(self._fernet_suite.decrypt(token))
self._file.close()
return data

def _update_credential_file(self):
self._spawn_file_pointer()

credentials = self._gather_credentials()
data = json.dumps(credentials)
token = self._fernet_suite.encrypt(data)

self._file.truncate()
self._file.write(token)
self._file.close()

def _gather_credentials(self):
data = { }
names = self._credentials
for name in names:
if name in ACCOUNT_ATTRIBUTES:
data.__setitem__(name, self._credentials[name])
return data

def _count_account_credentials(self):
count = 0
for attr in ACCOUNT_ATTRIBUTES:
if self.check_account_property(attr):
count += 1
return count

def _generate_fernet_key(self):
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(bytes(self.password))
self._fernet_key = base64.urlsafe_b64encode(digest.finalize())

def _spawn_fernet_suite(self):
self._fernet_suite = Fernet(self._fernet_key)

def _spawn_file_pointer(self):
self._file = open(self._path, 'r+', 0)

def _spawn_web_api(self):
self.web_api = steam.webapi.WebAPI(self.apikey)

def _spawn_authenticator(self):
secrets = {
'identity_secret': self._credentials.get('identity_secret'),
'shared_secret': self._credentials.get('shared_secret'),
'secret_1': self._credentials.get('secret_1'),
'revocation_code': self._credentials.get('revocation_code'),
'deviceid': self._credentials.get('deviceid')
}
self.authenticator = steam.guard.SteamAuthenticator(secrets)
self._spawn_mobile_session()
self.authenticator.medium = self._mobile_auth

def _has_session(self):
return True if self._has_web_session() or self._has_mobile_session() else False

def _has_web_session(self):
return isinstance(self._web_auth, steam.webauth.WebAuth)

def _has_mobile_session(self):
return isinstance(self._mobile_auth, steam.webauth.MobileWebAuth)

def _spawn_web_session(self):
self._web_auth = steam.webauth.WebAuth(self.username, self.password)
self._login_web_session(self._web_auth)
self.web_session = self._web_auth.session

def _spawn_mobile_session(self):
self._mobile_auth = steam.webauth.MobileWebAuth(self.username, self.password)
self._login_web_session(self._mobile_auth)
self.mobile_session = self._mobile_auth.session

def _login_web_session(self, web_auth):
if not isinstance(web_auth, steam.webauth.WebAuth) and not isinstance(web_auth, steam.webauth.MobileWebAuth):
raise WebAuthNotComplete('Please supply a valid WebAuth or MobileWebAuth session')

try:
twofactor_code = self.login_code
except SharedSecretNotSet:
twofactor_code = ''

web_auth.login(twofactor_code=twofactor_code)

if web_auth.complete:
if not hasattr(self, 'steamid'):
self.set_account_property('steamid', web_auth.steam_id)

if isinstance(web_auth, steam.webauth.MobileWebAuth):
self.set_account_property('oauth_token', web_auth.oauth_token)
else:
raise WebAuthNotComplete('The web authentication could not be completed.')

class SteamAccountException(Exception):
pass

class SharedSecretNotSet(SteamAccountException):
pass

class IdentitySecretNotSet(SteamAccountException):
pass

class WebAuthNotComplete(SteamAccountException):
pass

class WebException(SteamAccountException):
pass

class APIKeyException(SteamAccountException):
pass

class ParameterNotProvidedException(SteamAccountException):
pass