# **1. Overview**

Data manipulation adalah proses mengubah atau memanipulasi data agar siap digunakan untuk analisis atau tujuan lainnya. Hal ini melibatkan pembersihan data, transformasi data, penggabungan data, pemfilteran data, dan pengurangan kompleksitas data. Data manipulation penting karena membantu memastikan keakuratan, kebersihan, dan kualitas data, sehingga memungkinkan pengambilan keputusan yang lebih baik dan analisis yang lebih efektif.

Namun sebelum melakukan proses data manipulasi dengan PySpark, akan dilakukan set up dan ekstraksi data terlebih dahulu

# **2. First Step : Set Up**

Langkah pertama untuk memulai spark adalah dengan melakukan *set-up* berikut :

* Instalasi library spark
* <i>Create</i> SparkSession
* Konfigurasi dasar


In [None]:
# Instalasi library spark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=674d99347ad344d2b2321968f61a19bb0c5fcb4ce12c8d46279bd4e6ef99cd08
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
# Import library
from pyspark.sql import SparkSession

# Proses pembuatan SparkSession
spark = SparkSession \
    .builder \
    .appName('Belajar PySpark : Manipulasi Data') \
    .master('local[*]') \
    .config('spark.executor.memory', '2g') \
    .config('spark.executor.cores', '4') \
    .getOrCreate()

# Properti untuk membuat format tabel
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

# Definisikan sparkContext
sc = spark.sparkContext

# Tampilkan hasil
display(spark)

# **3. Ekstraksi Data**



> Ekstrak data dari file csv

In [None]:
# Ekstraksi data csv
data_loan = spark.read.csv(
    'loan.csv',
    header = True,
    inferSchema = True,
    nullValue = 'NA'
)

# Tampilkan hasilnya
data_loan.show()

+-------+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+--------------------+--------------------+------------------+--------------------+--------+----------+-----+-----------+----------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+-----------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+----------+-----------+-----------+------------------+------------+-------+-----------+-----------+----------+--------+---------------

> Periksa skema data / tipe data

In [None]:
# Periksa skema data
data_loan.printSchema()

root
 |-- id: integer (nullable = true)
 |-- member_id: integer (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: str

> Periksa jumlah baris dan kolom

In [None]:
# Mendapatkan jumlah baris dan kolom pada DataFrame
jumlah_baris = data_loan.count()
jumlah_kolom = len(data_loan.columns)

# Tampilkan hasilnya
print(f'Jumlah baris pada data = {jumlah_baris} dengan banyak kolom = {jumlah_kolom}')

Jumlah baris pada data = 4118 dengan banyak kolom = 111


# **4. Data Manipulation**

Beberapa hal yang biasa dilakukan saat data manipulation adalah sebagai berikut :


<ul>1️⃣ Pemfilteran (<i>Filtering</i>)</ul>
<ul>2️⃣ Penyortiran (<i>Sorting</i>)</ul>
<ul>3️⃣ Penggabungan (<i>Merging / Joining</i>)</ul>
<ul>4️⃣ Pemisahan (<i>Splitting</i>)</ul>
<ul>5️⃣ Penggantian Nilai (<i>Value Replacement</i>)</ul>
<ul>6️⃣ Agregasi (<i>Aggregate</i>)</ul>
<ul>7️⃣ etc</ul>


## **4.1 Filtering**

Salah satu teknik paling sederhana untuk menemukan bagian mana dari kumpulan data yang menarik adalah menemukan subset baris yang cocok dengan beberapa kriteria untuk di-filter atau dipilih. Untuk melakukan hal ini cukup mudah yakni dengan menuliskan kondisi seperti berikut

```
DataFrame.filter(conditions)
```

Conditions dapat dibentuk dari 1 atau lebih kriteria boolean dan dihubungkan dengan operator logika and (&), or (|) atau not (~)

<br>


docs : <i>https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html</i>

> <b>Quest 1</b> : <i>Tampilkan semua data yang loan amount nya lebih dari 10000!</i>

In [None]:
# Proses filter dengan style penulisan 1
data_loan_morethan_10k = data_loan.filter('loan_amnt > 10000')

# Tampilkan hasilnya
data_loan_morethan_10k.show(5)

+-------+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+--------------------+--------------------+------------------+--------------------+--------+----------+-----+-----------+----------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+-----------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+----------+-----------+-----------+------------------+------------+-------+-----------+-----------+----------+--------+---------------

In [None]:
# Proses filter dengan style penulisan 2
data_loan_morethan_10k = data_loan.filter(data_loan['loan_amnt'] > 10000)

# Tampilkan hasilnya
data_loan_morethan_10k.show(5)

+-------+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+--------------------+--------------------+------------------+--------------------+--------+----------+-----+-----------+----------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+-----------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+----------+-----------+-----------+------------------+------------+-------+-----------+-----------+----------+--------+---------------

> <b>Quest 2</b> : <i>Tampilkan semua data yang loan amount nya lebih dari 10000 dengan status gradenya A!</i>

In [None]:
# Buat kondisi
condition1 = (data_loan['loan_amnt'] > 10000)
condition2 = (data_loan['grade'] == 'A')

# Lakukan filter dengan dua kondisi
data_loan_filtered = data_loan.filter(condition1 & condition2)

# Tampilkan hasil
data_loan_filtered.show(5)

+-------+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+--------------------+--------------------+------------------+--------------------+--------+----------+-----+-----------+----------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+-----------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+----------+-----------+-----------+------------------+------------+-------+-----------+-----------+----------+--------+---------------

## **4.2 Sorting**

Menemukan data yang menarik dalam sebuah DataFrame seringkali lebih mudah jika dilakukan pengurutan baris datanya. Pengurutan dibagi menjadi dua yakni pengurutan secara `Ascending` (pengurutan naik A → Z) dan juga pengurutan secara `Descending` (pengurutan turun Z → A). Eksekusi sintaks berikut :

```
DataFrame.sort(by, ascending)
```

Jika ingin mengurutkan DataFrame pandas dengan memperhatikan argumen

<ul>
<li><b><i>by</i></b> sebagai satu atau lebih kolom yang ingin dijadikan patokan pengurutan</li>

<li><b><i>ascending</i></b> dapat diisikan True jika ingin terurut naik atau False jika ingin terurut turun (dafault : True)</li>

</ul><br>

docs : <i>https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sort.html</i>


> <b>Quest 3</b> : <i>Tampilkan data <code>member_id</code>, <code>loan_amnt</code>, <code>grade</code> dan <code>annual_inc</code> pada data pinjaman dan diurutkan berdasarkan <code>annual_inc</code> dari yang paling kecil ke yang terbesar</i>

In [None]:
# Proses pengurutan secara ascending pada kolom annual_inc
data_loan \
  .select('member_id', 'loan_amnt', 'grade', 'annual_inc') \
  .sort('annual_inc', ascending = True) \
  .show()

+---------+---------+-----+----------+
|member_id|loan_amnt|grade|annual_inc|
+---------+---------+-----+----------+
|   518840|     2000|    C|    4000.0|
|   678265|     1400|    B|    4080.0|
|   647711|     1200|    C|    4200.0|
|   381474|     2750|    E|    4200.0|
|   728097|     1800|    D|    4800.0|
|   673328|     2000|    B|    4800.0|
|   562099|     1200|    C|    4800.0|
|   449996|     2400|    E|    4800.0|
|   507853|     1400|    C|    4888.0|
|   184901|     1000|    B|    5000.0|
|   440811|     1000|    A|    5500.0|
|  1232192|     1300|    A|    6000.0|
|   674234|     1000|    D|    6000.0|
|   941800|     1050|    A|    6000.0|
|   551624|     3250|    A|    6000.0|
|   556182|     1600|    D|    6000.0|
|   512743|     1000|    C|    7000.0|
|  1186189|     2200|    B|    7200.0|
|   643337|     2500|    C|    7200.0|
|   790171|     3250|    B|    7200.0|
+---------+---------+-----+----------+
only showing top 20 rows



> <b>Quest 4</b> : <i>Tampilkan data <code>member_id</code>, <code>loan_amnt</code>, <code>grade</code> dan <code>annual_inc</code> pada data pinjaman dan diurutkan berdasarkan <code>annual_inc</code> dari yang paling kecil ke yang terbesar dan urutkan juga <code>loan_amnt</code> dari yang terbesar ke yang terkecil</i>

In [None]:
# Proses pengurutan secara ascending pada kolom annual_inc dan secara descending pada loan_amnt
data_loan \
  .select('member_id', 'loan_amnt', 'grade', 'annual_inc') \
  .sort(['annual_inc', 'loan_amnt'], ascending = [True, False]) \
  .show()

+---------+---------+-----+----------+
|member_id|loan_amnt|grade|annual_inc|
+---------+---------+-----+----------+
|   518840|     2000|    C|    4000.0|
|   678265|     1400|    B|    4080.0|
|   381474|     2750|    E|    4200.0|
|   647711|     1200|    C|    4200.0|
|   449996|     2400|    E|    4800.0|
|   673328|     2000|    B|    4800.0|
|   728097|     1800|    D|    4800.0|
|   562099|     1200|    C|    4800.0|
|   507853|     1400|    C|    4888.0|
|   184901|     1000|    B|    5000.0|
|   440811|     1000|    A|    5500.0|
|   551624|     3250|    A|    6000.0|
|   556182|     1600|    D|    6000.0|
|  1232192|     1300|    A|    6000.0|
|   941800|     1050|    A|    6000.0|
|   674234|     1000|    D|    6000.0|
|   512743|     1000|    C|    7000.0|
|   790171|     3250|    B|    7200.0|
|   643337|     2500|    C|    7200.0|
|  1186189|     2200|    B|    7200.0|
+---------+---------+-----+----------+
only showing top 20 rows



## **4.3 Adding New Columns**

Jangan pernah terjebak hanya dengan data yang tersedia. Maka menambahkan kolom baru ke DataFrame bisa dilakukan pada spark DataFrame sehingga informasi yang diperoleh juga bisa semakin kaya.

Untuk menambahkan kolom baru pada spark DataFrame gunakan sintaks
```
DataFrame.withColumn(colName, value)
```
Nilai (value) yang dimasukan bisa dilakukan dengan cara berikut :      
* <code>lit(value_input)</code> : Jika nilai yang ditambahkan bersifat serentak dan sama (default value)
* <code>when(condition, value1).otherwise(value2)</code> : Jika nilai yang ditambahkan dengan kondisi tertentu
* <code>column operation</code> : Jika nilai yang ingin ditambahkan merupakan hasil operasi matematis dari kolom yang <i>existing</i>  
<br>

docs : <i>https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumns.html</i>

> <b>Quest 5</b> : Pada data loan, tambahkan kolom persentase <i>debt-to-income</i> yang diberi nama 'dti_ratio' dimana kolom tersebut memuat perbandingan <code>loan_amnt</code> dengan <code>annual_inc</code>, lalu tampilkan hasilnya! (<i>gunakan pembulatan 4 angka dibelakang koma</i>)

In [None]:
# Import library untuk memanggil fungsi round
from pyspark.sql.functions import round

# Proses penambahan kolom
data_loan = data_loan.withColumn(
    'dti_ratio',
    round(data_loan['loan_amnt']/data_loan['annual_inc'], 4)
)

# Tampilkan hasilnya
data_loan.select('member_id', 'loan_amnt', 'annual_inc', 'dti_ratio').show()

+---------+---------+----------+---------+
|member_id|loan_amnt|annual_inc|dti_ratio|
+---------+---------+----------+---------+
|  1296599|     5000|   24000.0|   0.2083|
|  1314167|     2500|   30000.0|   0.0833|
|  1313524|     2400|   12252.0|   0.1959|
|  1277178|    10000|   49200.0|   0.2033|
|  1311748|     3000|   80000.0|   0.0375|
|  1311441|     5000|   36000.0|   0.1389|
|  1304742|     7000|   47004.0|   0.1489|
|  1288686|     3000|   48000.0|   0.0625|
|  1306957|     5600|   40000.0|     0.14|
|  1306721|     5375|   15000.0|   0.3583|
|  1305201|     6500|   72000.0|   0.0903|
|  1305008|    12000|   75000.0|     0.16|
|  1298717|     9000|   30000.0|      0.3|
|  1304956|     3000|   15000.0|      0.2|
|  1303503|    10000|  100000.0|      0.1|
|  1304871|     1000|   28000.0|   0.0357|
|  1299699|    10000|   42000.0|   0.2381|
|  1304884|     3600|  110000.0|   0.0327|
|  1294539|     6000|   84000.0|   0.0714|
|  1304855|     9200|  77385.19|   0.1189|
+---------+

> <b>Quest 6</b> : Dari hasil perhitungan dti_ratio diatas buat kolom baru <code>dti_ratio_type</code> dengan ketentuan :          
* Jika dti_ratio_type < 20% maka digolongkan 'low'
* Jika 20% <= dti_ratio_type <= 35% maka digolongkan 'moderate'
* Jika dti_ratio_type > 35% maka digolongkan 'high'
<br><br>Lalu tampilkan hasilnya!


In [None]:
# Import library untuk memanggil fungsi when
from pyspark.sql.functions import when

# Membuat kolom baru berdasarkan tiga kondisi
data_loan = data_loan.withColumn(
    'dti_ratio_type',
    when(data_loan['dti_ratio'] < 0.20, 'low')\
    .when(data_loan['dti_ratio'].between(0.20, 0.35), 'moderate')\
    .otherwise('high')
)

# Menampilkan DataFrame dengan kolom baru
data_loan\
    .select('member_id', 'loan_amnt', 'annual_inc', 'dti_ratio', 'dti_ratio_type')\
    .show()

+---------+---------+----------+---------+--------------+
|member_id|loan_amnt|annual_inc|dti_ratio|dti_ratio_type|
+---------+---------+----------+---------+--------------+
|  1296599|     5000|   24000.0|   0.2083|      moderate|
|  1314167|     2500|   30000.0|   0.0833|           low|
|  1313524|     2400|   12252.0|   0.1959|           low|
|  1277178|    10000|   49200.0|   0.2033|      moderate|
|  1311748|     3000|   80000.0|   0.0375|           low|
|  1311441|     5000|   36000.0|   0.1389|           low|
|  1304742|     7000|   47004.0|   0.1489|           low|
|  1288686|     3000|   48000.0|   0.0625|           low|
|  1306957|     5600|   40000.0|     0.14|           low|
|  1306721|     5375|   15000.0|   0.3583|          high|
|  1305201|     6500|   72000.0|   0.0903|           low|
|  1305008|    12000|   75000.0|     0.16|           low|
|  1298717|     9000|   30000.0|      0.3|      moderate|
|  1304956|     3000|   15000.0|      0.2|      moderate|
|  1303503|   

## **4.4 Drop Columns**

Tidak semua data biasanya diolah lebih lanjut, pun demikian kadang perlu untuk melakukan efisiensi memory, menyederhanakan data, penghapusan data sensitif, dll. Sehingga perlu untuk dilakukan penghapusan kolom

```
DataFrame.drop(kolom_dihapus)
```

docs : <i>https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.drop.html</i>

> <b>Quest 7</b> : Pada data loan, hapus kolom <code>desc</code> dan <code>url</code>!

In [None]:
# Proses penghapusan data
data_loan = data_loan.drop('desc', 'url')

# Tampilkan nama kolom untuk crosscheck
print(data_loan.columns)

['id', 'member_id', 'loan_amnt', 'funded_amnt', 'funded_amnt_inv', 'term', 'int_rate', 'installment', 'grade', 'sub_grade', 'emp_title', 'emp_length', 'home_ownership', 'annual_inc', 'verification_status', 'issue_d', 'loan_status', 'pymnt_plan', 'purpose', 'title', 'zip_code', 'addr_state', 'dti', 'delinq_2yrs', 'earliest_cr_line', 'inq_last_6mths', 'mths_since_last_delinq', 'mths_since_last_record', 'open_acc', 'pub_rec', 'revol_bal', 'revol_util', 'total_acc', 'initial_list_status', 'out_prncp', 'out_prncp_inv', 'total_pymnt', 'total_pymnt_inv', 'total_rec_prncp', 'total_rec_int', 'total_rec_late_fee', 'recoveries', 'collection_recovery_fee', 'last_pymnt_d', 'last_pymnt_amnt', 'next_pymnt_d', 'last_credit_pull_d', 'collections_12_mths_ex_med', 'mths_since_last_major_derog', 'policy_code', 'application_type', 'annual_inc_joint', 'dti_joint', 'verification_status_joint', 'acc_now_delinq', 'tot_coll_amt', 'tot_cur_bal', 'open_acc_6m', 'open_il_6m', 'open_il_12m', 'open_il_24m', 'mths_si

## **4.5 Rename Column**

Penamaan kolom pada DataFrame spark dapat diubah sesuai kebutuhan dengan me-running sintaks
```
DataFrame.withColumnsRenamed({kolom_lama : kolom_baru})
```
docs : <i>https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumnsRenamed.html</i>

> <b>Quest 8</b> : Pada data loan, ubah penamaan kolom berikut :     

<ol>
  <ol>
  <table>
    <tr>
      <th>Nama Kolom Awal</th>
      <th>Nama Kolom Pengganti</th>
    </tr>
    <tr>
      <td>loan_amnt</td>
      <td>loan_amount</td>
    </tr>
    <tr>
      <td>annual_inc</td>
      <td>annual_income</td>
    </tr>
    <tr>
      <td>pymnt_plan</td>
      <td>payment_plan</td>
    </tr>
  </table>
  </ol>
</ol>


In [None]:
# Proses mengubah nama kolom
data_loan = data_loan.withColumnsRenamed({
    'loan_amnt' : 'loan_amount',
    'annual_inc' : 'annual_income',
    'pymnt_plan' : 'payment_plan'
})

# Tampilkan nama kolom untuk crosscheck
print(data_loan.columns)

['id', 'member_id', 'loan_amount', 'funded_amnt', 'funded_amnt_inv', 'term', 'int_rate', 'installment', 'grade', 'sub_grade', 'emp_title', 'emp_length', 'home_ownership', 'annual_income', 'verification_status', 'issue_d', 'loan_status', 'payment_plan', 'url', 'desc', 'purpose', 'title', 'zip_code', 'addr_state', 'dti', 'delinq_2yrs', 'earliest_cr_line', 'inq_last_6mths', 'mths_since_last_delinq', 'mths_since_last_record', 'open_acc', 'pub_rec', 'revol_bal', 'revol_util', 'total_acc', 'initial_list_status', 'out_prncp', 'out_prncp_inv', 'total_pymnt', 'total_pymnt_inv', 'total_rec_prncp', 'total_rec_int', 'total_rec_late_fee', 'recoveries', 'collection_recovery_fee', 'last_pymnt_d', 'last_pymnt_amnt', 'next_pymnt_d', 'last_credit_pull_d', 'collections_12_mths_ex_med', 'mths_since_last_major_derog', 'policy_code', 'application_type', 'annual_inc_joint', 'dti_joint', 'verification_status_joint', 'acc_now_delinq', 'tot_coll_amt', 'tot_cur_bal', 'open_acc_6m', 'open_il_6m', 'open_il_12m', '

## **4.6 Convert Data Type**

Selain menambah kolom, fungsi lain dari withColumn pada Spark DataFrame juga dapat digunakan untuk mengkonversi tipe data dari sebuah kolom.

Untuk dapat mengkonversi tipe data pada sebuah kolom dapat menggunakan template berikut

```
from pyspark.sql.functions import col

DataFrame = DataFrame.withColumn(
    nama_kolom,
    col(nama_kolom).cast(tipe_data)
)
```

> <b>Quest 9</b> : Pada data loan, konversi kolom <code>member_id</code> yang semula mempunyai tipe data integer menjadi string! Hal ini dilakukan karena angka pada member_id merupakan sebuah kode sehingga tidak bisa dilakukan operasi matematis



In [None]:
# Import library untuk memanggil fungsi col
from pyspark.sql.functions import col

# Proses konversi tipe data
data_loan = data_loan.withColumn(
    'member_id',
    col('member_id').cast('string')
)

# Periksa Skema
data_loan.printSchema()

root
 |-- id: integer (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- payment_plan: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)


## **4.7 Aggregation**

Mirip dengan klausa "GROUP BY" dalam SQL, fungsi <code>groupBy()</code> dalam Spark digunakan untuk mengumpulkan data yang identik ke dalam kelompok pada DataFrame/Dataset dan melakukan fungsi agregasi pada data yang dikelompokkan

* <code>count()</code> - Menghitung jumlah data
* <code>mean()</code> - Menghitung rataan data
* <code>max()</code> - Menghitung nilai maksimum
* <code>min()</code> - Menghitung nilai minimum
* <code>sum()</code> - Menghitung total data

* agg() - Using agg() function, we can calculate more than one aggregate at a time.



> <b>Quest 10</b> : Pada data loan, hitung rata-rata annual income tiap grade, kemudian urutkan berdasarkan annual income paling tinggi ke rendah!

In [None]:
# Import library untuk memanggil fungsi agregasi
from pyspark.sql.functions import *

# Proses agregasi
data_loan\
  .groupBy('grade')\
  .agg(avg(col('annual_income')).alias('rataan_income'))\
  .orderBy('rataan_income', ascending = False)\
  .show()

+-----+-------------+
|grade|rataan_income|
+-----+-------------+
|    G|     94386.47|
|    F|     84602.27|
|    E|     77989.41|
|    D|     68524.87|
|    C|      67939.2|
|    B|     67639.96|
|    A|     66648.85|
+-----+-------------+



> <b>Quest 11</b> : Pada data loan, hitung rata-rata annual income dan rata-rata loan amount tiap grade, kemudian urutkan berdasarkan annual income paling tinggi ke rendah!

In [None]:
# Import library untuk memanggil fungsi agregasi
from pyspark.sql.functions import *

# Proses agregasi
data_loan\
  .groupBy('grade')\
  .agg(avg(col('annual_income')).alias('rataan_income'),
       avg(col('loan_amount')).alias('rataan_loan_amt'))\
  .orderBy('rataan_income', ascending = False)\
  .show()

+-----+-------------+---------------+
|grade|rataan_income|rataan_loan_amt|
+-----+-------------+---------------+
|    G|     94386.47|       20226.82|
|    F|     84602.27|        18363.3|
|    E|     77989.41|       15847.26|
|    D|     68524.87|        12278.2|
|    C|      67939.2|       11004.67|
|    B|     67639.96|       11119.08|
|    A|     66648.85|        8624.93|
+-----+-------------+---------------+



# **5. Intro Data Visualization**

Visualisasi data adalah representasi grafis atau grafik dari informasi atau data. Hal ini dilakukan untuk membantu menyajikan dan memahami data dengan lebih baik melalui elemen visual seperti grafik, diagram, peta, atau grafik lainnya.

Tujuan : mempermudah analisis, identifikasi pola, tren, atau hubungan dalam data, serta untuk menjelaskan dan mengkomunikasikan temuan kepada orang lain dengan cara yang lebih jelas dan mudah dipahami.

Beberapa library visualisasi data belum mensupport tipe data DataFrame spark dalam proses inputannya. Alternatifnya bisa mengubah DataFrame Spark menjadi DataFrame Pandas (library untuk olah data juga yang sangat terkenal). Eksekusi sintaks :     

```
SparkDataFrame.toPandas()
```

Untuk mengubah dataframe spark menjadi dataframe pandas

docs : https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html

> <b>Quest 12</b> : Pada data loan, hitung rata-rata annual income tiap grade, kemudian visualisasikan hasil perhitungan tersebut!

In [None]:
# Import library untuk memanggil fungsi agregasi
from pyspark.sql.functions import *

# Proses agregasi
ann_inc_per_grade = data_loan\
  .groupBy('grade')\
  .agg(avg(col('annual_income')).alias('rataan_income'))\
  .orderBy('rataan_income', ascending = True)

print(ann_inc_per_grade)

+-----+-------------+
|grade|rataan_income|
+-----+-------------+
|    A|     66648.85|
|    B|     67639.96|
|    C|      67939.2|
|    D|     68524.87|
|    E|     77989.41|
|    F|     84602.27|
|    G|     94386.47|
+-----+-------------+



In [None]:
# Ubah menjadi dataframe pandas
ann_inc_per_grade_pandas = ann_inc_per_grade.toPandas()

# Periksa hasil
print(f'Type Data sebelum konversi : {type(ann_inc_per_grade)}')
print(f'Type Data setelah konversi : {type(ann_inc_per_grade_pandas)}')

Type Data sebelum konversi : <class 'pyspark.sql.dataframe.DataFrame'>
Type Data setelah konversi : <class 'pandas.core.frame.DataFrame'>


Untuk membuat visualisasi data pada Python dapat menggunakan berbagai library yang tersedia seperti matplotlib, seaborn, plotly, dll. Namun kali ini akan dipilih library plotly untuk melakukan visualisasi data

Plotly adalah sebuah library grafik interaktif yang dapat digunakan dengan berbagai bahasa pemrograman, termasuk Python. Plotly menyediakan fungsi-fungsi dan kelas-kelas untuk membuat beragam jenis grafik dan visualisasi data dengan mudah seperti <i>line plots, scatter plots, area charts, bar charts, error bars, box plots, histograms, heatmaps, subplots, multiple-axes, polar charts,</i> dan <i>bubble charts</i>.

docs : https://plotly.com/python/

In [None]:
# Import library visualisasi data
import plotly.express as px

# Plot bar chart
fig = px.bar(
    ann_inc_per_grade_pandas,
    x = 'rataan_income',
    y = 'grade',
    orientation = 'h',
    text_auto = True,
    text = 'rataan_income'
)

# Konfigurasi tampilan
fig.update_layout(
    width = 800,
    height = 500,
    title = dict(
        text = '<b>Rata-rata Annual Income per Grade Status</b>',
        font = dict(
            size = 25
        )
    ),
    xaxis_title = '',
    yaxis_title = '',
    showlegend = False,
    paper_bgcolor = 'rgb(255, 255, 255, 1)',
    plot_bgcolor = 'rgb(255, 255, 255, 0)',
    xaxis_showticklabels = False
)

fig.update_traces(
    texttemplate='%{text}',
    textposition='inside'
)

# Tampilkan grafik
fig.show()