<a href="https://colab.research.google.com/github/Jose80010/My-notebook/blob/main/RSI_and_OBV_Divergence_Screener_Project_for_Github.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## RSI and OBV Divergence Screener for Binance
Project: The following code scans cryptocurrencies from binance to detect price divergences with respect to OBV and RSI indicators. The time frame intervals can be selected according to tradingview basic funcionality: 1m, 3m, 5m, 15m,30m, 45m, 1H, 2H, 3H, 4H, 1D, 1S, 1M, 3M, 6M, 12M. 






Install libraries

In [None]:
!pip install TA-lib-binary
#!pip install tvDatafeed #quitaron repositorio lo corremos con la sig linea desde github
!pip install --upgrade --no-cache-dir git+https://github.com/StreamAlpha/tvdatafeed.git
!pip install colorama

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/StreamAlpha/tvdatafeed.git
  Cloning https://github.com/StreamAlpha/tvdatafeed.git to /tmp/pip-req-build-jg4j72y2
  Running command git clone --filter=blob:none --quiet https://github.com/StreamAlpha/tvdatafeed.git /tmp/pip-req-build-jg4j72y2
  Resolved https://github.com/StreamAlpha/tvdatafeed.git to commit a7034f04509b67224618917c29272796e4fff858
  Preparing metadata (setup.py) ... [?25l[?25hdone
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Load libraries

In [None]:
import pandas as pd
import talib
import numpy as np

Define the RSI function and how to identify a divergence





In [None]:
from tabulate import tabulate
from tvDatafeed import TvDatafeed,Interval
from datetime import datetime
from scipy.signal import argrelextrema
from copy import deepcopy
from colorama import Style, Fore


def populate_indicators(symbol_data_df, indicator_list=[]):
    
    # 1. RSI
    # 2. Momentum (MOM)
    
    
    # 1. RSI
    # TV: rsi = rsi(close, 14) // RSI
    # TALIB: real = RSI(close, timeperiod=14)
    symbol_data_df['RSI'] = talib.RSI(symbol_data_df['close'], timeperiod=14).round(2)

    
  
    # 2. On Balance Volume (OBV)
    # TV: Obv = obv // OBV
    # TALIB: real = OBV(close, volume)
    symbol_data_df['OBV'] = talib.OBV(symbol_data_df['close'], symbol_data_df['volume'])

  
    return symbol_data_df


# Functions to check Regular Divergences and Hidden Divergences
# ----------------------------------------------------------------
# Function to check positive regular or negative hidden divergence
def positive_regular_negative_hidden_divergence(time_frame_str,
                                                rev_symbol_data_df,
                                                indicator_name,
                                                condition,
                                                src_close_or_highlow="Close",
                                                dont_wait_for_confirmation=False,
                                                max_pivot_points_to_check=10,
                                                pivot_period=5,
                                                max_bars_to_check=100
                                                ):
    # condition == 1 => positive_regular, cond == 2=> negative_hidden
    # src_close_or_highlow ==? "Close" or "High/Low"
    # dont_wait_for_confirmation ==> "Don't Wait for Confirmation"
    # max_pivot_points_to_check ==> "Maximum Pivot Points to Check" (1-20)
    # pivot_period ==> "Pivot Period" (1-50)
    # max_bars_to_check ==> "Maximum Bars to Check" (30-200)

    if src_close_or_highlow == 'Close':
        pivot_source = rev_symbol_data_df['close']
    else:
        pivot_source = rev_symbol_data_df['low']

    close = rev_symbol_data_df['close']
    datetime = rev_symbol_data_df['datetime']
    indicator = rev_symbol_data_df[indicator_name]

    no_of_rows_in_symbol_data_df = rev_symbol_data_df.shape[0]
    low_peak_positions = argrelextrema(pivot_source.values, np.less, order=pivot_period)[0]
    rev_symbol_data_df['Low_Peak'] = ''
    low_peak_col_location = rev_symbol_data_df.columns.get_loc("Low_Peak")
    for counter in low_peak_positions:
        rev_symbol_data_df.iloc[counter, low_peak_col_location] = 'T'

    # if indicators higher than last value or close price is higher than last close
    if dont_wait_for_confirmation or pivot_source[0] > pivot_source[1] or close[0] > close[1]:
        start_point = 0 if dont_wait_for_confirmation else 1  # don't check last candle

        # Now see if we have 'max_pivot_points_to_check' number of elements in low_peak_positions, otherwise we only
        # iterate size of low_peak_positions
        max_pivot_points_to_check = max_pivot_points_to_check if len(low_peak_positions) >= max_pivot_points_to_check else len(low_peak_positions)

        # Now loop thru max_pivot_points
        for pivot_point_counter in range(max_pivot_points_to_check):
            low_peak_position = low_peak_positions[pivot_point_counter]

            # no_of_rows_from_start = bar_index - array.get(pl_positions, x) + prd
            # In TV we need to add prd, so as to get exact index
            # For e.g., in TV if pivot_period=5 and pivot_low bar_index is 100, it shows bar_index+pivot_period i.e.100+5=105 for that pivot
            # So as to get actual for current_bar_index=107, we add prd
            # 107 (current_bar_index) - 105 (pivot bar index as reposted by pivothigh/pivotlow) + 5 (pivot period) = 7 (which is correct length from current bar_index (107) to pivot bar index (100))
            no_of_rows_from_start = low_peak_position

            # If we have reached more than 'max_bars_to_check' then break
            if no_of_rows_from_start > max_bars_to_check:
                break

            if no_of_rows_from_start > pivot_period:
                positive_regular_condition_met = False
                negative_hidden_condition_met = False

                # condition == 1 => positive_regular, cond == 2=> negative_hidden
                if condition == 1:
                    # src[start_point] > src[len] and
                    # prsc[start_point] < nz(array.get(pl_vals, x))
                    if indicator[start_point] > indicator[no_of_rows_from_start]:
                        if pivot_source[start_point] < pivot_source[no_of_rows_from_start]:
                                positive_regular_condition_met = True
                elif condition == 2:
                    # src[start_point] < src[len]
                    # prsc[start_point] > nz(array.get(pl_vals, x))
                    if indicator[start_point] < indicator[no_of_rows_from_start]:
                        if pivot_source[start_point] > pivot_source[no_of_rows_from_start]:
                            negative_hidden_condition_met = True

                if positive_regular_condition_met or negative_hidden_condition_met:
                    arrived = True

                    slope_indicator = (indicator[start_point] - indicator[no_of_rows_from_start]) / (no_of_rows_from_start - start_point)
                    virtual_line_indicator = indicator[start_point] - slope_indicator

                    slope_close = (close[start_point] - close[no_of_rows_from_start]) / (no_of_rows_from_start - start_point)
                    virtual_line_close = close[start_point] - slope_close

                    for y in range(1 + start_point, no_of_rows_from_start):
                        if indicator[y] < virtual_line_indicator or close[y] < virtual_line_close:
                            arrived = False
                            break
                        virtual_line_indicator = virtual_line_indicator - slope_indicator
                        virtual_line_close = virtual_line_close - slope_close

                    if arrived:
                        divergence_row_from_start = no_of_rows_from_start
                        if positive_regular_condition_met:
                            msg = time_frame_str + \
                                  " * " + indicator_name + \
                                  " * Positive Regular * " + datetime[no_of_rows_from_start].strftime('%d-%m-%Y %H:%M:%S') + \
                                  " * " + datetime[start_point].strftime('%d-%m-%Y %H:%M:%S')
                            #print(f'{Style.BRIGHT}{Fore.YELLOW}{msg}{Style.RESET_ALL}')
                            return True

                        if negative_hidden_condition_met:
                            msg = time_frame_str + \
                                  " * " + indicator_name + \
                                  " * Negative Hidden * " + datetime[no_of_rows_from_start].strftime('%d-%m-%Y %H:%M:%S') + \
                                  " * " + datetime[start_point].strftime('%d-%m-%Y %H:%M:%S')
                            #print(f'{Style.BRIGHT}{Fore.GREEN}{msg}{Style.RESET_ALL}')
                            return True

                        break
    return False

# Function to check negative regular or positive hidden divergence
def negative_regular_positive_hidden_divergence(time_frame_str,
                                                rev_symbol_data_df,
                                                indicator_name,
                                                condition,
                                                src_close_or_highlow="Close",
                                                dont_wait_for_confirmation=False,
                                                max_pivot_points_to_check=10,
                                                pivot_period=5,
                                                max_bars_to_check=100
                                                ):
    # condition == 1 => negative_regular, cond == 2=> positive_hidden
    # src_close_or_highlow ==? "Close" or "High/Low"
    # dont_wait_for_confirmation ==> "Don't Wait for Confirmation"
    # max_pivot_points_to_check ==> "Maximum Pivot Points to Check" (1-20)
    # pivot_period ==> "Pivot Period" (1-50)
    # max_bars_to_check ==> "Maximum Bars to Check" (30-200)

    if src_close_or_highlow == 'Close':
        pivot_source = rev_symbol_data_df['close']
    else:
        pivot_source = rev_symbol_data_df['high']

    close = rev_symbol_data_df['close']
    datetime = rev_symbol_data_df['datetime']
    indicator = rev_symbol_data_df[indicator_name]

    no_of_rows_in_symbol_data_df = rev_symbol_data_df.shape[0]
    high_peak_positions = argrelextrema(pivot_source.values, np.greater, order=pivot_period)[0]
    rev_symbol_data_df['High_Peak'] = ''
    high_peak_col_location = rev_symbol_data_df.columns.get_loc("High_Peak")
    for counter in high_peak_positions:
        rev_symbol_data_df.iloc[counter, high_peak_col_location] = 'T'

    # If indicators lower than last value or close price is lower than last close
    if dont_wait_for_confirmation or pivot_source[0] < pivot_source[1] or close[0] < close[1]:
        start_point = 0 if dont_wait_for_confirmation else 1  # don't check last candle

        # Now see if we have 'max_pivot_points_to_check' number of elements in low_peak_positions, otherwise we only
        # iterate size of low_peak_positions
        max_pivot_points_to_check = max_pivot_points_to_check if len(high_peak_positions) >= max_pivot_points_to_check else len(high_peak_positions)

        # Now loop thru max_pivot_points
        for pivot_point_counter in range(max_pivot_points_to_check):
            high_peak_position = high_peak_positions[pivot_point_counter]

            # no_of_rows_from_start = bar_index - array.get(pl_positions, x) + prd
            # In TV we need to add prd, so as to get exact index
            # For e.g., in TV if pivot_period=5 and pivot_low bar_index is 100, it shows bar_index+pivot_period i.e.100+5=105 for that pivot
            # So as to get actual for current_bar_index=107, we add prd
            # 107 (current_bar_index) - 105 (pivot bar index as reposted by pivothigh/pivotlow) + 5 (pivot period) = 7 (which is correct length from current bar_index (107) to pivot bar index (100))
            no_of_rows_from_start = high_peak_position

            # If we have reached more than 'max_bars_to_check' then break
            if no_of_rows_from_start > max_bars_to_check:
                break

            if no_of_rows_from_start > pivot_period:
                negative_regular_condition_met = False
                positive_hidden_condition_met = False

                # condition == 1 => positive_regular, cond == 2=> negative_hidden
                if condition == 1:
                    # src[start_point] < src[len] and
                    # pivot_source[start_point] > nz(array.get(ph_vals, x))
                    if indicator[start_point] < indicator[no_of_rows_from_start]:
                        if pivot_source[start_point] > pivot_source[no_of_rows_from_start]:
                                negative_regular_condition_met = True
                elif condition == 2:
                    # src[start_point] > src[len] and
                    # pivot_source[start_point] < nz(array.get(ph_vals, x))
                    if indicator[start_point] > indicator[no_of_rows_from_start]:
                        if pivot_source[start_point] < pivot_source[no_of_rows_from_start]:
                            positive_hidden_condition_met = True

                if negative_regular_condition_met or positive_hidden_condition_met:
                    arrived = True

                    slope_indicator = (indicator[start_point] - indicator[no_of_rows_from_start]) / (no_of_rows_from_start - start_point)
                    virtual_line_indicator = indicator[start_point] - slope_indicator

                    slope_close = (close[start_point] - close[no_of_rows_from_start]) / (no_of_rows_from_start - start_point)
                    virtual_line_close = close[start_point] - slope_close

                    for y in range(1 + start_point, no_of_rows_from_start):
                        if indicator[y] > virtual_line_indicator or close[y] > virtual_line_close:
                            arrived = False
                            break
                        virtual_line_indicator = virtual_line_indicator - slope_indicator
                        virtual_line_close = virtual_line_close - slope_close

                    if arrived:
                        divergence_row_from_start = no_of_rows_from_start
                        if negative_regular_condition_met:
                            msg = time_frame_str + \
                                  " * " + indicator_name + \
                                  " * Negative Regular * " + datetime[no_of_rows_from_start].strftime('%d-%m-%Y %H:%M:%S') + \
                                  " * " + datetime[start_point].strftime('%d-%m-%Y %H:%M:%S')
                            #print(f'{Style.BRIGHT}{Fore.BLUE}{msg}{Style.RESET_ALL}')
                            return True

                        if positive_hidden_condition_met:
                            msg = time_frame_str + \
                                  " * " + indicator_name + \
                                  " * Positive Hidden * " + datetime[no_of_rows_from_start].strftime('%d-%m-%Y %H:%M:%S') + \
                                  " * " + datetime[start_point].strftime('%d-%m-%Y %H:%M:%S')
                            #print(f'{Style.BRIGHT}{Fore.RED}{msg}{Style.RESET_ALL}')
                        return True

                        break
    return False



## Tradingview API

In [None]:

##################################################################################################################
######  T R A D I N G    V I E W
######################################################################################################################
def get_historical_data(tv_obj, exchange, symbol, time_frame, nbars=1000):
    status = False
    df = pd.DataFrame()

    max_no_of_retries = 5
    no_of_retries = 0
    status = False

    while no_of_retries < max_no_of_retries:
        try:
            df = tv_obj.get_hist(symbol, exchange, time_frame, nbars)
            status = True
        except Exception as err:
            print(err)
            df = pd.DataFrame()

        if not df.empty:          #if not df.notnull: #cambiamos el empy por otro
            break
        no_of_retries += 1

    df.reset_index(inplace=True)
    return df


def get_historical_data_nbars_converted_for_time_zone(tv_obj, exchange, symbol, time_frame, nbars=1000, required_timezone='Asia/Kolkata', current_timezone='Asia/Kolkata'):
    hist_candles_data_for_given_tf_df = get_historical_data(tv_obj, exchange, symbol, time_frame, nbars)
    if not hist_candles_data_for_given_tf_df.empty:
        hist_candles_data_for_given_tf_df['datetime'] = hist_candles_data_for_given_tf_df['datetime'].dt.tz_localize(current_timezone).dt.tz_convert(required_timezone)
    return hist_candles_data_for_given_tf_df



## Define the main function of the project

In [None]:

##################################################################################################################
######  M A I N
######################################################################################################################
def main():
    
    indicator_name_list = ['RSI', 'OBV']
    exchange = 'BINANCE'

    symbol_list = ['BCHBTC','BCHUSDT', 'BEAMUSDT','BETABTC','BETAUSDT','BICOBTC','BNBBTC','BNBUSDT','BNTBTC','BNTUSDT','BNXBTC','BNXUSDT', 'BTCUSDT']
                   
 
#    import csv
#    with open('Example.csv', 'w', newline = '') as csvfile:
#        my_writer = csv.writer(csvfile, delimiter = ' ')
#        my_writer.writerow(symbol_list)
#    from google.colab import files
#    files.download('Example.csv')

#    file = open("pares.txt", "w")
#    file.write(str(symbol_list) )
#    file.close
#    from google.colab import files
#    files.download('pares.txt')

    #list1 = [df1, df2, df3]


    #Define time intervals

    time_frame_list = {
                       # '1H': Interval.in_1_hour,
                       # '4H': Interval.in_4_hour,
                        '1D': Interval.in_daily,
                        '1W': Interval.in_weekly
    }


    final_divergence_df = pd.DataFrame()
    final_divergence_df['Symbol'] = ''
    final_divergence_df['TF'] = ''


    for indicator_name in indicator_name_list:
        final_divergence_df[indicator_name] = ''
    final_divergence_df['Total'] = 0

    ##Add login credential for tradingview
    username = 'shano.enlinea@gmail.com'
    password = 'lospr1mos1'
    tv_obj = TvDatafeed(username, password)

    #tv_obj = TvDatafeed()

    #For each symbol do the following

    for symbol in symbol_list:
      print(symbol)

      for time_frame_str, time_frame in time_frame_list.items():
          divergence_result_dict = {}
          liga = 'https://es.tradingview.com/chart/NpHgohJD/?symbol=BINANCE%3A'+ symbol
          divergence_result_dict['Symbol'] = '=HYPERLINK("{}", "{}")'.format(liga, symbol)
           
          
          divergence_result_dict['TF'] = time_frame_str

          symbol_data_df = get_historical_data_nbars_converted_for_time_zone(tv_obj, exchange, symbol, time_frame, required_timezone='UTC')
          symbol_data_df = populate_indicators(symbol_data_df)
          #print(symbol_data_df) 

        # -------------------------------------------------------------------------------------------
        # As TV starts from latest tick data and loops to oldest tick data, so we reverse symbol_data
        # -------------------------------------------------------------------------------------------
          rev_symbol_data_df = deepcopy(symbol_data_df)
          rev_symbol_data_df = rev_symbol_data_df.reindex(index=rev_symbol_data_df.index[::-1])
          rev_symbol_data_df.reset_index(drop=True, inplace=True)
          rev_symbol_data_df['datetime'] = rev_symbol_data_df['datetime'].dt.tz_localize(None)

        # Truncate data for testing
        # ------------------------
        # truncate_time_str = '25-10-2022 10:00:00'
        # truncate_time_str = '21-10-2022 14:00:00'
        # truncate_time = datetime.strptime(truncate_time_str, '%d-%m-%Y %H:%M:%S')
        # rev_symbol_data_df = rev_symbol_data_df[(rev_symbol_data_df['datetime'] <= truncate_time)]
        # rev_symbol_data_df.reset_index(drop=True, inplace=True)

          for indicator_name in indicator_name_list:
              if positive_regular_negative_hidden_divergence(time_frame_str, rev_symbol_data_df, indicator_name, 1):
                  divergence_result_dict[indicator_name] = 'Pos Regular'
              elif positive_regular_negative_hidden_divergence(time_frame_str, rev_symbol_data_df, indicator_name, 2):
                  divergence_result_dict[indicator_name] = 'Pos Hidden'
              elif negative_regular_positive_hidden_divergence(time_frame_str, rev_symbol_data_df, indicator_name, 1):
                  divergence_result_dict[indicator_name] = 'Neg Regular'
              elif negative_regular_positive_hidden_divergence(time_frame_str, rev_symbol_data_df, indicator_name, 2):
                  divergence_result_dict[indicator_name] = 'Neg Hidden'
              else:
                  divergence_result_dict[indicator_name] = ''  # Required to push indicator key into dict

          total = 0

          for indicator_name in indicator_name_list:
              if divergence_result_dict[indicator_name] != '':
                  total += 1
          divergence_result_dict['Total'] = total


          if(divergence_result_dict['Total']>0):
          # Append to dict to dataframe
            final_divergence_df = pd.concat([final_divergence_df, pd.DataFrame([divergence_result_dict])], ignore_index=True)

    # Print dataframe
    
    print(tabulate(final_divergence_df, headers='keys', tablefmt='fancy_grid', stralign='center', showindex=False))
    #print(type(final_divergence_df))
    #final_divergence_df.to_numpy
    from datetime import datetime
    now = datetime.now()
    final_divergence_df.to_csv ('Crypto_fichero_'+ str(now.date()) + '.csv')
    from google.colab import files
    files.download('Crypto_fichero_'+ str(now.date()) + '.csv')


Run the project !

In [None]:
if __name__ == "__main__":
   main()


print("Done : " + str(datetime.now()))

ERROR:tvDatafeed.main:error while signin


BCHBTC
BCHUSDT
BEAMUSDT
BETABTC
BETAUSDT
BICOBTC
BNBBTC
BNBUSDT
BNTBTC
BNTUSDT
BNXBTC
BNXUSDT
BTCUSDT
╒════════════════════════════════════════════════════════════════════════════════════════════════╤══════╤═════════════╤═══════╤═════════╕
│                                             Symbol                                             │  TF  │     RSI     │  OBV  │   Total │
╞════════════════════════════════════════════════════════════════════════════════════════════════╪══════╪═════════════╪═══════╪═════════╡
│  =HYPERLINK("https://es.tradingview.com/chart/NpHgohJD/?symbol=BINANCE%3ABCHUSDT", "BCHUSDT")  │  1W  │ Neg Hidden  │       │       1 │
├────────────────────────────────────────────────────────────────────────────────────────────────┼──────┼─────────────┼───────┼─────────┤
│ =HYPERLINK("https://es.tradingview.com/chart/NpHgohJD/?symbol=BINANCE%3ABEAMUSDT", "BEAMUSDT") │  1D  │ Pos Regular │       │       1 │
├─────────────────────────────────────────────────────────────────────

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Done : 2023-01-20 02:13:06.623075
