In [1]:
import os
import pandas as pd
import numpy as np
from lightweight_charts import Chart
import asyncio
import yfinance as yf
import nest_asyncio
nest_asyncio.apply()

### download stock data

In [2]:
# input the stock code
stock_code = input("Enter stock code (e.g., AAPL): ").strip().upper()
if not stock_code:
    print("No stock code provided. Exiting.")
    exit()

# initialization
stock_data_path = "./data"
start_date = '2024-06-09'
end_date = '2024-07-26'
interval = '1d'
save_flag = True
excel_extensions = ['.xlsx', '.xls', '.xlsm', 'csv']

In [3]:
def contains_excel_file(path: str, filename: str) -> bool:
    if not os.path.isdir(path):
        print("Invalid directory path.")
        return False

    for root, dirs, files in os.walk(path):
        for file in files:
            if filename.lower() == file.lower():
                return True

    print("No Excel files found.")
    return False

def download_yf_data(stock_code: str, stock_data_path: str, start_date: str, end_date: str, interval: str = '1d', save_flag: bool = True) -> pd.DataFrame:
    """
    Download stock data from Yahoo Finance.
    
    :param stock_code: Stock ticker symbol.
    :param start_date: Start date for the data in 'YYYY-MM-DD' format.
    :param end_date: End date for the data in 'YYYY-MM-DD' format.
    :param interval: Data interval (default is '1d').
    :return: DataFrame with stock data.
    """

    data = yf.download([stock_code], start=start_date, end=end_date, interval=interval)
    data = preprocess_stock_data(data)
    if save_flag:
        data.to_csv(f"{stock_data_path}/{stock_code}_{end_date}.csv")
    return data

def get_bar_data(stock_code: str, stock_data_path: str, start_date: str, end_date: str, interval: str = '1d', save_flag: bool = True) -> pd.DataFrame:
    """    Get bar data for a given stock symbol.
    :param symbol: Stock ticker symbol.
    :return: DataFrame with stock data or an empty DataFrame if no data is found
    """

    if contains_excel_file(stock_data_path, f"{stock_code}_{end_date}.csv") is False:
        print(f'No data for "{stock_code}" download it from Yahoo Finance')
        return download_yf_data(stock_code=stock_code, stock_data_path=stock_data_path, start_date=start_date, end_date=end_date, interval=interval, save_flag=save_flag)
    print(f'Get data for "{stock_code}" from {stock_data_path}')
    return pd.read_csv(f'{stock_data_path}/{stock_code}_{end_date}.csv')

# twist the stock data for downstream processing
def preprocess_stock_data(data: pd.DataFrame) -> pd.DataFrame:
    """
    Preprocess stock data to have a consistent format.
    
    :param data: DataFrame with stock data.
    :return: Preprocessed DataFrame.
    """
    data = data.reset_index()
    data.columns = data.columns.droplevel(1)
    data.rename(columns={
        'Date': 'time',
        'Open': 'open',
        'High': 'high',
        'Low': 'low',
        'Close': 'close',
        'Volume': 'volume'
    }, inplace=True)
    return data

def on_search(chart, searched_string):  # Called when the user searches.
    new_data = get_bar_data(searched_string, stock_data_path, start_date, end_date, interval, save_flag)
    if new_data.empty:
        return
    chart.topbar['symbol'].set(searched_string)
    chart.set(new_data)

def on_timeframe_selection(chart):  # Called when the user changes the timeframe.
    new_data = get_bar_data(stock_code, stock_data_path, start_date, end_date, interval, save_flag)
    if new_data.empty:
        return
    chart.set(new_data, True)

def on_timeframe_selection(chart):  # Called when the user changes the timeframe.
    new_data = get_bar_data(stock_code, stock_data_path, start_date, end_date, interval, save_flag)
    if new_data.empty:
        return
    chart.set(new_data, True)

def on_horizontal_line_move(chart, line):
    print(f'Horizontal line moved to: {line.price}')


In [4]:
data = get_bar_data(stock_code, stock_data_path, start_date, end_date, interval, save_flag)

No Excel files found.
No data for "AAPL" download it from Yahoo Finance


  data = yf.download([stock_code], start=start_date, end=end_date, interval=interval)
[*********************100%***********************]  1 of 1 completed


### Plot TradingView Chart

In [5]:
chart = Chart()

# UI setting
chart.grid(vert_enabled = True, horz_enabled = True)
chart.layout(background_color='#131722', font_family='Trebuchet MS', font_size = 16)
# chart.candle_style(up_color='#2962ff', down_color='#e91e63',
#                    border_up_color='#2962ffcb', border_down_color='#e91e63cb',
#                    wick_up_color='#2962ffcb', wick_down_color='#e91e63cb')
chart.volume_config(up_color='#2962ffcb', down_color='#e91e63cb')
chart.name = stock_code
chart.legend(visible=True, font_family='Trebuchet MS', ohlc=True, percent=True)
chart.events.search += on_search
chart.topbar.textbox('symbol', stock_code)
chart.horizontal_line(200, func=on_horizontal_line_move)
chart.set(data)

### Implement SMA

In [6]:
# implement the sma
def calculate_sma_df(df, period: int = 50):
    return pd.DataFrame({
        'time': df['time'],
        f'SMA {period}': df['close'].rolling(window=period).mean()
    }).fillna(0)

sma9_line = chart.create_line(name='SMA 9', color='#ffeb3b', width=1, price_label=False)
sma9_data = calculate_sma_df(data, period=9)
sma9_line.set(sma9_data)

sma4_line = chart.create_line(name='SMA 4', color ="#5f3bff", width=1, price_label=False)
sma4_data = calculate_sma_df(data, period=4)
sma4_line.set(sma4_data)

### Implement Donchian Channels

In [7]:
# implement the doncian channels to lightweight_charts Chart object
def calculate_donchian_channels_df(df, period: int = 20):
    high = df['high'].rolling(window=period).max()
    low = df['low'].rolling(window=period).min()
    mean = (high + low) / 2
    return pd.DataFrame({
        'time': df['time'],
        f'Upper Donchian {period}': high.fillna(0),
        f'Mean Donchian {period}': mean.fillna(0),
        f'Lower Donchian {period}': low.fillna(0)
    })

donchian20_data = calculate_donchian_channels_df(data, period=20)
donchian20_upper_line = chart.create_line(name='Upper Donchian 20', color="#00ff11", width=1, price_line=False, price_label=False)
donchian20_lower_line = chart.create_line(name='Lower Donchian 20', color="#ff4800", width=1, price_line=False, price_label=False)
donchian20_mean_line = chart.create_line(name='Mean Donchian 20', color="#ffffff", width=1, price_line=False, price_label=False)
donchian20_upper_line.set(donchian20_data)
donchian20_lower_line.set(donchian20_data)
donchian20_mean_line.set(donchian20_data)

### Implement Bollinger Bands

In [8]:
# implement the bollinger bands to lightweight_charts Chart object
def calculate_bollinger_bands_df(df, period: int = 20, num_std_dev: int = 2):
    sma = df['close'].rolling(window=period).mean()
    rolling_std = df['close'].rolling(window=period).std()
    upper_band = sma + (rolling_std * num_std_dev)
    lower_band = sma - (rolling_std * num_std_dev)      
    return pd.DataFrame({
        'time': df['time'],
        f'Upper Bollinger {period}': upper_band.fillna(0),
        f'Mean Bollinger {period}': sma.fillna(0),
        f'Lower Bollinger {period}': lower_band.fillna(0)
    })

bollinger20_data = calculate_bollinger_bands_df(data, period=20, num_std_dev=2)
bollinger20_upper_line = chart.create_line(name='Upper Bollinger 20', color="#00fff2", width=1, price_line=False, price_label=False)
bollinger20_lower_line = chart.create_line(name='Lower Bollinger 20', color="#ff0000", width=1, price_line=False, price_label=False)
bollinger20_mean_line = chart.create_line(name='Mean Bollinger 20', color="#ffffff", width=1, price_line=False, price_label=False)
bollinger20_upper_line.set(bollinger20_data)
bollinger20_lower_line.set(bollinger20_data)
bollinger20_mean_line.set(bollinger20_data)

### Implement RSI

In [9]:
# implement the RSI
def calculate_rsi_df(df, period=14, close_col='close'):
    df['close_new'] = df[close_col]  # Adjust close prices
    delta = df[close_col].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))

    # plot 30% and 70% lines
    return pd.DataFrame({
        'time': df['time'],
        'RSI': rsi.fillna(0),
        'RSI 30%': [30] * len(df),
        'RSI 70%': [70] * len(df)
    })

rsi_data = calculate_rsi_df(data, period=14)
rsi_line = chart.create_line(name='RSI', color="#ff00ff", width=1, price_line=False, price_label=False)
rsi_30_line = chart.create_line(name='RSI 30%', color="#ff0000", width=1, price_line=False, price_label=False)
rsi_70_line = chart.create_line(name='RSI 70%', color="#00ff00", width=1, price_line=False, price_label=False)
rsi_30_line.set(rsi_data)
rsi_70_line.set(rsi_data)
rsi_line.set(rsi_data)  

### Implement Stochastic Oscillator

In [10]:
# implement the stochastic oscillator
def calculate_stochastic_oscillator_df(df, period=14):

    df['low_new'] = df['low']
    df['high_new'] = df['high']
    df['close_new'] = df['close']
    low_min = df['low_new'].rolling(window=period).min()
    high_max = df['high_new'].rolling(window=period).max()
    k_percent = 100 * ((df['close_new'] - low_min) / (high_max - low_min))
    d_percent = k_percent.rolling(window=3).mean()

    return pd.DataFrame({
        'time': df['time'],
        '%K': k_percent.fillna(0),
        '%D': d_percent.fillna(0),
        'Stochastic 20%': [20] * len(df),
        'Stochastic 80%': [80] * len(df)
    })

stochastic_data = calculate_stochastic_oscillator_df(data, period=14)
stochastic_k_line = chart.create_line(name='%K', color="#ff00ff", width=1, price_line=False, price_label=False)
stochastic_d_line = chart.create_line(name='%D', color="#00ffff", width=1, price_line=False, price_label=False)
stochastic_20_line = chart.create_line(name='Stochastic 20%', color="#ff0000", width=1, price_line=False, price_label=False)
stochastic_80_line = chart.create_line(name='Stochastic 80%', color="#00ff00", width=1, price_line=False, price_label=False)
stochastic_k_line.set(stochastic_data)
stochastic_d_line.set(stochastic_data)
stochastic_20_line.set(stochastic_data)
stochastic_80_line.set(stochastic_data)

### show the graph

In [11]:
chart.show(block=True)

Get data for "TSM" from ./data


# Test showing graphs into different separated parts

In [3]:
!export PYTHONPATH=.

In [1]:
import os
import pandas as pd
import numpy as np
from lightweight_charts import Chart
import yfinance as yf
import datetime
from dateutil.relativedelta import relativedelta
import sys
sys.path.append('/Users/tongcc/dev/projects/trading')

# create a class to wrap the above script and plot the chart. Also use stochastic_oscillator.py and rsi.py to add these 2 indicators
from src.trading_funcs.indicators.rsi import RSI
from src.trading_funcs.indicators.stochastic_oscillator import StochasticOscillator
from src.trading_funcs.indicators.sma import SMA
from src.trading_funcs.indicators.donchian_channels import DonchianChannels
from src.trading_funcs.indicators.bollinger_bands import BollingerBands

In [2]:
# input the stock code
stock_code = input("Enter stock code (e.g., AAPL): ").strip().upper()
if not stock_code:
    print("No stock code provided. Exiting.")
    exit()

# initialization
stock_data_path = "./data"
start_date = '2024-06-09'
end_date = '2024-07-26'
interval = '1d'
save_flag = True
excel_extensions = ['.xlsx', '.xls', '.xlsm', 'csv']

In [3]:
def contains_excel_file(path: str, filename: str) -> bool:
    if not os.path.isdir(path):
        print("Invalid directory path.")
        return False

    for root, dirs, files in os.walk(path):
        for file in files:
            if filename.lower() == file.lower():
                return True

    print("No Excel files found.")
    return False

def download_yf_data(stock_code: str, stock_data_path: str, start_date: str, end_date: str, interval: str = '1d', save_flag: bool = True) -> pd.DataFrame:
    """
    Download stock data from Yahoo Finance.
    
    :param stock_code: Stock ticker symbol.
    :param start_date: Start date for the data in 'YYYY-MM-DD' format.
    :param end_date: End date for the data in 'YYYY-MM-DD' format.
    :param interval: Data interval (default is '1d').
    :return: DataFrame with stock data.
    """

    data = yf.download([stock_code], start=start_date, end=end_date, interval=interval)
    data = preprocess_stock_data(data)
    if save_flag:
        data.to_csv(f"{stock_data_path}/{stock_code}_{end_date}.csv")
    return data

def get_bar_data(stock_code: str, stock_data_path: str, start_date: str, end_date: str, interval: str = '1d', save_flag: bool = True) -> pd.DataFrame:
    """    Get bar data for a given stock symbol.
    :param symbol: Stock ticker symbol.
    :return: DataFrame with stock data or an empty DataFrame if no data is found
    """

    if contains_excel_file(stock_data_path, f"{stock_code}_{end_date}.csv") is False:
        print(f'No data for "{stock_code}" download it from Yahoo Finance')
        return download_yf_data(stock_code=stock_code, stock_data_path=stock_data_path, start_date=start_date, end_date=end_date, interval=interval, save_flag=save_flag)
    print(f'Get data for "{stock_code}" from {stock_data_path}')
    return pd.read_csv(f'{stock_data_path}/{stock_code}_{end_date}.csv')

# twist the stock data for downstream processing
def preprocess_stock_data(data: pd.DataFrame) -> pd.DataFrame:
    """
    Preprocess stock data to have a consistent format.
    
    :param data: DataFrame with stock data.
    :return: Preprocessed DataFrame.
    """
    data = data.reset_index()
    data.columns = data.columns.droplevel(1)
    data.rename(columns={
        'Date': 'time',
        'Open': 'open',
        'High': 'high',
        'Low': 'low',
        'Close': 'close',
        'Volume': 'volume'
    }, inplace=True)
    return data

def on_search(chart, searched_string):  # Called when the user searches.
    new_data = get_bar_data(searched_string, stock_data_path, start_date, end_date, interval, save_flag)
    if new_data.empty:
        return
    chart.topbar['symbol'].set(searched_string)
    chart.set(new_data)

def on_timeframe_selection(chart):  # Called when the user changes the timeframe.
    new_data = get_bar_data(stock_code, stock_data_path, start_date, end_date, interval, save_flag)
    if new_data.empty:
        return
    chart.set(new_data, True)

def on_timeframe_selection(chart):  # Called when the user changes the timeframe.
    new_data = get_bar_data(stock_code, stock_data_path, start_date, end_date, interval, save_flag)
    if new_data.empty:
        return
    chart.set(new_data, True)

def on_horizontal_line_move(chart, line):
    print(f'Horizontal line moved to: {line.price}')


In [4]:
data = get_bar_data(stock_code, stock_data_path, start_date, end_date, interval, save_flag)

Get data for "AAPL" from ./data


In [5]:
ohlc_chart = Chart(toolbox=True)
ohlc_chart.layout(background_color='#131722', font_family='Trebuchet MS', font_size=16)
ohlc_chart.volume_config(up_color='#2962ffcb', down_color='#e91e63cb')
ohlc_chart.name = stock_code
ohlc_chart.set(data)
# ohlc_chart.show(block=False)

In [6]:
# implement the RSI
def calculate_rsi_df(df, period=14, close_col='close'):
    df['close_new'] = df[close_col]  # Adjust close prices
    delta = df[close_col].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))

    # plot 30% and 70% lines
    return pd.DataFrame({
        'time': df['time'],
        'RSI': rsi.fillna(0),
        'RSI 30%': [30] * len(df),
        'RSI 70%': [70] * len(df)
    })


rsi_chart = Chart(toolbox=True)
rsi_chart.layout(background_color='#131722', font_family='Trebuchet MS', font_size=16)
rsi_chart.name = f"{stock_code} (RSI)"
rsi = RSI()
rsi_data = rsi.calculate_indicator_df(data)
rsi_chart.create_line(name='RSI', color="#ff4500", width=1).set(rsi_data)
# rsi_chart.show(block=False)

In [7]:
def _show_chart(chart: "Chart") -> None:  # type: ignore
    """Show a lightweight_charts Chart in a dedicated process."""
    chart.show(block=True)

In [8]:
import multiprocessing as mp

processes = [
            mp.Process(target=_show_chart, args=(ohlc_chart,)),
            mp.Process(target=_show_chart, args=(rsi_chart,)),
]

In [9]:
for p in processes:
    p.start()
for p in processes:
    p.join()

AttributeError: Can't get local object 'Chart.__init__.<locals>.<lambda>'

In [None]:
Chart.wait_for_close()