<a href="https://colab.research.google.com/github/AzzahraFebia/Big-Data/blob/main/Hands_On_Pertemuan_3_2420506023.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hands-On Pertemuan 3: Data Processing dengan Apache Spark

## Tujuan:
- Memahami dan mempraktikkan data processing menggunakan Apache Spark.
- Menggunakan Spark untuk operasi data yang efisien pada dataset besar.
- Menerapkan teknik canggih dalam Spark untuk mengatasi kasus penggunaan nyata.

### 1. Pengenalan Spark DataFrames
Spark DataFrame menyediakan struktur data yang optimal dengan operasi yang dioptimalkan untuk pemrosesan data besar, yang sangat mirip dengan DataFrame di Pandas atau di RDBMS.

- **Tugas 1**: Buat DataFrame sederhana di Spark dan eksplorasi beberapa fungsi dasar yang tersedia.

In [None]:
# Contoh membuat DataFrame sederhana dan operasi dasar
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.show()

In [1]:
# TUGAS 1
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Wilio', 'IT', 4500),
        ('Sean', 'Marketing', 4200),
        ('Alluna', 'HR', 3900)]

columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.show()


+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
|       Wilio|        IT|  4500|
|        Sean| Marketing|  4200|
|      Alluna|        HR|  3900|
+------------+----------+------+



### 2. Transformasi Dasar dengan DataFrames
Pemrosesan data meliputi transformasi seperti filtering, selections, dan aggregations. Spark menyediakan cara efisien untuk melaksanakan operasi ini.

- **Tugas 2**: Gunakan operasi filter, select, groupBy untuk mengekstrak informasi dari data, serta lakukan agregasi data untuk mendapatkan insight tentang dataset menggunakan perintah seperti mean, max, sum.

In [None]:
# Contoh operasi transformasi DataFrame
select('EmployeeName', 'Salary').show()
filter(df['Salary'] > 3000).show()
groupBy('Department').avg('Salary').show()

In [5]:
# OPERASI FILTER
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Wilio', 'IT', 4500),
        ('Sean', 'Marketing', 4200),
        ('Alluna', 'HR', 3900)]

columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.filter((df['Department'] == 'Sales') & (df['Salary'] > 3500)).show()


+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+



In [6]:
# OPERASI SELECT
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Wilio', 'IT', 4500),
        ('Sean', 'Marketing', 4200),
        ('Alluna', 'HR', 3900)]

columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.select('EmployeeName', 'Salary').show()


+------------+------+
|EmployeeName|Salary|
+------------+------+
|       James|  3000|
|     Michael|  4600|
|      Robert|  4100|
|       Maria|  3000|
|       Wilio|  4500|
|        Sean|  4200|
|      Alluna|  3900|
+------------+------+



In [7]:
# OPERASI GROUPBY
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Wilio', 'IT', 4500),
        ('Sean', 'Marketing', 4200),
        ('Alluna', 'HR', 3900)]

columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.groupBy('Department').avg('Salary').show()

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3900.0|
|        HR|     3900.0|
|   Finance|     3000.0|
| Marketing|     4200.0|
|        IT|     4500.0|
+----------+-----------+



In [10]:
# AGREGASI
from pyspark.sql import SparkSession
from pyspark.sql import functions as FC

spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Wilio', 'IT', 4500),
        ('Sean', 'Marketing', 4200),
        ('Alluna', 'HR', 3900)]

columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

# Menggabungkan agregasi dalam satu tabel
df.groupBy('Department').agg(
    FC.sum('Salary').alias('Total_Gaji'),
    FC.max('Salary').alias('Max_Gaji'),
    FC.min('Salary').alias('Min_Gaji'),
    FC.mean('Salary').alias('Mean_GajI'),
    FC.count('Salary').alias('Count_Gaji')
).show()

+----------+----------+--------+--------+---------+----------+
|Department|Total_Gaji|Max_Gaji|Min_Gaji|Mean_GajI|Count_Gaji|
+----------+----------+--------+--------+---------+----------+
|     Sales|     11700|    4600|    3000|   3900.0|         3|
|        HR|      3900|    3900|    3900|   3900.0|         1|
|   Finance|      3000|    3000|    3000|   3000.0|         1|
| Marketing|      4200|    4200|    4200|   4200.0|         1|
|        IT|      4500|    4500|    4500|   4500.0|         1|
+----------+----------+--------+--------+---------+----------+



### 3. Bekerja dengan Tipe Data Kompleks
Spark mendukung tipe data yang kompleks seperti maps, arrays, dan structs yang memungkinkan operasi yang lebih kompleks pada dataset yang kompleks.

- **Tugas 3**: Eksplorasi bagaimana mengolah tipe data kompleks dalam Spark DataFrames.

In [None]:
# Contoh manipulasi tipe data kompleks
df.withColumn('SalaryBonus', df['Salary'] * 0.1).show()
df.withColumn('TotalCompensation', df['Salary'] + df['SalaryBonus']).show()

In [11]:
# TUGAS 3
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Wilio', 'IT', 4500),
        ('Sean', 'Marketing', 4200),
        ('Alluna', 'HR', 3900)]

columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

# Menambahkan kolom SalaryBonus (0.1 / 10% dari Salary)
df = df.withColumn('SalaryBonus', df['Salary'] * 0.1)

# Menambahkan kolom TotalCompensation (Salary + SalaryBonus)
df = df.withColumn('TotalCompensation', df['Salary'] + df['SalaryBonus'])
df.show()

+------------+----------+------+-----------+-----------------+
|EmployeeName|Department|Salary|SalaryBonus|TotalCompensation|
+------------+----------+------+-----------+-----------------+
|       James|     Sales|  3000|      300.0|           3300.0|
|     Michael|     Sales|  4600|      460.0|           5060.0|
|      Robert|     Sales|  4100|      410.0|           4510.0|
|       Maria|   Finance|  3000|      300.0|           3300.0|
|       Wilio|        IT|  4500|      450.0|           4950.0|
|        Sean| Marketing|  4200|      420.0|           4620.0|
|      Alluna|        HR|  3900|      390.0|           4290.0|
+------------+----------+------+-----------+-----------------+



### 4. Operasi Data Lanjutan
Menggunakan Spark untuk operasi lanjutan seperti window functions, user-defined functions (UDFs), dan mengoptimalkan query.

- **Tugas 4**: Implementasikan window function untuk menghitung running totals atau rangkings.

In [None]:
# Contoh menggunakan window functions
from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.partitionBy('Department').orderBy('Salary')
df.withColumn('Rank', F.rank().over(windowSpec)).show()

In [15]:
# TUGAS 4
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as FC

spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Wilio', 'IT', 4500),
        ('Sean', 'Marketing', 4200),
        ('Alluna', 'HR', 3900)]

columns = ['EmployeeName', 'Department', 'Salary']
df = spark.createDataFrame(data, schema=columns)

# WindowSpec untuk membagi data sesuai departmen, dan mengurutkan sesuai salary
windowSpec = Window.partitionBy('Department').orderBy(FC.desc('Salary'))

# Menambahkan kolom ranking
df.withColumn('Rank', FC.rank().over(windowSpec)).show()

+------------+----------+------+----+
|EmployeeName|Department|Salary|Rank|
+------------+----------+------+----+
|       Maria|   Finance|  3000|   1|
|      Alluna|        HR|  3900|   1|
|       Wilio|        IT|  4500|   1|
|        Sean| Marketing|  4200|   1|
|     Michael|     Sales|  4600|   1|
|      Robert|     Sales|  4100|   2|
|       James|     Sales|  3000|   3|
+------------+----------+------+----+



### 5. Kesimpulan dan Eksplorasi Lebih Lanjut
Review apa yang telah dipelajari tentang pemrosesan data menggunakan Spark dan eksplorasi teknik lebih lanjut untuk mengoptimalkan pemrosesan data Anda.
<br>**Tugas 5**:
- Unduh dataset besar dari [Kaggle](https://www.kaggle.com/) atau sumber lainnya.
- Input data csv yang telah di download, kemudian load dan simpan data ke dalam pyspark.
- Setelah data berhasil di load menggunakan pyspark, lakukan manipulasi data untuk memperoleh informasi yang dibutuhkan

In [18]:
# TUGAS 5
from google.colab import files
uploaded = files.upload()

Saving amazon.csv to amazon.csv


MANIPULASI DATA

In [22]:
# Load Dataset
from pyspark.sql import SparkSession

# Inisialisasi Spark
spark = SparkSession.builder.appName("AmazonSalesAnalysis").getOrCreate()

# Load CSV
df = spark.read.csv("amazon.csv", header=True, inferSchema=True)

# Cek struktur data
df.printSchema()
df.show()


root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- discounted_price: string (nullable = true)
 |-- actual_price: string (nullable = true)
 |-- discount_percentage: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rating_count: string (nullable = true)
 |-- about_product: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_content: string (nullable = true)
 |-- img_link: string (nullable = true)
 |-- product_link: string (nullable = true)

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_

In [25]:
# SELECT KOLOM

df.select('product_name','category','discounted_price','rating').show()

+--------------------+--------------------+----------------+------+
|        product_name|            category|discounted_price|rating|
+--------------------+--------------------+----------------+------+
|Wayona Nylon Brai...|Computers&Accesso...|            ₹399|   4.2|
|Ambrane Unbreakab...|Computers&Accesso...|            ₹199|   4.0|
|Sounce Fast Phone...|Computers&Accesso...|            ₹199|   3.9|
|boAt Deuce USB 30...|Computers&Accesso...|            ₹329|   4.2|
|Portronics Konnec...|Computers&Accesso...|            ₹154|   4.2|
|pTron Solero TB30...|Computers&Accesso...|            ₹149|   3.9|
|boAt Micro USB 55...|Computers&Accesso...|         ₹176.63|   4.1|
|MI Usb Type-C Cab...|Computers&Accesso...|            ₹229|   4.3|
|TP-Link USB WiFi ...|Computers&Accesso...|            ₹499|   4.2|
|Ambrane Unbreakab...|Computers&Accesso...|            ₹199|   4.0|
|Portronics Konnec...|Computers&Accesso...|            ₹154|   4.3|
|boAt Rugged v3 Ex...|Computers&Accesso...|     

In [34]:
# FILTER PRODUK

df.filter(df['rating'] > 4.8).show()

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|       about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B0BP7XLX48|Syncwire LTG to U...|Computers&Accesso...|            ₹399|      ₹1,999|                80%|   5.0|          

In [47]:
# MENGHITUNG JUMLAH DATA

df.count()

1465

In [48]:
# MEMBERSIHKAN SIMBOL MATA UANG DAN PERSEN

from pyspark.sql.functions import regexp_replace, col

df = df.withColumn("discounted_price",
                   regexp_replace("discounted_price", "[^0-9.]", "").cast("float"))

df = df.withColumn("rating",
                   regexp_replace("rating", "[^0-9.]", "").cast("float"))


In [49]:
# RINGKASAN DATA PER KATEGORI

from pyspark.sql import functions as FC
df.groupBy("category").agg(
    FC.avg("discounted_price").alias("avg_discounted_price"),
    FC.count("*").alias("total_products"),
    FC.max("rating").alias("max_rating"),
    FC.min("rating").alias("min_rating")
).show(10)


+--------------------+--------------------+--------------+----------+----------+
|            category|avg_discounted_price|total_products|max_rating|min_rating|
+--------------------+--------------------+--------------+----------+----------+
|           reminders|                NULL|             1|      90.0|      90.0|
|Computers&Accesso...|               199.0|             1|       4.1|       4.1|
|OfficeProducts|Of...|              141.25|             4|       4.5|       4.2|
|OfficeProducts|Of...|  227.14285714285714|             7|       4.5|       4.1|
|Electronics|Camer...|               299.0|             1|       3.8|       3.8|
|Computers&Accesso...|               549.0|             1|       4.3|       4.3|
|      TWS Connection|                NULL|             3|    3999.0|    3999.0|
|     123 Sports Mode|                 8.0|             1|      NULL|      NULL|
|Computers&Accesso...|               558.0|             5|       4.5|       4.0|
|Computers&Accesso...|      