# Lab 02 - Pemrosesan Data Dengan DataFrame

Notebook ini menunjukkan bagaimana melakukan pemrosesan data menggunakan pyspark DataFrame. 

Operasi yang akan dilakukan adalah : 
1. [Membuat Dataframe](#01.Membuat-DataFrame)
2. [Memeriksa sekilas sebuah DataFrame](#2.1-Memeriksa-sekilas-DataFrame)
3. [Memfilter dataframe berdasar kolom dan baris](#2.2-Filtering)
4. [Menampilkan unique value](#2.3-Unique-value)
5. [Agregasi](#2.4-Agregasi)
6. [Transformasi DataFrame](#2.5-Transformasi-DataFrame)
7. [Data Enrichment : Join DataFrame](#2.6-Data-enrichment---Join)

Import package yang dibutuhkan

In [11]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.


Create spark session

In [12]:
spark = SparkSession.builder.appName('Eksplorasi DataFrame').getOrCreate()

# 01.Membuat DataFrame

DataFrame dapat dibuat dengan banyak cara, di antaranya :
- Dari python object, misalnya array/list, dictionary, pandas dataframe, dll
- Dari file : csv, json, dll
- Dari HDFS
- Dari RDD
- dll.

## 1.1. Create From Array

In [9]:
mydata = (('DKI JAKARTA',15328),
('JAWA BARAT',1320),
('JAWA TENGAH',1030),
('DI YOGYAKARTA',1174),
('JAWA TIMUR',813),
('BANTEN',1237))

df_from_array = spark.createDataFrame(mydata).toDF("province", "density")

df_from_array.show()

+-------------+-------+
|     province|density|
+-------------+-------+
|  DKI JAKARTA|  15328|
|   JAWA BARAT|   1320|
|  JAWA TENGAH|   1030|
|DI YOGYAKARTA|   1174|
|   JAWA TIMUR|    813|
|       BANTEN|   1237|
+-------------+-------+



## 1.2. Create from Pandas DataFrame

Dowload file

In [10]:
!wget -P dataset https://raw.githubusercontent.com/urfie/SparkSQL-dengan-Hive/main/datasets/penduduk2015.csv

--2025-08-06 04:54:38--  https://raw.githubusercontent.com/urfie/SparkSQL-dengan-Hive/main/datasets/penduduk2015.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 120 [text/plain]
Saving to: ‘dataset/penduduk2015.csv.2’


2025-08-06 04:54:38 (2.23 MB/s) - ‘dataset/penduduk2015.csv.2’ saved [120/120]



Create pandas dataframe dari file csv tersebut

In [None]:
import pandas as pd

pddf = pd.read_csv('dataset/penduduk2015.csv')
pddf

Ubah ke Spark dataframe

In [None]:
df_from_pandas = spark.createDataFrame(pddf)
df_from_pandas.show()

## 1.3. Create from csv

Kita juga bisa me-load langsung file csv tersebut ke Spark dataframe

In [None]:
df_from_csv = spark.read.csv("dataset/penduduk2015.csv", header=True)
df_from_csv.show()

## 1.4. Create from JSON

In [None]:
!wget -P dataset https://raw.githubusercontent.com/urfie/SparkSQL-dengan-Hive/main/datasets/penduduk2015.json

Tampilkan isi file dengan perintah `cat`

In [None]:
!cat dataset/penduduk2015.json

Untuk membaca multiline JSON, set parameter `multiline` = True

In [None]:
dfj = spark.read.json("dataset/penduduk2015.json", multiLine=True)
dfj.show()

# 2.Explorasi DataFrame

Dalam latihan ini kita akan mencoba berbagai operasi pada Spark DataFrame untuk melakukan eksplorasi data.

Kita akan menggunakan data kepadatan penduduk per propinsi.

In [None]:
!wget -P dataset https://raw.githubusercontent.com/urfie/SparkSQL-dengan-Hive/main/datasets/indonesia2013-2015.csv

Kita gunakan magic command untuk melihat ukuran dan isi file (karena file kita cukup kecil).

In [None]:
%ls -al dataset/indonesia2013-2015.csv

In [None]:
%cat dataset/indonesia2013-2015.csv

Karena data yang kita load sudah bersih, kita akan set inferSchema = True agar Spark menyesuaikan tipe kolom dengan datanya.

In [None]:
df = spark.read.csv("dataset/indonesia2013-2015.csv",header=True,inferSchema=True)

### 2.1 Memeriksa sekilas DataFrame

#### Menampilkan beberapa baris

Biasanya kita menampilkan beberapa baris data untuk mengecek format dan konten dataframe yang kita buat.

Untuk menampilkan beberapa baris dari dataframe, kita bisa gunakan perintah ``show(n)`` untuk menampilkan n baris pertama, atau ``first()`` untuk menampilkan 1 baris pertama saja.

In [None]:
df.show(5)
df.first()

#### Menampilkan jumlah kolom

In [None]:
df.columns

#### Menampilkan total records

In [None]:
df.count()

#### Menampilkan skema

Untuk menampilkan skema dataframe, gunakan fungsi `printSchema()`



In [None]:
df.printSchema()

Akses atribut `columns` untuk menampilkan list nama kolom


Akses atribut `dtypes` untuk menampilkan list nama kolom beserta data type masing-masing kolom tersebut

In [None]:
df.dtypes

#### Menampilkan summary statistik

Fungsi `describe()` digunakan untuk menampilkan summary statistik dari seluruh kolom.

Jangan lupa memanggil fungsi `show()` untuk menampilkan hasilnya.

In [None]:
df.describe().show()

Untuk menampilkan statistik dari salah satu kolom saja, gunakan nama kolom yang akan ditampilkan sebagai parameter. Misalnya `describe('column1')`

In [None]:
df.describe("density").show()

### 2.2 Filtering



Kita dapat melakukan filtering terhadap spark dataframe, berdasar kolom atau baris

#### Memilih kolom tertentu

Untuk menampilkan kolom tertentu, digunakan fungsi `select('nama_kolom')`


In [None]:
df.select("province").show(10)

Untuk memilih beberapa kolom, gunakan tanda koma sebagai pemisah

In [None]:
df.select("province","density").show(10)

#### Memilih records / baris

Untuk memilih baris dengan kondisi tertentu, gunakan fungsi `filter(<kondisi>)`



In [None]:
df.filter(df.density > 1000).show()

Untuk menggunakan kondisi berupa operasi string, dapat digunakan fungsi-fungsi dari `pyspark.sql.Column` yang terkait string, misalnya `contains()`, `startswith()`, `endswith()`

In [None]:
df.filter(df.province.contains('TENGGARA')).show(5)
df.filter(df.province.startswith('SU')).show(10)
df.filter(df.province.endswith('BARAT')).show(5)

Tersedia juga fungsi `like()` yang serupa dengan SQL statement *like*

In [None]:
df.filter(df.province.like('SU%')).show(5)

Atau dapat juga menggunakan regex, dengan fungsi `rlike()`

In [None]:
df.filter(df.province.rlike('[A-Z]*TA$')).show()

Dapat juga menggunakan filter berdasar list, dengan fungsi `isin()`

In [None]:
df.filter(df.timezone.isin('WIT','WITA')).show()

In [None]:
df.filter(df.province.isin('BALI','PAPUA')).show()

Untuk menggunakan beberapa kondisi sekaligus, menggunakan tanda `&` untuk AND dan `|` untuk OR, dengan masing-masing kondisi dilingkupi tanda kurung `()`

In [None]:
df.filter((df.timezone.isin('WIT','WITA')) & (df.year == 2013)).show()

### 2.3 Unique value

Untuk menampilkan nilai unik dari dataframe, digunakan fungsi `distinct()`.

Nilai unik di sini adalah kombinasi nilai dari seluruh kolom.

In [None]:
df.distinct().show()

Untuk menampilkan nilai unik dari kolom tertentu, tulis nama kolom yang dimaksud sebagai parameter.

In [None]:
df.select('timezone').distinct().show()

In [None]:
df.select('year','timezone').distinct().show()

#### Menghapus duplikasi data

Untuk menghapus record duplikat, gunakan `dropDuplicates(subset)` atau `drop_duplicates(subset)`.

In [None]:
df.dropDuplicates(['province', 'timezone']).show(100)

### 2.4 Agregasi

#### Group by column

Untuk mengelompokkan berdasar kolom, gunakan perintah `groupBy('nama_kolom')`

Untuk mengelompokkan berdasar lebih dari 1 kolom, gunakan tanda koma sebagai pemisah nama kolom.

In [None]:
df.groupBy("timezone")

In [None]:
df.groupBy("timezone","year")

Perintah `groupBy()` menghasilkan obyek `GroupedData` yang belum bisa ditampilkan.

Biasanya setelah pengelompokan, kita melakukan operasi sumarisasi data. Kita terapkan operasi tersebut pada objek hasil groupBy dengan memanggil fungsi yang dibutuhkan. Misalnya `count()` atau `max('namakolom')`

In [None]:
df.groupBy('timezone').count().show()

In [None]:
df.groupBy('timezone').max('density').show()

Kita juga bisa menggunakan fungsi `agg()` untuk melakukan agregasi. Terutama jika kita ingin melakukan lebih dari 1 operasi agregat.

Kita bisa menggunakan fungsi `alias()` untuk memberi nama kolom hasil agregasi.

In [None]:
df.groupBy("timezone").agg(F.max('density')).show()

In [None]:
df.groupBy("timezone").agg(F.avg('density').alias('avg_density'), \
                           F.min('density').alias('min_density'), \
                           F.max('density').alias('max_density')).show()

#### Order By

In [None]:
df.groupBy("timezone") \
  .mean("density") \
  .orderBy("timezone",ascending=False).show()

#### Agregasi dengan filter / kondisi

In [None]:
df.groupBy("timezone") \
  .mean("density") \
  .where(df.timezone.contains('WIT')).show()

#### Filter hasil agregat (SQL stat **HAVING**)

Untuk memfilter berdasar hasil agregasi (semacam perintah **HAVING** di SQL), lakukan dalam 2 langkah.
1. Lakukan `groupBy` + `agg` dan beri nama kolom hasil agregat dengan `alias`
2. gunakan fungsi `filter(kondisi)` pada kolom hasil agregasi.

In [None]:
df_agg = df.groupBy("timezone", "province") \
  .agg(F.avg("density").alias("avg_density")) \
  .where(df.timezone.contains('WIT'))

df_agg.filter(df_agg.avg_density > 50).show()

### 2.5 Transformasi DataFrame

#### Kolom baru berupa nilai konstan

Untuk menambahkan kolom baru ke dalam dataframe, kita bisa menggunakan perintah `withColumn()`

Sedangkan untuk menambahkan sebuah nilai konstan, kita bisa menggunakan fungsi `lit(nilai_konstan)` dari `pyspark.sql.functions`, yang berfungsi membuat kolom dari nilai literal/konstan.

Misalnya kita ingin menambahkan kolom **status** yang bernilai 1

In [None]:
df.withColumn('status', F.lit(1)).show()

#### Kolom baru dari kolom yang ada



In [None]:
df.withColumn('tahun-1', df.year-1).show()

In [None]:
df.withColumn('year', df.year-1).show()

#### Kondisional

Untuk menambahkan kolom berdasar beberapa kondisi, gunakan `when` dan `otherwise` (jika perlu).

Perhatikan bahwa `when` yang pertama adalah fungsi dalam `pyspark.sql.functions`, sedangkan `when` yang berikutnya adalah fungsi `when` pada object kolom (`pyspark.sql.Column`)

Fungsi `otherwise` adalah kondisi *else* atau kondisi selain yang disebutkan pada *when*.

In [None]:
df.withColumn('timezone_code', F.when(df.timezone == 'WIT', 1).
              when(df.timezone == 'WITA', 2).
              otherwise(3)).show(50)

### 2.6 Data enrichment - Join

Perintah ntuk melakukan join adalah sebagai berikut :

`df1.join(df2, on=[columname], how=’left’)`

Where :
-    `df1` − Dataframe1.
-    `df2` – Dataframe2.
-    `on` − nama kolom yang akan digunakan untuk join.
-    `how` – type of join needs to be performed – `left`, `right`, `outer`, `inner`. Defaultnya adalah inner join.


Create dataframe yang akan digunakan untuk join

In [None]:
damdata = (("SUMATERA SELATAN",2),
("SULAWESI TENGAH",2),
("SULAWESI SELATAN",2),
("SUMATERA BARAT",3),
("RIAU",3),
("LAMPUNG",3),
("NUSA TENGGARA TIMUR",4),
("BENGKULU",8),
("SUMATERA UTARA",10),
("JAWA TIMUR",12),
("JAWA TENGAH",35),
("JAWA BARAT",49))

df_dam = spark.createDataFrame(damdata).toDF("province", "dam_num")

df_dam.show()

Join dataframe kepadatan penduduk dengan dataframe jumlah bendungan, berdasarkan nama propinsi.

In [None]:
df.join(df_dam, on=['province'], how='left').show(50)

Untuk melakukan left join, gunakan parameter `how`

In [None]:
df.join(df_dam, on=['province'], how='left').show(40)