In [3]:
import pymongo
import pandas as pd
import requests as req
import logging
import json
import torch
from tqdm import tqdm #For Progress Bars
from bs4 import BeautifulSoup as bs
import re
import xml.etree.ElementTree as et
import numpy as np
import torch

In [4]:
head = {
    "User-Agent": "Digital-Alpha SEC Explorer/1.0",
    "Connection": "keep-alive"
}

company_data = pd.read_csv('./data/company_summary.csv')

logging.basicConfig(filename='error.log', encoding='utf-8', level=logging.DEBUG)

In [5]:
def has_keypoints(str, list):
    for key in list:
        if key in str.lower():
            return True
    return False

In [6]:
mongo_url = "mongodb+srv://huntrag:killsasuke@cluster0.staij.mongodb.net/myFirstDatabase?authSource=admin&replicaSet=atlas-a852oq-shard-0&w=majority&readPreference=primary&appname=MongoDB%20Compass&retryWrites=true&ssl=true"
try:
    client = pymongo.MongoClient(mongo_url)
    print("Connected to MongoDb Successfully")
except:
    logging.error("DB CONN ERROR: Couldn't connect to DB successfully")

mydb = client['tech-meet']

form_sentiment = mydb['form-sentiment']

Connected to MongoDb Successfully


In [7]:
from transformers import BertTokenizer, BertForSequenceClassification
finbert = BertForSequenceClassification.from_pretrained('yiyanghkust/finbert-tone',num_labels=3)
tokenizer = BertTokenizer.from_pretrained('yiyanghkust/finbert-tone')

gpu = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
cpu = "cpu"

finbert.eval()

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30873, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0): BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, element

In [8]:
labels = {0:'neutral', 1:'positive',2:'negative'}

def getSentimentDict(root,base_url):
    trigger_one = 'DISCLOSURE'
    trigger_list = ['ACCOUNTING', 'LEASES', 'DEBT','COMMITMENTS','STOCK',] # List of triggers to look for
    sentiment_dict = {}
    for report in root.iter('Report'):
        for trigger_word in trigger_list:
            partname = report.find('LongName').text
            if (trigger_word.lower() in partname.lower() and trigger_one.lower() in partname.lower()):
                if ("table" in partname.lower() or "details" in partname.lower()):
                    continue
                txtlist = getParaDat(base_url + '/' + report.find('HtmlFileName').text)
                if not txtlist:
                    continue
                #Get result in result as {'neutral':x, 'positive':y, 'negative':z}
                sentiment_dict[partname] = getSentimentWhole(txtlist)
                logging.info("Added another {} to dict".format(partname))
    
    logging.info("Done with the dict for {}".format(base_url))
    return sentiment_dict

def getSentimentWhole(txt):
    try:
        encoded_text = tokenizer(txt, return_tensors="pt", padding=True)
        sentiment_tensor = finbert(**encoded_text)[0].detach().numpy()
        minval = np.min(sentiment_tensor)
        for i in range(3):
            sentiment_tensor[i] = sentiment_tensor[i] - minval
    except:
        sentiment_tensor = np.zeros(3)
        logging.warning("Error in calculating output, setting output to [0., 0., 0.]")
    return {
        'neutral':float(sentiment_tensor[0]),
        'positive':float(sentiment_tensor[1]),
        'negative':float(sentiment_tensor[2])
    }
def getSentimentAvg(txt_list):
    try:
        encoded_text = tokenizer(txt_list, return_tensors="pt", padding=True)
        sentiment_tensor = finbert(**encoded_text)[0].detach().numpy()
        refined_sentiment = [i for i in sentiment_tensor if np.argmax(i) != 0]
        if refined_sentiment:
            average_sentiment = np.average( refined_sentiment, axis=0)
        else:
            average_sentiment = np.average( sentiment_tensor, axis=0)
        minval = np.min(average_sentiment)
        for i in range(3):
            average_sentiment[i] = average_sentiment[i] - minval
    except:
        average_sentiment = np.zeros(3)
        logging.warning("Error in calculating output, setting output to [0., 0., 0.]")
    return {
        'neutral':float(average_sentiment[0]),
        'positive':float(average_sentiment[1]),
        'negative':float(average_sentiment[2])
    }

def getParaDat(url):
    '''
    Function to get the text data from the url
    '''
    try:
        soup = bs(
            req.get(
                url,
                headers=head
            ).text,
            "html"
        )
        return soup.text.strip()
    except:
        logging.error("Error in getting text data from url")
        return ""

def getParaList(url):
    '''
    Function to get the text data from the url
    '''
    try:
        soup = bs(
            req.get(
                url,
                headers=head
            ).text,
            "html"
        )
        txtlist = re.split(r' *[\n\.\?!][\'"\)\]]* *',soup.text.strip())
        return [ txt for txt in txtlist if len(txt) > 100 ]
    except:
        logging.error("Error in getting text data from url")
        return []

In [9]:
for i in tqdm(company_data.index, desc="Filling in the MongoDb"):
    try:
        company_summary = json.loads(company_data["HISTORY"][i])
        file_shelf = company_summary["filings"]["recent"] #Holds the db of files
        cik_num = company_summary["cik"]
        tickers = company_summary['tickers']
        file_num = len(file_shelf['accessionNumber'])
    except:
        logging.error("Unknown Error @ Company ", cik_num)
        continue
    for k in range(file_num):
    #For every file in that company
        if file_shelf['form'][k].replace("-","").lower() in ["10k","10q"]:
            
            data_point = {}

            data_point['cik'] = int(cik_num)
            data_point['accession-number'] = file_shelf['accessionNumber'][k]
            
            #the root folder of the document
            base_url = "https://www.sec.gov/Archives/edgar/data/" + cik_num + "/" + file_shelf['accessionNumber'][k].replace("-", "")
            #Get its filings summary
            res = req.get(base_url + "/FilingSummary.xml", headers=head)
            try:
                root = et.fromstring(res.text)
            except:
                logging.error("Error in generating HTML Root at @", cik_num, " , " , file_shelf['accessionNumber'][k])
                pass
            # Store in dict format
            data_point['sentiment-list'] = getSentimentDict(root, base_url)
            try:
                form_sentiment.insert_one(data_point)
                logging.info("Pushed a ",cik_num," data to mongo\n")
            except:
                logging.error("Error in inserting data for ", cik_num, " , " , file_shelf['accessionNumber'][k])
                pass

Filling in the MongoDb:   0%|          | 0/292 [00:00<?, ?it/s]--- Logging error ---
Traceback (most recent call last):
  File "C:\Python310\lib\logging\__init__.py", line 1100, in emit
    msg = self.format(record)
  File "C:\Python310\lib\logging\__init__.py", line 943, in format
    return fmt.format(record)
  File "C:\Python310\lib\logging\__init__.py", line 678, in format
    record.message = record.getMessage()
  File "C:\Python310\lib\logging\__init__.py", line 368, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
  File "C:\Python310\lib\runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Python310\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "C:\Users\Kaushik Dey\AppData\Roaming\Python\Python310\site-packages\ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "C:\Users\Kaushik Dey\AppData\Roaming\Python\P

## Analysis Point

Check what went wrong

In [None]:
txtlist = getParaDat('https://www.sec.gov/Archives/edgar/data/1459417/000145941721000014/R13.htm')
encoded_text = tokenizer(txtlist, return_tensors="pt", padding=True)
encoded_text
sentiment_tensor = finbert(**encoded_text)[0].detach().numpy()
refined_sentiment = [i for i in sentiment_tensor if np.argmax(i) != 0]
if refined_sentiment:
    average_sentiment = np.average( refined_sentiment, axis=0)
else:
    average_sentiment = np.average( sentiment_tensor, axis=0)
minval = np.min(average_sentiment)
for i in range(3):
    average_sentiment[i] = average_sentiment[i] - minval
print(refined_sentiment)
# except:
    # average_sentiment = np.zeros(3)
    # logging.warning("Error in calculating output, setting output to [0., 0., 0.]")
dict =  {
    'neutral':average_sentiment[0],
    'positive':average_sentiment[1],
    'negative':average_sentiment[2]
}
dict