In [54]:
import flask
from flask import jsonify, make_response
from plaid.api import plaid_api
from plaid.model.country_code import CountryCode
from plaid.model.link_token_create_request_user import \
    LinkTokenCreateRequestUser
import json
import plaid
from google.cloud import firestore
from plaid.model.sandbox_item_reset_login_request import SandboxItemResetLoginRequest
from plaid.model.sandbox_public_token_create_request import SandboxPublicTokenCreateRequest
from plaid.model.sandbox_public_token_create_response import SandboxPublicTokenCreateResponse
from plaid.model.item_public_token_exchange_request import ItemPublicTokenExchangeRequest
from plaid.model.item_public_token_exchange_response import ItemPublicTokenExchangeResponse
from plaid.model.accounts_balance_get_request import AccountsBalanceGetRequest
from plaid.model.accounts_balance_get_request_options import AccountsBalanceGetRequestOptions
from plaid.model.accounts_get_response import AccountsGetResponse
from plaid.model.accounts_get_request import AccountsGetRequest
from plaid.model.accounts_get_request_options import AccountsGetRequestOptions
from plaid.model.products import Products
from typing import List, Dict, Any, Union
from plaid.model.account_base import AccountBase
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.collection import CollectionReference
from google.cloud.firestore_v1.types import WriteResult
from plaid import ApiException
from plaid.model.account_balance import AccountBalance
from datetime import datetime, timedelta

In [7]:
with open('../../private_keys.json') as f:
    data = json.load(f)
    f.close()

configuration = plaid.Configuration(
    host=plaid.Environment.Development,
    # host=plaid.Environment.Sandbox,
    api_key={
        'clientId': data['client_id'],
        'secret': data['secret_development'],
        # 'secret': data['secret_sandbox'],
    }
)

api_client = plaid.ApiClient(configuration)
plaid_client = plaid_api.PlaidApi(api_client)
firestore_client = firestore.Client.from_service_account_json('../../google-services.json')


1. Creating Link Account

In [None]:
def create_link_token(request: Union[flask.Request, Dict[str, Any]]) -> Dict[str, Any]:

    # Get UID from request
    if type(request) is flask.Request:
        data = request.data
        data_dict: dict = json.loads(data)
    else:
        data_dict = request

    uid: Union[str, None] = data_dict.get('uid')

    try:
        # Create Link Token Request
        request = plaid_api.LinkTokenCreateRequest(
            products=[Products('transactions')],
            client_name="CCCC",
            language='en',
            country_codes=[CountryCode('US')],
            user=LinkTokenCreateRequestUser(
                client_user_id=uid
            )
        )
        print(f'link_token_create_request \n{request}')

        # Create Link Token Response
        response: plaid_api.LinkTokenCreateResponse = plaid_client.link_token_create(
            request
        )
        print(f'link_token_create response = \n{response}')

        return response.to_dict()
    except ApiException as e:
        exceptions: dict = json.loads(e.body)
        
        return exceptions

In [None]:
data = {
}

create_link_token(data)

2. Exchange `public_token` to `access_token`

In [None]:
def exchange_public_token(is_sandbox: bool, request_dict: Dict[str, Any]) -> Dict[str, Any]:
    try:
        # Get `public_token`
        if is_sandbox:
            pt_request = SandboxPublicTokenCreateRequest(
                institution_id='ins_3',
                initial_products=[Products('transactions')],
            )

            pt_response: SandboxPublicTokenCreateResponse = plaid_client.sandbox_public_token_create(
                pt_request
            )

            public_token = pt_response.public_token
        else:
            public_token: str = request_dict.get('public_token')

        # Make Request to exchange
        exchange_request = ItemPublicTokenExchangeRequest(
            public_token=public_token
        )
        print(f'exchange request: \n{exchange_request}')

        exchange_response: ItemPublicTokenExchangeResponse = plaid_client.item_public_token_exchange(
            exchange_request
        )
        exchange_response_dict = exchange_response.to_dict()
        print(f'exchange response dict: \n{exchange_response_dict}')

        return exchange_response_dict
    except ApiException as e:
        exceptions: dict = json.loads(e.body)
        return exceptions

In [None]:
exchange_public_token(True,{})

3. ACCOUNTS GET

In [3]:
def accounts_get(access_token: str) -> Union[List[dict], dict]:
    try:
        request = AccountsGetRequest(access_token)
        response: AccountsGetResponse = plaid_client.accounts_get(request)
        accounts: List[dict] = response['accounts']

        return accounts
    except ApiException as e:
        exceptions: dict = json.loads(e.body)
        return exceptions

In [None]:
exchange_token_dict = exchange_public_token(True, {})
access_token = exchange_token_dict.get('access_token')
xyz = accounts_get(access_token)

4. Update Accounts data in Firestore

In [50]:
def update_accounts(uid: str, accounts: List[dict]) -> Dict[str, Any]:
    try:
        user_doc = firestore_client.collection('users').document(uid)        
        accounts_ref: CollectionReference = user_doc.collection('accounts')

        # Update Accounts in `Accounts` collections
        for account in accounts:
            print(f'account {account}')

            # Update Accounts Collections
            account_id: str = account.get('account_id')
            print(f'accountId {account_id}')

            account_doc_ref = accounts_ref.document(account_id)            
            account_snapshot = account_doc_ref.get()
            print(f'account_snapshot {account_snapshot}')

            balances: AccountBalance = account.get('balances')
            data = {
                'account_id': account.get('account_id'),
                'balances': {
                    'available': balances.available,
                    'current': balances.current,
                    'iso_currency_code': balances.iso_currency_code,
                    'limit': balances.limit,
                    'unofficial_currency_code': balances.unofficial_currency_code,
                },
                'mask': account.get('mask'),
                'name': account.get('name'),
                'official_name': account.get('official_name'),
                'subtype': str(account.get('subtype')),
                'type': str(account.get('type')),
                'verification_status': account.get('verification_status'),
            }

            if account_snapshot.exists:
                write_result = account_doc_ref.update(data)
                update_time = write_result.update_time
            else:  
                write_result = account_doc_ref.set(data)
                update_time = write_result.update_time

            result = {
                'status': 200, 
                'last_update_time': update_time,
            }

        return result
    except Exception as e:
        result = {
            'status': 404,
            'error_message': str(e),
        }
        
        return result

In [51]:
uid = 's'
accounts = accounts_get(access_token)
print(accounts)

update_accounts(uid,accounts)

[{'account_id': 'Aj4J7p5wqBi3XagjAeDpIQ3gbD4701U6bdoyO',
 'balances': {'available': 6641.0,
              'current': 1268.7,
              'iso_currency_code': 'USD',
              'limit': 8000.0,
              'unofficial_currency_code': None},
 'mask': '1002',
 'name': 'HEEYUN LEE -41002',
 'official_name': 'Delta SkyMiles® Gold Card',
 'subtype': 'credit card',
 'type': 'credit'}]
account {'account_id': 'Aj4J7p5wqBi3XagjAeDpIQ3gbD4701U6bdoyO',
 'balances': {'available': 6641.0,
              'current': 1268.7,
              'iso_currency_code': 'USD',
              'limit': 8000.0,
              'unofficial_currency_code': None},
 'mask': '1002',
 'name': 'HEEYUN LEE -41002',
 'official_name': 'Delta SkyMiles® Gold Card',
 'subtype': 'credit card',
 'type': 'credit'}
accountId Aj4J7p5wqBi3XagjAeDpIQ3gbD4701U6bdoyO
account_snapshot <google.cloud.firestore_v1.base_document.DocumentSnapshot object at 0x7fbac015cd00>


{'status': 200,
 'last_update_time': DatetimeWithNanoseconds(2022, 1, 25, 18, 36, 33, 281760, tzinfo=datetime.timezone.utc)}

5. Update user data `accountIds` field

In [None]:
def update_user_account_ids(uid: str, accounts: List[Dict[str, Any]]) -> Dict[str, Any]:
    try:
        user_doc = firestore_client.collection('users').document(uid)

        # Update `accountIds` in user document
        new_account_ids: List[str] = [x['account_id'] for x in accounts]
        user_dict = user_doc.get().to_dict()
        old_account_ids: List[str] = user_dict['accountIds']
        combined_ids = list(set(new_account_ids + old_account_ids))
        print(f'Combined IDs {combined_ids}')

        update_result = user_doc.update({
            'accountIds': combined_ids,
        })
        update_result.update_time

        resunt = {
            'status': 200,
            'update_time': update_result.update_time
        }

        return resunt
    except Exception as e:
        result = {
            'status': 404,
            'error_message': str(e),
        }

        return result

6. Update `plaid_keys` collections

In [None]:
def update_plaid_keys(uid: str, access_token: str) -> Dict[str, Any]:
    try:
        user_doc = firestore_client.collection('users').document(uid)
        plaid_keys_collection: CollectionReference = user_doc.collection(
            'plaid_keys')
        plaid_keys_doc = plaid_keys_collection.document('private_keys')

        plaid_keys_dict = plaid_keys_doc.get().to_dict()

        if plaid_keys_dict is None:
            write_result = plaid_keys_doc.create({
                'access_token': access_token,
            })
            update_time = write_result.update_time
        else:
            write_result = plaid_keys_doc.update({
                'access_token': access_token,
            })
            update_time = write_result.update_time
        
        result = {
            'status': 200,
            'last_update_time': update_time,
        }

        return result
    except Exception as e:
        result = {
            'status': 404,
            'error': str(e),
        }
        return result

Finally combining everything

In [None]:
def exchange_public_token_and_update_accounts(request: Union[flask.Request, dict]):

    # Parsing data from request
    if type(request) is not dict:
        data = request.data
        data_dict: dict = json.loads(data)
    else:
        data_dict = request

    uid: str = data_dict.get('uid')

    """ 
    Exchange public_token to access_token. 
    Returns 
        `access_token` if successful
        `None` if NOT successful
    """
    exchange_public_token_response = exchange_public_token(False, data_dict)
    access_token = exchange_public_token_response.get('access_token')
    print(f'Got access_token: {access_token}')

    # if `access_token` exists, keep going
    if access_token is not None:

        """
        Find the accounts corresponding to the access_token
        Returns
            'List[dict]' if successful
            `dict` if NOT successful with error_code
        """
        accounts = accounts_get(access_token)

        if type(accounts) == dict:
            error_code = accounts.get('error_code')
            error_message = accounts.get('error_message')
            print(f'Got error. {accounts}')

            result = jsonify(error_code=error_code, error_message=error_message)

            return make_response(result,404)
        else:
            account_ids = [x.get('account_id') for x in accounts]
            print(f'Linked accounts: {account_ids}')

            update_account_result = update_accounts(uid, accounts)
            print(f'update_account_result: {update_account_result}')

            update_user_result = update_user_account_ids(uid, accounts)
            print(f'update_user_result: {update_user_result}')

            update_keys_result = update_plaid_keys(uid, access_token)
            print(f'update_keys_result: {update_keys_result}')

            status_1 = update_account_result.get('status')
            status_2 = update_user_result.get('status')
            status_3 = update_keys_result.get('status')

            if status_1 == 200 and status_2 == 200 and status_3 == 200:
                return make_response(jsonify(status=200),200)
            else:
                return make_response(jsonify(status=404), 404)

    # if `access_token` is None, returns the error
    else:
        error_code = exchange_public_token_response.get('error_code')
        error_message = exchange_public_token_response.get('error_message')
        print(f'Got error. {exchange_public_token_response}')

        result = jsonify(error_code=error_code, error_message=error_message)

        return make_response(result,404)