In [None]:
import pyspark

In [None]:
from pyspark import SparkContext, SparkConf

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType
from pyspark.sql.functions import lit, array, explode, min as spark_min, max as spark_max, col

In [None]:
schema=StructType([    
    StructField('dateTime', DateType(), True),
    StructField('open', FloatType(), True),
    StructField('high', FloatType(), True),
    StructField('low', FloatType(), True),
    StructField('close', FloatType(), True),
    StructField('volume', FloatType(), True),
])

In [None]:
df = spark.read.csv("/user/root/input/output_BTCUSDT.csv", sep=',',header=True,schema=schema,inferSchema=True)

In [None]:
df.show()

In [None]:
#  check null & NaN
from pyspark.sql import functions as F
df.select(*[
    (
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in df.dtypes if c in df.columns
]).show()

In [None]:
def length(data):
    count = 0
    for n in data:
        count+=1
    return count
def sum(data):
    sum = 0
    for value in data:
        sum+=value
    return sum
def min(data):
    min = data[0]
    for i in range(1, length(data)):
        if min > data[i]:
            min = data[i]

    return min

def max(data):
    max = data[0]
    for i in range(1, length(data)):
        if max < data[i]:
            max = data[i]

    return max

def qsort(data):
    if len(data) <= 1:
        return data
    
    pivot = data[0]
    
    left = [i for i in data[1:] if i <= pivot]
    right = [i for i in data[1:] if i > pivot]
    
    return qsort(left) + [pivot] + qsort(right)

In [None]:
def describe(data):
    description = {}
    col_values = []
    dem = 0
    for col in data.columns:
        if dem == 0:
            dem += 1
            continue 
            
        col_values = [element[col] for element in data.select(col).collect()]
        count = length(col_values)
        mean = sum(col_values) / count
        std = (sum((x - mean) ** 2 for x in col_values) / (count - 1)) ** 0.5
        min_val = min(col_values)
        max_val = max(col_values)
        sorted_values = qsort(col_values)
        q25 = sorted_values[int(0.25 * (count - 1))]
        median = sorted_values[int(0.5 * (count - 1))]
        q75 = sorted_values[int(0.75 * (count - 1))]
    
        description[col] = {
            'count': count,
            'mean': mean,
            'std': std,
            'min': min_val,
            '25%': q25,
            '50%': median,
            '75%': q75,
            'max': max_val,
        }


    return description

In [None]:
des_data = describe(df)

In [None]:
from tabulate import tabulate
def show(data):
    headers = ["Metric"] + list(data.keys())
    rows = []
    
    metrics = list(data['open'].keys())
    for metric in metrics:
        row = [metric]
        for key in data.keys():
            row.append(data[key][metric])
        rows.append(row)
    
    print(tabulate(rows, headers=headers, tablefmt="grid"))

show(des_data)

In [None]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
# Hiển thị biểu đồ về giá theo ngày
fig = plt.figure(figsize=(10, 5))
plt.plot(df.select('dateTime').collect(),df.select('close').collect())
plt.xlabel('Date')
plt.ylabel('Bitcoin Price')
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
plt.title('Bitcoin Price')
plt.show()

# Tiền xử lý dữ liệu

## Chuẩn hóa dữ liệu

In [None]:
# Hàm chuẩn hóa dữ liệu
def min_max_scaling(df, min_vals, max_vals):
    for column in df.columns:
        if column != "dateTime":
            min_val = min_vals[column]
            max_val = max_vals[column]
            df = df.withColumn(column, (col(column) - min_val) / (max_val - min_val))
    return df

In [None]:
# Tính giá trị min và max cho mỗi cột
min_values = df.select([spark_min(col(c)).alias(c) for c in df.columns if c != "datetTime "]).first()
max_values = df.select([spark_max(col(c)).alias(c) for c in df.columns if c != "datetTime "]).first()

In [None]:
dataset_norm = min_max_scaling(df, min_values, max_values)

In [None]:
dataset_norm.show()

## Vẽ sơ đồ

In [None]:
fig = plt.figure(figsize=(10, 5))
plt.plot(dataset_norm.select('dateTime').collect(), dataset_norm.select('close').collect())
plt.xlabel('Date')
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
plt.title('Data Normalized')
plt.show()

## Chia dữ liệu

In [None]:
train_size = 0.8
count_data = length(dataset_norm.collect())
train_count = int(train_size * count_data)

training_data = dataset_norm.limit(train_count)

testing_data = dataset_norm.subtract(training_data).orderBy('dateTime')

## Vẽ biểu đồ các tập dữ liệu đã chia

In [None]:
# Train
fig = plt.figure(figsize=(10, 5))
plt.plot(training_data.select('dateTime').collect(), training_data.select('close').collect())
plt.xlabel('Date')
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
plt.title('Data Training')
plt.show()

In [None]:
# Test
fig = plt.figure(figsize=(10, 5))
plt.plot(testing_data.select('dateTime').collect(), testing_data.select('close').collect())
plt.xlabel('Date')
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
plt.title('Data Testing')
plt.show()

## Find lags

In [None]:
training_data.show()

In [None]:
def create_sliding_windows_rdd(rdd, lag):
    def sliding_window(index, iterator):
        data = list(iterator)
        x = []
        y = []
        for i in range(lag, len(data)):
            x.append([list(t[1:]) for t in data[i-lag:i]])
            y.append(data[i][1])
        return zip(x, y)
    
    return rdd.mapPartitionsWithIndex(sliding_window)

In [None]:
lag=1

training_windows_rdd = create_sliding_windows_rdd(training_data.rdd, lag)
testing_windows_rdd = create_sliding_windows_rdd(testing_data.rdd, lag)


x_train, y_train = zip(*training_windows_rdd.collect())
x_test, y_test = zip(*testing_windows_rdd.collect())



In [None]:
x_train

In [None]:
def create_matrix(size1, size2):
    limit = np.sqrt(6 / (size1 + size2))
    return np.random.uniform(-limit, limit, size=(size1, size2))
    

In [None]:
import numpy as np
np.random.seed(1234)
class GRU:
    def __init__(self, input_size, hidden_size, output_size):
        self.hidden_size = hidden_size

        def softmax(self, x):
            e_x = np.exp(x - np.max(x))  # Trừ max để tránh overflow
            return e_x / e_x.sum(axis=0)
        
        # Initialize weights
        self.W_z = create_matrix(hidden_size, input_size)
        self.U_z = create_matrix(hidden_size, hidden_size)
        self.b_z = np.zeros((hidden_size, 1))
        
        self.W_r = create_matrix(hidden_size, input_size)
        self.U_r = create_matrix(hidden_size, hidden_size)
        self.b_r = np.zeros((hidden_size, 1))
        
        self.W_h = create_matrix(hidden_size, input_size)
        self.U_h = create_matrix(hidden_size, hidden_size)
        self.b_h = np.zeros((hidden_size, 1))
        
        self.W_out = create_matrix(output_size, hidden_size)
        self.b_out = np.zeros((output_size, 1))
    
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    def tanh(self, x):
        return np.tanh(x)
    
    def forward(self, x, h_prev):
        z_t = self.sigmoid(np.dot(self.W_z, x) + np.dot(self.U_z, h_prev) + self.b_z)
        r_t = self.sigmoid(np.dot(self.W_r, x) + np.dot(self.U_r, h_prev) + self.b_r)
        h_hat_t = self.tanh(np.dot(self.W_h, x) + np.dot(self.U_h, r_t * h_prev) + self.b_h)
        h_t = (1 - z_t) * h_prev + z_t * h_hat_t

        y_t = np.dot(self.W_out, h_t) + self.b_out
        return y_t, h_t
    
    def train(self, X_train, y_train, learning_rate=0.001, epochs=100):
        for epoch in range(epochs):
            total_loss = 0
            for i in range(len(X_train)):
                h_prev = np.zeros((self.hidden_size, 1))
                for t in range(len(X_train[i])):
                    x_t = np.array(X_train[i][t]).reshape(-1, 1)
                    y_t = np.array(y_train[i]).reshape(-1, 1)
                    
                    # Forward với h_prev để ra y và h của hiện tại (biến tên h_prev vì nó sẽ lưu lại và sử dụng cho lượt tiếp theo)
                    y_pred, h_prev = self.forward(x_t, h_prev)
                    
                    # Tính hàm mất mát
                    loss = np.mean((y_pred - y_t) ** 2)
                    total_loss += loss
                    
                    # Backward pass (Gradient Descent)
                    grad_y_pred = 2 * (y_pred - y_t)
                    self.W_out -= learning_rate * np.dot(grad_y_pred, h_prev.T)
                    self.b_out -= learning_rate * grad_y_pred
            
            print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(X_train)}')

    def predict(self, X_test):
        predictions = []
        for i in range(len(X_test)):
            h_prev = np.zeros((self.hidden_size, 1))
            for t in range(len(X_test[i])):
                x_t = np.array(X_test[i][t]).reshape(-1, 1)
                
                # Forward pass
                y_pred, h_prev = self.forward(x_t, h_prev)
            
            predictions.append(y_pred.flatten()[0])
        return predictions

input_size = 5
hidden_size = 120
output_size = 1
gru = GRU(input_size, hidden_size, output_size)

# Huấn luyện mô hình
gru.train(x_train, y_train, learning_rate=0.001, epochs=100)


In [None]:
# Dự đoán
predictions = gru.predict(x_test)
print("Predictions:", np.array(predictions))

In [None]:
y_test

In [None]:
test_date = np.array([item[0:][0] for item in testing_data.collect()])

In [None]:
test_date = test_date[lag:]

In [None]:
plt.figure(num=None, figsize=(10, 4), dpi=80,facecolor='w', edgecolor='k')
plt.title('Graph Comparison Data Actual and Data Prediction')
plt.plot(test_date,y_test, color='red',label='Data Test')

plt.plot(test_date,predictions, color='blue',label='Prediction Results')
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
plt.xlabel('Day')
plt.ylabel('Price')
plt.legend()
plt.show()

In [None]:
mse = np.mean((np.array(predictions) - np.array(y_test)) ** 2)
mae = np.mean(np.abs(np.array(predictions) - np.array(y_test)))
rmse = np.sqrt(np.mean((np.array(predictions) - np.array(y_test)) ** 2))
mape =  np.mean(np.abs((np.array(y_test) - np.array(predictions)) / y_test)) * 100

In [None]:
print('mse: ', mse)
print('mae: ', mae)
print('rmse: ', rmse)
print('mape: ', mape)

In [None]:
def returnFromMinMax(scaled_data, col='close'):
    min_v = des_data[col]['min']
    max_v = des_data[col]['max']
    data = [d  * (max_v - min_v) + min_v for d in scaled_data]
    return data

In [None]:
predict_price = returnFromMinMax(predictions)

In [None]:
predict_price