In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
PLOT_DIAGRAM:bool = False

In [None]:
def get_test_decending_then_ascending_mkt_data(dim: int = 100, step: int = 100) -> pd.DataFrame:
        # Generate pandas dataframe of timestamps records
        df = pd.DataFrame(
            data={
                "timestamp": pd.date_range(
                    start=datetime.now(), periods=dim, freq=timedelta(hours=1)
                ),
                "inx": np.arange(dim),
            }
        )
        # mid point of the data
        mid = int(dim / 2)
        # Create nunmpy array of dim in length
        arr = np.arange(dim)
        for i in range(mid):
            arr[i] = (mid - i) * step + 1000
        for i in range(mid, dim):
            arr[i] = (i - mid) * step + 1000
        df.set_index("timestamp", inplace=True, drop=True)
        # Assign numpy array arr to df["close"]
        df["close"] = arr
        df["price_movement"] = df["close"].diff()
        return df

In [None]:
test_data = get_test_decending_then_ascending_mkt_data(100)
LOOK_BACK = 3

In [None]:
from preprocess_data.domains.features_gen import SMA_Cross_Feature
close_price = test_data["close"]
sma_cross = SMA_Cross_Feature(
        df_price=close_price, sma_window_1=10, sma_window_2=20, dimension=LOOK_BACK
    )

In [None]:
from preprocess_data.domains.indicators import calculate_simple_moving_average
sma_1 = calculate_simple_moving_average(close_price, sma_cross.sma_window_1)
sma_2 = calculate_simple_moving_average(close_price, sma_cross.sma_window_2)

In [None]:
# Use plotly to plot close price, sma_1 and sma_2

import plotly.graph_objects as go
if PLOT_DIAGRAM:
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=test_data.index, y=test_data["close"], name="close", mode="lines"))
    fig.add_trace(go.Scatter(x=test_data.index, y=sma_1, name="sma_1", mode="lines", line_dash="dot"))
    fig.add_trace(go.Scatter(x=test_data.index, y=sma_2, name="sma_2", mode="lines", line_dash="dot"))
    fig.show()

In [None]:
sma_cross_values:np.ndarray = sma_cross._calculate()

In [None]:
#find the index of sma_cross_value with True value
sma_cross_indices = np.where(sma_cross_values == True)[0] + sma_cross.invalid_data_length
sma_cross_indices
#Show the index of close prices with sma_cross_indices
close_price[sma_cross_indices]

## Load with market data

In [None]:
local_data_dir = "data"
test_exchange = "kraken"
test_symbol = "ETHUSD"
from_time_ms:int = int(datetime(2023,1,1).timestamp()*1000)
to_time_ms:int = int(datetime(2023,3,1).timestamp()*1000)

In [None]:
from cryptomarketdata.port.db_client import   get_data_db_client, Database_Type
from cryptomarketdata.utility import resample_timeframe
from datetime import datetime, timedelta
import os
db_client = get_data_db_client(
        exchange=test_exchange, database_type=Database_Type.PARQUET, data_directory=local_data_dir
    )

In [None]:
candles = db_client.get_candles(
    symbol=test_symbol, from_time=from_time_ms, to_time=to_time_ms
)
candles = resample_timeframe(data=candles, tf="15Min")
close_price = candles["close"]

In [None]:
sma_cross = SMA_Cross_Feature(
        df_price=close_price, sma_window_1=20, sma_window_2=100, dimension=LOOK_BACK
    )

num_features, dim = sma_cross.shape

In [None]:
raw_sma_10 = calculate_simple_moving_average(close_price, sma_cross.sma_window_1)
raw_sma_20 = calculate_simple_moving_average(close_price, sma_cross.sma_window_2)
sma_2 = calculate_simple_moving_average(close_price, sma_cross.sma_window_2)
sma_features = sma_cross.output_feature_array()


In [None]:
def padding_zeros_left(arr:np.ndarray, dim:int)->np.ndarray:
    """
    padding zeros to the left of the array to make the array length equal to dim
    """
    arr_len = len(arr)
    if arr_len == dim:
        return arr
    elif dim-arr_len > 0 :
        return np.pad(arr, pad_width=(dim-arr_len, 0))
    else:
        return arr

In [None]:
# populate feature for visualization
candles["sma10"] = padding_zeros_left(raw_sma_10, len(candles))
candles["sma20"] = padding_zeros_left(raw_sma_20, len(candles))
candles["cross"] = padding_zeros_left(sma_features[:,0], len(candles))
candles["cross-1"] = padding_zeros_left(sma_features[:,1], len(candles))
candles["cross-2"] = padding_zeros_left(sma_features[:,2], len(candles))

In [None]:
# With plotly, plot the close price, sma_10, sma_20 and sma_cross
# sma_10 with dotted line and sma_20 with dash line
# sma_cross with red dot for "True" value
fig = go.Figure()
fig.add_trace(go.Scatter(x=candles.index, y=candles["close"], name="close", mode="lines"))
fig.add_trace(go.Scatter(x=candles.index, y=candles["sma10"], name="sma_10", mode="lines", line_dash="dot"))
fig.add_trace(go.Scatter(x=candles.index, y=candles["sma20"], name="sma_20", mode="lines", line_dash="dash"))

#mark red x for candles["cross"] == True
cross_indices = np.where(candles["cross"] == True)[0]
fig.add_trace(go.Scatter(x=candles.index[cross_indices], y=candles["close"][cross_indices], name="sma_cross", mode="markers", marker=dict(color="red", symbol="x")))

#mark blue x for candles["cross-1"] == True
cross_indices = np.where(candles["cross-1"] == True)[0]
fig.add_trace(go.Scatter(x=candles.index[cross_indices], y=candles["close"][cross_indices], name="sma_cross-1", mode="markers", marker=dict(color="blue", symbol="x")))

#mark brown x for candles["cross-2"] == True
cross_indices = np.where(candles["cross-2"] == True)[0]
fig.add_trace(go.Scatter(x=candles.index[cross_indices], y=candles["close"][cross_indices], name="sma_cross-2", mode="markers", marker=dict(color="brown", symbol="x")))

fig.show()

In [None]:
(np.where(candles["cross"] == True))

In [None]:
import numpy as np

In [None]:
r = np.array([0.08004271, 0.08701138, 0.09531018])

In [None]:
#Reverse the array
r[::-1]