In [20]:
import requests as req
from bs4 import BeautifulSoup
import pandas as pd

### Constant Declaration

#### Global

In [21]:
stock_number = 100
minimum_score = 75

th = {
    "rsi" : {
        "upper" : 60,
        "lower" : 70
    },
    "10di-" : {
        "upper" : 13,
        "lower" : 18
    },
    "25di-" : {
        "upper" : 13,
        "lower" : 18
    },
    "10di+" : {
        "upper" : 30,
        "lower" : 25
    },
    "25di+":{
        "upper" : 30,
        "lower" : 20
    },
    "25adx":{
        "upper" : 30,
        "lower" : 20
    },
    "10adx":{
        "upper" : 30,
        "lower" : 25
    },
    "macd":{
        "upper" : 3,
        "lower" : 0
    },
}

#### Option : TradingView

In [22]:
headers = {'User-Agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Mobile Safari/537.36'}

tv_url = "https://id.tradingview.com/markets/stocks-indonesia/market-movers-active/"
tv_index_class = "apply-common-tooltip tickerNameBox-GrtoTeat tickerName-GrtoTeat"

yh_url = "https://finance.yahoo.com/quote/"

### Acquiring Stock Index

#### Option : TradingView

In [23]:
def fetch_index():
    tv_page = req.get(tv_url,headers=headers)
    soup = BeautifulSoup(tv_page.content, 'html.parser')
    
    main_table = soup.find('table')
    stock_rows = main_table.find_all('a',class_="apply-common-tooltip tickerNameBox-GrtoTeat tickerName-GrtoTeat")
    
    return [row.get_text() for row in stock_rows]

#### Option : IDX List

In [24]:
def read_index():
    stocks = pd.read_csv("stock_list.csv",delimiter=";")
    return stocks['Kode'].tolist()

#### Implementation

In [25]:
get_stock_index = {"TV":fetch_index, "IDX":read_index}

In [26]:
stocks = get_stock_index['TV']()

### Stock Price Mining

#### Global

##### Fetching Functions Declaration

In [27]:
def convert_text_to_float(text):
    return float(text.replace(",",""))

def parse_price_per_row(columns):
    return [convert_text_to_float(col.get_text()) for col in columns]

In [28]:
def fetch_stock_rows(stock):
    yh_page = req.get(yh_url+stock+".JK/history",headers=headers)
    soup = BeautifulSoup(yh_page.content, 'html.parser')
    
    main_table = soup.find('table')
    price_rows = main_table.find_all('tr')[1:50]
    
    return price_rows

def get_stock_df(price_rows):
    data_rows = []
    
    for row in price_rows:
        columns = row.find_all('td')
        columns = columns[1:5]
    
        try : 
            open, high, low, close = parse_price_per_row(columns)
            data_rows.append({'Open':open,'High':high,'Low':low,'Close':close})
        except :
            pass

    df = pd.DataFrame(data_rows)
    reversed_df = df.iloc[::-1].reset_index(drop=True)
        
    return reversed_df

def print_progress(total,fetched,passed):
    print("Total : ",total)
    print("Fetched : ",fetched)
    print("Passed : ",passed,"\n")

##### Assessment Functions Declaration

In [29]:
def get_true_range(df):
    tr_1 = df['High'] - df['Low']
    tr_2 = abs(df['High'] - df['Close'].shift())
    tr_3 = abs(df['Low'] - df['Close'].shift())
    
    tr_final = pd.concat([tr_1,tr_2,tr_3],axis=1).max(axis=1)
    return tr_final

In [30]:
def get_rsi(df, period=14):
    delta = df['Close'].diff()
    
    gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        
    avg_gain = gain.ewm(com=period-1, min_periods=period).mean()
    avg_loss = loss.ewm(com=period-1, min_periods=period).mean()
    
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
        
    return rsi

def get_atr(df, period=14):
    tr = get_true_range(df)
    return tr.rolling(window=period).mean()

def get_dis(df, period=14):
    atr = get_atr(df, period)
    
    up = df['High'] - df['High'].shift()
    down = df['Low'].shift() - df['Low']
    
    plus_dm = up.where((up > down) & (up > 0), 0)
    minus_dm = down.where((down > up) & (down > 0), 0)
    
    plus_dm_smooth = plus_dm.ewm(alpha=1/period, min_periods=period).mean()
    minus_dm_smooth = minus_dm.ewm(alpha=1/period, min_periods=period).mean()
    
    plus_di = 100 * (plus_dm_smooth / atr)
    minus_di = 100 * (minus_dm_smooth / atr)
    
    return plus_di, minus_di

def get_adx(df, period=14):
    plus_di, minus_di = get_dis(df, period)
    
    dx = 100 * (abs(plus_di - minus_di) / (plus_di + minus_di))
    
    adx = dx.ewm(alpha=1/period, min_periods=period).mean()
        
    return adx

def get_macd_histogram(df,period=12):
    ema_12 = df['Close'].ewm(span=period, adjust=False).mean()
    ema_26 = df['Close'].ewm(span=26, adjust=False).mean()
    
    macd = ema_12 - ema_26
    signal = macd.ewm(span=9, adjust=False).mean()
    
    return macd - signal


In [31]:
def get_threshold_score(value,indicator):
    score = 0
    
    score = (value - th[indicator]['lower'])/(th[indicator]['upper'] - th[indicator]['lower']) * 100
    score = max(0, min(100, score))
    
    return score

In [32]:
def assess_rsi(df,period=14):
    rsi = get_rsi(df, period)
    score = get_threshold_score(rsi.iloc[-1],"rsi")
    return score

def assess_dis(df,period=14):
    plus_di, minus_di = get_dis(df, period)
    score = (get_threshold_score(plus_di.iloc[-1],"10di+") + get_threshold_score(minus_di.iloc[-1],"10di-"))/2
    return score

def assess_adx(df,period=14):
    adx = get_adx(df, period)
    
    th = "10adx" if period <15 else "25adx"
    score = get_threshold_score(adx.iloc[-1],th)
    return score

def assess_macd(df,period=12):
    histogram = get_macd_histogram(df,period)

    increasement = 0
    for i in range(th['macd']['upper'],0,-1):
        if histogram.iloc[-i] < histogram.iloc[-i+1]:
            increasement += 1
    
    score = get_threshold_score(increasement,"macd")
    return score

#### Option : Conservative

In [33]:
def assess_conservative(df):
    rsi = assess_rsi(df)
    
    short_dis = assess_dis(df,period=10)
    long_dis = assess_dis(df,period=25)
    
    short_adx = assess_adx(df,period=10)
    long_adx = assess_adx(df,period=25)
    
    return rsi, short_dis, long_dis, short_adx, long_adx

In [34]:
def export_passed_stock_conservative(stock,score,assessment):
    return {"Stock":stock,"Score":score,"RSI":assessment[0],"10DI+":assessment[1],"25DI+":assessment[2],"10ADX":assessment[3],"25ADX":assessment[4]}

#### Option : Aggressive

In [35]:
def assess_aggresive(df):
    rsi = assess_rsi(df)
    
    short_dis = assess_dis(df,period=10)
    short_adx = assess_adx(df,period=10)
    
    macd = assess_macd(df)
    
    return rsi, short_dis, short_adx, macd

In [36]:
def export_passed_stock_aggresive(stock,score,assessment):
    return {"Stock":stock,"Score":score,"RSI":assessment[0],"10DI+":assessment[1],"10ADX":assessment[2],"MACD":assessment[3]}

####  Implementation

In [37]:
assess_stock = {"conservative":assess_conservative, "aggresive":assess_aggresive}
export_stock = {"conservative":export_passed_stock_conservative, "aggresive":export_passed_stock_aggresive}

In [38]:
passed_stock = []
fetched_stock = 0

for index,stock in enumerate(stocks) : 
    if len(passed_stock) >= stock_number:
        break
    
    if (index+1) % 10 == 0:
        print_progress(index+1,fetched_stock,len(passed_stock))
    
    try:
        stock_price_rows = fetch_stock_rows(stock)
        stock_df = get_stock_df(stock_price_rows)
            
        fetched_stock += 1
            
        assessment = assess_stock["aggresive"](stock_df)

        score = sum(assessment)/len(assessment)
        if score >= minimum_score:
            new_line = export_stock["aggresive"](stock,score,assessment)
            passed_stock.append(new_line)
                    
    except:
        pass
    
final_df = pd.DataFrame(passed_stock)

Total :  10
Fetched :  9
Passed :  0 

Total :  20
Fetched :  19
Passed :  1 

Total :  30
Fetched :  29
Passed :  3 

Total :  40
Fetched :  39
Passed :  4 

Total :  50
Fetched :  49
Passed :  6 

Total :  60
Fetched :  59
Passed :  7 

Total :  70
Fetched :  69
Passed :  9 

Total :  80
Fetched :  79
Passed :  9 

Total :  90
Fetched :  89
Passed :  9 

Total :  100
Fetched :  99
Passed :  9 



In [39]:
final_df = final_df.sort_values(by=['Score'],ascending=False)
final_df.to_csv("stock_assessment.csv",index=False)