In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType


In [2]:
# Initialize Spark session
spark = SparkSession.builder.appName("Amortization Schedule").getOrCreate()



In [3]:
def npf_pmt(rate, nper, pv, fv=0, when='end'):
    
    temp = (1 + rate) ** nper
 
    if rate == 0:
        fact = nper
    else:
        fact = (temp - 1) / rate
    
     
    pmt_val = -(fv + pv * temp) / fact
    
    
    if when == 'begin':
        return pmt_val / (1 + rate)
    else:
        return pmt_val


In [4]:

def amortize(loanamount, interest_rate, term_months):
    zpayment = -npf_pmt(interest_rate/12, term_months, loanamount) 
    
    schedule = []

    for ctr in range(term_months + 1):
        if ctr == 0:
            opening_balance = 0
            payment = 0
            interest = opening_balance * interest_rate/12
            principal = payment - interest
            closing_balance = loanamount
        else:
            opening_balance = closing_balance
            payment = zpayment
            interest = opening_balance * interest_rate/12
            principal = payment - interest
            closing_balance = opening_balance - principal
        
        schedule.append(
            (ctr, opening_balance, payment, interest, principal, closing_balance)
        )
    
    return schedule


In [10]:
 
data = amortize(592.04, 0.02, 4)
 
corrected_data = []
for row in data:
    corrected_row = (
        int(row[0]),       # period as IntegerType
        float(row[1]),     # opening_balance as DoubleType
        float(row[2]),     # payment as DoubleType
        float(row[3]),     # interest as DoubleType
        float(row[4]),     # principal as DoubleType
        float(row[5])      # closing_balance as DoubleType
    )
    corrected_data.append(corrected_row)

 
df = spark.createDataFrame(corrected_data, schema=schema)


In [11]:

# Show the DataFrame
df.show()



+------+------------------+------------------+-------------------+------------------+--------------------+
|period|   opening_balance|           payment|           interest|         principal|     closing_balance|
+------+------------------+------------------+-------------------+------------------+--------------------+
|     0|               0.0|               0.0|                0.0|               0.0|              592.04|
|     1|            592.04|148.62722182874123| 0.9867333333333334| 147.6404884954079|  444.39951150459206|
|     2|444.39951150459206|148.62722182874123| 0.7406658525076534|147.88655597623358|   296.5129555283585|
|     3| 296.5129555283585|148.62722182874123|0.49418825921393084| 148.1330335695273|  148.37992195883118|
|     4|148.37992195883118|148.62722182874123| 0.2472998699313853|148.37992195880983|2.134470378223341...|
+------+------------------+------------------+-------------------+------------------+--------------------+



In [None]:
 
# df.write.csv('basic_amortization_schedule.csv', header=True)
 
spark.stop()