In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,concat,lit,floor,rand
spark = SparkSession.builder.appName("ETLPractice").getOrCreate()
source_path = "loan.csv"
target_path = "loan_result.csv"
load_data = spark.read.csv("loan.csv",header = True, inferSchema = True)

In [4]:
load_data.columns
load_data.show(5)

+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+
|Customer_ID|Age|Gender|  Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|
+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+
|    IB14001| 30|  MALE|BANK MANAGER|        SINGLE|          4| 50000|      22199|            6|      HOUSING| 10,00,000 |      5|      42,898|               6|                 9|
|    IB14008| 44|  MALE|   PROFESSOR|       MARRIED|          6| 51000|      19999|            4|     SHOPPING|     50,000|      3|      33,999|               1|                 5|
|    IB14012| 30|FEMALE|     DENTIST|        SINGLE|          3| 58450|      27675|            

#### Transformation 1: Concatenate Customer_ID and Occupation

In [6]:
load_data = load_data.withColumn("full_name", concat(col('Customer_ID'), lit(' '), col('Occupation')))
load_data.show()

+-----------+---+------+-------------------+--------------+-----------+------+-----------+-------------+------------------+-----------+-------+------------+----------------+------------------+--------------------+
|Customer_ID|Age|Gender|         Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|     Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|           full_name|
+-----------+---+------+-------------------+--------------+-----------+------+-----------+-------------+------------------+-----------+-------+------------+----------------+------------------+--------------------+
|    IB14001| 30|  MALE|       BANK MANAGER|        SINGLE|          4| 50000|      22199|            6|           HOUSING| 10,00,000 |      5|      42,898|               6|                 9|IB14001 BANK MANAGER|
|    IB14008| 44|  MALE|          PROFESSOR|       MARRIED|          6| 51000|      19999|            4|          SHOPPING|     50,000|      3| 

#### Transformation 2: Calculate Income (subtract 10% taxes)

In [7]:
load_data = load_data.withColumn('Income', floor(lit(10000) + rand() * lit(50)))
load_data.show(10)

+-----------+---+------+-----------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+--------------------+
|Customer_ID|Age|Gender|       Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|           full_name|
+-----------+---+------+-----------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+--------------------+
|    IB14001| 30|  MALE|     BANK MANAGER|        SINGLE|          4| 10031|      22199|            6|      HOUSING| 10,00,000 |      5|      42,898|               6|                 9|IB14001 BANK MANAGER|
|    IB14008| 44|  MALE|        PROFESSOR|       MARRIED|          6| 10006|      19999|            4|     SHOPPING|     50,000|      3|      33,999|               1|      

#### Adding Eligibilty Score column

In [8]:
load_data = load_data.withColumn('Loan_Eligibility_Score', floor(lit(20) + rand() * lit(31)))
load_data.show(10)

+-----------+---+------+-----------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+--------------------+----------------------+
|Customer_ID|Age|Gender|       Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|           full_name|Loan_Eligibility_Score|
+-----------+---+------+-----------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+--------------------+----------------------+
|    IB14001| 30|  MALE|     BANK MANAGER|        SINGLE|          4| 10031|      22199|            6|      HOUSING| 10,00,000 |      5|      42,898|               6|                 9|IB14001 BANK MANAGER|                    47|
|    IB14008| 44|  MALE|        PROFESSOR|       MARRIED|          6| 10006|    

#### Transformation 3: Filter by Age (age >=30)

In [9]:
load_data = load_data.filter(col('age')>=30)
load_data.show()

+-----------+---+------+-------------------+--------------+-----------+------+-----------+-------------+------------------+-----------+-------+------------+----------------+------------------+--------------------+----------------------+
|Customer_ID|Age|Gender|         Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|     Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|           full_name|Loan_Eligibility_Score|
+-----------+---+------+-------------------+--------------+-----------+------+-----------+-------------+------------------+-----------+-------+------------+----------------+------------------+--------------------+----------------------+
|    IB14001| 30|  MALE|       BANK MANAGER|        SINGLE|          4| 10031|      22199|            6|           HOUSING| 10,00,000 |      5|      42,898|               6|                 9|IB14001 BANK MANAGER|                    47|
|    IB14008| 44|  MALE|          PROFESSOR|       M

#### Transformation 4: Group by Age and Calculate Average Salary

In [10]:
avg_salary_by_age = load_data.groupBy('age').agg({'Income':'avg'}).withColumnRenamed('avg(salary)', 'avg_salary')
avg_salary_by_age.show()

+---+------------------+
|age|       avg(Income)|
+---+------------------+
| 31|10025.266666666666|
| 53|           10026.0|
| 34| 10024.07142857143|
| 44|           10023.0|
| 47| 10026.23076923077|
| 52|10022.666666666666|
| 40|           10019.8|
| 57|           10035.2|
| 54|10023.387096774193|
| 48| 10024.57142857143|
| 41|          10018.25|
| 43|         10026.375|
| 37|10019.636363636364|
| 35|           10026.5|
| 55|10022.666666666666|
| 59| 10036.09090909091|
| 39|           10023.0|
| 49| 10022.92857142857|
| 51|10020.055555555555|
| 50|           10026.2|
+---+------------------+
only showing top 20 rows



In [11]:
load_data = load_data.orderBy('age')
load_data.show()

+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+------------------+-----------+-------+------------+----------------+------------------+--------------------+----------------------+
|Customer_ID|Age|Gender|          Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|     Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|           full_name|Loan_Eligibility_Score|
+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+------------------+-----------+-------+------------+----------------+------------------+--------------------+----------------------+
|    IB14085| 30|  MALE|         ELECTRICIAN|       MARRIED|          4| 10014|      15000|            5|           HOUSING|  3,54,789 |      5|      32,154|               5|                 5| IB14085 ELECTRICIAN|                    24|
|    IB14012| 30|FEMALE|             DENTIST|   

#### Save the transformed data to an external CSV file

In [12]:
load_data.write.csv(target_path, mode='overwrite', header=True)