<a href="https://colab.research.google.com/github/aryajani/algo-trading/blob/main/Hammer_trend.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Setup

# Install packages
!pip install smartapi-python
!pip install logzero
!pip install pyotp
!pip install websocket-client
!pip install ijson
!pip install mplfinance
!pip install pandas plotly

url = 'https://anaconda.org/conda-forge/libta-lib/0.4.0/download/linux-64/libta-lib-0.4.0-h166bdaf_1.tar.bz2'
!curl -L $url | tar xj -C /usr/lib/x86_64-linux-gnu/ lib --strip-components=1
!pip install conda-package-handling
!wget https://anaconda.org/conda-forge/ta-lib/0.5.1/download/linux-64/ta-lib-0.5.1-py311h9ecbd09_0.conda
!cph x ta-lib-0.5.1-py311h9ecbd09_0.conda
!mv ./ta-lib-0.5.1-py311h9ecbd09_0/lib/python3.11/site-packages/talib /usr/local/lib/python3.11/dist-packages/

# Set time zone to IST
!rm /etc/localtime
!ln -s /usr/share/zoneinfo/Asia/Kolkata /etc/localtime
!date


In [2]:

# package import statement
from SmartApi import SmartConnect #or from SmartApi.smartConnect import SmartConnect
import pyotp
from logzero import logger
from datetime import time, timedelta, datetime
import ijson
import json
import mplfinance as mpf
import time
import plotly.graph_objects as go
import requests
import pandas as pd
from talib.abstract import *

In [20]:
# Functions

def login(api_key, token, username, pwd):

    smartApi = SmartConnect(api_key)
    try:
        totp = pyotp.TOTP(token).now()
    except Exception as e:
        logger.error("Invalid Token: The provided token is not valid.")
        raise e

    correlation_id = "abcde"
    data = smartApi.generateSession(username, pwd, totp)


    if data['status'] == False:
        logger.error(data)

    else:
        # login api call
        # logger.info(f"You Credentials: {data}")
        authToken = data['data']['jwtToken']
        refreshToken = data['data']['refreshToken']
        # fetch the feedtoken
        feedToken = smartApi.getfeedToken()
        # fetch User Profile
        res = smartApi.getProfile(refreshToken)
        smartApi.generateToken(refreshToken)
        res=res['data']['exchanges']

    return smartApi

def fetch_price(exchange, symbol, token):
    data = smartApi.ltpData(exchange, symbol, token)
    return data['data']['ltp']

def to_nearest_point05(number):
    # Multiply by 20 to convert 0.05 steps to integer steps
    return round(number * 20) / 20

def find_nearest_expiry(url, strike_price):
    # Fetch the JSON data
    response = requests.get(url)
    data = json.loads(response.text)

    # Normalize the strike price
    normalized_strike = str(int(float(strike_price) * 100))

    # Separate lists for CE and PE options
    ce_options = []
    pe_options = []
    latest_ce = None
    latest_pe = None

    for entry in data:
        # Normalize the strike price from JSON
        json_strike = str(int(float(entry['strike'])))

        if (json_strike == normalized_strike and
            entry['exch_seg'] == "NFO" and
            entry['name'] == "NIFTY"):

            expiry_date = datetime.strptime(entry['expiry'], "%d%b%Y").date()

            if entry['symbol'][-2:] == "CE":
                if latest_ce is None or expiry_date < datetime.strptime(latest_ce['expiry'], "%d%b%Y").date():
                    latest_ce = entry
            elif entry['symbol'][-2:] == "PE":
                if latest_pe is None or expiry_date < datetime.strptime(latest_pe['expiry'], "%d%b%Y").date():
                    latest_pe = entry

    print(f"Latest CE: {latest_ce}")
    print(f"Latest PE: {latest_pe}")

    return {
        'CE': latest_ce,
        'PE': latest_pe
    }

def get_order_info(orderid):
    OrderBook = smartApi.orderBook()['data']
    for i in OrderBook:
        if i['orderid'] == orderid:
            return i

def trailing_stoploss(orderid_sl, trailing_stoploss, orderid): # trailing_stoploss =

    order_info_sl = get_order_info(orderid_sl)
    order_info = get_order_info(orderid)
    print(order_info_sl)

    new_stoploss = None

    symbol = order_info_sl['tradingsymbol']
    exchange = order_info_sl['exchange']
    token = order_info_sl['symboltoken']
    prev_price = order_info['averageprice']
    order_type = order_info_sl['transactiontype']
    prev_stoploss = order_info_sl['triggerprice']




    if order_type == "BUY":

        min_val = prev_price

        while True:

            curr_price = fetch_price(exchange, symbol, token)
            if curr_price < min_val:

                new_stoploss = prev_stoploss - (trailing_stoploss * (min_val - curr_price))
                modify_params = order_info_sl
                modify_params['triggerprice'] = to_nearest_point05(new_stoploss)
                modify_params['price'] = to_nearest_point05(new_stoploss * 1.05)
                smartApi.modifyOrder(modify_params)
                min_val = curr_price
                prev_stoploss = to_nearest_point05(new_stoploss)
                print("-----------STOPLOSS-------------")
                print(to_nearest_point05(new_stoploss))
                print("-----------STOPLOSS-------------")

            print("Curr price: ", curr_price)
            print("Min Val: ", min_val)
            time.sleep(15)


    else:

        max_val = prev_price

        while True:

            curr_price = fetch_price(exchange, symbol, token)
            if curr_price > max_val:

                new_stoploss = prev_stoploss + (trailing_stoploss * (curr_price - max_val))
                modify_params = order_info_sl
                modify_params['triggerprice'] = to_nearest_point05(new_stoploss)
                modify_params['price'] = to_nearest_point05(new_stoploss * .95)
                smartApi.modifyOrder(modify_params)
                max_val = curr_price

            time.sleep(15)


def get_historical_data(exc, token, interval, from_dt, to_dt):

    #Historic api
    try:
        historicParam={
        "exchange": exc,
        "symboltoken": token,
        "interval": interval,
        "fromdate": from_dt,
        "todate": to_dt
        }
        return smartApi.getCandleData(historicParam)
    except Exception as e:
        logger.exception(f"Historic Api failed: {e}")

def check_hammer(df, hammer_index):
    if hammer_index < 3:
        return None, None  # Not enough data to check previous three candles

    # Check if the previous three candles have decreasing highs
    if (df['high'][hammer_index - 1] < df['high'][hammer_index - 2] and
        df['high'][hammer_index - 2] < df['high'][hammer_index - 3]):

        # Buy signal: next candle closes above the hammer's high
        if hammer_index + 1 < len(df) and df['high'][hammer_index + 1] > df['high'][hammer_index]:
            return True

    return False


In [4]:
api_key = 'key'
username = 'user'
pwd = 'pwd'
token = "token"

smartApi = login(api_key, token, username, pwd)


[I 250130 09:18:06 smartConnect:124] in pool


In [31]:

# Current time
now = datetime.now()
end = now.strftime("%Y-%m-%d %H:%M")

s_token = "3045"
exchange = "NSE"
s_symbol = "SBIN-EQ"

# Time 2 days ago
two_days_ago = now - timedelta(days=15)
start = two_days_ago.strftime("%Y-%m-%d %H:%M")
res = get_historical_data(exchange, s_token, "ONE_MINUTE", start, end)

cols = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
df = pd.DataFrame(res['data'], columns=cols)
df['timestamp'] = pd.to_datetime(df['timestamp'])

# check for 3 consecutive falls
df['Hammer'] = CDLHAMMER(df['open'], df['high'], df['low'], df['close'])
df['RSI'] = RSI(df['close'], timeperiod=14)

is_open = False
profit = 0
profit_percent = 0
win_days = 0
loss_days = 0


for i in range(len(df)):
    if df['Hammer'][i] != 0:
        if not is_open:
            if check_hammer(df, i) and df['RSI'][i] > 50:
                is_open = True
                entry = df['close'][i]
                stoploss = df['low'][i]
    else:
        if is_open:
            if df['low'][i] < stoploss:
                is_open = False
                profit += entry - stoploss
                profit_percent += (profit / entry) * 100
                if profit > 0:
                    win_days += 1
                else:
                    loss_days += 1
            else:
                stoploss += (df['high'][i] - stoploss)*0.5


print(f"Profit: {profit}")
print(f"Profit Percent: {profit_percent}")
print(f"Number of wins: {win_days}")
print(f"Number of loss: {loss_days}")
print(f"Win %: {win_days/(win_days+loss_days)*100}")







Profit: -3.5312499999998863
Profit Percent: -1.0268484857579137
Number of wins: 1
Number of loss: 4
Win %: 20.0


In [29]:
# Create a candlestick chart
fig = go.Figure(data=[go.Candlestick(x=df['timestamp'],
                                      open=df['open'],
                                      high=df['high'],
                                      low=df['low'],
                                      close=df['close'],
                                      name='Candlestick')])

# Add markers for hammer patterns
hammer_dates = df[df['Hammer'] != 0]['timestamp']
hammer_values = df[df['Hammer'] != 0]['close']

# Add scatter points for hammers
fig.add_trace(go.Scatter(x=hammer_dates,
                         y=hammer_values,
                         mode='markers',
                         marker=dict(color='blue', size=10),
                         name='Hammer Pattern'))

# Update layout
fig.update_layout(title='Candlestick Chart with Hammer Patterns',
                  xaxis_title='Date',
                  yaxis_title='Price',
                  xaxis_rangeslider_visible=False)

# Show the figure
fig.show()