In [57]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [58]:
import pandas as pd
import random
import os

# Membaca file CSV
file1_path = "/content/drive/MyDrive/tugas/Major_eruption.csv"
file2_path = "/content/drive/MyDrive/tugas/volcano_indo.csv"

# Pastikan file ada
if not os.path.exists(file1_path):
    raise FileNotFoundError(f"File not found: {file1_path}")
if not os.path.exists(file2_path):
    raise FileNotFoundError(f"File not found: {file2_path}")

try:
    df1 = pd.read_csv(file1_path)
    df2 = pd.read_csv(file2_path)
except pd.errors.EmptyDataError:
    raise pd.errors.EmptyDataError("One of the CSV files is empty.")
except pd.errors.ParserError:
    raise pd.errors.ParserError("Error parsing one of the CSV files. Check the file format.")


# Fungsi untuk memperbanyak data
def augment_data_with_conditions(df1, df2, target_size=100):
    if target_size <= 0:
        raise ValueError("Target size must be a positive integer.")

    df1_augmented = pd.concat([df1] * (target_size // len(df1) + 1), ignore_index=True)
    df1_augmented = df1_augmented.sample(target_size, replace=len(df1) < target_size).reset_index(drop=True)

    df2_augmented = pd.concat([df2] * (target_size // len(df2) + 1), ignore_index=True)
    df2_augmented = df2_augmented.sample(target_size, replace=len(df2) < target_size).reset_index(drop=True)

    # Menambahkan missing values ke df2 (10% data)
    num_missing = int(0.1 * len(df2_augmented))
    missing_indices = random.sample(df2_augmented.index.to_list(), num_missing)
    for idx in missing_indices:
        col = random.choice(df2_augmented.columns)
        df2_augmented.loc[idx, col] = None

    # Menambahkan duplikasi pada sebagian df2 (20% data)
    num_duplicates = int(0.2 * len(df2_augmented))
    if num_duplicates > 0:
        duplicates = df2_augmented.sample(num_duplicates, replace=False)
        df2_augmented = pd.concat([df2_augmented, duplicates], ignore_index=True).reset_index(drop=True)


    return df1_augmented, df2_augmented

# Memperbanyak data sesuai kondisi
try:
    augmented_df1, augmented_df2 = augment_data_with_conditions(df1, df2, target_size=100)

    # Menyimpan hasil ke file lokal
    output_dir = "/content/drive/MyDrive/Colab Notebooks/PAD/"
    os.makedirs(output_dir, exist_ok=True)

    augmented_df1.to_csv(os.path.join(output_dir, "Major_eruption.csv"), index=False)
    augmented_df2.to_csv(os.path.join(output_dir, "volcano_indo.csv"), index=False)

    print("File berhasil disimpan:")
    print(f"- {os.path.join(output_dir, 'Major_eruption.csv')} (tanpa missing atau duplikat)")
    print(f"- {os.path.join(output_dir, 'volcano_indo.csv')} (dengan missing value dan beberapa duplikat)")

except ValueError as e:
    print(f"Error: {e}")
except FileNotFoundError as e:
    print(f"Error: {e}")
except pd.errors.EmptyDataError as e:
    print(f"Error: {e}")
except pd.errors.ParserError as e:
    print(f"Error: {e}")
except Exception as e: # Tangkap semua exception
    print(f"An unexpected error occurred: {e}")

File berhasil disimpan:
- /content/drive/MyDrive/Colab Notebooks/PAD/Major_eruption.csv (tanpa missing atau duplikat)
- /content/drive/MyDrive/Colab Notebooks/PAD/volcano_indo.csv (dengan missing value dan beberapa duplikat)


In [59]:
df = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/PAD/Major_eruption.csv")
df.duplicated().sum()

76

In [60]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Buat SparkSession
spark = SparkSession.builder \
    .appName("Major_eruption Analysis") \
    .getOrCreate()

# Tentukan skema untuk DataFrame agar cocok dengan CSV asli
vol_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("Eruption date", StringType(), True),
    StructField("Volcano", StringType(), True),
    StructField("Cessation date", StringType(), True),
    StructField("VEI", IntegerType(), True),
    StructField("Characteristics", StringType(), True),
    StructField("Tsunami", StringType(), True),
    StructField("Tephra volume", StringType(), True),
    StructField("Fatality", StringType(), True)
])

# Baca file CSV ke dalam DataFrame
vol_df = spark.read.csv(
    "/content/drive/MyDrive/Colab Notebooks/PAD/Major_eruption.csv",
    header=True,
    schema=vol_schema
)

# Tampilkan DataFrame
vol_df.show()
vol_df.createOrReplaceTempView('VolcanoEruptions')

# Total fatalities
spark.sql("""
    SELECT
        Volcano,
        MAX(CAST(REPLACE(Fatality, ',', '') AS DECIMAL)) as MaxFatalities
    FROM VolcanoEruptions
    WHERE Fatality != 'N/A'
    GROUP BY Volcano
    ORDER BY MaxFatalities DESC
    LIMIT 10
""").show()

# Analisis distribusi VEI
spark.sql("""
    SELECT
        VEI,
        COUNT(*) as EruptionCount
    FROM VolcanoEruptions
    GROUP BY VEI
    ORDER BY VEI
""").show()

+-----+----------------+----------+---------------+---+-----------------+-------+-------------+--------+
|index|   Eruption date|   Volcano| Cessation date|VEI|  Characteristics|Tsunami|Tephra volume|Fatality|
+-----+----------------+----------+---------------+---+-----------------+-------+-------------+--------+
|    9|   17 March 1963|     Agung|27 January 1964|  5|      cv,pf,lf,lm|     no|        1 km3|   1,148|
|   14|  26 August 1883|  Krakatoa|  February 1884|  6|cv,se,pf,fa,lm,cc|15–42 m|    5–8.5 km3|  36,600|
|   20|  12 August 1772|Papandayan| 12 August 1772|  3|            cv,ph|     no|         NULL|   2,957|
|   16|    2 March 1856|       Awu|  17 March 1856|  3|         cv,pf,lm|    yes|0.51±0.50 km3|   2,806|
|   16|    2 March 1856|       Awu|  17 March 1856|  3|         cv,pf,lm|    yes|0.51±0.50 km3|   2,806|
|    9|   17 March 1963|     Agung|27 January 1964|  5|      cv,pf,lf,lm|     no|        1 km3|   1,148|
|    9|   17 March 1963|     Agung|27 January 1964|  5|

In [61]:
#sel terpisah
jumlahBaris = len(df)
print(f"Jumlah baris dalam DataFrame: {jumlahBaris}")

Jumlah baris dalam DataFrame: 100


In [62]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

vol_df.printSchema()
pandas_df = vol_df.toPandas()
print(pandas_df.dtypes)

root
 |-- index: integer (nullable = true)
 |-- Eruption date: string (nullable = true)
 |-- Volcano: string (nullable = true)
 |-- Cessation date: string (nullable = true)
 |-- VEI: integer (nullable = true)
 |-- Characteristics: string (nullable = true)
 |-- Tsunami: string (nullable = true)
 |-- Tephra volume: string (nullable = true)
 |-- Fatality: string (nullable = true)

index               int32
Eruption date      object
Volcano            object
Cessation date     object
VEI                 int32
Characteristics    object
Tsunami            object
Tephra volume      object
Fatality           object
dtype: object


In [63]:
# contoh sql manipulation
vol_df.createOrReplaceTempView('VolcanoEruptions')

# Gunakan SELECT untuk membuat DataFrame baru dengan nilai yang diperbarui
updated_vol_df = spark.sql("""
    SELECT *,
           CASE WHEN `Tephra volume` IS NULL THEN 'Unknown'
                ELSE `Tephra volume`
           END as updated_Tephra_volume
    FROM VolcanoEruptions
""")

# Ganti DataFrame asli dengan yang diperbarui
updated_vol_df = updated_vol_df.drop("Tephra volume").withColumnRenamed("updated_Tephra_volume", "Tephra volume")

# Buat ulang tampilan sementara dengan DataFrame yang diperbarui
updated_vol_df.createOrReplaceTempView('VolcanoEruptions')

# Verifikasi perubahannya
spark.sql("SELECT * FROM VolcanoEruptions WHERE `Tephra volume` = 'Unknown'").show()

+-----+----------------+-------------+---------------+---+-----------------+-------+--------+-------------+
|index|   Eruption date|      Volcano| Cessation date|VEI|  Characteristics|Tsunami|Fatality|Tephra volume|
+-----+----------------+-------------+---------------+---+-----------------+-------+--------+-------------+
|   20|  12 August 1772|   Papandayan| 12 August 1772|  3|            cv,ph|     no|   2,957|      Unknown|
|   23|  September 1257|      Samalas|        unknown|  7|             NULL|   NULL|    NULL|      Unknown|
|    3| 3 November 2010|       Merapi|8 November 2010|  4|      cv,pf,ld,lm|     no|     353|      Unknown|
|   20|  12 August 1772|   Papandayan| 12 August 1772|  3|            cv,ph|     no|   2,957|      Unknown|
|   13|     7 June 1892|          Awu|   12 June 1892|  3|         cv,pf,lm|    yes|   1,532|      Unknown|
|    5|    18 July 1983|         Colo|  December 1983|  4|         cv,pf,ph|     no|       0|      Unknown|
|    1|21 December 2018|Anak

In [64]:
from pyspark.sql.functions import lower, trim
# Buat SparkSession
spark = SparkSession.builder.appName("VolcanoJoin").getOrCreate()

# skema untuk volcano_indo_df
indo_schema = StructType([
    StructField("Volcano_ID", IntegerType(), True),
    StructField("Volcano_Name", StringType(), True),
    StructField("Last_Eruption", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Region", StringType(), True)
])

# Baca file CSV ke dalam volcano_indo_df
volcano_indo_df = spark.read.csv(
    "/content/drive/MyDrive/tugas/volcano_indo.csv",
    header=True,
    schema=indo_schema
)

# Standarisasi nama gunung berapi
vol_df = vol_df.withColumn("Volcano", lower(trim(vol_df["Volcano"])))
volcano_indo_df = volcano_indo_df.withColumn("Volcano_Name", lower(trim(volcano_indo_df["Volcano_Name"])))

# Gabungkan DataFrame
joined_df = vol_df.join(volcano_indo_df, vol_df["Volcano"] == volcano_indo_df["Volcano_Name"], "left")

# Tampilkan hasil
joined_df.show()

+-----+----------------+----------+---------------+---+-----------------+-------+-------------+--------+----------+------------+-------------+--------------------+--------------------+
|index|   Eruption date|   Volcano| Cessation date|VEI|  Characteristics|Tsunami|Tephra volume|Fatality|Volcano_ID|Volcano_Name|Last_Eruption|              Status|              Region|
+-----+----------------+----------+---------------+---+-----------------+-------+-------------+--------+----------+------------+-------------+--------------------+--------------------+
|    9|   17 March 1963|     agung|27 January 1964|  5|      cv,pf,lf,lm|     no|        1 km3|   1,148|        82|       agung|stratovolcano|3,031 metres (9,9...|      2019 (ongoing)|
|   14|  26 August 1883|  krakatoa|  February 1884|  6|cv,se,pf,fa,lm,cc|15–42 m|    5–8.5 km3|  36,600|      NULL|        NULL|         NULL|                NULL|                NULL|
|   20|  12 August 1772|papandayan| 12 August 1772|  3|            cv,ph|  

In [65]:
BarisJoin = vol_df.join(volcano_indo_df, vol_df["Volcano"] == volcano_indo_df["Volcano_Name"], "left")
print(f'Jumlah Baris setalah Join : {BarisJoin.count()}')

Jumlah Baris setalah Join : 100


In [66]:
VolND = vol_df.dropDuplicates()
print(f'Jumlah Baris tanpa duplikat : {VolND.count()}')

Jumlah Baris tanpa duplikat : 24


In [67]:
VolNoMissing = VolND.dropna(subset='Fatality')
VolNoMissing.show()

+-----+----------------+-------------+----------------+---+-----------------+-------+-------------+--------------------+
|index|   Eruption date|      Volcano|  Cessation date|VEI|  Characteristics|Tsunami|Tephra volume|            Fatality|
+-----+----------------+-------------+----------------+---+-----------------+-------+-------------+--------------------+
|   16|    2 March 1856|          awu|   17 March 1856|  3|         cv,pf,lm|    yes|0.51±0.50 km3|               2,806|
|    8|   26 April 1966|        kelut|   27 April 1966|  4|      cv,cl,pf,lm|     no|    0.089 km3|                 212|
|   15|   15 April 1872|       merapi|   21 April 1872|  4|            cv,pf|     no|     0.33 km3|                 200|
|    9|   17 March 1963|        agung| 27 January 1964|  5|      cv,pf,lf,lm|     no|        1 km3|               1,148|
|    6|    5 April 1982|   galunggung|  8 January 1983|  4|      cv,pf,lf,lm|     no|   0.37 km3 +|                  68|
|    4|10 February 1990|        

In [68]:
# Melihat Statistik data Clean
VolNoMissing.describe().show()

+-------+------------------+-------------+-------+---------------+------------------+---------------+-------+-------------+--------------------+
|summary|             index|Eruption date|Volcano| Cessation date|               VEI|Characteristics|Tsunami|Tephra volume|            Fatality|
+-------+------------------+-------------+-------+---------------+------------------+---------------+-------+-------------+--------------------+
|  count|                23|           23|     23|             23|                23|             23|     23|           17|                  23|
|   mean|12.043478260869565|       1586.0|   NULL|           NULL| 4.217391304347826|           NULL|   NULL|         NULL|   209.8181818181818|
| stddev| 6.858824845472406|         NULL|   NULL|           NULL|1.3469332756346388|           NULL|   NULL|         NULL|  291.61372333339125|
|    min|                 1|10 April 1815|  agung|10 January 2019|                 2|       cf,cl,lm|15–42 m|   0.0017 km3|       

In [69]:
from pyspark.sql.functions import regexp_replace, col

# Transmisi 'Fatality' ke numerik setelah menghapus koma dan menangani nilai non-numerik
VolNoMissing = VolNoMissing.withColumn(
    "Fatality",
    regexp_replace(col("Fatality"), ",", "").cast("double")
)

KorelasiVol = VolNoMissing.corr('VEI', 'Fatality')
print(f'Korelasi VEI dan Fatality: {KorelasiVol}')

Korelasi VEI dan Fatality: 0.26924193927746376
