In [214]:
import scripts.stock_plots as stock_plt
from datetime import datetime, timedelta
import numpy as np
import yfinance as yf
import plotly.graph_objects as go

import pandas as pd
from plotly.subplots import make_subplots



import time

In [161]:
tdy = datetime.now()
past_days = 360
ago = datetime.now() - timedelta(days=past_days)

# tickers = ["AAPL", "META", "TSLA"]
ticker = "TSLA"

df = yf.download(ticker, ago, tdy, progress=False)



The 'unit' keyword in TimedeltaIndex construction is deprecated and will be removed in a future version. Use pd.to_timedelta instead.



In [222]:
def calculate_psar(stock_data, af_step=0.02, af_max=0.2):
    """
    Calculate the Parabolic SAR for a given stock data.

    Parameters:
    - stock_data: DataFrame with columns 'High' and 'Low'.
    - af_step: Acceleration Factor step, default 0.02.
    - af_max: Maximum Acceleration Factor, default 0.2.

    Returns:
    - A DataFrame with the original columns and an additional 'PSAR' column.
    """
    data = stock_data.copy()
    n = len(data)
    
    # Initialize columns
    psar_array = np.zeros(len(data))
    af_array = np.full(len(data), af_step)

    # Initial values
    bull = True
    psar = data.iloc[0]['Low']
    ep = data.iloc[0]['High']

    for i in range(1, n):
        if bull:
            psar = psar + af_array[i-1] * (ep - psar)
            if data.iloc[i]['Low'] < psar:
                bull = False
                psar = ep
                ep = data.iloc[i]['Low']
                af_array[i] = af_step
            else:
                if data.iloc[i]['High'] > ep:
                    ep = data.iloc[i]['High']
                    af_array[i] = min(af_array[i-1] + af_step, af_max)
                else:
                    af_array[i] = af_array[i-1]
        else:
            psar = psar - af_array[i-1] * (psar - ep)
            if data.iloc[i]['High'] > psar:
                bull = True
                psar = ep
                ep = data.iloc[i]['High']
                af_array[i] = af_step
            else:
                if data.iloc[i]['Low'] < ep:
                    ep = data.iloc[i]['Low']
                    af_array[i] = min(af_array[i-1] + af_step, af_max)
                else:
                    af_array[i] = af_array[i-1]
        
        psar_array[i] = psar
        
    psar_array[0] = data.iloc[0]['Close']

    data["psar_diff"] = data['Close'] - psar_array

    return psar_array, data




In [223]:
psar, new_data = calculate_psar(df, af_step=0.02, af_max=0.2)

In [209]:
# build complete timepline from start date to end date
dt_all = pd.date_range(start=df.index[0],end=df.index[-1])

# retrieve the dates that ARE in the original datset
dt_obs = [d.strftime("%Y-%m-%d") for d in pd.to_datetime(df.index)]

# define dates with missing values
dt_breaks = [d for d in dt_all.strftime("%Y-%m-%d").tolist() if not d in dt_obs]

In [228]:
# Plot
fig = make_subplots(
    rows=2,
    cols=1,
    shared_xaxes=True,
    # subplot_titles=("Candle Chart", "Volume"),
    vertical_spacing=0,
    row_width=[0.2, 0.8],
)

candle = go.Candlestick(x=df.index, open=df['Open'], high=df['High'], low=df['Low'], close=df['Close'])
    
sar = go.Scatter(x=df.index, y=psar, mode='markers', marker=dict(color="black", size=1))

fig.add_trace(
            candle,
            row=1,
            col=1,
        )

fig.add_trace(
            sar,
            row=1,
            col=1,
        )


# Compare each number with the previous one and set the color flag accordingly
green_mask = new_data["psar_diff"] > new_data["psar_diff"].shift()
new_data["psar_color"] = ""
new_data.loc[green_mask, "psar_color"] = "green"
new_data.loc[~green_mask, "psar_color"] = "red"

fig.add_trace(
            go.Bar(
                x=df.index,
                y=new_data["psar_diff"],
                # marker_color="#689be3",
                marker_color=new_data["psar_color"],
                showlegend=False,
            ),
            row=2,
            col=1,
        )

# hide dates with no values
fig.update_xaxes(rangebreaks=[dict(values=dt_breaks)])
fig.update_layout(autosize=True, xaxis_rangeslider_visible=False)


fig.show()

In [181]:
def loop_psar(df_original):
    df = df_original.copy()
    # Initialize variables
    para_sar = np.array(df["Close"])
    psar = df["Close"].iloc[4]
    df["10d_min"] = df["Low"].rolling(window=5).min()
    df["10d_max"] = df["High"].rolling(window=5).min()
    af_start = af_val = af_step = 0.02
    af_max = 0.2
    trend_type = 1 # 1 for uptrend and 0 for downtrend


    af_val_df = np.zeros(len(df))
    trend_type_df = np.zeros(len(df))



    def up_trend(psar_pre, af_pre, af_max, i):
        psar = psar_pre + af_pre * (df["10d_max"].iloc[i-1] - psar_pre)
        if (df["10d_max"].iloc[i] > df["10d_max"].iloc[i-1] and (af_pre + af_step) < af_max):
            # print(af_pre, af_max, "up", i)
            af_pre += af_step
        return psar, af_pre

    def down_trend(psar_pre, af_pre, af_max, i):
        psar = psar_pre - af_pre * (psar_pre - df["10d_min"].iloc[i-1])
        if (df["10d_min"].iloc[i] > df["10d_min"].iloc[i-1] and (af_pre + af_step) < af_max):
            # print(af_pre, af_max, "down", i)
            af_pre += af_step

        return psar, af_pre

    for i in range(10, len(df)-10):
        # print(psar)
        if psar < para_sar[i]:
            if trend_type != 1:
                af_val = af_start
                trend_type = 1
            psar, af_val = up_trend(psar, af_val, af_max, i)
        else:
            if trend_type != 0:
                af_val = af_start
                trend_type = 0
            psar, af_val = down_trend(psar, af_val, af_max, i)

        para_sar[i] = psar

        af_val_df[i] = af_val
        trend_type_df[i] = trend_type



    df["af_val"] = af_val_df
    df["trend_type"] = trend_type_df
    df["psar"] = para_sar

    return para_sar, df

In [182]:
psar_loop, result = loop_psar(df)

In [183]:
# Plot    
fig = go.Figure([
    go.Candlestick(x=df.index, open=df['Open'], high=df['High'],
                low=df['Low'], close=df['Close']),
    
    go.Scatter(x=df.index, y=result["psar"], mode='markers')
])


fig.show()

In [19]:
def vector_psar(df):
    # Initialize SAR arrays
    psar = np.zeros(len(df))
    af = np.full(len(df), 0.02) 
    hp = np.zeros(len(df))
    lp = np.zeros(len(df))

    # Vectorized logical index masks
    long = df['Close'] > psar[:-1]
    short = df['Close'] < psar[:-1]

    # AF update  
    af[~long] = np.minimum(0.2, af[~long] + 0.02)
    af[short] = np.minimum(0.2, af[short] + 0.02) 

    # HP/LP update
    hp[1:][long[1:]] = np.maximum(hp[1:][long[1:]], df['High'].iloc[:-1][long[:-1]])  
    lp[1:][short[1:]] = np.minimum(lp[1:][short[1:]], df['Low'].iloc[:-1][short[:-1]])

    # PSAR update 
    psar[1:] = psar[:-1] + af[1:] * (hp[1:] - psar[:-1])  

    # Mask where PSAR crossed high
    mask = psar > df['High'][:-1]  
    psar[1:][mask] = lp[1:][mask]
    af[1:][mask] = np.minimum(0.2, af[1:][mask] + 0.02)

    return psar

In [None]:
# Plot    
fig = go.Figure([
    go.Candlestick(x=df.index, open=df['Open'], high=df['High'],
                low=df['Low'], close=df['Close']),
    
    go.Scatter(x=df.index, y=psar_vec)
])

fig.show()