# Tugas Besar Pemroseran Parallel Kelas B
## Analisis Data Games menggunakan MPI (reduce, filter), pyspark context (reduce, filter), dan sparksession (query) 

1. Rakasya Yoga Surya Prastama (F1D02310022)
2. Rakasya Yoga Surya Prastama (F1D02310022)
3. Rakasya Yoga Surya Prastama (F1D02310022)
4. Rakasya Yoga Surya Prastama (F1D02310022)
5. Rakasya Yoga Surya Prastama (F1D02310022)

Link Kaggle untuk dataset: https://www.kaggle.com/datasets/jummyegg/rawg-game-dataset

In [None]:
%pip install pyspark
%pip install mpi4py

Baris kode ini Menginstall 2 library yang akan digunakan, yaitu 'pyspark' dan juga 'mpi4py' dengan perintah '%pip install. 'pyspark' digunakan untuk menjalankan Spark di lingkungan Python, yang memungkinkan pemrosesan data besar secara paralel dan terdistribusi. Sedangkan 'mpi4py' adalah antarmuka Python untuk MPI (Message Passing Interface), yang digunakan untuk membagi dan mengelola tugas antar proses dalam pemrograman paralel manual. 

In [None]:
from mpi4py import MPI
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import csv

Baris kode ini memuat semua library yang akan digunakan untuk projek ini. 'mpi4py.MPI' digunakan untuk komunikasi antar proses saat menggunakan 'MPI', sedangkan 'SparkSession' dari 'pyspark.sql' digunakan untuk membaca dan memproses data tabular seperti database. Library tambahan seperti 'pandas', 'numpy', dan 'csv' juga diimpor, kemungkinan untuk manipulasi data dan pembacaan file CSV di luar konteks Spark.

In [None]:
import time
mulai_ukur = time.time()
akhir_ukur = time.time()

waktu_pengerjaan = akhir_ukur - mulai_ukur

print(waktu_pengerjaan)

Baris kode ini bertujuan mengukur durasi proses yang akan dijalankan. Namun, karena waktu mulai yaitu 'mulai_ukur' dan akhir yaitu 'akhir_ukur' dicatat tanpa proses di antaranya, nilai 'waktu_pengerjaan' akan mendekati nol. Jika nilai ditempatkan dengan benar di sekitar blok pemrosesan utama, pengukuran ini nantinya akan berguna untuk membandingkan kinerja MPI dan PySpark.

In [None]:
spark = SparkSession.builder.appName("Game Sales Analysis").getOrCreate()
data = spark.read.csv("game_info.csv", header=True, inferSchema=True)
data.show(10)

Pada baris kode ini 'SparkSession' dibuat dengan nama aplikasi "Game Sales Analysis", lalu digunakan untuk membaca file CSV 'game_info.csv'. File dibaca sebagai DataFrame dengan parameter 'header=True' agar baris pertama digunakan sebagai nama kolom dan 'inferSchema=True' agar tipe data dikenali otomatis. Kemudian, sepuluh baris pertama dari data ditampilkan untuk keperluan verifikasi.

In [None]:
mpi = MPI.COMM_WORLD
size = mpi.Get_size()

df = pd.read_csv("game_info.csv")
chunks = [df.iloc[i::size].reset_index(drop=True) for i in range(size)]

# Broadcast potongan data
scatter = mpi.scatter(chunks, root=0)

# FILTER: metacritic > 80
Filter = scatter[scatter["metacritic"] > 80]

# REDUCE: jumlah ratings_count
Reduce = Filter["ratings_count"].sum()
total_Reduce = mpi.reduce(Reduce, op=MPI.SUM, root=0)

print("Total ratings_count untuk metacritic > 80:", total_Reduce)


Pada baris Kode ini dengan Menggunakan mpi4py, data dibaca melalui 'pandas' dan dibagi menjadi beberapa potongan berdasarkan jumlah proses (size). Setiap proses mendapatkan bagian data untuk dianalisis secara paralel menggunakan scatter. Filter dilakukan untuk memilih data dengan nilai metacritic > 80, kemudian dilakukan reduce untuk menghitung total ratings_count, dan hasil akhirnya dikumpulkan di root process menggunakan mpi.reduce.

In [None]:
# Buat SparkSession dan SparkContext
sc = spark.sparkContext

# Baca file CSV sebagai RDD
rdd = sc.textFile("game_info.csv")

# Ambil header
header = rdd.first()

# Hilangkan header dan parsing CSV
data = rdd.filter(lambda x: x != header).map(lambda row: next(csv.reader([row])))

# FILTER: metacritic > 80 (kolom ke-4, index 3)
filtered = data.filter(lambda x: len(x) > 3 and x[3].isdigit() and int(x[3]) > 80)

# REDUCE: total ratings_count (kolom ke-13, index 12 — sesuaikan jika struktur berubah)
ratings_count_index = 12
ratings = filtered.map(lambda x: int(x[ratings_count_index]) if len(x) > ratings_count_index and x[ratings_count_index].isdigit() else 0)

# Totalkan dengan reduce
total = ratings.reduce(lambda a, b: a + b)

# Tampilkan hasil
print("Total ratings_count (PySpark RDD):", total)


Pada baris kode ini Data CSV dibaca sebagai RDD menggunakan 'SparkContext', dan 'header' dihapus dari 'data'. Setiap baris diubah menjadi list menggunakan parser CSV, kemudian difilter agar hanya menyisakan baris dengan metacritic > 80. Nilai 'ratings_count' dikonversi menjadi integer dan dijumlahkan menggunakan fungsi reduce. Hasil akhirnya adalah total 'ratings_count' dari data yang telah difilter.

In [None]:
# data = spark.read.csv("game_info.csv", header=True, inferSchema=True)
data.createOrReplaceTempView("games")

result = spark.sql("""
    SELECT name, rating, metacritic, platforms, released, genres, publishers 
    FROM games
    WHERE 
        rating IS NOT NULL AND 
        metacritic IS NOT NULL AND
        TRY_CAST(rating AS FLOAT) IS NOT NULL AND 
        metacritic IS NOT NULL
    ORDER BY rating  DESC
    LIMIT 20
""")

print("Hasil Query:")
result.toPandas()

Baris kode ini menggunakan fitur SQL dari Spark untuk menjalankan kueri terhadap DataFrame yang telah dibuat sebelumnya. Data difilter agar hanya menampilkan game yang memiliki nilai rating dan metacritic yang valid, lalu diurutkan berdasarkan rating secara menurun. Hasilnya dibatasi hanya 20 game teratas dan ditampilkan sebagai DataFrame 'pandas' agar lebih mudah dibaca.