<a href="https://colab.research.google.com/github/fitriMD/Big_Data_2022/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Spark ditulis menggunakan bahasa pemrogrraman Scala dan membutuhkan Java Virtual Machine (JVM) untuk menjalankannya. Sebagai langkah pertama, lakukan instalasi Java dengan menuliskan perintah di bawah ini.

In [20]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

### Untuk instalasi Apache Spark, unduh berkas menggunakan perintah wget kemudian ekstrak dengan perintah tar. Silahkan salin perintah berikut untuk melakukan instalasi.

In [21]:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

### Sebagai Langkah lanjutan, dibutuhkan pengaturan terkait Java dan Spark Home. Untuk melakukannya dapat memanfaatkan script pyton. Silahkan masukkan kode berikut ke dalam notebook.

In [22]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

### Konfigurasi PySpark dapat dilakukan dengan menginstall library findspark yang digunakan untuk mencari lokasi Spark yang terinstall pada sistem. Proses instalasi dapat memanfaatkan perintah pip, silahkan perhatikan perintah di bawah ini.

In [23]:
!pip install -q findspark

### Setelah proses instalasi berhasil, lakukan import library dan inisialisasi findspark. Silahkan salin kode berikut ke dalam notebook.

In [24]:
import findspark
findspark.init()

### Koneksi ke dalam spark dapat dilakukan memanfaatkan SparkSession. Salin kode berikut, dimana spark menggunakan port 4050.

In [25]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
.master("local")\
.appName("Colab")\
.config('spark.ui.port', '4050')\
.getOrCreate()

### Spark mempunyai beberapa modul untuk membaca data dengan format yang berbeda. Spark secara otomatis akan menentukan tiap tipe data untuk setiap kolom. Data yang akan digunakan sebagai dataset dapat diunduh menggunakan perintah wget. Silahkan perhatikan perintah berikut.

In [36]:
!wget --continue https://github.com/dhanifudin/pyspark-demo -O sample_books.json

--2022-06-06 06:16:28--  https://github.com/dhanifudin/pyspark-demo
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘sample_books.json’

sample_books.json       [ <=>                ] 143.14K  --.-KB/s    in 0.01s   

2022-06-06 06:16:28 (9.73 MB/s) - ‘sample_books.json’ saved [146580]



### Data yang telah diunduh dapat dibaca dengan menggunakan kode berikut. Silahkan salin kode berikut ke dalam notebook.

In [37]:
df = spark.read.json("sample_books.json")

### Sebelum dapat menganalisa dataset, perlu mengetahui schema data yang akan diolah. Schema dapat diketahui dengan menggunakan kode berikut. Kode ini memanfaatkan dataframe.

In [38]:
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- author: string (nullable = true)
 |-- edition: string (nullable = true)
 |-- price: double (nullable = true)
 |-- title: string (nullable = true)
 |-- year_written: long (nullable = true)



In [39]:
df.show(4,False)

+---------------+---------------+--------------+-----+----------------+------------+
|_corrupt_record|author         |edition       |price|title           |year_written|
+---------------+---------------+--------------+-----+----------------+------------+
|null           |Austen, Jane   |Penguin       |18.2 |Northanger Abbey|1814        |
|null           |Tolstoy, Leo   |Penguin       |12.7 |War and Peace   |1865        |
|null           |Tolstoy, Leo   |Penguin       |13.5 |Anna Karenina   |1875        |
|null           |Woolf, Virginia|Harcourt Brace|25.0 |Mrs. Dalloway   |1925        |
+---------------+---------------+--------------+-----+----------------+------------+
only showing top 4 rows



In [40]:
df.count()

1267

In [41]:
df.select("title", "price", "year_written").show(5)

+----------------+-----+------------+
|           title|price|year_written|
+----------------+-----+------------+
|Northanger Abbey| 18.2|        1814|
|   War and Peace| 12.7|        1865|
|   Anna Karenina| 13.5|        1875|
|   Mrs. Dalloway| 25.0|        1925|
|       The Hours|12.35|        1999|
+----------------+-----+------------+
only showing top 5 rows



In [42]:
# Get books that are written after 1950 & cost greater than $10

df_filtered = df.filter("year_written > 1950 AND price > 10 AND title IS NOT NULL")
df_filtered.select("title", "price", "year_written").show(50, False)

+-----------------------------+-----+------------+
|title                        |price|year_written|
+-----------------------------+-----+------------+
|The Hours                    |12.35|1999        |
|Harry Potter                 |19.95|2000        |
|One Hundred Years of Solitude|14.0 |1967        |
+-----------------------------+-----+------------+



In [43]:
from pyspark.sql.functions import max

# Find the costliest book
maxValue = df_filtered.agg(max("price")).collect()[0][0]
print("maxValue: ",maxValue)

df_filtered.select("title","price").filter(df.price == maxValue).show(20, False)

maxValue:  19.95
+------------+-----+
|title       |price|
+------------+-----+
|Harry Potter|19.95|
+------------+-----+



#**TUGAS**

In [44]:
# 1
from pyspark.sql.functions import min

# Find the cheapest price

minValue = df.agg(min("price")).collect()[0][0]
print("minValue: ",minValue)

df.select("author", "edition", "price", "title", "year_written").filter(df.price == minValue).show(20, False)

minValue:  5.75
+----------------+------------+-----+-----------+------------+
|author          |edition     |price|title      |year_written|
+----------------+------------+-----+-----------+------------+
|Dickens, Charles|Random House|5.75 |Bleak House|1870        |
+----------------+------------+-----+-----------+------------+



In [48]:
# 2
import pyspark.sql.functions as f
from pyspark.sql.functions import desc

df.groupBy("year_written").count().select("year_written", f.col("count").
                                      alias("jumlah_buku")).sort(desc("year_written")).show()

+------------+-----------+
|year_written|jumlah_buku|
+------------+-----------+
|        2000|          1|
|        1999|          1|
|        1967|          1|
|        1937|          1|
|        1925|          1|
|        1922|          1|
|        1875|          1|
|        1870|          1|
|        1865|          2|
|        1862|          1|
|        1814|          1|
|        1603|          1|
|        null|       1254|
+------------+-----------+



In [46]:
#3

import pyspark.sql.functions as f
from pyspark.sql.functions import desc

df.groupBy("year_written").agg({"Price" : "max"}).sort(desc("year_written")).show()


+------------+----------+
|year_written|max(Price)|
+------------+----------+
|        2000|     19.95|
|        1999|     12.35|
|        1967|      14.0|
|        1937|     27.45|
|        1925|      25.0|
|        1922|      29.0|
|        1875|      13.5|
|        1870|      5.75|
|        1865|      12.7|
|        1862|      7.75|
|        1814|      18.2|
|        1603|      7.95|
|        null|      null|
+------------+----------+



In [47]:
#4
import pyspark.sql.functions as f
from pyspark.sql.functions import desc

df.groupBy("year_written").agg({"Price" : "min"}).sort(desc("year_written")).show()

+------------+----------+
|year_written|min(Price)|
+------------+----------+
|        2000|     19.95|
|        1999|     12.35|
|        1967|      14.0|
|        1937|     27.45|
|        1925|      25.0|
|        1922|      29.0|
|        1875|      13.5|
|        1870|      5.75|
|        1865|      5.76|
|        1862|      7.75|
|        1814|      18.2|
|        1603|      7.95|
|        null|      null|
+------------+----------+

