# Lending Loop Web App
This is a prototype for a Lending Loop web app / dashboard. This notebook focuses on the logics of Lending Loop summary calculations.

# System Initializations

### Package Initializations

In [1]:
# DataFrames
import pandas as pd
import findspark
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T

# Databases
import pymongo
import json

# Numerical Packages
import numpy as np

# Datetime
import datetime

# Misc Packages
import os

# Visualizations
import plotly
import plotly.offline as pyo
import plotly.graph_objs as go

### System Configuration


In [2]:
# Plotly offline
pyo.init_notebook_mode(connected=True)

# Pandas display
pd.set_option('display.max_colwidth', -1)

# PySpark Session Initialization
packages = 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0'
dedicated_memory = '1g'

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages {} --driver-memory {} pyspark-shell' \
    .format(packages, dedicated_memory)

In [3]:
# Find SPARK_HOME
findspark.init()

# Create SparkSession
spark = (pyspark.sql.SparkSession
         .builder.appName('LendingLoop')
         .getOrCreate())

# Obtain handle to MongoClient
DB = 'lendingLoop'
dbHandle = pymongo.MongoClient()[DB]

### Constant Definitions

In [283]:
LENDING_LOOP_FEE_RATE = 1.5

### Helper Function Definitions

In [333]:
def read_from_DB_to_spark(coll):
    '''
    Read a MongoDB collection and return a dataframe.
    '''
    try:
        return (spark
                .read
                .format('com.mongodb.spark.sql.DefaultSource')
                .option('uri', 'mongodb://127.0.0.1/{}.{}'.format(DB, coll))
                .option('inferschema', 'true')
                .load()
                .withColumn('loanID', F.col('loanID').cast(T.IntegerType())))
    except Exception as e:
        print 'Fail to read from "{}.{}".\n{}'.format(DB, coll, e)
        raise

def write_spark_to_DB(DF, coll, mode):
    '''
    Write a PySpark DataFrame to a MongoDB collection.
    '''
    try:
        (DF
         .write
         .format('com.mongodb.spark.sql.DefaultSource')
         .option('uri', 'mongodb://127.0.0.1/{}.{}'.format(DB, coll))
         .mode(mode)
         .save())
    except Exception as e:
        print 'Error saving DataFrame to "{}.{}".\n{}'.format(DB, coll, e)
        raise

def display_DF(sparkDF, n=5):
    '''
    Interactively displays the first n rows of a sparkDF as a pandas dataframe
    '''
    return (sparkDF
            .limit(n)
            .drop('_id')
            .toPandas())

def import_new_notes(DF):
    '''
    Search DF for new notes and import them into MongoDB.
    '''
    @F.udf(returnType=T.DoubleType())
    def calculate_principal(unitPay, interestRate, totalPayCycles):
        '''
        Calculate the principal invested based on the unitPay, interestRate and totalPayCycles.
        '''
        interestRate /= 100 * 12

        return unitPay / interestRate * (1 - 1. / (1 + interestRate)**totalPayCycles)
    
    COLL = 'notes'
    
    # Obtain handle to Mongo database and collection
    collection = dbHandle[COLL]
    
    # Get existing notes
    if collection.count() == 0:
        existingNotesDF = spark.createDataFrame(spark.sparkContext.emptyRDD(), 
                                                schema=T.StructType([T.StructField('loanID', T.IntegerType())]))
    else:
        # Retrieve existing notes ID from MongoDB
        existingNotesDF = read_from_DB_to_spark(COLL)
    
    # Obtain list of unique notes from input DF
    uniqueNotesDF = (DF
                     .orderBy('dueDate')
                     .groupBy('loanID', 'company', 'loanName', 'interestRate', 'grade')
                     .agg(F.count('loanID').alias('cyclesTotal'), 
                          F.sum('principalScheduled').alias('principal'),
                          F.round(F.sum('fees'), 2).alias('feesTotal'),
                          F.round(F.sum('interestScheduled') - F.sum('fees'), 2).alias('profits'),
                          F.round(F.mean('totalScheduled'), 2).alias('unitPayment'), 
                          F.add_months(F.first('dueDate'), -1).alias('startDate'))
                     .withColumn('principal', 
                                 F.round(calculate_principal('unitPayment', 'interestRate', 'cyclesTotal'), 0)))
    
    # Filter out new notes using left-anti join with `existingNotesDF`
    newNotesDF = (uniqueNotesDF
                  .join(existingNotesDF, 'loanID', 'leftanti')
                  .withColumn('cyclesRemaining', F.udf(lambda x: x, T.LongType())('cyclesTotal'))
                  .withColumn('feesAccrued', F.lit(0.00))
                  .withColumn('amountRepayed', F.lit(0.00)))
    
    # Save new notes into MongoDB
    write_spark_to_DB(newNotesDF, COLL, 'append')
    
    print 'Successfully imported {} new note(s) to "{}.{}"'.format(newNotesDF.count(), DB, COLL)

# Test Area

### Import dataset

In [81]:
# Read raw CSV file
rawDF = (spark
         .read
         .format('com.databricks.spark.csv')
         .option('header', 'True')
         .option('inferschema', 'True')
         .load('all_payments.csv'))

# Camel case titles
camelCaseDict = {title: title[0].lower() + title.replace(' ', '')[1:] for title in rawDF.columns}

# Simplify certain column titles
camelCaseDict['Fees Paid to Loop'] = 'fees'
camelCaseDict['Risk Band'] = 'grade'
camelCaseDict['Loan Id'] = 'loanID'

# Camelcase column titles
rawDF = rawDF.select([F.col(title).alias(camelCaseDict[title]) for title in camelCaseDict.keys()])

In [82]:
display_DF(rawDF)

Unnamed: 0,status,interestPaid,paymentType,principalScheduled,totalOwed,loanID,grade,interestRate,company,principalPaid,dueDate,interestScheduled,totalScheduled,fees,loanName,principalOwed,totalPaid,interestOwed,datePaid
0,Paid,0.86,Scheduled Payment,2.39,3.25,59705,D,20.67,Nice and Smooth Ultramedia Inc.,2.39,2017-09-22,0.86,3.25,0.06,Concert Producer & Promoter,2.39,3.25,0.86,2017-09-23
1,Paid,0.64,Scheduled Payment,2.83,3.47,83910,B+,10.33,Bronze Baxx Tanning Studio Inc.,2.83,2017-09-22,0.64,3.47,0.09,Salon - Equipment Purchase (1 of 2),2.83,3.47,0.64,2017-09-22
2,Scheduled,0.0,Scheduled Payment,1.27,1.91,7802,B+,10.33,Brightpath Capital Corporation,0.0,2017-09-25,0.64,1.91,0.09,Mortgage Lender - Working Capital,1.27,0.0,0.64,NaT
3,Scheduled,0.0,Scheduled Payment,0.82,1.83,96625,C,16.18,Rossco's Tree Service and contracting Ltd.,0.0,2017-09-30,1.01,1.83,0.09,Arborist - Lease Buyouts (1 of 3),0.82,0.0,1.01,NaT
4,Scheduled,0.0,Scheduled Payment,1.63,2.64,58609,C,16.18,5686645 Manitoba LTD.,0.0,2017-10-07,1.01,2.64,0.09,Contractor - Working Capital,1.63,0.0,1.01,NaT


### Add new notes to collection

In [347]:
import_new_notes(rawDF)

Successfully imported 17 new note(s) to "lendingLoop.notes"


### Pull new collection

In [335]:
notesDF = read_from_DB_to_spark('notes')
display_DF(notesDF)

Unnamed: 0,amountRepayed,company,cyclesRemaining,cyclesTotal,feesAccrued,feesTotal,grade,interestRate,loanID,loanName,principal,profits,startDate,unitPayment
0,0.0,Rossco's Tree Service and contracting Ltd.,60,60,0.0,3.23,C,16.18,96625,Arborist - Lease Buyouts (1 of 3),75.0,31.35,2017-08-31,1.83
1,0.0,NuEnergy Systems Inc.,36,36,0.0,1.78,A+,6.8,29560,Integrated Solar Energy Services Company,75.0,6.16,2017-09-15,2.3
2,0.0,Jenco Canada Inc.,12,12,0.0,0.42,A,8.52,67770,LED Lighting Distributor and Franchisor,50.0,1.88,2017-09-07,4.36
3,0.0,Bronze Baxx Tanning Studio Inc.,24,24,0.0,1.21,B+,10.33,83910,Salon - Equipment Purchase (1 of 2),75.0,7.0,2017-08-22,3.47
4,0.0,Nitin Chauhan Medicine Professional Corporation,24,24,0.0,0.81,A,8.52,90873,"Head, Neck & Facial Plastic Surgeon - Working Capital",50.0,3.62,2017-09-07,2.27


### Initialise Base `Summary` State

In [348]:
# TODO Update to realistic values
baseState = {
    'lifeTimeEarnings': 0.,
    'netROI': 0.,
    'availableFunds': 1500,
    'fundsInvested': 0.,
    'fundCommited': 0.,
    'lastUpdated': datetime.datetime.now() - datetime.timedelta(3),
}

collection = dbHandle['summary']
collection.drop()
dbHandle['transactions'].drop()
collection.insert_one(baseState)

<pymongo.results.InsertOneResult at 0x10871c128>

### Import New Transactions

In [349]:
def update_state(DF):
    '''
    Update state with new transactions from DF.
    '''
    # Retrive the latest overallState
    overallState = dbHandle['summary'].find_one()

    # Get transactions that have not been added to MongoDB
    newTransactionsDF = (rawDF
                         .filter(F.isnull('datePaid') == 'False')
                         .filter(F.col('datePaid') > overallState['lastUpdated'])
                         .select('loanID', 'company', 'totalPaid', 'fees', 'datePaid'))
    
    # Retrieve notes that appear in newTransactionDF
    netTransactionsDF = (newTransactionsDF
                         .groupBy('loanID')
                         .agg(F.sum('totalPaid').alias('totalPaid'), 
                              F.sum('fees').alias('fees')))
    
    # Update noteStates
    updatedNotesStateDF = (read_from_DB('notes')
                           .join(netTransactionsDF, 'loanID')
                           .withColumn('amountRepayed', F.col('amountRepayed') + F.col('totalPaid'))
                           .withColumn('feesAccrued', F.col('feesAccrued') + F.col('fees'))
                           .withColumn('cyclesRemaining', F.col('cyclesRemaining') - 1)
                           .drop('totalPaid', 'fees'))
    
    # Write updated noteStates to DB
    write_spark_to_DB(updatedNotesStateDF, 'notes', 'append')

    # Write new trasactions to DB
    write_spark_to_DB(newTransactionsDF, 'transactions', 'append')
    
    # Update the lastUpdated timestamp of the summary collection 
    dbHandle['summary'].update_one({'_id': overallState['_id']}, {'$set': {'lastUpdated': datetime.datetime.now()}})
    
    print '{} new transaction(s) written to "{}.{}"'.format(newTransactionsDF.count(), DB, 'transactions')

In [350]:
update_state(rawDF)

2 new transaction(s) written to "lendingLoop.transactions"


# TODO Ensure that no more transactions occur on the same day *after* script is run

### Update `summary` with new Transactions

In [351]:
display_DF(read_from_DB_to_spark('notes'), 10)

Unnamed: 0,amountRepayed,company,cyclesRemaining,cyclesTotal,feesAccrued,feesTotal,grade,interestRate,loanID,loanName,principal,profits,startDate,unitPayment
0,0.0,Nitin Chauhan Medicine Professional Corporation,24,24,0.0,0.81,A,8.52,90873,"Head, Neck & Facial Plastic Surgeon - Working Capital",50.0,3.62,2017-09-07,2.27
1,0.0,Brightpath Capital Corporation,48,48,0.0,2.45,B+,10.33,7802,Mortgage Lender - Working Capital,75.0,14.2,2017-08-25,1.91
2,0.0,Jenco Canada Inc.,12,12,0.0,0.42,A,8.52,67770,LED Lighting Distributor and Franchisor,50.0,1.88,2017-09-07,4.36
3,0.0,Rossco's Tree Service and contracting Ltd.,60,60,0.0,3.23,C,16.18,96625,Arborist - Lease Buyouts (1 of 3),75.0,31.35,2017-08-31,1.83
4,3.47,Bronze Baxx Tanning Studio Inc.,23,24,0.09,1.21,B+,10.33,83910,Salon - Equipment Purchase (1 of 2),75.0,7.0,2017-08-22,3.47
5,0.0,Snakes & Lattes Inc.,36,36,0.0,1.82,B+,10.33,79370,Snakes & Lattes - New Location (1 of 2),75.0,10.56,2017-09-18,2.43
6,0.0,Anthony C. C. Chan Inc,24,24,0.0,1.21,B+,10.33,14941,Accounting Practice - Expansion/Renovation,75.0,7.0,2017-09-15,3.47
7,0.0,NuEnergy Systems Inc.,36,36,0.0,1.78,A+,6.8,29560,Integrated Solar Energy Services Company,75.0,6.16,2017-09-15,2.3
8,0.0,The Extensionist Consultancy Inc,36,36,0.0,1.84,B,12.2,45851,Salon Chain - Inventory Purchase,75.0,12.9,2017-09-14,2.49
9,0.0,J.K. ENGINEERING LTD,36,36,0.0,2.42,A,8.52,83061,Infrastructure Engineering and Construction Firm - Second Loan,100.0,11.06,2017-09-15,3.15


### Create custom progress bar

In [428]:
def progress_bar(DF):
    '''
    Creates a custom progress bar based on notesDF.
    '''
    def _convert_to_list(colName):
        '''
        Extract the colName of notesDF as a list, with each entry rounded to 2 decimal places.
        '''
        return (DF
                .select(colName)
                .rdd
                .map(lambda x: x[0] if colName == 'company' else round(x[0], 2))
                .collect())
    
    def _create_trace(colName):
        '''
        Create a trace based on colName.
        '''
        colourDict = {
            'feesAccrued': '#ff8000', 
            'principalReceived': '#512361',
            'nextPrincipalPayment': '#9165AE',
            'principalOutstanding': '#C2B2C8',
            'profitsReceived': '#72C02C', 
            'nextProfitsPayment': '#a2de6e',
            'profitsOutstanding': '#d7f1c1'
        }
        
        names = {
            'feesAccrued': 'Fees Accrued', 
            'principalReceived': 'Principal Received',
            'nextPrincipalPayment': 'Next Principal Payment',
            'principalOutstanding': 'Principal Outstanding', 
            'profitsReceived': 'Profits Received', 
            'nextProfitsPayment': 'Next Profit Payment',
            'profitsOutstanding': 'Profits Outstanding'
        }
        
        groups = {
            'feesAccrued': 'Fees', 
            'principalReceived': 'Principal',
            'nextPrincipalPayment': 'Principal',
            'principalOutstanding': 'Principal', 
            'profitsReceived': 'Profits', 
            'nextProfitsPayment': 'Profits',
            'profitsOutstanding': 'Profits'
        }
        
        return go.Bar(
            x = _convert_to_list(colName),
            y = companiesList,
            name = names[colName],
            orientation = 'h',
            width = 0.75,
            hoverinfo = 'text',
            text = ['' if val == 0 else '{}'.format(val) for val in _convert_to_list(colName)],
            legendgroup = groups[colName],
            marker = {
                'color': colourDict[colName],
                'line': {'width': 0.}
            }
        )
    
    # Define udfs`
    @F.udf(returnType=T.FloatType())
    def principal_received(amountRepayedNetFees, principal):
        return min(amountRepayedNetFees, principal)

    @F.udf(returnType=T.FloatType())
    def next_principal_payment(amountRepayedNetFees, principal, unitPayment):
        return min(unitPayment, max(principal - amountRepayedNetFees, 0.))

    @F.udf(returnType=T.FloatType())
    def principal_outstanding(amountRepayedNetFees, principal, nextPrincipalPayment):
        return max(principal - amountRepayedNetFees - nextPrincipalPayment, 0.)

    @F.udf(returnType=T.FloatType())
    def profits_received(amountRepayedNetFees, principal, profits):
        return min(profits, max(amountRepayedNetFees - principal, 0.))

    @F.udf(returnType=T.FloatType())
    def profits_outstanding(amountRepayedNetFees, principal, profits, unitPayment):
        profits_received = min(amountRepayedNetFees - principal, profits)

        if profits_received > profits:
            return 0.0
        elif (profits - profits_received) < unitPayment:
            return profits - profits_received
        elif profits_received >= - unitPayment: # profit_received is negative
            return profits - (profits_received + unitPayment)
        else:
            return profits

    @F.udf(returnType=T.FloatType())
    def next_profits_payment(amountRepayedNetFees, principal, profits, unitPayment):
        profits_received = amountRepayedNetFees - principal
        if profits_received > 0:
            return min(unitPayment, max(profits - profits_received, 0.))
        elif profits_received >= - unitPayment: # profit_received is negative
            return unitPayment + profits_received 
        else:
            return 0.
    
    # Add relevant columns to notesDF
    DF = (DF
               .withColumn('amountRepayedNetFees', F.col('amountRepayed') - F.col('feesAccrued'))
               .withColumn('principalReceived', principal_received('amountRepayedNetFees', 
                                                                   'principal'))
               .withColumn('nextPrincipalPayment', next_principal_payment('amountRepayedNetFees', 
                                                                          'principal',
                                                                          'unitPayment'))
               .withColumn('principalOutstanding', principal_outstanding('amountRepayedNetFees', 
                                                                         'principal', 
                                                                         'nextPrincipalPayment'))
               .withColumn('profitsReceived', profits_received('amountRepayedNetFees', 
                                                               'principal',
                                                               'profits'))
               .withColumn('profitsOutstanding', profits_outstanding('amountRepayedNetFees', 
                                                                     'principal',
                                                                     'profits',
                                                                     'unitPayment'))
               .withColumn('nextProfitsPayment', next_profits_payment('amountRepayedNetFees', 
                                                                      'principal', 
                                                                      'profits', 
                                                                      'unitPayment'))
               .orderBy(['principal', 'principalOutstanding', 'interestRate', 'cyclesTotal'], ascending=[1, 0, 1, 1]))
    
    # Obtain list of companies
    companiesList = _convert_to_list('company')
    
    # Create traces
    traceFee = _create_trace('feesAccrued')
    tracePrincipalReceived = _create_trace('principalReceived')
    traceNextPrincipalPayment = _create_trace('nextPrincipalPayment')
    tracePrincipalOustanding = _create_trace('principalOutstanding')
    traceProfitsReceived = _create_trace('profitsReceived')
    traceNextProfitsPayment = _create_trace('nextProfitsPayment')
    traceProfitsOutstanding = _create_trace('profitsOutstanding')
    
    data = go.Data([traceFee, 
                    tracePrincipalReceived, traceNextPrincipalPayment, tracePrincipalOustanding,
                    traceProfitsReceived, traceNextProfitsPayment, traceProfitsOutstanding])
    
    layout = go.Layout(
        height = 20 * notesDF.count() + 280,
        xaxis = {
            'domain': [0.2, 1]
        },
        barmode = 'stack',
        legend = {
            'orientation': 'h',
            'traceorder': 'grouped'
        }
    )
    
    figure = go.Figure(data=data, layout=layout)
    
    pyo.iplot(figure)

In [429]:
progress_bar(notesDF)

In [358]:
testDF = spark.createDataFrame([(6.0, 'My Test Company', 3.0, 16.18, 75.0, 10., 36, 18.14),
                                (20.0, 'My Test Company1', 3.0, 16.18, 75.0, 10., 36, 18.14),
                                (70.0, 'My Test Company2', 3.0, 16.18, 75.0, 10., 36, 18.14),
                                (80.0, 'My Test Company3', 3.0, 16.18, 75.0, 10., 36, 18.14),
                                (115.0, 'My Test Company4', 3.0, 16.18, 75.0, 10., 36, 18.14)],
                               ['amountRepayed', 'company', 'feesAccrued', 'interestRate', 
                                'principal', 'unitPayment', 'totalCycles', 'profits'])

display_DF(testDF)

Unnamed: 0,amountRepayed,company,feesAccrued,interestRate,principal,unitPayment,totalCycles,profits
0,6.0,My Test Company,3.0,16.18,75.0,10.0,36,18.14
1,20.0,My Test Company1,3.0,16.18,75.0,10.0,36,18.14
2,70.0,My Test Company2,3.0,16.18,75.0,10.0,36,18.14
3,80.0,My Test Company3,3.0,16.18,75.0,10.0,36,18.14
4,115.0,My Test Company4,3.0,16.18,75.0,10.0,36,18.14
