In [1]:
from app.crawler_news.config import BASE_URL, HEADERS
from bs4 import BeautifulSoup
from datetime import timedelta, date, datetime
import aiohttp
import requests
from io import BytesIO
import fitz
import re
from googletrans import Translator
from docx import Document

from app.crawler_news.config import BASE_URL, HEADERS, CONTENT_URL, ATTACHMENT_URL

In [2]:
from datetime import datetime

def convert_to_iso(date_str):
    date_obj = datetime.strptime(date_str, '%m/%d/%Y')
    return date_obj.strftime('%Y-%m-%d')

In [3]:
def extract_news_links(page_content):
    list = []
    soup = BeautifulSoup(page_content, 'html.parser')
    news_links = soup.select('#seiNetIssuerLatestNews .container-seinet a')
    for row in news_links:
        news_id = row["href"].split("/")[-1]
        date = str(row.select_one('h4').text.split()[0])

        news_dict = {
            "Date": convert_to_iso(date),
            "ContentURL": CONTENT_URL + f"/{news_id}"
        }
        list.append(news_dict)

    return list

In [4]:
def format_html_text(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    text = soup.get_text(separator=" ", strip=True)
    text = re.sub(r'\s+', ' ', text).strip()
    text = text.replace("\xa0", " ")
    return text

In [5]:
def format_PDF_text(text):
    # Remove multiple spaces
    text = re.sub(r'\s+', ' ', text)
    text = text.replace("\n", " ").strip()
    return text

In [6]:
def extract_text_from_docx(byte_code):
    doc = Document(byte_code)
    text = ""
    for para in doc.paragraphs:
        text += para.text + "\n"
    return text

In [7]:
def extract_text_from_pdf(byte_code):
    file = fitz.open(stream=byte_code)
    text = ""
    for page_num in range(min(2, file.page_count)):
        page = file.load_page(page_num)
        text += page.get_text("text")
    return text

In [8]:
async def fetch_attachment_staro(session, attachment_id, file_type):
    CUSTOM_URL = ATTACHMENT_URL + f"/{attachment_id}"
    async with session.get(CUSTOM_URL) as response:
        if response.status == 200:
            bytes_content = await response.read()
            text = format_PDF_text(extract_text_from_pdf(BytesIO(bytes_content)))
            return text
    return None

In [9]:
def fetch_attachment(attachment_id, file_type):
    CUSTOM_URL = ATTACHMENT_URL + f"/{attachment_id}"
    response = requests.get(CUSTOM_URL)
    if response.status_code == 200:
        bytes_data = BytesIO(response.content)
        if file_type == "pdf":
            text = format_PDF_text(extract_text_from_pdf(bytes_data))
        elif file_type == "docx":
            text = format_PDF_text(extract_text_from_docx(bytes_data))
        return text
    return None

In [10]:
def extract_first_link_from_href(text):    
    soup = BeautifulSoup(text, 'lxml')    
    
    link = soup.find('a', href=lambda href: href and href.strip().startswith(('https://seinet.com.mk/document/', 'https://www.seinet.com.mk/document/')))    
   
    return link['href'] if link else None

In [11]:
def fetch_content(response_json, is_redirect=False):
    content = response_json.get("data", {}).get("attachments", None)
    if content:
        attachment_id = content[0].get("attachmentId", None)
        attachment_type = content[0].get("attachmentType", {}).get("mimeType", "")
        if attachment_id is not None:
            if attachment_type == "application/pdf":
                return fetch_attachment(attachment_id, "pdf")
            elif attachment_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
                return fetch_attachment(attachment_id, "docx")
            else:
                return None

    text = response_json.get("data", {}).get("content", None)  

    # Check if redirection is needed   
    url_check = extract_first_link_from_href(text)
    if url_check:
        news_id = url_check.split("/")[-1]        
        url = CONTENT_URL + f"/{news_id}" 
        response_json_redirect = requests.get(url).json()        
        content_mk = fetch_content(response_json_redirect)         
        #return content_mk
        return translate_mk_to_en(translator, content_mk)  
    return format_html_text(text)

In [12]:
def normalize_cyrillic(text):
    # Replace common misencoded Cyrillic characters
    replacements = {
        '~': 'ч', '{': 'ш', '}': 'ж', '`': 'ѓ', ']': 'љ', '[': 'њ', '|': 'и',
        '@': 'џ', '^': 'ќ',
        'A': 'А', 'B': 'Б', 'V': 'В', 'G': 'Г', 'D': 'Д', 'E': 'Е', 'Z': 'З',
        'I': 'И', 'J': 'Ј', 'K': 'К', 'L': 'Л', 'M': 'М', 'N': 'Н', 'O': 'О',
        'P': 'П', 'R': 'Р', 'S': 'С', 'T': 'Т', 'U': 'У', 'F': 'Ф', 'H': 'Х',
        'C': 'Ц', 'Y': 'Ч', 'X': 'Џ', 'Q': 'Ш',
        'a': 'а', 'b': 'б', 'v': 'в', 'g': 'г', 'd': 'д', 'e': 'е', 'z': 'з',
        'i': 'и', 'j': 'ј', 'k': 'к', 'l': 'л', 'm': 'м', 'n': 'н', 'o': 'о',
        'p': 'п', 'r': 'р', 's': 'с', 't': 'т', 'u': 'у', 'f': 'ф', 'h': 'х',
        'c': 'ц', 'y': 'ч', 'x': 'џ', 'q': 'ш'
    }
    for latin_char, cyrillic_char in replacements.items():
        text = text.replace(latin_char, cyrillic_char)
    return text

In [13]:
translator = Translator()

def translate_mk_to_en(translator, text):
    if text is not None:
        if isinstance(text, str) and text.strip():
            if "..." in text:
                return None
            
            normalized_text = normalize_cyrillic(text)
            try:
                translated_text = translator.translate(normalized_text, src='mk', dest='en')
                return translated_text.text
            except Exception as e:
                print(f"Translation error: {e}")
                return normalized_text
    else:
        return None

In [14]:
def detect_garbled_text(text):
    mixed_words = re.findall(r'\b[A-Za-z]+\d+[A-Za-z]*\b', text)
    return len(mixed_words) > 10

In [15]:
# Helper function for missing data fetch in Filter 3
def fetch_company_news_data(company_name, start_date):
    data_to_append = []

    if (start_date == None):
        end_date = (date.today())
        start_date = (end_date - timedelta(days=365 * 10)).isoformat()

    latest_date = start_date

    news_date_link = []
    CUSTOM_URL = BASE_URL + f"{company_name}"
    response = requests.get(CUSTOM_URL)        
    if response.status_code == 200:
        page_content = response.text
        news_date_link = extract_news_links(page_content)
    else:
        print(f"Failed to fetch data. Status code: {response.status_code}")
        return None, None, None
    
    if not news_date_link:
        return None, None, None

    max_date_dict = max(news_date_link, key=lambda x: x['Date'])
    max_date = max_date_dict['Date']

    if max_date == latest_date:
        return None, None, None
    elif max_date > latest_date:

        filtered_list = [entry for entry in news_date_link if entry['Date'] > latest_date]
        latest_date = max_date

        for entry in filtered_list:
            URL = entry['ContentURL']
            response = requests.get(URL)                  
            response_json = response.json()
            text = fetch_content(response_json)
            if text == "" or text is None or detect_garbled_text(text):
                continue
            new_data = {
                    "company": company_name,
                    "date": entry['Date'],
                    "content": text,
                    "sentiment": ""
            }
            data_to_append.append(new_data)

    return company_name, latest_date, data_to_append

In [18]:
company_name, latest_date, data_to_append = fetch_company_news_data("TKPR", "2015-01-01")

In [278]:
from sqlalchemy.orm import Session
from app.database.connection import get_db
from app.models.stock import StockData, LatestDate

def get_all_companies(db: Session = next(get_db())):
    print("Fetching all unique company names...")
    try:
        # Query all records from LatestDate
        stocks = db.query(LatestDate.company_name).distinct().limit(15).all()
        
        # Extract company names from the result
        company_names = [stock.company_name for stock in stocks]
        
        print(f"Found {len(company_names)} unique companies")
        return company_names
    
    except Exception as e:
        print(f"Error fetching company names: {e}")
        return []

In [279]:
company_names = get_all_companies()

Fetching all unique company names...
Found 15 unique companies


In [283]:
company_names

['ADIN',
 'ALK',
 'ALKB',
 'AMBR',
 'AMEH',
 'APTK',
 'ATPP',
 'AUMK',
 'BANA',
 'BGOR',
 'BIKF',
 'BIM',
 'BLTU',
 'CBNG',
 'CDHV']