# HandsOn Week 6 RDD Spark
Selamat datang di HandsOn Week 6, yaitu tentang pemrosesan data terdistribusi menggunakan Spark. Untuk tujuan pembelajaran, seperti biasa, kita akan menggunakan *pseudo-distributed mode* (single node cluster) di VM yang telah disediakan. Dengan kode yang *similar* di cluster komputer dengan *n* workers, maka komputasi akan tersebar ke *n* workers tersebut. Adapun yang akan kita coba kali ini adalah melakukan komputasi menggunakan RDD dan DataFrame. Berikut catatan-catatan yang perlu kamu perhatikan dalam hands-on ini:
1. Untuk menjalankan Apache Spark dalam bahasa python di VM, ketikkan perintah ```pyspark``` di terminal.
2. Dari semua Milestone, data input yang digunakan adalah data "purchases.txt" yang telah diletakkan di HDFS. Oleh karena itu, pastikan hadoop service kamu berjalan (```start-dfs.sh```, ```start-yarn.sh```, ```jps```). Untuk membaca data dari HDFS, lihat kembali di slide perkuliahan.
3. Untuk Milestone 1, 2 dan 3, kalian perlu untuk mencatat waktu yang diperlukan saat melakukan MapReduce menggunakan hadoop streaming jar di hands-on sebelumnya. Waktu bisa dihitung dari selisih "waktu awal" dan "waktu akhir" yang tampak di terminal saat kalian selesain melakukan MapReduce -atau menggunakan cara lain yang masih *acceptable*-. (lihat ilustrasi di bawah).
4. Lakukan zip file jupyter notebook ini beserta gambar-gambar yang diperlukan -screenshot waktu proses MapReduce Hadoop jar-, dan submit ke portal kuliah EDUNEX dengan format nama "**HandsOnWeek10_NIM_NamaLengkap.zip**". Pastikan file jupyter notebook yang kamu zip dalam kondisi memiliki output per cellnya (tidak kosong karena belum dijalankan). <br>

<img title="Waktu Awal" align="left" src="waktu_awal.JPG" alt="Drawing" style="width: 600px;"/>
<img title="Waktu Akhir" align="left" src="waktu_akhir.JPG" alt="Drawing" style="width: 600px;"/>

## Milestone 1
Kerjakan Milestone 1 pada HandsOn Week 5(sebelumnya), akan tetapi menggunakan RDD Spark. Catat waktu (bandingkan) yang dibutuhkan (dalam detik) antara: "MapReduce menggunakan hadoop streaming jar" dengan yang akan kamu proses menggunakan RDD Spark ini.

In [22]:
from time import time

# Tuliskan code kamu di sini

start_time = time()

dist_data = sc.textFile("hdfs://localhost:9000/purchases/purchases.txt")

def mapper_milestone1(line):
    line = line.strip()
    datas = line.split('\t')
    if len(datas) == 6:
        date, time, location, product, price, payment = datas
        if 'Toys' in product or 'Consumer Electronics' in product:
            return [(product, float(price))]
    return []

RDD1 = dist_data.flatMap(mapper_milestone1).filter(lambda x: x) 
total_sales = RDD1.reduceByKey(lambda x, y: x+y)
print("total sales: ", total_sales.collect())

print("Waktu yang diperlukan dengan RDD Spark: %.2f detik" % (time() - start_time))
print("Waktu yang diperlukan dengan Hadoop:", "86 detik")

total sales:  [('Consumer Electronics', 57452374.12999931), ('Toys', 57463477.10999949)]
Waktu yang diperlukan dengan RDD Spark: 18.92 detik
Waktu yang diperlukan dengan Hadoop: 86 detik


<img title="Waktu Awal" align="left" src="waktu_awal_milestone1.png" alt="Drawing" style="width: 600px;"/>
<img title="Waktu Akhir" align="left" src="waktu_akhir_milestone1.png" alt="Drawing" style="width: 600px;"/>

## Milestone 2
Kerjakan Milestone 2 pada HandsOn Week5 (sebelumnya), akan tetapi menggunakan RDD Spark. Catat waktu (bandingkan) yang dibutuhkan (dalam detik) antara: "MapReduce menggunakan hadoop streaming jar" dengan yang akan kamu proses menggunakan RDD Spark ini.

In [23]:
from time import time
# Tuliskan code kamu di sini

start_time = time()

dist_data = sc.textFile("hdfs://localhost:9000/purchases/purchases.txt")

def mapper_milestone2(line):
    line = line.strip()
    datas = line.split('\t')
    if len(datas) == 6:
        date, time, location, product, price, payment = datas
        if location == 'Miami' or location == 'San Francisco' or location == 'Atlanta':
            return [(location, (float(price), product))]
    return []

RDD1 = dist_data.flatMap(mapper_milestone2).filter(lambda x: x)

def reducer_milestone2(a, b):
    if a[0] > b[0]:
        return a
    else:
        return b
    
total_sales_by_store = RDD1.reduceByKey(reducer_milestone2)
print("total sales by store: ", total_sales_by_store.collect())

print("Waktu yang diperlukan dengan RDD Spark: %.2f detik" % (time() - start_time))
print("Waktu yang diperlukan dengan Hadoop:", "59 detik")

total sales by store:  [('San Francisco', (499.97, "Men's Clothing")), ('Atlanta', (499.96, 'Pet Supplies')), ('Miami', (499.98, "Men's Clothing"))]
Waktu yang diperlukan dengan RDD Spark: 20.61 detik
Waktu yang diperlukan dengan Hadoop: 59 detik


<img title="Waktu Awal" align="left" src="waktu_awal_milestone2.png" alt="Drawing" style="width: 600px;"/>
<img title="Waktu Akhir" align="left" src="waktu_akhir_milestone2.png" alt="Drawing" style="width: 600px;"/>

## Milestone 3
Kerjakan Milestone 3 pada HandsOn Week5 (sebelumnya), akan tetapi menggunakan RDD Spark. Catat waktu (bandingkan) yang dibutuhkan (dalam detik) antara: "MapReduce menggunakan hadoop streaming jar" dengan yang akan kamu proses menggunakan RDD Spark ini.

In [30]:
from datetime import datetime, time
# Tuliskan code kamu di sini

start_time = datetime.now()

dist_data = sc.textFile("hdfs://localhost:9000/purchases/purchases.txt")

def mapper_milestone3(line):
    line = line.strip()
    datas = line.split('\t')
    if len(datas) == 6:
        date, time_str, location, product, price, payment = datas
        time_obj = datetime.strptime(time_str, '%H:%M').time()
        if time(9,1) <= time_obj <= time(10,0):
            return [('09:01-10:00',1)]
        elif time(10, 1) <= time_obj <= time(11,0):
            return [('10:01-11:00',1)] 
    return []

RDD1 = dist_data.flatMap(mapper_milestone3).filter(lambda x: x)    
count_sales_by_time = RDD1.reduceByKey(lambda x, y: x+y)
print("count sales by time: ", count_sales_by_time.collect())

print("Waktu yang diperlukan dengan RDD Spark: %.2f detik" % ((datetime.now() - start_time).total_seconds()))
print("Waktu yang diperlukan dengan Hadoop:", "103 detik")

count sales by time:  [('09:01-10:00', 459775), ('10:01-11:00', 459825)]
Waktu yang diperlukan dengan RDD Spark: 102.47 detik
Waktu yang diperlukan dengan Hadoop: 103 detik


<img title="Waktu Awal" align="left" src="waktu_awal_milestone3.png" alt="Drawing" style="width: 600px;"/>
<img title="Waktu Akhir" align="left" src="waktu_akhir_milestone3.png" alt="Drawing" style="width: 600px;"/>

## Milestone 4
Milestone ini dibagi menjadi 4.1, 4.2 dan 4.3 yang masing-masing secara berturut-turut adalah mengerjakan ulang Milestone 1, 2 dan 3 di atas (menggunakan RDD Spark), akan tetapi menggunakan trik "**persist() RDD**" untuk mempercepat prosesnya. Kamu bisa melakukan "**persist**" untuk RDD mana saja yang kamu anggap dapat memberikan waktu proses tercepat.

In [37]:
from pyspark import StorageLevel
from datetime import datetime, time

dist_data = sc.textFile("hdfs://localhost:9000/purchases/purchases.txt")

start_time1 = datetime.now()

filtered_data = dist_data.filter(lambda line: "Toys" in line or "Consumer Electronics" in line) 
filtered_data.persist(StorageLevel.MEMORY_AND_DISK)

def mapper_milestone1(line):
    line = line.strip()
    datas = line.split('\t')
    if len(datas) == 6:
        date, time, location, product, price, payment = datas
        return [(product, float(price))]
    return []

sales = filtered_data.flatMap(mapper_milestone1).filter(lambda x: x)

total_sales = sales.reduceByKey(lambda x, y: x+y)
print("total sales: ", total_sales.collect())

end_time1 = datetime.now()

print("Waktu Milestone 1:", "18.92 detik", " vs. Waktu Milestone 4.1: %.2f detik" % ((end_time1 - start_time1).total_seconds()))

start_time2 = datetime.now()

filtered_data_2 = dist_data.filter(lambda line: "Miami" in line or "San Francisco" in line or 'Atlanta' in line) 
filtered_data_2.persist()

def mapper_milestone2(line):
    line = line.strip()
    datas = line.split('\t')
    if len(datas) == 6:
        date, time, location, product, price, payment = datas
        return [(location, (float(price), product))]
    return []

RDD2 = filtered_data_2.flatMap(mapper_milestone2).filter(lambda x: x)

def reducer_milestone2(a, b):
    if a[0] > b[0]:
        return a
    else:
        return b
    
total_sales_by_store = RDD2.reduceByKey(reducer_milestone2)
print("total sales by store: ", total_sales_by_store.collect())

end_time2 = datetime.now()

print("Waktu Milestone 2:", "20.61 detik", " vs. Waktu Milestone 4.2: %.2f detik" % ((end_time2 - start_time2).total_seconds()))

start_time3 = datetime.now()

def filter_milestone3(line):
    line = line.strip()
    datas = line.split('\t')
    if len(datas) == 6:
        date, time_str, location, product, price, payment = datas
        time_obj = datetime.strptime(time_str, '%H:%M').time()
        if time(9,1) <= time_obj <= time(10,0):
            return line
        elif time(10, 1) <= time_obj <= time(11,0):
            return line 

filtered_data3 = dist_data.filter(filter_milestone3)
filtered_data3.persist()        

def mapper_milestone3(line):
    line = line.strip()
    datas = line.split('\t')
    if len(datas) == 6:
        date, time_str, location, product, price, payment = datas
        time_obj = datetime.strptime(time_str, '%H:%M').time()
        if time(9,1) <= time_obj <= time(10,0):
            return [('09:01-10:00',1)]
        elif time(10, 1) <= time_obj <= time(11,0):
            return [('10:01-11:00',1)] 
    return []

RDD3 = filtered_data3.flatMap(mapper_milestone3).filter(lambda x: x)
count_sales_by_time = RDD3.reduceByKey(lambda x, y: x+y)
print("count sales by time: ", count_sales_by_time.collect())

end_time3 = datetime.now()

print("Waktu Milestone 3:", "102.47 detik", " vs. Waktu Milestone 4.3: %.2f detik" % ((end_time3 - start_time3).total_seconds()))

total sales:  [('Consumer Electronics', 57452374.12999931), ('Toys', 57463477.10999949)]
Waktu Milestone 1: 18.92 detik  vs. Waktu Milestone 4.1: 8.17 detik
total sales by store:  [('San Francisco', (499.97, "Men's Clothing")), ('Atlanta', (499.96, 'Pet Supplies')), ('Miami', (499.98, "Men's Clothing"))]
Waktu Milestone 2: 20.61 detik  vs. Waktu Milestone 4.2: 7.14 detik
count sales by time:  [('09:01-10:00', 459775), ('10:01-11:00', 459825)]
Waktu Milestone 3: 102.47 detik  vs. Waktu Milestone 4.3: 54.05 detik


## Milestone 5
Milestone ini dibagi menjadi 5.1, 5.2 dan 5.3 yang masing-masing secara berturut-turut adalah mengerjakan ulang Milestone 1, 2 dan 3 di atas, akan tetapi menggunakan DataFrame dari Apache Spark. Catat waktu yang diperlukan untuk masing-masing proses (5.1, 5.2 dan 5.3).

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, minute, to_timestamp
from time import time

spark = SparkSession \
    .builder \
    .appName("dataframe-spark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

purchases_df = spark.read.csv("hdfs://localhost:9000/purchases/purchases.txt", inferSchema=True, sep="\t")

purchases_df.createOrReplaceTempView("purchaseSql")

purchases_df = purchases_df.withColumn("_c1", to_timestamp("_c1", "HH:mm"))

start_time1 = time()

spark.sql("SELECT _c3 as product, SUM(_c4) as total_sales FROM purchaseSql WHERE _c3 LIKE '%Toys%' or _c3 LIKE '%Consumer Electronics%' GROUP BY _c3;").show()

end_time1 = time()

start_time2 = time()

spark.sql("SELECT location, price, product FROM (SELECT _c2 as location, _c4 as price, _c3 as product, ROW_NUMBER() OVER (PARTITION BY _c2 ORDER BY _c4 DESC) AS rank FROM purchaseSql WHERE _c2 IN ('Atlanta', 'Miami', 'San Francisco')) ranked WHERE rank = 1;").show()

end_time2 = time()

start_time3 = time()

spark.sql("""SELECT 
    CONCAT('09:01-10:00') AS `Time Range`,
    SUM(CASE WHEN (hour(_c1) = 9 AND minute(_c1) BETWEEN 1 AND 59 OR hour(_c1) = 10 AND minute(_c1) = 0) THEN 1 ELSE 0 END) AS `Count`
FROM purchaseSql
UNION ALL
SELECT 
    CONCAT('10:01-11:00') AS `Time Range`,
    SUM(CASE WHEN (hour(_c1) = 10 AND minute(_c1) BETWEEN 1 AND 59 OR hour(_c1) = 11 AND minute(_c1) = 0) THEN 1 ELSE 0 END) AS `Count`
FROM purchaseSql;""").show()

end_time3 = time()

print("Waktu Milestone 5.1: %.2f detik" % (end_time1 - start_time1))
print("Waktu Milestone 5.2: %.2f detik" % (end_time2 - start_time2))
print("Waktu Milestone 5.3: %.2f detik" % (end_time3 - start_time3))

+--------------------+-------------------+
|             product|        total_sales|
+--------------------+-------------------+
|Consumer Electronics|5.745237412999931E7|
|                Toys|5.746347710999949E7|
+--------------------+-------------------+

+-------------+------+--------------+
|     location| price|       product|
+-------------+------+--------------+
|San Francisco|499.97|Men's Clothing|
|      Atlanta|499.96|  Pet Supplies|
|        Miami|499.98|   Video Games|
+-------------+------+--------------+

+-----------+------+
| Time Range| Count|
+-----------+------+
|09:01-10:00|459775|
|10:01-11:00|459825|
+-----------+------+

Waktu Milestone 5.1: 15.87 detik
Waktu Milestone 5.2: 18.43 detik
Waktu Milestone 5.3: 41.95 detik
