In [1]:
import sys
import pandas as pd
from pyspark.sql import SparkSession
# create sparksession
spark = SparkSession \
    .builder \
    .appName("insight") \
    .getOrCreate()
acquis = spark.read.option("inferSchema", "true").option("header", False).option("dateFormat", "yyyyMMdd").option("delimiter","|").csv("../realdata/2010Q1/Acquisition_2010Q1.txt")


In [2]:
perf = spark.read.option("inferSchema", "true").option("header", False).option("dateFormat", "yyyyMMdd").option("delimiter","|").csv("../realdata/2010Q1/Performance_2010Q1.txt")

In [3]:
col_names_aquisition = ['loanID', 'originationChannel', 'sellerName', 'origIntRate', 'origUPB', 
                       'origLoanTerm', 'originationDate', 'firstPaymentDate', 'LTV', 'CLTV', 
                        'numOfBorrowers', 'origDebtToIncomeRatio', 'borrowerCredScoreAtOrigination',
                       'firstTimeBuyerIndicator', 'loanPurpose', 'propertyType', 'numOfUnits',
                       'occupancyType', 'propertyState', 'zipCodeShort', 'primaryMortgInsurPercent', 
                       'productType', 'coborrowerCreditScoreAtOrig', 'mortgageInsurType', 
                       'relocationMortgIndicator']
col_names_performance =['loanID', 'monthlyReportingPeriod', 'servicerName', 'currentIntRate',
                       'currentActualUPB', 'loanAge', 'remMonthsToLegalMaturity', 'adjMonthToMaturity',
                       'maturityDate', 'MSA', 'currentLoanDelinqStatus', 'modifFlag', 'zeroBalanceCode',
                       'zeroBalanceEffectiveDate', 'lastPaidInstallDate', 'foreclosureDate',
                       'dispositionDate', 'foreclosureCost', 'propPreservAndReparCosts', 'assetRecoveryCosts',
                       'miscelHoldingExpensesAndCredits', 'associatedTaxesForHoldingProperty', 'netSaleProceeds',
                       'creditEnhacementProceeds', 'repurchaseMakeWholeProceeds','otherForeclosureProceeds',
                       'nonInterestBearingUPB', 'principalForegivenessAmount', 'repurchaseMakeWholeProceedsFlag',
                       'foreclosurePrincipWriteOffAmont', 'servicingActivityIndicator']

In [4]:
i = 0
acquis_rn = acquis
for colname in col_names_aquisition:
    acquis_rn = acquis_rn.withColumnRenamed("_c"+str(i),col_names_aquisition[i])
    i = i + 1
    
i = 0
perf_rn = perf
for colname in col_names_performance:
    perf_rn = perf_rn.withColumnRenamed("_c"+str(i),col_names_performance[i])
    i = i + 1
#acquis_rn.select(acquis_rn.columns[:7]).show(5)

In [5]:
from pyspark.sql.functions import to_date
from pyspark.sql.types import IntegerType
perf_rn = perf_rn.withColumn("monthlyReportingPeriod",to_date(perf_rn.monthlyReportingPeriod,'MM/dd/yyyy'))
perf_rn = perf_rn.withColumn("currentLoanDelinqStatus", perf_rn["currentLoanDelinqStatus"].cast(IntegerType()))
acquis_rn = acquis_rn.withColumn("originationDate",to_date(acquis_rn.originationDate,'MM/yyyy'))
acquis_rn = acquis_rn.withColumn("firstPaymentDate",to_date(acquis_rn.firstPaymentDate,'MM/yyyy'))

In [None]:
acquis_rn.select(acquis_rn.columns[0:8]).show(10)

In [None]:
print((acquis_rn.count(), len(acquis_rn.columns)))

In [None]:
print(perf_rn.count(),len(perf_rn.columns))

In [None]:
perf_rn.select(perf_rn.columns[:7]).show(10)

In [6]:
perf_rn.createOrReplaceTempView('perf')
_2010Q1_perf = spark.sql(
'''
SELECT 
perf.loanID, perf.monthlyReportingPeriod, perf.loanAge, perf.currentLoanDelinqStatus FROM perf 

'''
)

In [None]:
_2010Q1_perf.select(_2010Q1_perf.columns[0:3]).show(40)

In [7]:
pivot_perf =_2010Q1_perf.groupBy("loanID").pivot("monthlyReportingPeriod").max("currentLoanDelinqStatus")

In [8]:
final_perf = pivot_perf.select(pivot_perf.columns[0:28])

In [9]:
acquis_rn.createOrReplaceTempView('acquis')
sql_acquis = spark.sql(
'''
SELECT acquis.loanID, acquis.origIntRate, acquis.origUPB, acquis.originationDate, 
acquis.firstPaymentDate, acquis.LTV, acquis.CLTV, acquis.numOfBorrowers, acquis.origDebtToIncomeRatio, 
acquis.borrowerCredScoreAtOrigination, acquis.zipCodeShort, acquis.primaryMortgInsurPercent
FROM acquis WHERE (originationDate BETWEEN '2010-01-01'AND '2010-12-01')

'''
)

In [None]:
sql_acquis.count()

In [10]:

_2010Q1 = sql_acquis.join(final_perf,on=['loanID'],how='inner')
#print((_2010Q1.count(), len(_2010Q1.columns)))


In [None]:
_2010Q1.select(_2010Q1.columns[0:9]).show(5)

In [None]:
_2010Q1.columns


In [None]:
_2010Q1.columns

In [11]:
from pyspark.sql.functions import greatest
clist = _2010Q1.columns[-14:]
_2010Q1 = _2010Q1.withColumn("default_status",greatest(*clist))

In [12]:
_2010Q1.groupBy("default_status").count().sort("default_status").show(7)

+--------------+------+
|default_status| count|
+--------------+------+
|          null| 15412|
|             0|147998|
|             1|  3494|
|             2|   359|
|             3|   123|
|             4|    98|
|             5|    67|
+--------------+------+
only showing top 7 rows



In [None]:
_2010Q1 = _2010Q1.orderBy(["loanID","originationDate","monthlyReportingPeriod"])

In [None]:
print(_2010Q1.count(), len(_2010Q1.columns))

In [None]:
t=perf_rn.agg({"monthlyReportingPeriod":"max"}).collect()[0]
print(t)

In [None]:
print(perf_rn.count())

In [None]:
t=perf_rn.agg({"monthlyReportingPeriod":"min"}).collect()[0]
print(t)