In [68]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, lit
import os

In [2]:
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-18.0.2.1"

In [3]:
con = SparkConf().setAppName("Example").setMaster("local[*]").set("spark.driver.extraClassPath","C:/pyspark/*") \
        .set("spark.executor.extraClassPath","C:/pyspark/*")

In [4]:
sc = SparkContext.getOrCreate(conf=con)
spark = SparkSession(sc)

In [5]:
spark

In [6]:
database = "dimasdb"
table = "contract_loan_ledger"
password = "Saifullah2398"
user = "spark_sql"
schema = "dbo"
jdbc_url = f"jdbc:sqlserver://localhost:1433;database={database};encrypt=true;trustServerCertificate=true"

In [7]:
df_loan = spark.read \
        .format('jdbc') \
        .option('url', jdbc_url) \
        .option('dbtable', f"{schema}.{table}") \
        .option('user', user) \
        .option('password', password) \
        .option('driver', "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .load()

In [8]:
df_loan.show()

+---------+-----------+--------------------+------+--------------------+-------------+---------------+---------+--------------------+-------------------+
|ledger_id|contract_id|          created_at|period|         ledger_type|ledger_status|initial_balance|  balance|            due_date|      paid_off_date|
+---------+-----------+--------------------+------+--------------------+-------------+---------------+---------+--------------------+-------------------+
|  00q7jho|      wrv68|2020-07-20 09:33:...|     2|           PRINCIPAL|         PAID|       270000.0|      0.0|2020-09-20 16:59:...|2020-09-25 12:21:53|
|  00shmdx|      7ozx8|2020-09-23 17:08:...|     2|            LATE_FEE|       UNPAID|       315000.0| 315000.0|2020-09-20 16:59:...|               null|
|  02c17sg|      jo3e8|2020-07-20 09:41:...|     2|            INTEREST|         PAID|            0.0|      0.0|2020-09-20 16:59:...|2020-09-26 10:44:47|
|  02koly8|      8kms6|2020-07-20 04:05:...|     1|           PRINCIPAL|    

In [9]:
df_loan = df_loan.withColumn('created_at', col('created_at').cast('timestamp'))
df_loan = df_loan.withColumn('due_date', col('due_date').cast('timestamp'))
df_loan = df_loan.withColumn('paid_off_date', col('paid_off_date').cast('timestamp'))

In [10]:
df_loan.limit(10).show()

+---------+-----------+--------------------+------+-----------+-------------+---------------+--------+--------------------+-------------------+
|ledger_id|contract_id|          created_at|period|ledger_type|ledger_status|initial_balance| balance|            due_date|      paid_off_date|
+---------+-----------+--------------------+------+-----------+-------------+---------------+--------+--------------------+-------------------+
|  00q7jho|      wrv68|2020-07-20 09:33:...|     2|  PRINCIPAL|         PAID|       270000.0|     0.0|2020-09-20 16:59:...|2020-09-25 12:21:53|
|  00shmdx|      7ozx8|2020-09-23 17:08:...|     2|   LATE_FEE|       UNPAID|       315000.0|315000.0|2020-09-20 16:59:...|               null|
|  02c17sg|      jo3e8|2020-07-20 09:41:...|     2|   INTEREST|         PAID|            0.0|     0.0|2020-09-20 16:59:...|2020-09-26 10:44:47|
|  02koly8|      8kms6|2020-07-20 04:05:...|     1|  PRINCIPAL|         PAID|       628500.0|     0.0|2020-08-20 16:59:...|2020-08-25 05

In [11]:
df_loan = df_loan.withColumnRenamed("ledger_status","status")
df_loan.show()

+---------+-----------+--------------------+------+--------------------+------+---------------+---------+--------------------+-------------------+
|ledger_id|contract_id|          created_at|period|         ledger_type|status|initial_balance|  balance|            due_date|      paid_off_date|
+---------+-----------+--------------------+------+--------------------+------+---------------+---------+--------------------+-------------------+
|  00q7jho|      wrv68|2020-07-20 09:33:...|     2|           PRINCIPAL|  PAID|       270000.0|      0.0|2020-09-20 16:59:...|2020-09-25 12:21:53|
|  00shmdx|      7ozx8|2020-09-23 17:08:...|     2|            LATE_FEE|UNPAID|       315000.0| 315000.0|2020-09-20 16:59:...|               null|
|  02c17sg|      jo3e8|2020-07-20 09:41:...|     2|            INTEREST|  PAID|            0.0|      0.0|2020-09-20 16:59:...|2020-09-26 10:44:47|
|  02koly8|      8kms6|2020-07-20 04:05:...|     1|           PRINCIPAL|  PAID|       628500.0|      0.0|2020-08-20 16

In [12]:
df_loan.count()

1097

In [13]:
df_loan_pick = df_loan.select("ledger_id","status","initial_balance")
df_loan_pick.show()

+---------+------+---------------+
|ledger_id|status|initial_balance|
+---------+------+---------------+
|  00q7jho|  PAID|       270000.0|
|  00shmdx|UNPAID|       315000.0|
|  02c17sg|  PAID|            0.0|
|  02koly8|  PAID|       628500.0|
|  04118sr|UNPAID|            0.0|
|  05m6tmf|  PAID|    1.4014296E7|
|  09d9bex|UNPAID|       261000.0|
|  0c91ud5|UNPAID|            0.0|
|  0ctzbu9|UNPAID|            0.0|
|  0dk6f4m|UNPAID|       630000.0|
|  0gtmp7w|UNPAID|      1120704.0|
|  0hgpn4w|  PAID|       261000.0|
|  0j2dkm3|  PAID|            0.0|
|  0jk7r35|UNPAID|            0.0|
|  0jkma0b|UNPAID|       309000.0|
|  0lanxn0|UNPAID|       270000.0|
|  0mzccbd|UNPAID|            0.0|
|  0nfbm1u|UNPAID|       247000.0|
|  0no7ffz|UNPAID|      1006020.0|
|  0o3npxh|UNPAID|       646000.0|
+---------+------+---------------+
only showing top 20 rows



In [14]:
df_loan_pick.sort("initial_balance").show()

+---------+------+---------------+
|ledger_id|status|initial_balance|
+---------+------+---------------+
|  16ebwdc|UNPAID|            0.0|
|  13v0rf1|UNPAID|            0.0|
|  02c17sg|  PAID|            0.0|
|  04118sr|UNPAID|            0.0|
|  19d012g|UNPAID|            0.0|
|  0ctzbu9|UNPAID|            0.0|
|  0j2dkm3|  PAID|            0.0|
|  2ao81rd|UNPAID|            0.0|
|  0jk7r35|UNPAID|            0.0|
|  0odmn45|UNPAID|            0.0|
|  1irg2c5|UNPAID|            0.0|
|  0r1j585|UNPAID|            0.0|
|  0u05348|UNPAID|            0.0|
|  29nzi5e|UNPAID|            0.0|
|  1kwfwnk|UNPAID|            0.0|
|  0wp33p2|  PAID|            0.0|
|  1kz9mxs|UNPAID|            0.0|
|  0zzpzaj|UNPAID|            0.0|
|  1opgu92|  PAID|            0.0|
|  2gj8kc9|UNPAID|            0.0|
+---------+------+---------------+
only showing top 20 rows



In [15]:
df_loan_pick.sort(F.desc("initial_balance")).show()

+---------+------+---------------+
|ledger_id|status|initial_balance|
+---------+------+---------------+
|  05m6tmf|  PAID|    1.4014296E7|
|  2dmkzyd|  PAID|    1.4014296E7|
|  bk8a7nu|  PAID|    1.4014296E7|
|  j0rpov7|  PAID|    1.4014296E7|
|  rxln7g5|  PAID|    1.4014296E7|
|  sbxobdm|  PAID|    1.4014296E7|
|  swttytr|UNPAID|    1.4014296E7|
|  wi56t3v|  PAID|    1.4014296E7|
|  7hv1zns|UNPAID|      5033016.0|
|  srhw0hf|  PAID|      5033016.0|
|  b9r6zt1|UNPAID|      2067500.0|
|  h6a2ebp|UNPAID|      2067500.0|
|  8mq957a|UNPAID|      1560500.0|
|  8x68cai|UNPAID|      1560500.0|
|  9haj0eu|UNPAID|      1560500.0|
|  9me776e|UNPAID|      1560500.0|
|  ju7bxoh|UNPAID|      1560500.0|
|  zt2auvm|UNPAID|      1560500.0|
|  907f6ps|  PAID|      1297620.0|
|  hcac056|  PAID|      1297620.0|
+---------+------+---------------+
only showing top 20 rows



In [16]:
df_loan.printSchema()

root
 |-- ledger_id: string (nullable = true)
 |-- contract_id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- period: integer (nullable = true)
 |-- ledger_type: string (nullable = true)
 |-- status: string (nullable = true)
 |-- initial_balance: double (nullable = true)
 |-- balance: double (nullable = true)
 |-- due_date: timestamp (nullable = true)
 |-- paid_off_date: timestamp (nullable = true)



In [17]:
df_loan_pick.printSchema()

root
 |-- ledger_id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- initial_balance: double (nullable = true)



In [18]:
df_loan.select("status","initial_balance").filter("status = 'PAID' ").show()

+------+---------------+
|status|initial_balance|
+------+---------------+
|  PAID|       270000.0|
|  PAID|            0.0|
|  PAID|       628500.0|
|  PAID|    1.4014296E7|
|  PAID|       261000.0|
|  PAID|            0.0|
|  PAID|       369000.0|
|  PAID|            0.0|
|  PAID|       383000.0|
|  PAID|        80480.0|
|  PAID|       403500.0|
|  PAID|       224000.0|
|  PAID|            0.0|
|  PAID|            0.0|
|  PAID|       224000.0|
|  PAID|       215880.0|
|  PAID|       351500.0|
|  PAID|       351500.0|
|  PAID|    1.4014296E7|
|  PAID|       547500.0|
+------+---------------+
only showing top 20 rows



In [19]:
df_loan.select(df_loan.status, df_loan.initial_balance).filter("status = 'PAID' ").show()

+------+---------------+
|status|initial_balance|
+------+---------------+
|  PAID|       270000.0|
|  PAID|            0.0|
|  PAID|       628500.0|
|  PAID|    1.4014296E7|
|  PAID|       261000.0|
|  PAID|            0.0|
|  PAID|       369000.0|
|  PAID|            0.0|
|  PAID|       383000.0|
|  PAID|        80480.0|
|  PAID|       403500.0|
|  PAID|       224000.0|
|  PAID|            0.0|
|  PAID|            0.0|
|  PAID|       224000.0|
|  PAID|       215880.0|
|  PAID|       351500.0|
|  PAID|       351500.0|
|  PAID|    1.4014296E7|
|  PAID|       547500.0|
+------+---------------+
only showing top 20 rows



In [20]:
df_loan.select("ledger_id", "status").filter("ledger_id like '%56%' ").show()

+---------+------+
|ledger_id|status|
+---------+------+
|  356sdh9|UNPAID|
|  562yelb|UNPAID|
|  56essfc|UNPAID|
|  6d0q566|UNPAID|
|  8ezk56t|UNPAID|
|  hcac056|  PAID|
|  mo566zw|UNPAID|
|  wi56t3v|  PAID|
+---------+------+



In [21]:
df_loan.select("ledger_id","period","ledger_type","due_date").filter((df_loan.ledger_id.like("%56%")) & (df_loan.period == 2)).show()

+---------+------+-----------+--------------------+
|ledger_id|period|ledger_type|            due_date|
+---------+------+-----------+--------------------+
|  hcac056|     2|  PRINCIPAL| 2020-09-20 16:59:59|
|  mo566zw|     2|   INTEREST|2020-09-20 16:59:...|
+---------+------+-----------+--------------------+



In [22]:
df_loan.createOrReplaceTempView("ledger")
spark.sql("SELECT COUNT(*) FROM ledger").show()

+--------+
|count(1)|
+--------+
|    1097|
+--------+



In [27]:
spark.sql("SELECT ledger_id, status, balance FROM ledger WHERE balance > 0 ").show()

+---------+------+---------+
|ledger_id|status|  balance|
+---------+------+---------+
|  00shmdx|UNPAID| 315000.0|
|  09d9bex|UNPAID| 261000.0|
|  0dk6f4m|UNPAID| 630000.0|
|  0gtmp7w|UNPAID|1120704.0|
|  0jkma0b|UNPAID| 309000.0|
|  0lanxn0|UNPAID| 270000.0|
|  0nfbm1u|UNPAID| 247000.0|
|  0no7ffz|UNPAID|1006020.0|
|  0o3npxh|UNPAID| 646000.0|
|  0papqv6|UNPAID| 261000.0|
|  0rebryg|UNPAID| 315000.0|
|  0sc5go8|UNPAID| 155000.0|
|  0sdw7ft|UNPAID| 547500.0|
|  0u2c8dn|UNPAID| 379000.0|
|  0u6ide9|UNPAID| 959500.0|
|  0vcudec|UNPAID| 324000.0|
|  0wmrszv|UNPAID| 475000.0|
|  0zsyz6v|UNPAID| 116000.0|
|  14dh2e5|UNPAID| 646000.0|
|  15b4qu0|UNPAID| 224000.0|
+---------+------+---------+
only showing top 20 rows



In [28]:
df_contract = spark.read \
            .format("jdbc") \
            .option('url', jdbc_url) \
            .option('user', user) \
            .option('password', password) \
            .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
            .option('dbtable', f"dbo.contract_loan") \
            .load()

In [29]:
df_contract.createOrReplaceTempView("contract")
df_contract.cache()

DataFrame[contract_id: string, created_at: timestamp, contract_status: string, tenure: int, loan_amount: int, provision: double, interest: double, principal: double]

In [40]:
spark.sql("""
SELECT l.ledger_type, SUM(c.principal) AS principal_amount
FROM ledger l JOIN contract c ON l.contract_id = c.contract_id
GROUP BY l.ledger_type
ORDER BY SUM(c.principal) desc
""").show()

+--------------------+----------------+
|         ledger_type|principal_amount|
+--------------------+----------------+
|           PRINCIPAL|    2.35587404E9|
|            INTEREST|    2.35587404E9|
|            LATE_FEE|     3.5793018E8|
|RESTRUCTURE_DOWN_...|       2800020.0|
+--------------------+----------------+



In [44]:
contract_join = df_contract.join(df_loan, ['contract_id'], how='inner')
contract_join.limit(10).show()

+-----------+--------------------+---------------+------+-----------+---------+---------+-----------+---------+--------------------+------+-----------+------+---------------+--------+--------------------+-------------------+
|contract_id|          created_at|contract_status|tenure|loan_amount|provision| interest|  principal|ledger_id|          created_at|period|ledger_type|status|initial_balance| balance|            due_date|      paid_off_date|
+-----------+--------------------+---------------+------+-----------+---------+---------+-----------+---------+--------------------+------+-----------+------+---------------+--------+--------------------+-------------------+
|      wrv68|2020-07-20 09:33:...|         ACTIVE|     6|    1618500|   1500.0|      0.0|  1620000.0|  00q7jho|2020-07-20 09:33:...|     2|  PRINCIPAL|  PAID|       270000.0|     0.0|2020-09-20 16:59:...|2020-09-25 12:21:53|
|      7ozx8|2020-07-20 06:15:...|         ACTIVE|     6|    2021000|   1000.0|      0.0|  2022000.0

In [46]:
contract_join.groupBy(['contract_id','ledger_type']).agg(
                        F.sum('interest').alias('total_interest'), \
                        F.max('interest').alias('max_interest')).show()

+-----------+-----------+--------------+------------+
|contract_id|ledger_type|total_interest|max_interest|
+-----------+-----------+--------------+------------+
|      7ozx8|   INTEREST|           0.0|         0.0|
|      7g7ra|   LATE_FEE|           0.0|         0.0|
|      alq2l|   INTEREST|     1378440.0|    459480.0|
|      kpu81|   INTEREST|     1378440.0|    459480.0|
|      7isno|  PRINCIPAL|     6890352.0|   2296784.0|
|      wn12p|   INTEREST|           0.0|         0.0|
|      jb3yv|   INTEREST|           0.0|         0.0|
|      7qcon|   INTEREST|           0.0|         0.0|
|      lfex0|  PRINCIPAL|     1378440.0|    459480.0|
|      4064n|  PRINCIPAL|           0.0|         0.0|
|      wrv68|   INTEREST|           0.0|         0.0|
|      1sk6y|   LATE_FEE|           0.0|         0.0|
|      p8438|  PRINCIPAL|           0.0|         0.0|
|      9pt6u|   INTEREST|           0.0|         0.0|
|      bmmv1|  PRINCIPAL|   1.9198512E7|   6399504.0|
|      5a6jf|   INTEREST|   

In [48]:
roi = contract_join.withColumn('ReturnOnInvestment', F.col('interest') + F.col('principal'))
roi.limit(10).show()

+-----------+--------------------+---------------+------+-----------+---------+---------+-----------+---------+--------------------+------+-----------+------+---------------+--------+--------------------+-------------------+------------------+
|contract_id|          created_at|contract_status|tenure|loan_amount|provision| interest|  principal|ledger_id|          created_at|period|ledger_type|status|initial_balance| balance|            due_date|      paid_off_date|ReturnOnInvestment|
+-----------+--------------------+---------------+------+-----------+---------+---------+-----------+---------+--------------------+------+-----------+------+---------------+--------+--------------------+-------------------+------------------+
|      wrv68|2020-07-20 09:33:...|         ACTIVE|     6|    1618500|   1500.0|      0.0|  1620000.0|  00q7jho|2020-07-20 09:33:...|     2|  PRINCIPAL|  PAID|       270000.0|     0.0|2020-09-20 16:59:...|2020-09-25 12:21:53|         1620000.0|
|      7ozx8|2020-07-20 

In [61]:
roi = roi.withColumn('Duration', 
                    F.when((F.col('tenure') == 2), 'Short') \
                     .when((F.col('tenure') == 3), 'Short') \
                     .when((F.col('tenure') == 4), 'Medium') \
                     .when((F.col('tenure') == 5), 'Medium') \
                     .when((F.col('tenure') == 6), 'Long')
                    )
roi.limit(10).show()

+-----------+--------------------+---------------+------+-----------+---------+---------+-----------+---------+--------------------+------+-----------+------+---------------+--------+--------------------+-------------------+------------------+--------+
|contract_id|          created_at|contract_status|tenure|loan_amount|provision| interest|  principal|ledger_id|          created_at|period|ledger_type|status|initial_balance| balance|            due_date|      paid_off_date|ReturnOnInvestment|Duration|
+-----------+--------------------+---------------+------+-----------+---------+---------+-----------+---------+--------------------+------+-----------+------+---------------+--------+--------------------+-------------------+------------------+--------+
|      wrv68|2020-07-20 09:33:...|         ACTIVE|     6|    1618500|   1500.0|      0.0|  1620000.0|  00q7jho|2020-07-20 09:33:...|     2|  PRINCIPAL|  PAID|       270000.0|     0.0|2020-09-20 16:59:...|2020-09-25 12:21:53|         1620000.

In [60]:
roi.select('tenure').distinct().collect()

[Row(tenure=6), Row(tenure=3), Row(tenure=5), Row(tenure=4), Row(tenure=2)]

In [62]:
roi.select('Duration').distinct().collect()

[Row(Duration='Medium'), Row(Duration='Long'), Row(Duration='Short')]

In [66]:
df_bi = spark.read \
        .format('jdbc') \
        .option('url', jdbc_url) \
        .option('dbtable', f"dbo.business_data") \
        .option('user', user) \
        .option('password', password) \
        .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
        .load()

In [67]:
df_bi.show()

+-------+--------------------+--------------------+-------+-------+-------+-------+
|column1|             column2|             column3|column4|column5|column6|column7|
+-------+--------------------+--------------------+-------+-------+-------+-------+
|   null|Context: p2p lend...|                null|   null|   null|   null|   null|
|   null|                null|                null|   null|   null|   null|   null|
|   null|      LOAN_CONTRACTS|                null|   null|   null|   null|   null|
|   null|                null|                null|   null|   null|   null|   null|
|   null|              FIELDS|         DESCRIPTION|   null|   null|   null|   null|
|   null|         contract_id|                null|   null|   null|   null|   null|
|   null|          created_at|Timestamp of cont...|   null|   null|   null|   null|
|   null|     contract_status|- active\n- finis...|   null|   null|   null|   null|
|   null|              tenure|Duration of loan ...|   null|   null|   null| 

In [69]:
df_bi = df_bi.withColumn('column3', when(df_bi.column3.isNull(), lit('0')).otherwise(df_bi.column3))
df_bi.limit(10).show()

+-------+--------------------+--------------------+-------+-------+-------+-------+
|column1|             column2|             column3|column4|column5|column6|column7|
+-------+--------------------+--------------------+-------+-------+-------+-------+
|   null|Context: p2p lend...|                   0|   null|   null|   null|   null|
|   null|                null|                   0|   null|   null|   null|   null|
|   null|      LOAN_CONTRACTS|                   0|   null|   null|   null|   null|
|   null|                null|                   0|   null|   null|   null|   null|
|   null|              FIELDS|         DESCRIPTION|   null|   null|   null|   null|
|   null|         contract_id|                   0|   null|   null|   null|   null|
|   null|          created_at|Timestamp of cont...|   null|   null|   null|   null|
|   null|     contract_status|- active\n- finis...|   null|   null|   null|   null|
|   null|              tenure|Duration of loan ...|   null|   null|   null| 

In [70]:
df_bi = df_bi.drop('column7')
df_bi.limit(10).show()

+-------+--------------------+--------------------+-------+-------+-------+
|column1|             column2|             column3|column4|column5|column6|
+-------+--------------------+--------------------+-------+-------+-------+
|   null|Context: p2p lend...|                   0|   null|   null|   null|
|   null|                null|                   0|   null|   null|   null|
|   null|      LOAN_CONTRACTS|                   0|   null|   null|   null|
|   null|                null|                   0|   null|   null|   null|
|   null|              FIELDS|         DESCRIPTION|   null|   null|   null|
|   null|         contract_id|                   0|   null|   null|   null|
|   null|          created_at|Timestamp of cont...|   null|   null|   null|
|   null|     contract_status|- active\n- finis...|   null|   null|   null|
|   null|              tenure|Duration of loan ...|   null|   null|   null|
|   null|         loan_amount|                   0|   null|   null|   null|
+-------+---