In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.appName("Capston project").getOrCreate()

In [4]:
sc = spark.sparkContext

### Loading file

In [5]:
data = sc.textFile("C:/Users/Miles/Desktop/Big Data/Capstone/credit-card-default-1000.csv")

In [6]:
data

C:/Users/Miles/Desktop/Big Data/Capstone/credit-card-default-1000.csv MapPartitionsRDD[1] at textFile at DirectMethodHandleAccessor.java:104

In [7]:
type(data)

pyspark.rdd.RDD

In [8]:
data.take(4)

['CUSTID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_1,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,DEFAULTED',
 '530,20000,2,2,2,21,-1,-1,2,2,-2,-2,0,0,0,0,0,0,0,0,0,0,162000,0,0',
 '38,60000,2,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1576,0',
 '43,10000,1,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1500,0']

### Removing headers

In [9]:
header = data.first()

In [10]:
data = data.filter(lambda x:x!=header)

In [11]:
data.take(5)

['530,20000,2,2,2,21,-1,-1,2,2,-2,-2,0,0,0,0,0,0,0,0,0,0,162000,0,0',
 '38,60000,2,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1576,0',
 '43,10000,1,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1500,0',
 '47,20000,2,1,2,22,0,0,2,-1,0,-1,1131,291,582,291,0,291,291,582,0,0,130291,651,0',
 '70,20000,1,4,2,22,2,0,0,0,-1,-1,1692,13250,433,1831,0,2891,13250,433,1831,0,2891,153504,0']

In [12]:
data.count()

1002

In [13]:
data = data.map(lambda x:x.split(','))

In [14]:
data.take(1)

[['530',
  '20000',
  '2',
  '2',
  '2',
  '21',
  '-1',
  '-1',
  '2',
  '2',
  '-2',
  '-2',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '162000',
  '0',
  '0']]

### 3.Cleanup data. Remove lines that are not "CSV"

In [15]:
data = data.filter(lambda x:all(x))

In [16]:
data.count()

1000

In [17]:
data.take(1)

[['530',
  '20000',
  '2',
  '2',
  '2',
  '21',
  '-1',
  '-1',
  '2',
  '2',
  '-2',
  '-2',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '162000',
  '0',
  '0']]

### 4. Remove double quotes that are present in few records.

### Converting object column into integer

In [18]:
data1 = data.map(lambda x: [int(x[i]) if i!=2 else x[i] for i in range(len(x))])

In [19]:
data1.take(1)

[[530,
  20000,
  '2',
  2,
  2,
  21,
  -1,
  -1,
  2,
  2,
  -2,
  -2,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  162000,
  0,
  0]]

### Converting it into dataframe

In [20]:
header

'CUSTID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_1,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,DEFAULTED'

In [21]:
data1.take(2)

[[530,
  20000,
  '2',
  2,
  2,
  21,
  -1,
  -1,
  2,
  2,
  -2,
  -2,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  162000,
  0,
  0],
 [38,
  60000,
  '2',
  2,
  2,
  22,
  0,
  0,
  0,
  0,
  -2,
  -2,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1576,
  0]]

In [22]:
header 

'CUSTID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_1,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,DEFAULTED'

In [23]:
# df = spark.createDataFrame(data, schema= schema)
df = data1.toDF(schema = header.split(','))

In [24]:
df.show(5)

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+
|   530|    20000|  2|        2|       2| 21|   -1|   -1|    2|    2|   -2|   -2|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0|  162000|       0|        0|
|    38|    60000|  2|        2|       2| 22|    0|    0|    0|    0|   -2|   -2|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0

In [25]:
df.printSchema()

root
 |-- CUSTID: long (nullable = true)
 |-- LIMIT_BAL: long (nullable = true)
 |-- SEX: string (nullable = true)
 |-- EDUCATION: long (nullable = true)
 |-- MARRIAGE: long (nullable = true)
 |-- AGE: long (nullable = true)
 |-- PAY_1: long (nullable = true)
 |-- PAY_2: long (nullable = true)
 |-- PAY_3: long (nullable = true)
 |-- PAY_4: long (nullable = true)
 |-- PAY_5: long (nullable = true)
 |-- PAY_6: long (nullable = true)
 |-- BILL_AMT1: long (nullable = true)
 |-- BILL_AMT2: long (nullable = true)
 |-- BILL_AMT3: long (nullable = true)
 |-- BILL_AMT4: long (nullable = true)
 |-- BILL_AMT5: long (nullable = true)
 |-- BILL_AMT6: long (nullable = true)
 |-- PAY_AMT1: long (nullable = true)
 |-- PAY_AMT2: long (nullable = true)
 |-- PAY_AMT3: long (nullable = true)
 |-- PAY_AMT4: long (nullable = true)
 |-- PAY_AMT5: long (nullable = true)
 |-- PAY_AMT6: long (nullable = true)
 |-- DEFAULTED: long (nullable = true)



### 5. rounding of age to range of 10s.  

In [26]:
df.show(5)

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+
|   530|    20000|  2|        2|       2| 21|   -1|   -1|    2|    2|   -2|   -2|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0|  162000|       0|        0|
|    38|    60000|  2|        2|       2| 22|    0|    0|    0|    0|   -2|   -2|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0

In [27]:
from pyspark.sql.functions import *

In [28]:
def func(x):
    x = floor(x/10) 
    x = x*10
#     return (x//10)*10
    return x
    
    
df = df.withColumn('range_of_10s', func(col('AGE')) )

In [29]:
df.show()

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|range_of_10s|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+
|   530|    20000|  2|        2|       2| 21|   -1|   -1|    2|    2|   -2|   -2|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0|  162000|       0|        0|          20|
|    38|    60000|  2|        2|       2| 22|    0|    0|    0|    0|   -2|   -2|        0|        0|        0|        0|   

### 6.Normalize sex to only 1 and 2.

In [30]:
df.withColumn("SEX",regexp_replace("SEX",'F','2')).show(10)

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|range_of_10s|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+
|   530|    20000|  2|        2|       2| 21|   -1|   -1|    2|    2|   -2|   -2|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0|  162000|       0|        0|          20|
|    38|    60000|  2|        2|       2| 22|    0|    0|    0|    0|   -2|   -2|        0|        0|        0|        0|   

In [31]:
df = df.withColumn("SEX",when(df['SEX'] == 'M','1').when(df['SEX'] == 'F','2').otherwise(df['SEX']))

In [32]:
df.show()

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|range_of_10s|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+
|   530|    20000|  2|        2|       2| 21|   -1|   -1|    2|    2|   -2|   -2|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0|  162000|       0|        0|          20|
|    38|    60000|  2|        2|       2| 22|    0|    0|    0|    0|   -2|   -2|        0|        0|        0|        0|   

In [33]:
df1= df.replace(['M','F'],['1','2'],'AGE')

In [35]:
df1.show()

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|range_of_10s|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+
|   530|    20000|  2|        2|       2| 21|   -1|   -1|    2|    2|   -2|   -2|        0|        0|        0|        0|        0|        0|       0|       0|       0|       0|  162000|       0|        0|          20|
|    38|    60000|  2|        2|       2| 22|    0|    0|    0|    0|   -2|   -2|        0|        0|        0|        0|   

### 7. average billed Amount.

In [36]:
df.select(avg(col("BILL_AMT1")+col("BILL_AMT2")+col("BILL_AMT3")+col("BILL_AMT4")+col("BILL_AMT5")+col("BILL_AMT6")).alias("avg_bill_amount")).show()

+---------------+
|avg_bill_amount|
+---------------+
|     259790.697|
+---------------+



### 8. average pay amount

In [37]:
df.select(avg(col("PAY_1")+col("PAY_2")+col("PAY_3")+col("PAY_4")+col("PAY_5")+col("PAY_6")).alias("avg_pay_amount")).show()

+--------------+
|avg_pay_amount|
+--------------+
|        -1.185|
+--------------+



### 9. Find average pay duration. Make sure numbers are rounded and negative values are eliminated

In [39]:
df1 =  df.groupBy('CUSTID').agg((when((sum( col("PAY_1")+col("PAY_2")+col("PAY_3")+col("PAY_4")+col("PAY_5")+col("PAY_6"))/6)<0,0).otherwise((sum( col("PAY_1")+col("PAY_2")+col("PAY_3")+col("PAY_4")+col("PAY_5")+col("PAY_6"))/6))).alias("AVG_PAY_DUR"))

In [40]:
df1.show()

+------+------------------+
|CUSTID|       AVG_PAY_DUR|
+------+------------------+
|    26|               0.5|
|   964|               0.0|
|   418|               0.0|
|   191|               0.0|
|   558|               0.0|
|   270|               0.5|
|   730|               0.0|
|   705|1.1666666666666667|
|   278|               0.0|
|   243|               0.0|
|   442|               0.0|
|   965|               0.0|
|    54|               0.0|
|   926|               0.0|
|   296|1.3333333333333333|
|   348|               0.0|
|   385|               0.0|
|   155|               1.5|
|   831|               0.0|
|   588|               0.0|
+------+------------------+
only showing top 20 rows



In [41]:
df = df.join(df1, "CUSTID" , 'inner').select(col("*"))

In [42]:
df.show()

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+------------------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|range_of_10s|       AVG_PAY_DUR|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+------------------+
|    26|    50000|  1|        3|       2| 23|    0|    0|    3|    2|   -1|   -1|     1018|     6377|       -2|      935|     4985|     1212|    6433|       2|     937|    4990|    1215|    9546|        0|          20|               0.5|
|   964|    30000|  1|        2|       2| 30|   

### 10. Add SEXNAME to the data using SQL Joins.

In [43]:
sex_name = sc.parallelize([['1','Male'], ['2', "Female"]])

In [44]:
sex_name.collect()

[['1', 'Male'], ['2', 'Female']]

In [45]:
sex_name_df = sex_name.toDF(["Id","SEX_NAME"])

In [46]:
sex_name_df.show()

+---+--------+
| Id|SEX_NAME|
+---+--------+
|  1|    Male|
|  2|  Female|
+---+--------+



In [47]:
df1 = df.join(sex_name_df,df.SEX == sex_name_df.Id ,'inner').select("*")

In [48]:
df1.show(5)

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+-----------+---+--------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|range_of_10s|AVG_PAY_DUR| Id|SEX_NAME|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+-----------+---+--------+
|    26|    50000|  1|        3|       2| 23|    0|    0|    3|    2|   -1|   -1|     1018|     6377|       -2|      935|     4985|     1212|    6433|       2|     937|    4990|    1215|    9546|        0|          20|        0.5|  1|    Male|
|   964|    30000|  1|  

## Converting DataFrame to RDD

In [49]:
rdd = df1.rdd.map(list)

In [50]:
rdd

PythonRDD[104] at RDD at PythonRDD.scala:53

In [51]:
rdd.take(5)

[[26,
  50000,
  '1',
  3,
  2,
  23,
  0,
  0,
  3,
  2,
  -1,
  -1,
  1018,
  6377,
  -2,
  935,
  4985,
  1212,
  6433,
  2,
  937,
  4990,
  1215,
  9546,
  0,
  20,
  0.5,
  '1',
  'Male'],
 [964,
  30000,
  '1',
  2,
  2,
  30,
  -1,
  0,
  2,
  -1,
  0,
  0,
  88948,
  79783,
  80575,
  92982,
  98502,
  103200,
  30000,
  10575,
  15000,
  10000,
  10000,
  20000,
  1,
  30,
  0.0,
  '1',
  'Male'],
 [191,
  310000,
  '1',
  1,
  2,
  32,
  0,
  0,
  0,
  0,
  0,
  0,
  19346,
  21708,
  22940,
  19068,
  16409,
  16383,
  3000,
  2000,
  2198,
  1000,
  3000,
  2399,
  1,
  30,
  0.0,
  '1',
  'Male'],
 [270,
  50000,
  '1',
  2,
  2,
  27,
  3,
  2,
  2,
  -2,
  -1,
  -1,
  6160,
  3274,
  6190,
  307,
  2870,
  670,
  3276,
  6246,
  307,
  2870,
  670,
  1362,
  0,
  20,
  0.5,
  '1',
  'Male'],
 [730,
  60000,
  '1',
  1,
  2,
  29,
  -1,
  2,
  -1,
  0,
  0,
  0,
  0,
  5396,
  10270,
  13576,
  13864,
  14636,
  5396,
  5000,
  3500,
  501,
  1000,
  2000,
  0,
  20,
  0

In [52]:
df1.take(5)

[Row(CUSTID=26, LIMIT_BAL=50000, SEX='1', EDUCATION=3, MARRIAGE=2, AGE=23, PAY_1=0, PAY_2=0, PAY_3=3, PAY_4=2, PAY_5=-1, PAY_6=-1, BILL_AMT1=1018, BILL_AMT2=6377, BILL_AMT3=-2, BILL_AMT4=935, BILL_AMT5=4985, BILL_AMT6=1212, PAY_AMT1=6433, PAY_AMT2=2, PAY_AMT3=937, PAY_AMT4=4990, PAY_AMT5=1215, PAY_AMT6=9546, DEFAULTED=0, range_of_10s=20, AVG_PAY_DUR=0.5, Id='1', SEX_NAME='Male'),
 Row(CUSTID=964, LIMIT_BAL=30000, SEX='1', EDUCATION=2, MARRIAGE=2, AGE=30, PAY_1=-1, PAY_2=0, PAY_3=2, PAY_4=-1, PAY_5=0, PAY_6=0, BILL_AMT1=88948, BILL_AMT2=79783, BILL_AMT3=80575, BILL_AMT4=92982, BILL_AMT5=98502, BILL_AMT6=103200, PAY_AMT1=30000, PAY_AMT2=10575, PAY_AMT3=15000, PAY_AMT4=10000, PAY_AMT5=10000, PAY_AMT6=20000, DEFAULTED=1, range_of_10s=30, AVG_PAY_DUR=0.0, Id='1', SEX_NAME='Male'),
 Row(CUSTID=191, LIMIT_BAL=310000, SEX='1', EDUCATION=1, MARRIAGE=2, AGE=32, PAY_1=0, PAY_2=0, PAY_3=0, PAY_4=0, PAY_5=0, PAY_6=0, BILL_AMT1=19346, BILL_AMT2=21708, BILL_AMT3=22940, BILL_AMT4=19068, BILL_AMT5=1640

### 11. 

In [53]:
# creating temporary table for sql querry
df1.createOrReplaceTempView("CCDATA")

In [54]:
spark.sql("SELECT SEX_NAME,count(*) as Total,SUM(DEFAULTED) as Defaults,  ROUND(SUM(DEFAULTED)*100/count(*)) as PER_DEFAULT FROM CCDATA GROUP BY SEX_NAME").show()

+--------+-----+--------+-----------+
|SEX_NAME|Total|Defaults|PER_DEFAULT|
+--------+-----+--------+-----------+
|  Female|  591|     218|       37.0|
|    Male|  409|     185|       45.0|
+--------+-----+--------+-----------+



In [55]:
df1.show(1)

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+-----------+---+--------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|range_of_10s|AVG_PAY_DUR| Id|SEX_NAME|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+-----------+---+--------+
|    26|    50000|  1|        3|       2| 23|    0|    0|    3|    2|   -1|   -1|     1018|     6377|       -2|      935|     4985|     1212|    6433|       2|     937|    4990|    1215|    9546|        0|          20|        0.5|  1|    Male|
+------+---------+---+--

In [56]:
x = rdd.map(lambda x:(x[-1],(1,x[-5])) )
x

PythonRDD[166] at RDD at PythonRDD.scala:53

In [57]:
x.take(5)

[('Male', (1, 0)),
 ('Male', (1, 1)),
 ('Male', (1, 1)),
 ('Male', (1, 0)),
 ('Male', (1, 0))]

In [58]:
x = x.reduceByKey(lambda x, y: ( (x[0] + y[0],x[1] + y[1]) ))
x.take(5)

[('Male', (409, 185)), ('Female', (591, 218))]

In [59]:
x = x.map(lambda x:(x[0], x[1][0], x[1][1] , ( x[1][1]/x[1][0]*100 )))
x.collect()

[('Male', 409, 185, 45.2322738386308), ('Female', 591, 218, 36.88663282571912)]

In [60]:
xx = x.toDF(['SEX_NAME','Total','Defaults','PER_DEFAULT']).sort(['SEX_NAME'])
xx.show()

+--------+-----+--------+-----------------+
|SEX_NAME|Total|Defaults|      PER_DEFAULT|
+--------+-----+--------+-----------------+
|  Female|  591|     218|36.88663282571912|
|    Male|  409|     185| 45.2322738386308|
+--------+-----+--------+-----------------+



In [61]:
xx.createOrReplaceTempView("xx")

In [62]:
spark.sql("select SEX_NAME, Total, Defaults, round(PER_DEFAULT) as PER_DEFAULT from xx").show()

+--------+-----+--------+-----------+
|SEX_NAME|Total|Defaults|PER_DEFAULT|
+--------+-----+--------+-----------+
|  Female|  591|     218|       37.0|
|    Male|  409|     185|       45.0|
+--------+-----+--------+-----------+



### 12. 

In [63]:
spark.sql("SELECT MARRIAGE, EDUCATION, count(*) as Total, SUM(DEFAULTED) as Defaults, ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT FROM CCDATA GROUP BY MARRIAGE,EDUCATION ORDER BY 1,2").show()

+--------+---------+-----+--------+-----------+
|MARRIAGE|EDUCATION|Total|Defaults|PER_DEFAULT|
+--------+---------+-----+--------+-----------+
|       1|        1|  123|      71|       58.0|
|       1|        2|  198|     105|       53.0|
|       1|        3|   87|      52|       60.0|
|       1|        4|    3|       2|       67.0|
|       2|        1|  268|      69|       26.0|
|       2|        2|  243|      65|       27.0|
|       2|        3|   55|      24|       44.0|
|       2|        4|    4|       2|       50.0|
|       3|        1|    4|       4|      100.0|
|       3|        2|    7|       3|       43.0|
|       3|        3|    8|       6|       75.0|
+--------+---------+-----+--------+-----------+



In [64]:
df1.show(1)

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+-----------+---+--------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|range_of_10s|AVG_PAY_DUR| Id|SEX_NAME|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+-----------+---+--------+
|    26|    50000|  1|        3|       2| 23|    0|    0|    3|    2|   -1|   -1|     1018|     6377|       -2|      935|     4985|     1212|    6433|       2|     937|    4990|    1215|    9546|        0|          20|        0.5|  1|    Male|
+------+---------+---+--

In [65]:
x1 = rdd.map(lambda x:((x[4],x[3]), (1, x[-5])))
x1.take(5)

[((2, 3), (1, 0)),
 ((2, 2), (1, 1)),
 ((2, 1), (1, 1)),
 ((2, 2), (1, 0)),
 ((2, 1), (1, 0))]

In [66]:
x1 = x1.reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1]) )
x1.collect()

[((2, 3), (55, 24)),
 ((2, 2), (243, 65)),
 ((2, 1), (268, 69)),
 ((1, 2), (198, 105)),
 ((3, 2), (7, 3)),
 ((1, 3), (87, 52)),
 ((1, 1), (123, 71)),
 ((2, 4), (4, 2)),
 ((3, 1), (4, 4)),
 ((1, 4), (3, 2)),
 ((3, 3), (8, 6))]

In [67]:
# ( (2, 3), (55, 24) )
x1 = x1.map(lambda x:( x[0][0],x[0][1] , x[1][0] , x[1][1] , (x[1][1]/x[1][0]*100) ))
x1.collect()

[(2, 3, 55, 24, 43.63636363636363),
 (2, 2, 243, 65, 26.74897119341564),
 (2, 1, 268, 69, 25.74626865671642),
 (1, 2, 198, 105, 53.03030303030303),
 (3, 2, 7, 3, 42.857142857142854),
 (1, 3, 87, 52, 59.77011494252874),
 (1, 1, 123, 71, 57.72357723577236),
 (2, 4, 4, 2, 50.0),
 (3, 1, 4, 4, 100.0),
 (1, 4, 3, 2, 66.66666666666666),
 (3, 3, 8, 6, 75.0)]

In [68]:
xx1 = x1.toDF(['MARRIAGE','EDUCATION','Total','Defaults','PER_DEFAULT']).sort(['MARRIAGE','EDUCATION'])
xx1.show()

+--------+---------+-----+--------+------------------+
|MARRIAGE|EDUCATION|Total|Defaults|       PER_DEFAULT|
+--------+---------+-----+--------+------------------+
|       1|        1|  123|      71| 57.72357723577236|
|       1|        2|  198|     105| 53.03030303030303|
|       1|        3|   87|      52| 59.77011494252874|
|       1|        4|    3|       2| 66.66666666666666|
|       2|        1|  268|      69| 25.74626865671642|
|       2|        2|  243|      65| 26.74897119341564|
|       2|        3|   55|      24| 43.63636363636363|
|       2|        4|    4|       2|              50.0|
|       3|        1|    4|       4|             100.0|
|       3|        2|    7|       3|42.857142857142854|
|       3|        3|    8|       6|              75.0|
+--------+---------+-----+--------+------------------+



In [69]:
xx1.createOrReplaceTempView("xx1")

In [70]:
spark.sql("select MARRIAGE, EDUCATION, Total, round(PER_DEFAULT) as PER_DEFAULT from xx1").show()

+--------+---------+-----+-----------+
|MARRIAGE|EDUCATION|Total|PER_DEFAULT|
+--------+---------+-----+-----------+
|       1|        1|  123|       58.0|
|       1|        2|  198|       53.0|
|       1|        3|   87|       60.0|
|       1|        4|    3|       67.0|
|       2|        1|  268|       26.0|
|       2|        2|  243|       27.0|
|       2|        3|   55|       44.0|
|       2|        4|    4|       50.0|
|       3|        1|    4|      100.0|
|       3|        2|    7|       43.0|
|       3|        3|    8|       75.0|
+--------+---------+-----+-----------+



### 13. 

In [71]:
spark.sql("SELECT AVG_PAY_DUR, count(*) as Total, SUM(DEFAULTED) as Defaults, ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT FROM CCDATA GROUP BY AVG_PAY_DUR ORDER BY 1").show()

+-------------------+-----+--------+-----------+
|        AVG_PAY_DUR|Total|Defaults|PER_DEFAULT|
+-------------------+-----+--------+-----------+
|                0.0|  751|     281|       37.0|
|0.16666666666666666|   36|      17|       47.0|
| 0.3333333333333333|   70|      31|       44.0|
|                0.5|   37|      20|       54.0|
| 0.6666666666666666|   37|      18|       49.0|
| 0.8333333333333334|   20|      10|       50.0|
|                1.0|   14|       7|       50.0|
| 1.1666666666666667|   11|       6|       55.0|
| 1.3333333333333333|    8|       5|       63.0|
|                1.5|    5|       2|       40.0|
| 1.6666666666666667|    2|       2|      100.0|
| 1.8333333333333333|    3|       2|       67.0|
|                2.0|    1|       0|        0.0|
| 2.1666666666666665|    2|       0|        0.0|
| 2.3333333333333335|    1|       0|        0.0|
|                2.5|    1|       1|      100.0|
|                3.0|    1|       1|      100.0|
+-------------------

In [72]:
df1.show(1)

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+-----------+---+--------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|range_of_10s|AVG_PAY_DUR| Id|SEX_NAME|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+------------+-----------+---+--------+
|    26|    50000|  1|        3|       2| 23|    0|    0|    3|    2|   -1|   -1|     1018|     6377|       -2|      935|     4985|     1212|    6433|       2|     937|    4990|    1215|    9546|        0|          20|        0.5|  1|    Male|
+------+---------+---+--

In [73]:
x2 = rdd.map(lambda x: (x[-3], (1,x[-5])))
x2.take(5)

[(0.5, (1, 0)), (0.0, (1, 1)), (0.0, (1, 1)), (0.5, (1, 0)), (0.0, (1, 0))]

In [74]:
x2 = x2.reduceByKey(lambda x,y : (x[0]+y[0], x[1]+y[1]))
x2.collect()

[(0.5, (37, 20)),
 (0.0, (751, 281)),
 (1.3333333333333333, (8, 5)),
 (0.3333333333333333, (70, 31)),
 (0.16666666666666666, (36, 17)),
 (0.6666666666666666, (37, 18)),
 (1.5, (5, 2)),
 (1.1666666666666667, (11, 6)),
 (1.0, (14, 7)),
 (0.8333333333333334, (20, 10)),
 (1.6666666666666667, (2, 2)),
 (1.8333333333333333, (3, 2)),
 (2.0, (1, 0)),
 (2.1666666666666665, (2, 0)),
 (2.3333333333333335, (1, 0)),
 (2.5, (1, 1)),
 (3.0, (1, 1))]

In [75]:
x2 = x2.map(lambda x:(x[0] , x[1][0] , x[1][1] , x[1][1]/x[1][0]*100 ) )
x2.collect()

[(0.5, 37, 20, 54.054054054054056),
 (0.0, 751, 281, 37.416777629826896),
 (1.3333333333333333, 8, 5, 62.5),
 (0.3333333333333333, 70, 31, 44.285714285714285),
 (0.16666666666666666, 36, 17, 47.22222222222222),
 (0.6666666666666666, 37, 18, 48.64864864864865),
 (1.5, 5, 2, 40.0),
 (1.1666666666666667, 11, 6, 54.54545454545454),
 (1.0, 14, 7, 50.0),
 (0.8333333333333334, 20, 10, 50.0),
 (1.6666666666666667, 2, 2, 100.0),
 (1.8333333333333333, 3, 2, 66.66666666666666),
 (2.0, 1, 0, 0.0),
 (2.1666666666666665, 2, 0, 0.0),
 (2.3333333333333335, 1, 0, 0.0),
 (2.5, 1, 1, 100.0),
 (3.0, 1, 1, 100.0)]

In [76]:
xx2 = x2.toDF(['AVG_PAY_DUR','Total','Defaults','PER_DEFAULT']).sort('AVG_PAY_DUR')
xx2.show()

+-------------------+-----+--------+------------------+
|        AVG_PAY_DUR|Total|Defaults|       PER_DEFAULT|
+-------------------+-----+--------+------------------+
|                0.0|  751|     281|37.416777629826896|
|0.16666666666666666|   36|      17| 47.22222222222222|
| 0.3333333333333333|   70|      31|44.285714285714285|
|                0.5|   37|      20|54.054054054054056|
| 0.6666666666666666|   37|      18| 48.64864864864865|
| 0.8333333333333334|   20|      10|              50.0|
|                1.0|   14|       7|              50.0|
| 1.1666666666666667|   11|       6| 54.54545454545454|
| 1.3333333333333333|    8|       5|              62.5|
|                1.5|    5|       2|              40.0|
| 1.6666666666666667|    2|       2|             100.0|
| 1.8333333333333333|    3|       2| 66.66666666666666|
|                2.0|    1|       0|               0.0|
| 2.1666666666666665|    2|       0|               0.0|
| 2.3333333333333335|    1|       0|            

In [77]:
xx2.createOrReplaceTempView("xx2")

In [78]:
spark.sql("select AVG_PAY_DUR, Total, Defaults, round(PER_DEFAULT) as PER_DEFAULT from xx2").show()

+-------------------+-----+--------+-----------+
|        AVG_PAY_DUR|Total|Defaults|PER_DEFAULT|
+-------------------+-----+--------+-----------+
|                0.0|  751|     281|       37.0|
|0.16666666666666666|   36|      17|       47.0|
| 0.3333333333333333|   70|      31|       44.0|
|                0.5|   37|      20|       54.0|
| 0.6666666666666666|   37|      18|       49.0|
| 0.8333333333333334|   20|      10|       50.0|
|                1.0|   14|       7|       50.0|
| 1.1666666666666667|   11|       6|       55.0|
| 1.3333333333333333|    8|       5|       63.0|
|                1.5|    5|       2|       40.0|
| 1.6666666666666667|    2|       2|      100.0|
| 1.8333333333333333|    3|       2|       67.0|
|                2.0|    1|       0|        0.0|
| 2.1666666666666665|    2|       0|        0.0|
| 2.3333333333333335|    1|       0|        0.0|
|                2.5|    1|       1|      100.0|
|                3.0|    1|       1|      100.0|
+-------------------

In [79]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())
sum_result = rdd.fold(4,lambda x, y: x + y)
sum_result

[1, 2, 3, 4, 5]


51

In [80]:
2*2 + 2*1 + 1+3+4+5+2

21

In [101]:
rdd = sc.parallelize([[1, 2],[ 3, 4], [5,6]])

In [102]:
rdd.collect()

[[1, 2], [3, 4], [5, 6]]

In [99]:
rdd.flatMap(lambda x:x*2).collect()

[1, 2, 1, 2, 3, 4, 3, 4, 5, 6, 5, 6]

In [100]:
rdd.map(lambda x:x*2).collect()

[[1, 2, 1, 2], [3, 4, 3, 4], [5, 6, 5, 6]]

In [103]:
rdd = sc.parallelize((("A", 1), ("B", 2), ("A", 3), ("B", 4), ("C", 5)))
rdd.collect()

[('A', 1), ('B', 2), ('A', 3), ('B', 4), ('C', 5)]

In [106]:
rdd.aggregateByKey( (0,0) , lambda x,y: (x +  ) ,( y +  ) , lambda x,y: x+y ).collect()

[('B', 6), ('C', 5), ('A', 4)]