In [43]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import re

In [44]:
def finding_raw_urls_from_base(url):
    service = Service("./chromedriver/chromedriver.exe")
    options = Options()
    options.add_argument("--headless")
    options.add_argument("--disable-gpu")
    options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
    
    driver = webdriver.Chrome(service=service, options=options)
    driver.get(url)
    
    WebDriverWait(driver, 60).until(
        EC.presence_of_all_elements_located((By.TAG_NAME, 'a'))
    )
    
    html = driver.page_source
    driver.quit()
    
    soup = BeautifulSoup(html, 'html.parser')
    links = soup.find_all('a', href=True)
    raw_urls = []
    for link in links:
        if link['href'].startswith("https://www.moneycontrol.com/news/business/"):
            raw_urls.append(link['href'])
    return list(set(raw_urls))
    

In [45]:
raw_urls = finding_raw_urls_from_base("https://www.moneycontrol.com/news/business/stocks/")
# raw_urls = finding_raw_urls_from_base("https://www.moneycontrol.com/news/business/stocks/page-3")

In [None]:
raw_urls

In [47]:
stock_urls =[]
market_urls = []
next_page_urls = []
other_urls = []
ipo_urls = []
def extract_urls(all_urls):
    for link in raw_urls:
        if link.startswith("https://www.moneycontrol.com/news/business/stocks/page"):
            next_page_urls.append(link)
        elif link.startswith("https://www.moneycontrol.com/news/business/stocks/"):
            stock_urls.append(link)
        elif link.startswith("https://www.moneycontrol.com/news/business/markets/"):
            market_urls.append(link)
        elif link.startswith("https://www.moneycontrol.com/news/business/ipo/"):
            ipo_urls.append(link)
        else:
            other_urls.append(link)

In [48]:
extract_urls(raw_urls)

In [None]:
market_urls

In [None]:
stock_urls

In [51]:

regex = r'https:\/\/www\.moneycontrol\.com\/news\/business\/stocks\/[^\/\s]+(?:\.[a-z]{2,6})(?:[\/\?].*)?'

final_stocks_urls = [url for url in stock_urls if re.match(regex, url)]

In [None]:
final_stocks_urls

In [53]:
service = Service("./chromedriver/chromedriver.exe")
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.199 Safari/537.36")

driver = webdriver.Chrome(service=service, options= options)

In [54]:
def extract_data(url):  
    driver.get(url)
    
    WebDriverWait(driver, 60).until(
        lambda d: d.execute_script("return document.readyState") == "complete"
    )
    
    html = driver.page_source
    soup = BeautifulSoup(html, 'html.parser')    
    news = {}
    
    title = soup.find('h1', class_ = "article_title")
    news.update({"title":f"{title.text.strip()}"})
    
    desc = soup.find('h2', class_ = 'article_desc')
    news.update({"desc":f"{desc.text.strip()}"})
    
    date_time_div = soup.find('div', class_="article_schedule")
    if date_time_div:
        span_tag = date_time_div.find('span')
        date = span_tag.text.strip()
        news.update({"date":f"{date}"})
    
    datetime = date_time_div.text.strip()
    news.update({"datetime":f"{datetime}"})
    
    paragraphs_list = []
    paragrphs_div = soup.find('div', class_ ="content_wrapper")
    if paragrphs_div:
        paragraph_tags = paragrphs_div.find_all('p')
        for p in paragraph_tags:
            para_text = p.text.strip()            
            if len(para_text) < 50 :
                continue            
            if re.search(r"(click\s+here|disclaimer|modal|window|advertisement|investment\s+tips)", para_text, re.IGNORECASE):
                continue            
            paragraphs_list.append(para_text)         
            
    news.update({"content": paragraphs_list})
    
    stock_name = soup.find('a', class_="stock-name")
    if stock_name:
        news.update({"stock_name":f"{stock_name.text.strip()}"})
            
    # driver.quit()   
    return news

In [55]:
raw_news_data = []

In [None]:
for url in final_stocks_urls :
    data = extract_data(url)
    raw_news_data.append(data)

In [None]:
driver.quit()   

In [None]:
# raw_news_data

Processed data

In [None]:
from datetime import datetime

In [None]:
import re
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
# import spacy


In [None]:
import nltk
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('wordnet')

In [None]:
lemmatizer = WordNetLemmatizer()

In [None]:
def preprocess_news(news):
    text = f"{news['title']} {news['desc']} {' '.join(news['content'])}"
    
    text = re.sub(r"[^a-zA-Z\s]", "", text)
    text = re.sub(r"\s+"," ", text).strip()
    
    text = text.lower()
    
    tokens = word_tokenize(text)
    
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stop_words]
    
    lemmatized_token = [lemmatizer.lemmatize(token) for token in tokens]
    return " ".join(lemmatized_token)

In [None]:
def preprocess_date(date):
    date_obj = datetime.strptime(date, "%B %d, %Y") # strptime str -> datetime obj
    formatted_date = date_obj.strftime("%Y-%m-%d")  # strftime  datetime obj -> formatted str
    return formatted_date

In [None]:
def preprocess_datetime(raw_datetime):
    raw_datetime_split = raw_datetime.split("/")
    date_obj = datetime.strptime(raw_datetime_split[0].strip(), "%B %d, %Y")
    formatted_date = date_obj.strftime("%Y-%m-%d")
    time_obj =  raw_datetime_split[1].strip().replace("IST","")
    formatted_datetime = formatted_date + " " + time_obj
    return formatted_datetime.strip()

# print(preprocess_datetime("January 02, 2025 / 17:53 IST"))

In [None]:
from yahooquery import search

def extract_ticker_name(company_name):
    company_name = company_name.strip()
    ticker_name = None       
    if company_name:
        results = search(company_name)        
        if results and 'quotes' in results:
            for quote in results['quotes']:
                if 'symbol' in quote and 'longname' in quote:
                    if company_name.lower() in quote['longname'].lower():
                        ticker_name = quote['symbol']
                        break      
    return ticker_name

In [None]:
def get_complete_raw_news(news):
    text = f"{news['title']} {news['desc']} {' '.join(news['content'])}"
    return text

In [None]:
processed_data = []

In [None]:
for news in raw_news_data:
    processed_text = preprocess_news(news)
    formatted_date = preprocess_date(news['date'])
    formatted_datetime = preprocess_datetime(news['datetime'])
    raw_news = get_complete_raw_news(news)
    if "stock_name" in news:
        ticker_name = extract_ticker_name(news["stock_name"]) 
    
        processed_data.append({
            # "_id": news["_id"],
            "raw_news":raw_news,
            "processed_text": processed_text,
            "date": formatted_date,
            "datetime": formatted_datetime,
            "stock_name": news.get("stock_name"),
            "ticker_name":ticker_name,        
        })
    else:
        processed_data.append({
                # "_id": news["_id"],
                "raw_news":raw_news,
                "processed_text": processed_text,
                "date": formatted_date,
                "datetime": formatted_datetime,
                "stock_name": news.get("stock_name"),      
                "ticker_name": None,      
            })

In [None]:
# processed_data

Manually updating bad records 

In [None]:
for processed_news in processed_data:
    if not processed_news['stock_name']:
        stock_name = input(f"Enter stock name {processed_news['raw_news']}")
        processed_news.update({"stock_name":f"{stock_name}"})
    if not processed_news['ticker_name']:
        ticker_name = input(f"Enter ticker name for {processed_news['raw_news']}")
        processed_news.update({"ticker_name":f"{ticker_name}"})
        

In [None]:
processed_data

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import pipeline

In [None]:
tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone")
model = AutoModelForSequenceClassification.from_pretrained("yiyanghkust/finbert-tone")
nlp_pipeline = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)

In [None]:
def split_text(text, max_length=500):
    tokens = tokenizer.tokenize(text)
    chunks = [tokens[i:i+max_length] for i in range(0, len(tokens), max_length)]
    return [tokenizer.convert_tokens_to_string(chunk) for chunk in chunks]

In [None]:
for news in processed_data:
    
    text = news['raw_news']
    
    text_chunks = split_text(text)
    sentiment_results = []
    
    for chunk in text_chunks:
        sentiment_result = nlp_pipeline(chunk)
        sentiment_results.extend(sentiment_result)

    i = 0
    while i < len(sentiment_results) - 1:
        if sentiment_results[i]['label'] == sentiment_results[i + 1]['label']:
            avg_score = (sentiment_results[i]['score'] + sentiment_results[i + 1]['score']) / 2
            sentiment_results[i] = {"label": sentiment_results[i]['label'], "score": avg_score}
            del sentiment_results[i + 1]  
        else:
            i += 1  
    news.update({"finbert_analysis":sentiment_results})
 
    # print("Updated Sentiment Results:", sentiment_results)

In [None]:
processed_data

In [None]:
finbert_list = []

In [None]:
for data in processed_data:
    if len(data['finbert_analysis']) == 1:
        if data['stock_name'] and data['ticker_name']:
            finbert_list.append(data)


In [None]:
finbert_list

In [None]:
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta

def get_stock_features(ticker, news_date):
    
    try:
        news_date = datetime.strptime(news_date, "%Y-%m-%d")
        before_start = (news_date - timedelta(days=5)).strftime("%Y-%m-%d")
        after_end = (news_date + timedelta(days=5)).strftime("%Y-%m-%d")
        
        
        stock = yf.Ticker(ticker)
        try:
            historical_data = stock.history(start=before_start, end=after_end)  
            historical_data.index = historical_data.index.tz_localize(None)
        except Exception as e:
            print(f"Error fetching data for ticker {ticker}: {e}")
            return None
            
            
        if historical_data.empty:
            return None
        
        #before news stock data
        before_news = historical_data.loc[:news_date - timedelta(days=1)]
        avg_price_5d = before_news['Close'].mean()
        vol_5d = before_news['Close'].std()
        avg_volume_5d = before_news['Volume'].mean()
        
        # news day stock data
        if news_date.strftime("%Y-%m-%d") in historical_data.index:
            news_day = historical_data.loc[news_date.strftime("%Y-%m-%d")]
            open_price = news_day['Open']
            close_price = news_day['Close']
            volume_news_day = news_day['Volume']
            stock_movement = "up" if close_price > open_price else "down"
        else:
            open_price = close_price = volume_news_day = None
        
        # gap
        prev_day_idx = historical_data.index.get_loc(news_date) - 1
        if prev_day_idx >= 0:
            prev_close = historical_data.iloc[prev_day_idx]['Close']
            gap = open_price - prev_close if open_price is not None else None
        else:
            prev_close = None
            gap = None
        
        # after news stock data 
        after_news = historical_data.loc[news_date + timedelta(days=1):]
        price_movement_1d = ((after_news['Close'].iloc[0] - open_price) / open_price) * 100 if len(after_news) > 0 and open_price else None
        price_movement_3d = ((after_news['Close'].iloc[2] - open_price) / open_price) * 100 if len(after_news) > 2 and open_price else None
        price_movement_5d = ((after_news['Close'].iloc[4] - open_price) / open_price) * 100 if len(after_news) > 4 and open_price else None
        # avg_volume_after = after_news['Volume'].mean() if len(after_news) > 0 else None
        
        return {
            "avg_price_5d": avg_price_5d,
            "volatility_5d": vol_5d,
            "avg_volume_5d": avg_volume_5d,
            "open_price_news_day": open_price,
            "close_price_news_day": close_price,
            "volume_news_day": volume_news_day,
            "price_movement_1d": price_movement_1d,
            "price_movement_3d": price_movement_3d,
            # "price_movement_5d": price_movement_5d,
            # "avg_volume_after": avg_volume_after,
            "gap":gap,
            # "stock_movement_on_news_day":stock_movement,
        }
    except Exception as e :
        return None


In [None]:
count = 0 
for finbert_added_data in finbert_list:
    ticker = finbert_added_data['ticker_name'].strip()
    date = finbert_added_data['date'].strip()
    
    data = get_stock_features(ticker, date)
    print(count)
    finbert_added_data.update({"stock_data": data})
    count += 1

In [None]:
finbert_list

In [None]:
presentation_data_df = pd.DataFrame(finbert_list)


In [None]:
selected_column_df = presentation_data_df[['finbert_analysis','stock_data']]

In [None]:
def extraction_finbert_data(finbert_list_dict):
    label = finbert_list_dict[0].get('label')    
    score = finbert_list_dict[0].get('score')
    return label, score

In [None]:
selected_column_df[['finbert_label', 'finbert_score']] = selected_column_df['finbert_analysis'].apply(extraction_finbert_data).apply(pd.Series)

In [None]:
def extract_stock_data(stock_data):
    avg_price_5d = stock_data.get('avg_price_5d')
    volatility_5d = stock_data.get('volatility_5d')
    avg_volume_5d = stock_data.get('avg_volume_5d')
    open_price_news_day = stock_data.get('open_price_news_day')
    close_price_news_day = stock_data.get('close_price_news_day')
    volume_news_day = stock_data.get('volume_news_day')
    price_movement_1d = stock_data.get('price_movement_1d')
    gap = stock_data.get('gap')
    stock_movement_on_news_day = stock_data.get('stock_movement_on_news_day')
    return avg_price_5d,volatility_5d, avg_volume_5d, open_price_news_day, close_price_news_day, volume_news_day, price_movement_1d,gap, stock_movement_on_news_day

In [None]:
selected_column_df[['avg_price_5d','volatility_5d','avg_volume_5d','open_price_news_day','close_price_news_day','volume_news_day','price_movement_1d','gap','stock_movement_on_news_day']] =selected_column_df['stock_data'].apply(extract_stock_data).apply(pd.Series)

In [None]:
dropped_column_df = selected_column_df.drop(columns=['finbert_analysis','stock_data'])

In [None]:
def convert_label_to_numeric(label):
    if label.strip() == 'Positive':
        return 1
    elif label.strip() == "Neutral":
        return 0
    elif label.strip() == "Negative":
        return -1

In [None]:
dropped_column_df['finbert_label'] = dropped_column_df['finbert_label'].apply(convert_label_to_numeric)

In [None]:
def convert_target_varible(output):
    if output.strip() == "up":
        return 1
    elif output.strip() == "down":
        return 0

In [None]:
# dropped_column_df['stock_movement_on_news_day'] = dropped_column_df['stock_movement_on_news_day'].apply(convert_target_varible)

In [None]:
def round_out_value(value):
    try:
        return round(value, 3)
    except:
        return 0

In [None]:
dropped_column_df['finbert_score'] = dropped_column_df['finbert_score'].apply(round_out_value)
dropped_column_df['avg_price_5d'] = dropped_column_df['avg_price_5d'].apply(round_out_value)
dropped_column_df['volatility_5d'] = dropped_column_df['volatility_5d'].apply(round_out_value)
dropped_column_df['avg_volume_5d'] = dropped_column_df['avg_volume_5d'].apply(round_out_value)
dropped_column_df['open_price_news_day'] = dropped_column_df['open_price_news_day'].apply(round_out_value)
dropped_column_df['close_price_news_day'] = dropped_column_df['close_price_news_day'].apply(round_out_value)
dropped_column_df['volume_news_day'] = dropped_column_df['volume_news_day'].apply(round_out_value)
dropped_column_df['price_movement_1d'] = dropped_column_df['price_movement_1d'].apply(round_out_value)
dropped_column_df['gap'] = dropped_column_df['gap'].apply(round_out_value)

In [None]:
dropped_column_df = dropped_column_df.fillna(0)


In [None]:

dropped_column_df =dropped_column_df.drop(columns='stock_movement_on_news_day')

Prediction

In [None]:
import dill
with open("./Stock_prediction_model.pkl", "rb") as file:
    loaded_model = dill.load(file)


In [None]:
for i in range(len(dropped_column_df)):
    row_dict = dropped_column_df.iloc[i].to_dict()
    new_data_df = pd.DataFrame([row_dict])
    predicted_movement = loaded_model.predict(new_data_df)
    print("Predicted Stock Movement:", "Up" if predicted_movement[0] == 1 else "Down")
