# **Studi Kasus Apache Spark**

| Nama   | NRP |
| -------- | ------- |
| Nathan Kho Pancras  | 5027231002    |
| Michael Kenneth Salim | 5027231002     |

## **Import Library dan Download Dataset**

In [50]:
import kagglehub
import os
from pyspark.sql import SparkSession

path = kagglehub.dataset_download("sudalairajkumar/cryptocurrencypricehistory")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/cryptocurrencypricehistory


## **Menginisiasi Spark Session**

In [51]:
spark = SparkSession.builder.getOrCreate()

In [52]:
spark

## **Pre-Processing Data**

### Membaca Data

In [54]:
Coins = ["BTC", "ETH", "DOGE", "USD", "Stellar"]

Read_BTC = spark.read.csv(f"{path}/coin_Bitcoin.csv", header=True, inferSchema=True)
Read_ETH = spark.read.csv(f"{path}/coin_Ethereum.csv", header=True, inferSchema=True)
Read_DOGE = spark.read.csv(f"{path}/coin_Dogecoin.csv", header=True, inferSchema=True)
Read_USD = spark.read.csv(f"{path}/coin_USDCoin.csv", header=True, inferSchema=True)
Read_Stellar = spark.read.csv(f"{path}/coin_Stellar.csv", header=True, inferSchema=True)

print("Data has been successfully read")

Data has been successfully read


### Memastikan Kolom Date Bertipe Tanggal dan Data Diurutkan Secara Kronologis

In [55]:
print("Bitcoin Data")
Read_BTC.show(5)

print("Ethereum Data")
Read_ETH.show(5)

print("Dogecoin Data")
Read_DOGE.show(5)

print("USDCoin Data")
Read_USD.show(5)

print("Stellar Data")
Read_Stellar.show(5)

Bitcoin Data
+---+-------+------+-------------------+------------------+------------------+------------------+------------------+------+---------------+
|SNo|   Name|Symbol|               Date|              High|               Low|              Open|             Close|Volume|      Marketcap|
+---+-------+------+-------------------+------------------+------------------+------------------+------------------+------+---------------+
|  1|Bitcoin|   BTC|2013-04-29 23:59:59|147.48800659179688|             134.0|134.44400024414062| 144.5399932861328|   0.0| 1.6037688645E9|
|  2|Bitcoin|   BTC|2013-04-30 23:59:59|146.92999267578125| 134.0500030517578|             144.0|             139.0|   0.0|  1.542813125E9|
|  3|Bitcoin|   BTC|2013-05-01 23:59:59|139.88999938964844|107.72000122070312|             139.0|116.98999786376953|   0.0|1.29895459375E9|
|  4|Bitcoin|   BTC|2013-05-02 23:59:59| 125.5999984741211| 92.28189849853516|116.37999725341797|105.20999908447266|   0.0|1.16851749525E9|
|  5|Bi

## **Analisis Tren Harga**

### Memasukkan Data ke dalam Database

In [62]:
Read_BTC.createOrReplaceTempView("BTC")
Read_ETH.createOrReplaceTempView("ETH")
Read_DOGE.createOrReplaceTempView("DOGE")
Read_USD.createOrReplaceTempView("USD")
Read_Stellar.createOrReplaceTempView("Stellar")

### Hitung rata-rata harga penutupan (Close) bulanan untuk setiap kripto.

In [80]:
BTC_monthly_query = """
SELECT
    'Bitcoin' AS Coin,
    SUBSTRING(Date, 1, 7) AS Month,
    AVG(Close) AS Avg_Close
FROM BTC
GROUP BY SUBSTRING(Date, 1, 7)
ORDER BY Month
"""
BTC_monthly = spark.sql(BTC_monthly_query)
print("Bitcoin Monthly Averages:")
BTC_monthly.show(5)

ETH_monthly_query = """
SELECT
    'Ethereum' AS Coin,
    SUBSTRING(Date, 1, 7) AS Month,
    AVG(Close) AS Avg_Close
FROM ETH
GROUP BY SUBSTRING(Date, 1, 7)
ORDER BY Month
"""
ETH_monthly = spark.sql(ETH_monthly_query)
print("Ethereum Monthly Averages:")
ETH_monthly.show(5)

doge_monthly_query = """
SELECT
    'Dogecoin' AS Coin,
    SUBSTRING(Date, 1, 7) AS Month,
    AVG(Close) AS Avg_Close
FROM DOGE
GROUP BY SUBSTRING(Date, 1, 7)
ORDER BY Month
"""
doge_monthly = spark.sql(doge_monthly_query)
print("Dogecoin Monthly Averages:")
doge_monthly.show(5)

USD_monthly_query = """
SELECT
    'USDCoin' AS Coin,
    SUBSTRING(Date, 1, 7) AS Month,
    AVG(Close) AS Avg_Close
FROM USD
GROUP BY SUBSTRING(Date, 1, 7)
ORDER BY Month
"""
USD_monthly = spark.sql(USD_monthly_query)
print("USDCoin Monthly Averages:")
USD_monthly.show(5)

Stellar_monthly_query = """
SELECT
    'Stellar' AS Coin,
    SUBSTRING(Date, 1, 7) AS Month,
    AVG(Close) AS Avg_Close
FROM Stellar
GROUP BY SUBSTRING(Date, 1, 7)
ORDER BY Month
"""
Stellar_monthly = spark.sql(Stellar_monthly_query)
print("Stellar Monthly Averages:")
Stellar_monthly.show()

Bitcoin Monthly Averages:
+-------+-------+------------------+
|   Coin|  Month|         Avg_Close|
+-------+-------+------------------+
|Bitcoin|2013-04| 141.7699966430664|
|Bitcoin|2013-05|119.99274124637726|
|Bitcoin|2013-06|107.76140670776367|
|Bitcoin|2013-07|   90.512206539031|
|Bitcoin|2013-08|113.90548435334236|
+-------+-------+------------------+
only showing top 5 rows

Ethereum Monthly Averages:
+--------+-------+------------------+
|    Coin|  Month|         Avg_Close|
+--------+-------+------------------+
|Ethereum|2015-08| 1.259648747742176|
|Ethereum|2015-09| 0.988966029882431|
|Ethereum|2015-10|0.6611039330882411|
|Ethereum|2015-11|0.9313520669937134|
|Ethereum|2015-12| 0.886259263561618|
+--------+-------+------------------+
only showing top 5 rows

Dogecoin Monthly Averages:
+--------+-------+--------------------+
|    Coin|  Month|           Avg_Close|
+--------+-------+--------------------+
|Dogecoin|2013-12|5.005079392503831E-4|
|Dogecoin|2014-01|8.277584448449254

### Identifikasi Bulan dengan Rata-Rata Harga Penutupan Tertinggi dan Terendah untuk Setiap Kripto

In [83]:
BTC_highest_lowest_query = """
SELECT
    'Bitcoin' AS Coin,
    MAX(Avg_Close) AS Highest_Avg_Close,
    MIN(Avg_Close) AS Lowest_Avg_Close,
    (SELECT Month FROM (
        SELECT Month, Avg_Close,
        RANK() OVER (ORDER BY Avg_Close DESC) as rank
        FROM (
            SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
            FROM BTC
            GROUP BY SUBSTRING(Date, 1, 7)
        )
    ) WHERE rank = 1) AS Highest_Month,
    (SELECT Month FROM (
        SELECT Month, Avg_Close,
        RANK() OVER (ORDER BY Avg_Close ASC) as rank
        FROM (
            SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
            FROM BTC
            GROUP BY SUBSTRING(Date, 1, 7)
        )
    ) WHERE rank = 1) AS Lowest_Month
FROM (
    SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
    FROM BTC
    GROUP BY SUBSTRING(Date, 1, 7)
)
"""
BTC_highest_lowest = spark.sql(BTC_highest_lowest_query)
print("Bitcoin Highest and Lowest Months:")
BTC_highest_lowest.show(truncate=False)

ETH_highest_lowest_query = """
SELECT
    'Ethereum' AS Coin,
    MAX(Avg_Close) AS Highest_Avg_Close,
    MIN(Avg_Close) AS Lowest_Avg_Close,
    (SELECT Month FROM (
        SELECT Month, Avg_Close,
        RANK() OVER (ORDER BY Avg_Close DESC) as rank
        FROM (
            SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
            FROM ETH
            GROUP BY SUBSTRING(Date, 1, 7)
        )
    ) WHERE rank = 1) AS Highest_Month,
    (SELECT Month FROM (
        SELECT Month, Avg_Close,
        RANK() OVER (ORDER BY Avg_Close ASC) as rank
        FROM (
            SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
            FROM ETH
            GROUP BY SUBSTRING(Date, 1, 7)
        )
    ) WHERE rank = 1) AS Lowest_Month
FROM (
    SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
    FROM ETH
    GROUP BY SUBSTRING(Date, 1, 7)
)
"""
ETH_highest_lowest = spark.sql(ETH_highest_lowest_query)
print("Ethereum Highest and Lowest Months:")
ETH_highest_lowest.show(truncate=False)

DOGE_highest_lowest_query = """
SELECT
    'Dogecoin' AS Coin,
    MAX(Avg_Close) AS Highest_Avg_Close,
    MIN(Avg_Close) AS Lowest_Avg_Close,
    (SELECT Month FROM (
        SELECT Month, Avg_Close,
        RANK() OVER (ORDER BY Avg_Close DESC) as rank
        FROM (
            SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
            FROM DOGE
            GROUP BY SUBSTRING(Date, 1, 7)
        )
    ) WHERE rank = 1) AS Highest_Month,
    (SELECT Month FROM (
        SELECT Month, Avg_Close,
        RANK() OVER (ORDER BY Avg_Close ASC) as rank
        FROM (
            SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
            FROM DOGE
            GROUP BY SUBSTRING(Date, 1, 7)
        )
    ) WHERE rank = 1) AS Lowest_Month
FROM (
    SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
    FROM DOGE
    GROUP BY SUBSTRING(Date, 1, 7)
)
"""
DOGE_highest_lowest = spark.sql(DOGE_highest_lowest_query)
print("Dogecoin Highest and Lowest Months:")
DOGE_highest_lowest.show(truncate=False)

USD_highest_lowest_query = """
SELECT
    'USDCoin' AS Coin,
    MAX(Avg_Close) AS Highest_Avg_Close,
    MIN(Avg_Close) AS Lowest_Avg_Close,
    (SELECT Month FROM (
        SELECT Month, Avg_Close,
        RANK() OVER (ORDER BY Avg_Close DESC) as rank
        FROM (
            SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
            FROM USD
            GROUP BY SUBSTRING(Date, 1, 7)
        )
    ) WHERE rank = 1) AS Highest_Month,
    (SELECT Month FROM (
        SELECT Month, Avg_Close,
        RANK() OVER (ORDER BY Avg_Close ASC) as rank
        FROM (
            SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
            FROM USD
            GROUP BY SUBSTRING(Date, 1, 7)
        )
    ) WHERE rank = 1) AS Lowest_Month
FROM (
    SELECT SUBSTRING(Date, 1, 7) AS Month, AVG(Close) AS Avg_Close
    FROM USD
    GROUP BY SUBSTRING(Date, 1, 7)
)
"""
USD_highest_lowest = spark.sql(USD_highest_lowest_query)
print("USDCoin Highest and Lowest Months:")
USD_highest_lowest.show(truncate=False)


Bitcoin Highest and Lowest Months:
+-------+-----------------+----------------+-------------+------------+
|Coin   |Highest_Avg_Close|Lowest_Avg_Close|Highest_Month|Lowest_Month|
+-------+-----------------+----------------+-------------+------------+
|Bitcoin|57206.72022674067|90.512206539031 |2021-04      |2013-07     |
+-------+-----------------+----------------+-------------+------------+

Ethereum Highest and Lowest Months:
+--------+------------------+------------------+-------------+------------+
|Coin    |Highest_Avg_Close |Lowest_Avg_Close  |Highest_Month|Lowest_Month|
+--------+------------------+------------------+-------------+------------+
|Ethereum|3140.7307596877413|0.6611039330882411|2021-05      |2015-10     |
+--------+------------------+------------------+-------------+------------+

Dogecoin Highest and Lowest Months:
+--------+-------------------+---------------------+-------------+------------+
|Coin    |Highest_Avg_Close  |Lowest_Avg_Close     |Highest_Month|Lowes

## **Analisis Volatilitas**

In [86]:
BTC_volatility_query = """
SELECT
    Date,
    High - Low AS Daily_Volatility
FROM BTC
ORDER BY Date
"""
BTC_volatility = spark.sql(BTC_volatility_query)
print("Sample of Bitcoin Daily Volatility:")
BTC_volatility.show(5)

Sample of Bitcoin Daily Volatility:
+-------------------+------------------+
|               Date|  Daily_Volatility|
+-------------------+------------------+
|2013-04-29 23:59:59|13.488006591796875|
|2013-04-30 23:59:59|12.879989624023438|
|2013-05-01 23:59:59| 32.16999816894531|
|2013-05-02 23:59:59| 33.31809997558594|
|2013-05-03 23:59:59|29.027999877929688|
+-------------------+------------------+
only showing top 5 rows

