# Membaca data file ke DataFrame dan mengolahnya
Berikut adalah kode untuk membaca data ke dalam DataFrame Spark.

In [None]:
#import module/package yang dibutuhkan
from pyspark.sql import SparkSession
from pyspark.sql.types import *

#membuat session, untuk mengakses semua fungsi spark dan DataFrame API
spark = SparkSession \
    .builder \
    .appName("Pengenalan DataFrame Spark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#mendefinisikan skema dari data yang kita baca
purchaseSchema = StructType([
    StructField("Tanggal", DateType(), True),
    StructField("Jam", StringType(), True),
    StructField("Kota", StringType(), True),
    StructField("Item", StringType(), True),
    StructField("Total", FloatType(), True),
    StructField("Pembayaran", StringType(), True),
])    

#membaca file csv menggunakan skema yang kita buat sebelumya, 
#dan dengan pemisah kolom "tab" (\t)
purchaseDataframe = spark.read.csv(
    "dataset/purchases.csv", 
    header=True, schema=purchaseSchema, sep="\t")
#menampilkan 3 baris DataFrame
purchaseDataframe.show(3)

Pertama kita perlu mengimport module/package yang dibutuhkan, yaitu SparkSession dan semua sql data type. Untuk mengakses semua fungsi Spark dan DataFrame API, kita harus membuat SparkSession. Kemudian kita akan membuat skema untuk data kita. Skema ini mencakup susunan kolom dari data kita, jenis data per kolomnya, dan apakah diperbolehkan jenis data null (kosong) atau tidak. Kita mendefinisikan skema data menggunakan "StructType" seperti kode di atas. Adapun "StructType" sudah kita import dari "from pyspark.sql.types import \*", dimana "\*" berarti import semua yang tersedia, termasuk "StructType". Setelah itu kita dapat membaca data CSV kita ke DataFrame Spark. Default pembacaan file CSV adalah melakukan pemisahan kolom dengan pemisahnya adalah tanda koma ",". Dan karena data kita dipisahkan menggunakan "tab", maka kita tambahkan parameter "sep="\t"", dan juga kita isi Header=True karena file kita memiliki header kolom. Kemudian, kenapa kita harus menggunakan skema? Karena ini akan memudahkan untuk proses selanjutnya. Dengan skema, kita dapat menentukan jenis data dari setiap kolom. Misalnya di kolom "Total", kita isikan tipe data float, sehingga nanti kita dapat memfilter DataFrame ini, misalnya hanya menampilkan nilai di atas batas nilai tertentu. Terakhir, kita tampilkan tiga baris DataFrame dengan perintah ".show(3)".

Selanjutnya kita dapat menghitung jumlah baris data kita dengan perintah ".count()" seperti kode di bawah ini. Kita juga dapat melihat skema kita dengan perintah "printSchema()". Untuk melihat statistik dari DataFrame, kita dapat melakukannya dengan perintah ".describe()". Kita dapat menentukan kolom mana yang ingin kita lihat, di bawah ini misanya kolom "Total".

In [None]:
#menghitung baris dari DataFrame kita, dan menge-print untuk menampilkan
jumlahBaris = purchaseDataframe.count()
print("jumlah baris: ", jumlahBaris)
#melihat skema dari dataframe kita
purchaseDataframe.printSchema()
purchaseDataframe.describe('Total').show()

Kita dapat membuat DataFrame baru dari kolom data yang kita inginkan saja, yakni dengan menggunakan perintah ".select(nama_DataFrame['nama_kolom'])", seperti kode di bawah ini.

In [None]:
#membuat DataFrame baru dengan mengambil data kolom kota 
#dan total pembayaran saja
kotaTotalDataframe = purchaseDataframe.select(purchaseDataframe['Kota'], 
                                              purchaseDataframe['Total'])
kotaTotalDataframe.show(3); #menampilkan 3 baris DataFrame baru kita
kotaTotalDataframe.printSchema() #print skema dari DataFrame baru kita

Kita dapat menambahkan konstanta nilai tertentu pada kolom yang kita inginkan, seperti kode di bawah ini. Coba lihat nilai setelah kita munculkan tabelnya, maka semua nilai di kolom "Total" akan ditambah dengan 10. 

In [None]:
#menambahkan nilai 10 untuk nilai di kolom 'Total' DataFrame kita
kotaTotalDataframe.select(kotaTotalDataframe['Kota'],
                          kotaTotalDataframe['Total']+10).show(3)

Kita dapat memfilter data berdasarkan syarat kondisional tertentu seperti kode di bawah ini. Kode di bawah ini akan memembuat DataFrame baru dari baris data yang nilai di kolom "Total"nya lebih dari 200.

In [5]:
#memfilter data dimana nilai kolom 'Total' > 200
kotaTotalDataframe.filter(kotaTotalDataframe['Total'] > 200).show(3)

+----------+------+
|      Kota| Total|
+----------+------+
|  San Jose|214.05|
|Pittsburgh|493.51|
|     Omaha|235.63|
+----------+------+
only showing top 3 rows



Untuk melakukan pengurutan/sorting data berdasarkan kolom tertentu, kita dapat melakukannya dengan fungsi ".orderBy('nama_kolom')".

In [6]:
orderByKotaDataframe = purchaseDataframe.orderBy('Kota').show(4)

+----------+-----+-----------+----------------+------+----------+
|   Tanggal|  Jam|       Kota|            Item| Total|Pembayaran|
+----------+-----+-----------+----------------+------+----------+
|2012-10-07|11:11|Albuquerque|    Pet Supplies| 308.7|      Visa|
|2012-10-07|11:40|Albuquerque|            Toys|299.63|MasterCard|
|2012-10-07|11:13|Albuquerque|Women's Clothing|419.49|  Discover|
|2012-10-07|10:39|Albuquerque|    Pet Supplies| 401.3|MasterCard|
+----------+-----+-----------+----------------+------+----------+
only showing top 4 rows



Jika kita ingin mengetahui berapa jumlah transaksi di tiap kota, kita dapat menggunakan kode di bawah ini. Pertama kita grupkan dahulu berdasarkan kolom "kota", kemudian kita hitung dengan fungsi ".count()".

In [7]:
jumlahPembelianByKota = purchaseDataframe.groupBy("Kota").count()
jumlahPembelianByKota.show(5)

+---------------+-----+
|           Kota|count|
+---------------+-----+
|North Las Vegas|40013|
|        Phoenix|40333|
|          Omaha|40209|
|      Anchorage|39806|
|        Anaheim|40086|
+---------------+-----+
only showing top 5 rows



# Cara akses data dari DataFrame
DataFrame Spark adalah data yang terdistribusi di klaster, sehingga kita tidak dapat mengakses komponen dataframe dengan indeks (baris,kolom) seperti layaknya kita dapat lakukan di DataFrame pada pandas. Untuk mengakses data berdasarkan baris, kita dapat mengakalinya dengan cara menambahkan satu kolom berupa "incremental ID". Kemudian kita dapat memilih baris data yang kita inginkan dengan menggunakan fungsi ".filter()". Berikut ini contohnya.

In [8]:
#mengimport fungsi monotonically_increasing_id
from pyspark.sql.functions import monotonically_increasing_id

purchaseTambahKolomIdDataframe = purchaseDataframe.withColumn(
    "indeks", monotonically_increasing_id())
purchaseTambahKolomIdDataframe.show(4)
baris2Sampai4 = purchaseTambahKolomIdDataframe.filter((purchaseTambahKolomIdDataframe['indeks']<=4) & 
                                                      (purchaseTambahKolomIdDataframe['indeks']>=2))
baris2Sampai4.show()

+----------+-----+----------+----------------+------+----------+------+
|   Tanggal|  Jam|      Kota|            Item| Total|Pembayaran|indeks|
+----------+-----+----------+----------------+------+----------+------+
|2012-01-01|09:00|  San Jose|  Men's Clothing|214.05|      Amex|     0|
|2012-01-01|09:00|Fort Worth|Women's Clothing|153.57|      Visa|     1|
|2012-01-01|09:00| San Diego|           Music| 66.08|      Cash|     2|
|2012-01-01|09:00|Pittsburgh|    Pet Supplies|493.51|  Discover|     3|
+----------+-----+----------+----------------+------+----------+------+
only showing top 4 rows

+----------+-----+----------+-------------------+------+----------+------+
|   Tanggal|  Jam|      Kota|               Item| Total|Pembayaran|indeks|
+----------+-----+----------+-------------------+------+----------+------+
|2012-01-01|09:00| San Diego|              Music| 66.08|      Cash|     2|
|2012-01-01|09:00|Pittsburgh|       Pet Supplies|493.51|  Discover|     3|
|2012-01-01|09:00|     O

Kemudian jika ingin mengakses nilai berdasakan kolom, mudah saja, yakni dengan menggunakan fungsi ".select()" seperti yang kita telah lakukan sebelumnya. Berikut contohnya.

In [9]:
dataBaris2KolomTotal = purchaseTambahKolomIdDataframe.filter(
    purchaseTambahKolomIdDataframe['indeks']==2).select('Total')
dataBaris2KolomTotal.show()

+-----+
|Total|
+-----+
|66.08|
+-----+



# Membuat DataFrame dari data yang kita isikan secara manual
Kita dapat membuat DataFrame yang berisi data yang kita buat secara manual, tidak dari file. Untuk melakukannya, salah satunya, kita dapat mebuat data tipe "Row", kemudian kita akan gabungkan beberapa data "Row" menggunakan perintah "sc.parallelize". Hasil dari perintah tersebut adalah berupa RDD. Untuk mengubahnya ke DataFrame, kita dapat menggunakan perintah ".toDF()". Berikut contoh programnya.

In [11]:
#mengimport tipe data Row
from pyspark.sql import Row

sc = SparkContext.getOrCreate()
df = sc.parallelize([ \
     Row(nama='Rony', umur=27, tinggi=168), \
     Row(nama='Andy', umur=26, tinggi=165), \
     Row(nama='Syeril', umur=27, tinggi=168)]).toDF()
df.show()


+------+------+----+
|  nama|tinggi|umur|
+------+------+----+
|  Rony|   168|  27|
|  Andy|   165|  26|
|Syeril|   168|  27|
+------+------+----+



# Memanipulasi DataFrame menggunakan bahasa SQL

Mungkin di antara kita ada yang sudah familiar menggunakan bahasa SQL. Kita dapat memperoses DataFrame menggunakan bahasa SQL dengan tetap mengembalikan nilainya dalam tipe data DataFrame. Untuk melakukan hal tersebut, pertama kita harus membuat SQL temporary view. Kode di bawah ini contoh memilih data dari kolom "Total" dari DataFrame kita.

In [12]:
#membuat sql temporary view
purchaseDataframe.createOrReplaceTempView("purchaseSql")

#memilih hanya kolom Total dan Pembayaran dari sql view kita
TotalPembayaran = spark.sql("SELECT Total, Pembayaran FROM purchaseSql")
TotalPembayaran.show(3)

+------+----------+
| Total|Pembayaran|
+------+----------+
|214.05|      Amex|
|153.57|      Visa|
| 66.08|      Cash|
+------+----------+
only showing top 3 rows



Kemudian kode di bawah ini adalah kode untuk mengurutkan/sorting DataFrame kita berdasarkan kolom "Kota" dengan bahasa SQL.

In [13]:
#mengurutkan berdasarkan 'Kota' secara alfabetis
orderByKota = spark.sql("SELECT * FROM purchaseSql ORDER BY Kota")
orderByKota.show(5)

+----------+-----+-----------+----------------+------+----------+
|   Tanggal|  Jam|       Kota|            Item| Total|Pembayaran|
+----------+-----+-----------+----------------+------+----------+
|2012-10-07|11:11|Albuquerque|    Pet Supplies| 308.7|      Visa|
|2012-10-07|11:41|Albuquerque|           Music|365.64|      Visa|
|2012-10-07|11:13|Albuquerque|Women's Clothing|419.49|  Discover|
|2012-10-07|10:39|Albuquerque|    Pet Supplies| 401.3|MasterCard|
|2012-10-07|11:18|Albuquerque|          Crafts|475.77|      Visa|
+----------+-----+-----------+----------------+------+----------+
only showing top 5 rows



Terakhir, kita akan mencoba memfilter baris data yang nilai di kolom "Total"nya lebih dari 200, dan kita urutkan berdasarkan metode pembayaran (kolom "Pembayaran"). Berikut adalah kodenya.

In [14]:
#filter nilai kolom Total>50 dan urutkan berdasarkan cara pembayaran
contohFilter = spark.sql("SELECT * FROM purchaseSql WHERE Total>200 ORDER BY Pembayaran")
contohFilter.show(4)

+----------+-----+--------------+-------------------+------+----------+
|   Tanggal|  Jam|          Kota|               Item| Total|Pembayaran|
+----------+-----+--------------+-------------------+------+----------+
|2012-10-07|10:34|      Richmond|Children's Clothing|252.45|      Amex|
|2012-10-07|10:36|San Bernardino|               Toys|272.91|      Amex|
|2012-10-07|10:34|     Baltimore|              Books|299.94|      Amex|
|2012-10-07|10:33|       Lincoln|       Pet Supplies|359.44|      Amex|
+----------+-----+--------------+-------------------+------+----------+
only showing top 4 rows

