**Preprocessing Data dengan PySpark**

Adapun teknik data preprocessing yang akan kita bahas pada materi kali ini seperti berikut.

- Scaling
- Label Encoding
- One-Hot Encoding
- PCA

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('learn_data_cleansing').getOrCreate()


customers_df = spark.read.format('csv')\
.option("infraSchema", "true")\
.option("header","true")\
.load("customers.csv")

customers_df.createOrReplaceTempView("customers_table")

In [4]:
# TIPE DATA & SUMMARY STATISTIC

print(customers_df.printSchema())
customers_df.summary().show()

root
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- home_address: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

None
+-------+------------------+-------------+-----------------+------------------+--------------------+-----------------+---------+--------------------+---------+
|summary|       customer_id|customer_name|           gender|               age|        home_address|         zip_code|     city|               state|  country|
+-------+------------------+-------------+-----------------+------------------+--------------------+-----------------+---------+--------------------+---------+
|  count|              1007|         1007|              989|              1007|                1007|             1007|     1007|                1007|   

In [5]:
from pyspark.sql.functions import isnull, isnan

customers_df.where(isnull('gender') | isnan('gender')).show()

print("jumlah missing value : ", customers_df.where(isnull('gender')|isnan('gender')).count())

+-----------+-------------+------+---+--------------------+--------+--------------------+--------------------+---------+
|customer_id|customer_name|gender|age|        home_address|zip_code|                city|               state|  country|
+-----------+-------------+------+---+--------------------+--------+--------------------+--------------------+---------+
|         39|     fulan 39|  NULL| 80|7440 Cameron Esta...|    4622|North Victoriache...|  Northern Territory|Australia|
|        168|    fulan 168|  NULL| 27|2781 Berge MallSu...|    1975|      North Leoburgh|   Western Australia|Australia|
|        322|    fulan 322|  NULL| 30|593 Becker Circle...|    1640|          Jacobiview|   Western Australia|Australia|
|        393|    fulan 393|  NULL| 34|5158 Levi HillSui...|    1474|          Johnsburgh|          Queensland|Australia|
|        442|    fulan 442|  NULL| 26|5157 Feil RoadApt...|    7249|          Port Chloe|     New South Wales|Australia|
|        720|    fulan 720|  NUL

In [6]:
print("jumlah duplikasi : ", (customers_df.count() - customers_df.distinct().count()))

jumlah duplikasi :  6


In [7]:

# UBAH TIPE DATA W/ CAST()

from pyspark.sql.functions import col

new_customers_df = customers_df.withColumn("customer_id", col("customer_id").cast("string"))
new_customers_df = new_customers_df.withColumn("zip_code", col("zip_code").cast("string"))

new_customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- home_address: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)



In [8]:
new_customers_df.na.drop()

DataFrame[customer_id: string, customer_name: string, gender: string, age: string, home_address: string, zip_code: string, city: string, state: string, country: string]

In [9]:
# SIMPAN DATA YG TDK MISSING VALUE

new_customers_df = new_customers_df.na.drop()

print("jumlah missing value : ", new_customers_df.where(isnull('gender') | isnan('gender')).count())

jumlah missing value :  0


In [10]:

customers_df.na.fill("prefer not to say", subset = ["gender"])

DataFrame[customer_id: string, customer_name: string, gender: string, age: string, home_address: string, zip_code: string, city: string, state: string, country: string]

In [None]:
from pyspark import pandas as ps
customers_df_pandas = ps.read_csv("customers.csv")
customers_df_pandas["age"].interpolate(method='linear')


new_customers_df.where(new_customers_df.age > 100).show()

In [12]:
from pyspark.sql.functions import when

new_customers_df = new_customers_df.withColumn(
    "age", when(new_customers_df.age == 700, 70) \
    .when(new_customers_df.age == 500, 50) \
    .otherwise(new_customers_df.age))

new_customers_df.summary().show()

+-------+------------------+-------------+-----------------+------------------+--------------------+-----------------+---------+--------------------+---------+
|summary|       customer_id|customer_name|           gender|               age|        home_address|         zip_code|     city|               state|  country|
+-------+------------------+-------------+-----------------+------------------+--------------------+-----------------+---------+--------------------+---------+
|  count|               989|          989|              989|               989|                 989|              989|      989|                 989|      989|
|   mean|498.27805864509605|         NULL|             NULL|49.876643073811934|                NULL|5026.199191102123|     NULL|                NULL|     NULL|
| stddev|287.67376465771207|         NULL|             NULL|17.651855611617894|                NULL|2880.569897954812|     NULL|                NULL|     NULL|
|    min|                 1|      fulan 

In [13]:

new_customers_df = new_customers_df.dropDuplicates()
print("Jumlah duplikasi: ", (new_customers_df.count() - new_customers_df.distinct().count()))

Jumlah duplikasi:  0


## Scaling

#### Standar Scaling

Mengubah skala pada data numerik sehingga memiliki nilai rata-rata 0 dan standar deviasi 1

In [16]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType # Import IntegerType

# Ubah nilai pd kolom age menjadi vector

# Cast the age column to IntegerType
new_customers_df = new_customers_df.withColumn("age", col("age").cast(IntegerType()))

assembler = VectorAssembler().setInputCols(['age']).setOutputCol('vec_age')
preprocess_customers_df = assembler.transform(new_customers_df)

standard_scaler = StandardScaler().setInputCol("vec_age").setOutputCol('StandardScaler_age')
preprocess_customers_df = standard_scaler.fit(preprocess_customers_df).transform(preprocess_customers_df)
preprocess_customers_df.show(5)

+-----------+-------------+-----------------+---+--------------------+--------+----------------+--------------------+---------+-------+--------------------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|            city|               state|  country|vec_age|  StandardScaler_age|
+-----------+-------------+-----------------+---+--------------------+--------+----------------+--------------------+---------+-------+--------------------+
|        353|    fulan 353|             Male| 22|1800 George RoadS...|    9711|      Stantonton|          Queensland|Australia| [22.0]|[1.2460803649043537]|
|        666|    fulan 666|Prefer not to say| 72|20 Caitlin Parade...|    3159| West Nathanstad|  Northern Territory|Australia| [72.0]|  [4.07808119423243]|
|        712|    fulan 712|Prefer not to say| 71|749 Klein SummitS...|    5372|    Lefflermouth|Australian Capita...|Australia| [71.0]| [4.021441177645869]|
|        457|    fulan 457|             Male| 27|85 Mitche

#### Min-max Scaling

Mengubah skala pada data numerik sehingga memiliki rentang dari 0 hingga 1

In [17]:
from pyspark.ml.feature import MinMaxScaler

min_max_scaler = MinMaxScaler().setInputCol("vec_age").setOutputCol('MinMaxScaler_age')
preprocess_customers_df = min_max_scaler.fit(preprocess_customers_df).transform(preprocess_customers_df)

preprocess_customers_df.show(10)


+-----------+-------------+-----------------+---+--------------------+--------+-------------------+--------------------+---------+-------+--------------------+--------------------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|               city|               state|  country|vec_age|  StandardScaler_age|    MinMaxScaler_age|
+-----------+-------------+-----------------+---+--------------------+--------+-------------------+--------------------+---------+-------+--------------------+--------------------+
|        353|    fulan 353|             Male| 22|1800 George RoadS...|    9711|         Stantonton|          Queensland|Australia| [22.0]|[1.2460803649043537]|[0.03333333333333...|
|        666|    fulan 666|Prefer not to say| 72|20 Caitlin Parade...|    3159|    West Nathanstad|  Northern Territory|Australia| [72.0]|  [4.07808119423243]|[0.8666666666666667]|
|        712|    fulan 712|Prefer not to say| 71|749 Klein SummitS...|    5372|       Lefflermo

## Label Encoding

Proses membuat sebuah indeks dalam bentuk bilangan bulat yang mewakili kategori tertentu dalam sebuah feature kategorik.

In [18]:
from pyspark.ml.feature import StringIndexer
label_encoder = StringIndexer().setInputCol("gender").setOutputCol("label_gender")
preprocess_customers_df = label_encoder.fit(preprocess_customers_df).transform(preprocess_customers_df)

preprocess_customers_df.show(10)


+-----------+-------------+-----------------+---+--------------------+--------+-------------------+--------------------+---------+-------+--------------------+--------------------+------------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|               city|               state|  country|vec_age|  StandardScaler_age|    MinMaxScaler_age|label_gender|
+-----------+-------------+-----------------+---+--------------------+--------+-------------------+--------------------+---------+-------+--------------------+--------------------+------------+
|        353|    fulan 353|             Male| 22|1800 George RoadS...|    9711|         Stantonton|          Queensland|Australia| [22.0]|[1.2460803649043537]|[0.03333333333333...|         1.0|
|        666|    fulan 666|Prefer not to say| 72|20 Caitlin Parade...|    3159|    West Nathanstad|  Northern Territory|Australia| [72.0]|  [4.07808119423243]|[0.8666666666666667]|         0.0|
|        712|    fulan 712|Pre

In [19]:
# JIKA INGIN MENGEMBALIKAN DATA

from pyspark.ml.feature import IndexToString
labelReverse = IndexToString().setInputCol("label_gender").setOutputCol("reverse_label")
labelReverse.transform(preprocess_customers_df).show(5)

+-----------+-------------+-----------------+---+--------------------+--------+----------------+--------------------+---------+-------+--------------------+--------------------+------------+-----------------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|            city|               state|  country|vec_age|  StandardScaler_age|    MinMaxScaler_age|label_gender|    reverse_label|
+-----------+-------------+-----------------+---+--------------------+--------+----------------+--------------------+---------+-------+--------------------+--------------------+------------+-----------------+
|        353|    fulan 353|             Male| 22|1800 George RoadS...|    9711|      Stantonton|          Queensland|Australia| [22.0]|[1.2460803649043537]|[0.03333333333333...|         1.0|             Male|
|        666|    fulan 666|Prefer not to say| 72|20 Caitlin Parade...|    3159| West Nathanstad|  Northern Territory|Australia| [72.0]|  [4.07808119423243]|[0.86666

## One-Hot Encoding

Pada metode ini, kita akan merepresentasikan setiap kategori kedalam nilai boolean yaitu 1 dan 0.

In [20]:
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder().setInputCol("label_gender").setOutputCol("one_hot_gender")
one_hot_encoder.fit(preprocess_customers_df).transform(preprocess_customers_df).show(5)



+-----------+-------------+-----------------+---+--------------------+--------+----------------+--------------------+---------+-------+--------------------+--------------------+------------+--------------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|            city|               state|  country|vec_age|  StandardScaler_age|    MinMaxScaler_age|label_gender|one_hot_gender|
+-----------+-------------+-----------------+---+--------------------+--------+----------------+--------------------+---------+-------+--------------------+--------------------+------------+--------------+
|        353|    fulan 353|             Male| 22|1800 George RoadS...|    9711|      Stantonton|          Queensland|Australia| [22.0]|[1.2460803649043537]|[0.03333333333333...|         1.0| (2,[1],[1.0])|
|        666|    fulan 666|Prefer not to say| 72|20 Caitlin Parade...|    3159| West Nathanstad|  Northern Territory|Australia| [72.0]|  [4.07808119423243]|[0.8666666666666667]

## PCA

metode PCA untuk mengurangi jumlah feature atau kolom yang digunakan untuk melatih model serta menyelesaikan masalah multikolinearitas yang terdapat dalam sebuah data.

In [21]:
from pyspark.ml.feature import PCA

# BUAT FEATURE BUATAN
assembler = VectorAssembler().setInputCols(['StandardScaler_age', 'MinMaxScaler_age']).setOutputCol('features')
preprocess_customers_df = assembler.transform(preprocess_customers_df)


pca = PCA().setInputCol("features").setOutputCol("PCA_age").setK(1)

preprocess_customers_df  = pca.fit(preprocess_customers_df).transform(preprocess_customers_df)
preprocess_customers_df.show(5, truncate=False)



+-----------+-------------+-----------------+---+---------------------------+--------+----------------+----------------------------+---------+-------+--------------------+---------------------+------------+----------------------------------------+---------------------+
|customer_id|customer_name|gender           |age|home_address               |zip_code|city            |state                       |country  |vec_age|StandardScaler_age  |MinMaxScaler_age     |label_gender|features                                |PCA_age              |
+-----------+-------------+-----------------+---+---------------------------+--------+----------------+----------------------------+---------+-------+--------------------+---------------------+------------+----------------------------------------+---------------------+
|353        |fulan 353    |Male             |22 |1800 George RoadSuite 097  |9711    |Stantonton      |Queensland                  |Australia|[22.0] |[1.2460803649043537]|[0.0333333333333333