Nama: Dika Elsaputra

NPM: 2320506032

**Tugas 1**: Pelajari bagaimana MapReduce bekerja dengan dataset sederhana dan coba implementasikan konsep key-value pair.

### Bagaimana MapReduce Bekerja

1. **Map Phase**: 
   - **Input**: Dataset dibagi menjadi bagian-bagian kecil yang disebut splits.
   - **Proses**: Fungsi *Map* dipanggil untuk setiap split. Fungsi ini menghasilkan pasangan kunci-nilai (key-value pairs) dari data input. Misalnya, dalam kasus analisis teks, pasangan kunci-nilai bisa berupa kata sebagai kunci dan angka 1 sebagai nilai.
   - **Output**: Pasangan kunci-nilai ini kemudian dikelompokkan oleh kunci.

2. **Shuffle and Sort Phase**:
   - **Pengelompokan**: Pasangan kunci-nilai yang dihasilkan dari fase Map dikelompokkan berdasarkan kunci.
   - **Sort**: Pasangan kunci-nilai diurutkan berdasarkan kunci.

3. **Reduce Phase**:
   - **Input**: Fase ini menerima pasangan kunci-nilai yang telah dikelompokkan dan diurutkan.
   - **Proses**: Fungsi *Reduce* dijalankan untuk setiap kelompok kunci. Fungsi ini menggabungkan nilai-nilai terkait dengan kunci yang sama.
   - **Output**: Hasil akhir dari fase Reduce adalah output akhir dari proses MapReduce.

### Contoh Implementasi Key-Value Pair

Mari kita lihat implementasi sederhana dari MapReduce dengan dataset teks kecil. Misalkan kita memiliki teks sebagai berikut dan ingin menghitung jumlah kemunculan setiap kata.

**Dataset Input:**
```
hello world
hello MapReduce
hello MapReduce world
```

#### Fungsi Map

Fungsi Map akan mengambil setiap baris teks dan menghasilkan pasangan kunci-nilai untuk setiap kata.

In [None]:
def map_function(text):
    results = []
    for word in text.split():
        results.append((word, 1))
    return results

input_text = "hello world hello MapReduce hello MapReduce world"
mapped_results = map_function(input_text)
print(mapped_results)

#### Fungsi Reduce

Fungsi Reduce akan menggabungkan nilai-nilai yang memiliki kunci yang sama.

In [None]:
from collections import defaultdict

def reduce_function(pairs):
    reduced_result = defaultdict(int)
    for key, value in pairs:
        reduced_result[key] += value
    return dict(reduced_result)

reduced_results = reduce_function(mapped_results)
print(reduced_results)

### Penjelasan

1. **Map Function**: Mengambil baris teks dan membagi setiap kata menjadi pasangan kunci-nilai dengan nilai 1.
2. **Reduce Function**: Mengumpulkan dan menjumlahkan nilai-nilai yang memiliki kunci yang sama untuk memberikan jumlah total kemunculan setiap kata.

Proses MapReduce ini akan memproses data besar dengan cara terdistribusi dan paralel, yang sangat berguna untuk analisis data besar dalam skala besar.

**Tugas 2**: Implementasikan fungsi `map_function` dan `reduce_function` pada dataset teks sederhana, lalu hitung jumlah kata.

In [None]:
def map_function(text):
  for word in text.split():
    yield (word, 1)

from collections import defaultdict

def reduce_function(pairs):
  result = defaultdict(int)
  for word, count in pairs:
    result[word] += count
  return result

text = "hello world hello"
mapped = list(map_function(text))
result = dict(reduce_function(mapped))
print(result)

**Tugas 3**: Upload file teks ke HDFS, jalankan perintah MapReduce Word Count, dan tampilkan hasilnya.

```batch
hdfs dfs -mkdir /user
hdfs dfs -mkdir /user/figuran04
hdfs dfs -mkdir /user/figuran04/input
cd Downloads
hdfs dfs -put input.txt /user/figuran04/input/
hadoop jar C:/hadoop-3.3.6/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.6.jar wordcount /user/figuran04/input/ /user/figuran04/output/
hdfs dfs -cat /user/figuran04/output/part-r-00000
```

```batch
Duis    1
Excepteur       1
Lorem   1
Ut      1
ad      1
adipiscing      1
aliqua. 1
aliquip 1
amet,   1
anim    1
aute    1
cillum  1
commodo 1
consectetur     1
consequat.      1
culpa   1
cupidatat       1
deserunt        1
do      1
dolor   2
dolore  2
ea      1
eiusmod 1
elit,   1
enim    1
esse    1
est     1
et      1
eu      1
ex      1
exercitation    1
fugiat  1
id      1
in      3
incididunt      1
ipsum   1
irure   1
labore  1
laboris 1
laborum.        1
magna   1
minim   1
mollit  1
nisi    1
non     1
nostrud 1
nulla   1
occaecat        1
officia 1
pariatur.       1
proident,       1
qui     1
quis    1
reprehenderit   1
sed     1
sint    1
sit     1
sunt    1
tempor  1
ullamco 1
ut      2
velit   1
veniam, 1
voluptate       1
```

**Tugas 4**: Cari dataset besar, jalankan MapReduce untuk menghitung kata, dan buat laporan analisis hasil.

#### 1. **File System Counters**
- **FILE: Number of bytes read=6925**  
  Mengacu pada jumlah total byte yang dibaca oleh sistem file lokal selama eksekusi. Ini termasuk data yang dibaca selama fase Map dan Reduce dari disk lokal.
  
- **FILE: Number of bytes written=569429**  
  Ini adalah total byte yang ditulis ke sistem file lokal selama eksekusi tugas. Hasil ini menunjukkan besar output yang dihasilkan pada node lokal setelah seluruh proses pemrosesan.

- **HDFS: Number of bytes read=13621**  
  Mengacu pada total byte yang dibaca dari HDFS, baik pada fase Map maupun Reduce. Data ini termasuk input yang diberikan untuk Map dan output antar fase dari Map ke Reduce.

- **HDFS: Number of bytes written=6151**  
  Jumlah byte yang ditulis ke HDFS sebagai hasil akhir dari proses Reduce. Nilai ini biasanya lebih kecil dibandingkan input, karena data sudah terproses dan direduksi.

- **HDFS: Number of read/write operations**  
  Terdapat 8 operasi pembacaan dan 2 operasi penulisan di HDFS selama eksekusi, menunjukkan aktivitas I/O pada cluster HDFS selama proses MapReduce.

#### 2. **Job Counters**
- **Launched map tasks=1, Launched reduce tasks=1**  
  Menunjukkan bahwa hanya satu tugas Map dan satu tugas Reduce yang diluncurkan untuk proses ini, karena dataset relatif kecil.

- **Data-local map tasks=1**  
  Mengonfirmasi bahwa tugas Map diluncurkan pada node yang sama dengan tempat data disimpan, meningkatkan efisiensi dengan meminimalkan transfer data di jaringan.

- **Total time spent by all map tasks (ms)=17091, Total time spent by all reduce tasks (ms)=18621**  
  Waktu total yang dihabiskan oleh tugas Map dan Reduce untuk menyelesaikan eksekusi. Map membutuhkan waktu 17091 ms, sedangkan Reduce memerlukan waktu 18621 ms. Ini mengindikasikan bahwa sebagian besar waktu dihabiskan di fase Reduce.

- **Total megabyte-milliseconds taken by all map tasks=17501184**  
  Jumlah total megabyte yang diproses oleh tugas Map dalam milidetik. Angka ini membantu mengukur throughput pemrosesan data selama fase Map.

#### 3. **Map-Reduce Framework**
- **Map input records=88**  
  Sebanyak 88 catatan yang diproses oleh tugas Map dari dataset.

- **Map output records=721**  
  Setelah diproses, Map menghasilkan 721 record output. Hal ini disebabkan oleh pemecahan dataset ke dalam unit key-value yang lebih kecil.

- **Reduce input records=196, Reduce output records=196**  
  Fase Reduce menerima 196 record dan menghasilkan jumlah yang sama setelah reduksi. Hal ini menunjukkan proses penggabungan key-value yang efisien tanpa ada penghilangan data.

- **Spilled Records=392**  
  Record yang dituliskan ke disk selama proses Shuffle (saat output dari Map dipindahkan ke Reduce). Semakin besar dataset, semakin banyak catatan yang akan tertumpah ke disk selama proses Shuffle.

- **Shuffle Errors (BAD_ID, CONNECTION, IO_ERROR, dll.)**  
  Tidak ada error yang terjadi selama proses shuffle, yang berarti seluruh transfer data antar node berjalan lancar.

#### 4. **File Input Format Counters**
- **Bytes Read=13493**  
  Ukuran data yang dibaca oleh framework dari HDFS selama fase Map, menunjukkan total byte dari file input yang diproses.

#### 5. **File Output Format Counters**
- **Bytes Written=6151**  
  Jumlah byte yang ditulis ke HDFS sebagai output akhir dari tugas Reduce.


**Tugas Tambahan**: Buat algoritma MapReduce lainnya, seperti menghitung rata-rata nilai, atau menghitung frekuensi kemunculan elemen tertentu di dalam dataset.

In [None]:
def map_function_combined(data):
    total = 0
    count = 0
    for element in data:
        total += element if isinstance(element, (int, float)) else 0
        count += 1 if isinstance(element, (int, float)) else 0
        yield ('frequency', (element, 1))
    yield ('average', (total, count))

from collections import defaultdict

def reduce_function_combined(pairs):
    total_sum = 0
    total_count = 0
    frequency_result = defaultdict(int)
    for key, value in pairs:
        if key == 'average':
            partial_sum, partial_count = value
            total_sum += partial_sum
            total_count += partial_count
        elif key == 'frequency':
            element, count = value
            frequency_result[element] += count
    average = total_sum / total_count if total_count > 0 else 0
    return {'average': average, 'frequency': dict(frequency_result)}

data = [10, 20, 'apple', 30, 'banana', 'apple', 40, 'banana', 'apple', 50]
mapped = map_function_combined(data)  
reduced = reduce_function_combined(mapped)  
print(f"Hasil Rata-rata dan Frekuensi Kemunculan Elemen: {reduced}")
