In [None]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from keras.models import Sequential, load_model
from keras.layers import LSTM, Dense, Dropout
from keras import backend as K
import gc, os, datetime
import plotly.express as px
from dotenv import load_dotenv
import os

In [None]:
spark = SparkSession.builder \
        .appName("LSTM_Index_Incremental") \
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.23") \
        .config("spark.driver.memory", "6g") \
        .getOrCreate()

In [None]:
raw_db_url = os.getenv("RAW_DB_URL")
raw_db_properties = {
        "user": os.getenv("DB_USER"),
        "password": os.getenv("DB_PASSWORD"),
        "driver": os.getenv("DB_DRIVER")
    }

dw_db_url = os.getenv("DW_DB_URL")
dw_db_properties = {
        "user": os.getenv("DB_USER"),
        "password": os.getenv("D_PASSWORD"),
        "driver": os.getenv("DB_DRIVER")
    }

date_dw_df = spark.read.jdbc(url=dw_db_url, 
                             table="dim.date", 
                             properties=dw_db_properties)

time_dw_df = spark.read.jdbc(url=dw_db_url, 
                             table="dim.time", 
                             properties=dw_db_properties)

index_dw_df = spark.read.jdbc(url=dw_db_url, 
                              table="dim.index", 
                              properties=dw_db_properties)

exchange_dw_df = spark.read.jdbc(url=dw_db_url, 
                                 table="dim.exchange", 
                                 properties=dw_db_properties)

session_dw_df = spark.read.jdbc(url=dw_db_url, 
                                table="dim.tradingsession", 
                                properties=dw_db_properties)


In [None]:
index_raw_df = spark.read.jdbc(url=raw_db_url, 
                               table="streaming.index_data", 
                               properties=raw_db_properties)

fact_index = index_raw_df.alias("ir") \
    .join(date_dw_df.alias("d"), col("ir.trading_date") == col("d.tradingdate"), "left") \
    .join(time_dw_df.alias("t"), col("ir.time") == col("t.time_hh_mm_ss"), "left") \
    .join(index_dw_df.alias("i"), col("ir.index_name") == col("i.index_name"), "left") \
    .join(exchange_dw_df.alias("e"), col("ir.exchange") == col("e.exchange_name"), "left") \
    .join(session_dw_df.alias("s"), col("ir.trading_session") == col("s.trading_session"), "left")

In [None]:
fact_indexmarket_df = fact_index.select(
        col("d.tradingdate"),
        col("t.time_hh_mm_ss"),
        col("i.index_name"),
        col("ir.index_value").alias("index_value"),
        col("ir.prior_index_value").alias("prio_index_value"),
        col("ir.change").alias("change"),
        col("ir.ratio_change").alias("ratio_change"),
        col("ir.total_qtty").alias("total_qtty"),
        col("ir.total_value").alias("total_value"),
        col("ir.total_qtty_pt").alias("total_qtty_pt"),
        col("ir.total_value_pt").alias("total_value_pt"),
        col("ir.advances").alias("advances"),
        col("ir.nochanges").alias("nochanges"),
        col("ir.declines").alias("declines"),
        col("ir.ceilings").alias("ceilings"),
        col("ir.floors").alias("floors"),
        col("e.exchange_key").alias("exchange_key"),
        col("s.trading_session_key").alias("trading_session_key")
    ).distinct()

In [None]:
pandas_df = fact_indexmarket_df.toPandas()

In [None]:
print(pandas_df.head())

In [None]:
index_name = "VNINDEX"
df_index = pandas_df[pandas_df["index_name"] == index_name].sort_values('tradingdate')

fig = px.line(df_index, x='tradingdate', y='index_value', title=f'Chỉ số {index_name}')
fig.show()