# 1. Imports

In [11]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create a Session
spark = SparkSession.builder.appName('LoadDataSFApp').master('spark://172.31.19.209:7077').getOrCreate()

In [12]:
from pyspark.sql.types import *
from pyspark.sql import Row, DataFrame, functions as F
from datetime import datetime, date, timedelta

***With the next lines of code every time we perform an action with a show function, it doesnt matter how big is the dataframe or how many columns does it have, it will print the table like a pandas table without the need of conversion and the posibility of running out of memory.***

In [13]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# 2. Variables

***Dataset Path***

In [14]:
acquisitions_path = 'hdfs://hdfs:9000/tfm/Dataset/SingleFamily/SingleFamilyFixedRateMortgage/Acquisition/'
performance_path = 'hdfs://hdfs:9000/tfm/Dataset/SingleFamily/SingleFamilyFixedRateMortgage/Performance/'

***Dataframe Path***

In [15]:
df_acquisitions = 'hdfs://hdfs:9000/tfm/Dataframes/SingleFamily/SingleFamilyFixedRateMortgage/Acquisition/'
df_performance = 'hdfs://hdfs:9000/tfm/Dataframes/SingleFamily/SingleFamilyFixedRateMortgage/Performance/'

# 3. Manage Data

## 3.1. Acquisitions File

## 3.1.1. Read Data

In [18]:
URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

In [19]:
fs = FileSystem.get(URI("hdfs://hdfs:9000"), Configuration())

In [20]:
status = fs.listStatus(Path(acquisitions_path))

for fileStatus in status:
    print(fileStatus.getPath())

In [21]:
status = fs.listStatus(Path(performance_path))

for fileStatus in status:
    print(fileStatus.getPath())

In [23]:
status = fs.listStatus(Path(performance_path))

for fileStatus in status:
    print(fileStatus.getPath())

In [25]:
acquisitions = spark.read.format('csv')\
                    .option('delimiter', '|')\
                    .option("header","false")\
                    .option("inferSchema","true")\
                    .load(acquisitions_path)

In [26]:
acquisitions.printSchema()

root
 |-- _c0: long (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: integer (nullable = true)
 |-- _c20: integer (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: integer (nullable = true)
 |-- _c24: string (nullable = true)



In [27]:
new_columns = ["LOAN_ID", "ORIG_CHN", "Seller_Name", "ORIG_RT", "ORIG_AMT", "ORIG_TRM", "ORIG_DTE"
                           ,"FRST_DTE", "OLTV", "OCLTV", "NUM_BO", "DTI", "CSCORE_B", "FTHB_FLG", "PURPOSE", "PROP_TYP"
                           ,"NUM_UNIT", "OCC_STAT", "STATE", "ZIP_3", "MI_PCT", "Product_Type", "CSCORE_C", "MI_TYPE", "RELOCATION_FLG"]

In [28]:
former_columns = acquisitions.columns

In [29]:
former_columns

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11',
 '_c12',
 '_c13',
 '_c14',
 '_c15',
 '_c16',
 '_c17',
 '_c18',
 '_c19',
 '_c20',
 '_c21',
 '_c22',
 '_c23',
 '_c24']

## 3.1.2. Rename Columns and Infer Schema

In [30]:
for c in range(len(new_columns)):
    acquisitions = acquisitions.withColumnRenamed(former_columns[c], new_columns[c])

In [31]:
acquisitions.printSchema()

root
 |-- LOAN_ID: long (nullable = true)
 |-- ORIG_CHN: string (nullable = true)
 |-- Seller_Name: string (nullable = true)
 |-- ORIG_RT: double (nullable = true)
 |-- ORIG_AMT: integer (nullable = true)
 |-- ORIG_TRM: integer (nullable = true)
 |-- ORIG_DTE: string (nullable = true)
 |-- FRST_DTE: string (nullable = true)
 |-- OLTV: integer (nullable = true)
 |-- OCLTV: integer (nullable = true)
 |-- NUM_BO: integer (nullable = true)
 |-- DTI: integer (nullable = true)
 |-- CSCORE_B: integer (nullable = true)
 |-- FTHB_FLG: string (nullable = true)
 |-- PURPOSE: string (nullable = true)
 |-- PROP_TYP: string (nullable = true)
 |-- NUM_UNIT: integer (nullable = true)
 |-- OCC_STAT: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIP_3: integer (nullable = true)
 |-- MI_PCT: integer (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- CSCORE_C: integer (nullable = true)
 |-- MI_TYPE: integer (nullable = true)
 |-- RELOCATION_FLG: string (nullable = true

In [32]:
acquisitions.show(5)

+------------+--------+--------------------+-------+--------+--------+--------+--------+----+-----+------+---+--------+--------+-------+--------+--------+--------+-----+-----+------+------------+--------+-------+--------------+
|     LOAN_ID|ORIG_CHN|         Seller_Name|ORIG_RT|ORIG_AMT|ORIG_TRM|ORIG_DTE|FRST_DTE|OLTV|OCLTV|NUM_BO|DTI|CSCORE_B|FTHB_FLG|PURPOSE|PROP_TYP|NUM_UNIT|OCC_STAT|STATE|ZIP_3|MI_PCT|Product_Type|CSCORE_C|MI_TYPE|RELOCATION_FLG|
+------------+--------+--------------------+-------+--------+--------+--------+--------+----+-----+------+---+--------+--------+-------+--------+--------+--------+-----+-----+------+------------+--------+-------+--------------+
|100001420754|       C|PROVIDENT FUNDING...|  3.375|  217000|     360| 11/2016| 01/2017|  74|   74|     1| 30|     785|       N|      R|      SF|       1|       P|   PA|  194|  null|         FRM|    null|   null|             N|
|100003665787|       R|JPMORGAN CHASE BA...|  3.125|  374000|     360| 08/2016| 10/2016|

***In order to gain performance in the dataframe I persist the modified dataframe so every time I make an action it wont have to make all the previous transformations. Therefore, the previous steps will only be necessary the first time.***

In [33]:
acquisitions.write.parquet(df_acquisitions, mode = 'overwrite')

## 3.1.3. Read Total Acquisition File

In [34]:
acquisitions = spark.read.parquet(df_acquisitions)

In [35]:
acquisitions.count()

9366884

In [36]:
acquisitions.show(5, truncate = False)

+------------+--------+--------------------------------------------------------------------+-------+--------+--------+--------+--------+----+-----+------+---+--------+--------+-------+--------+--------+--------+-----+-----+------+------------+--------+-------+--------------+
|LOAN_ID     |ORIG_CHN|Seller_Name                                                         |ORIG_RT|ORIG_AMT|ORIG_TRM|ORIG_DTE|FRST_DTE|OLTV|OCLTV|NUM_BO|DTI|CSCORE_B|FTHB_FLG|PURPOSE|PROP_TYP|NUM_UNIT|OCC_STAT|STATE|ZIP_3|MI_PCT|Product_Type|CSCORE_C|MI_TYPE|RELOCATION_FLG|
+------------+--------+--------------------------------------------------------------------+-------+--------+--------+--------+--------+----+-----+------+---+--------+--------+-------+--------+--------+--------+-----+-----+------+------------+--------+-------+--------------+
|100000040778|B       |UNITED SHORE FINANCIAL SERVICES, LLC D/B/A UNITED WHOLESALE MORTGAGE|3.875  |238000  |360     |06/2016 |08/2016 |52  |52   |2     |29 |782     |N    

## 3.2. Performance File

In [37]:
performance = spark.read.format('csv')\
                    .option('delimiter', '|')\
                    .option("header","false")\
                    .option("inferSchema","true")\
                    .load(performance_path)

In [38]:
performance.printSchema()

root
 |-- _c0: long (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: double (nullable = true)
 |-- _c18: double (nullable = true)
 |-- _c19: double (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: double (nullable = true)
 |-- _c23: double (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: double (nullable = true)
 |-- _c26: double (nullable = true)
 |-- _c27: double (nullable = 

In [39]:
new_columns = ["LOAN_ID", "Monthly_Rpt_Prd", "Servicer_Name", "LAST_RT", "LAST_UPB", "Loan_Age", "Months_To_Legal_Mat"
                          , "Adj_Month_To_Mat", "Maturity_Date", "MSA", "Delq_Status", "MOD_FLAG", "Zero_Bal_Code", 
                          "ZB_DTE", "LPI_DTE", "FCC_DTE","DISP_DT", "FCC_COST", "PP_COST", "AR_COST", "IE_COST", "TAX_COST", "NS_PROCS",
                          "CE_PROCS", "RMW_PROCS", "O_PROCS", "NON_INT_UPB", "PRIN_FORG_UPB_FHFA", "REPCH_FLAG", "PRIN_FORG_UPB_OTH", "TRANSFER_FLG"]

In [40]:
former_columns = performance.columns

## 3.2.1. Rename Columns

In [41]:
for c in range(len(new_columns)):
    performance = performance.withColumnRenamed(former_columns[c], new_columns[c])

In [42]:
performance.printSchema()

root
 |-- LOAN_ID: long (nullable = true)
 |-- Monthly_Rpt_Prd: string (nullable = true)
 |-- Servicer_Name: string (nullable = true)
 |-- LAST_RT: double (nullable = true)
 |-- LAST_UPB: double (nullable = true)
 |-- Loan_Age: integer (nullable = true)
 |-- Months_To_Legal_Mat: integer (nullable = true)
 |-- Adj_Month_To_Mat: integer (nullable = true)
 |-- Maturity_Date: string (nullable = true)
 |-- MSA: integer (nullable = true)
 |-- Delq_Status: string (nullable = true)
 |-- MOD_FLAG: string (nullable = true)
 |-- Zero_Bal_Code: integer (nullable = true)
 |-- ZB_DTE: string (nullable = true)
 |-- LPI_DTE: string (nullable = true)
 |-- FCC_DTE: string (nullable = true)
 |-- DISP_DT: string (nullable = true)
 |-- FCC_COST: double (nullable = true)
 |-- PP_COST: double (nullable = true)
 |-- AR_COST: double (nullable = true)
 |-- IE_COST: double (nullable = true)
 |-- TAX_COST: double (nullable = true)
 |-- NS_PROCS: double (nullable = true)
 |-- CE_PROCS: double (nullable = true)
 |-

In [43]:
performance.show(5)

+------------+---------------+-------------+-------+--------+--------+-------------------+----------------+-------------+-----+-----------+--------+-------------+------+-------+-------+-------+--------+-------+-------+-------+--------+--------+--------+---------+-------+-----------+------------------+----------+-----------------+------------+
|     LOAN_ID|Monthly_Rpt_Prd|Servicer_Name|LAST_RT|LAST_UPB|Loan_Age|Months_To_Legal_Mat|Adj_Month_To_Mat|Maturity_Date|  MSA|Delq_Status|MOD_FLAG|Zero_Bal_Code|ZB_DTE|LPI_DTE|FCC_DTE|DISP_DT|FCC_COST|PP_COST|AR_COST|IE_COST|TAX_COST|NS_PROCS|CE_PROCS|RMW_PROCS|O_PROCS|NON_INT_UPB|PRIN_FORG_UPB_FHFA|REPCH_FLAG|PRIN_FORG_UPB_OTH|TRANSFER_FLG|
+------------+---------------+-------------+-------+--------+--------+-------------------+----------------+-------------+-----+-----------+--------+-------------+------+-------+-------+-------+--------+-------+-------+-------+--------+--------+--------+---------+-------+-----------+------------------+--------

***In order to gain performance in the dataframe I persist the modified dataframe so every time I make an action it wont have to make all the previous transformations. Therefore, the previous steps will only be necessary the first time.***

In [44]:
performance.write.parquet(df_performance, mode = 'overwrite')

## 3.1.3. Read Total Performance File

In [45]:
performance = spark.read.parquet(df_performance)

In [46]:
performance.count()

298507297

In [47]:
performance.show(5, truncate = False)

+------------+---------------+-------------+-------+--------+--------+-------------------+----------------+-------------+-----+-----------+--------+-------------+------+-------+-------+-------+--------+-------+-------+-------+--------+--------+--------+---------+-------+-----------+------------------+----------+-----------------+------------+
|LOAN_ID     |Monthly_Rpt_Prd|Servicer_Name|LAST_RT|LAST_UPB|Loan_Age|Months_To_Legal_Mat|Adj_Month_To_Mat|Maturity_Date|MSA  |Delq_Status|MOD_FLAG|Zero_Bal_Code|ZB_DTE|LPI_DTE|FCC_DTE|DISP_DT|FCC_COST|PP_COST|AR_COST|IE_COST|TAX_COST|NS_PROCS|CE_PROCS|RMW_PROCS|O_PROCS|NON_INT_UPB|PRIN_FORG_UPB_FHFA|REPCH_FLAG|PRIN_FORG_UPB_OTH|TRANSFER_FLG|
+------------+---------------+-------------+-------+--------+--------+-------------------+----------------+-------------+-----+-----------+--------+-------------+------+-------+-------+-------+--------+-------+-------+-------+--------+--------+--------+---------+-------+-----------+------------------+--------