In [1]:

import time
import requests
from bs4 import BeautifulSoup
import re
from zipfile import ZipFile
import glob
import os
import fitz
import openai
import pandas as pd
from tenacity import retry, stop_after_attempt, wait_random_exponential
from transformers import GPT2Tokenizer

  from .autonotebook import tqdm as notebook_tqdm
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


In [None]:
'''Funktsioon loomaks ja teostamaks päring vastavale lingile'''
def koosta_paring(url):
    headers = {
        'User-Agent': 'my_crawler (brandon.loorits@ut.ee) / for_study_purpose',
    } # Määrame enda päringu päise, et oleks teada, kes päringuid veebileheküljele teeb
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.text
    else:
        print(f'Päring lehele {url} ebaõnnestus. Staatuskood: {response.status_code}')
        return None

In [None]:
main_url = 'https://nasdaqbaltic.com/statistics/et/shares'
shares = []

# Kogume kokku kõik url-id
print(f'Külastan lehte: {main_url}')
time.sleep(5)  # Viiteaeg, et ei ummistaks lehte
page_content = koosta_paring(main_url)
if page_content:
    soup = BeautifulSoup(page_content, 'html.parser')
    table = soup.find('table') 
    if table:
        links = [a['href'] for a in table.find_all('a', href=True, class_="text16 compname") if a['href'].startswith('/statistics/et/instrument')]
        shares.extend(links)
    else:
        print('Ei leitud tabelit".')
else:
    print('Algse lehe külastamine ebaõnnestus.')
    

In [None]:
# Külastame leitud linke
reports=[]
for link in shares:
    absolute_link = f'https://nasdaqbaltic.com{link}'
    print(f'Külastan lehte: {absolute_link}')
    time.sleep(5)  # Ootame 5 sekundit enne järgmise päringu tegemist, et mitte ummistada lehekülge
    page_content = koosta_paring(absolute_link)
    if page_content:
        soup = BeautifulSoup(page_content, 'html.parser')
        link_element = soup.find('a', string="Aruanded")
        reports.append(link_element.get('href'))
    else:
        print(f'Lehe külastamine ebaõnnestus: {absolute_link}')
print(len(reports))


In [None]:
base_link =f'https://nasdaqbaltic.com'
# Kõik failide lühendid, mis loetakse sisse XHTML failina kuna pdf-id puuduvad
xhtml_nums = {
    'ako':('pf57','pf38_1'),
    'apg':('pf1','pf2c'),
    'ign':('pfa3','pf136'),
    'kne1':('pfa3','pf100'),
    'pzv':('pf57','pf70'),
    'rsu':('pf1','pf34'),
    'sab':('pf1','pf4c'),
    'vlp':('pf1','pf4b')
}
exclusion_list = xhtml_nums.keys()

# Loome kaustad, kui need ei eksisteeri
os.makedirs('aruanded/aastaaruanded', exist_ok=True)
os.makedirs('alusfailid', exist_ok=True)

# Otsime vajalikud aruanded ja salvestame need vastavasse kausta
for report in reports:
    absolute_link = f'{base_link}{report}'
    print(f'Külastan lehte: {absolute_link}')
    time.sleep(5)  # Ootame 5 sekundit enne järgmise päringu tegemist, et mitte ummistada lehekülge
    page_content = koosta_paring(absolute_link)
    soup = BeautifulSoup(page_content, 'html.parser')
    table = soup.find('tbody')

    pdf_link = None
    zip_link = None
    esg_link = None

    for row in table.find_all('tr'):
        links = row.find_all('a')
        hrefs = [link.get('href') for link in links]
        for href in hrefs:
            if 'ar' in href.split('/')[-1] and href.endswith('.pdf') and href.split('/')[4] not in exclusion_list:
                if href.split('/')[4] == 'dgr' and href.endswith('ias.pdf'): # erand ühele aruandele kuna sellel olemas ka lühendatud versioon, mida me ei vaja
                    pdf_link = href
                    break
                else:
                    pdf_link = href
            elif 'ar' in href.split('/')[-1] and href.endswith('.zip') and not pdf_link:
                zip_link = href
            if 'esg' in href.split('/')[-1]:
                esg_link = href
        if pdf_link or zip_link:
            break

    # Kontrollime, kas aastaaruanne on olemas
    if pdf_link:
        # Kontrollime kõigepealt, kas ESG aruanne on olemas samale aastale
        if esg_link:
            esg_link_full = f'{base_link}{esg_link}'
            print(f'ESG link: {esg_link_full}')
            esg_response = requests.get(esg_link_full)
            if esg_response:
                match = re.search(r'/reports/([^_/]+/[^_/]+)_', esg_link_full).group(1).replace('/', '_')
                print(f'Kirjutame esg {match} pdfi maha')
                with open(f'aruanded/aastaaruanded/esg_{match}.pdf', 'wb') as file:
                    file.write(esg_response.content)
        else:# Kui ei ole, siis võtame href atribuudi väärtuse ja kirjutame aastaaruande kausta
            rep_link = f'{base_link}{pdf_link}'
            print(f'REP link: {rep_link}')
                    
            rep_response = requests.get(rep_link)
            if rep_response:
                match = re.search(r'/reports/([^_/]+/[^_/]+)_', rep_link).group(1).replace('/', '_')
                print(f'Kirjutame aastaaruande pdfi {match} maha')
                with open(f'aruanded/aastaaruanded/rep_{match}.pdf', 'wb') as file:
                    file.write(rep_response.content)
    elif zip_link:
        # Võtame href atribuudi väärtuse, et hiljem ZIP kaustast leida aastaaruanne XHTML formaadis
        rep_link = f'{base_link}{zip_link}'
        print(f'REP link: {rep_link}')
                
        rep_response = requests.get(rep_link)
        if rep_response:
            match = re.search(r'/reports/([^_/]+/[^_/]+)_', rep_link).group(1).replace('/', '_')
            print(f'Kirjutame aastaaruande zipi {match} maha')
            with open(f'alusfailid/rep_{match}.zip', 'wb') as file:
                file.write(rep_response.content)


In [None]:
'''Funktsioon, mis otsib aastaaruande tihendatud kaustast'''
def leia_aruanne(zip_path, target_dir_rep):
    # Kontrollime, kas väljundkaust on loodud
    if not os.path.exists(target_dir_rep):
        os.makedirs(target_dir_rep)
    name = zip_path.split('\\')[1].replace('zip','xhtml')
    with ZipFile(zip_path, 'r') as zip_ref:
        for file_name in zip_ref.namelist():
            # Kontrollime, kas faili laiend on .xhtml 
            if file_name.endswith('.xhtml'):
                with zip_ref.open(file_name) as file:
                    content = file.read()
                    
                new_file_path = os.path.join(target_dir_rep, os.path.basename(name))
                print(new_file_path)
                # Kirjutame sisu uude faili
                with open(new_file_path, 'wb') as new_file:
                    new_file.write(content)
                print(f"Fail {os.path.basename(name)} on kirjutatud kausta {target_dir_rep}.")

In [None]:
target_dir_rep = 'aruanded/aruandedXHTML'

zip_files = glob.glob('alusfailid/*.zip')

for zip in zip_files:
    leia_aruanne(zip,target_dir_rep)

In [None]:
def eralda_tekst_XHTML(xhtml_files, id_ranges):
    for xhtml_file in xhtml_files:
        file_name = xhtml_file.split('\\')[-1].split('.')[0].split('_')[1]
        # Leiame vajaliku osa infost failist ja eraldame teksti
        if file_name in id_ranges:
            start_id, end_id = id_ranges[file_name]

            with open(xhtml_file, 'r', encoding='utf-8') as file:
                content = file.read()

            parsed_html = BeautifulSoup(content, 'lxml')
            capture_text = False
            extracted_text = ""

            for div in parsed_html.find_all('div', id=True):
                if div.get('id') == start_id:
                    capture_text = True
                if capture_text:
                    extracted_text += div.get_text(separator="\n") + "\n\n"
                if div.get('id') == end_id:
                    capture_text = False  
            # Salvesta tekst txt faili
            output_file_path = xhtml_file.replace('.xhtml', '.txt').replace('aruandedXHTML','aastaaruanded')
            with open(output_file_path, 'w', encoding='utf-8') as output_file:
                output_file.write(extracted_text)
            print(f"Tekst failist {file_name}.xhtml märgndite {start_id} ja {end_id} vahel on salvestatud kausta: {output_file_path}")


In [None]:
aruandedXHTML = glob.glob('aruanded/aruandedXHTML/*')

eralda_tekst_XHTML(aruandedXHTML, xhtml_nums)

In [None]:

'''Funktsioon loomaks sisukordade sõnastik'''
def loo_sisukordade_sonastik(paths):
    toc_dict = {}
    for path in paths:
        file_name = path.split('\\')[-1].split('.')[0].split('_')[1]
        doc = fitz.open(path)
        # Mustrid, mide leheküljelt otsitakse
        toc_patterns = ["sisukord", "table of contents", "content", "contents"]

        for page_num in range(len(doc)):
            text_lines = doc[page_num].get_text().splitlines()
            for line in text_lines:
                line = line.strip().lower() 
                for pattern in toc_patterns:
                    if line == pattern.lower():
                        toc_dict[file_name] = page_num
                        break  # Leidsime esimese vastavuse ja katkestame tsükli

            if file_name in toc_dict:  # Kui oleme lehekülje leidnud, ei ole vaja edasi otsida
                break
    return toc_dict

In [None]:
aastaaruanded = glob.glob('aruanded/aastaaruanded/*.pdf')
toc = loo_sisukordade_sonastik(aastaaruanded)
print('sisukorra lk nr:',toc)
print(len(toc))

In [None]:
def leia_lk_numbrid(path, keywords, toc=None):
    doc = fitz.open(path)
    text_output_path = path.replace('.pdf', '.txt')
    trim_start_page = None
    file_name = path.split('\\')[-1].split('.')[0].split('_')[1]
    # Proovime leida, kas meil on leitud sisukord, kust leida lehekülgede vahemik, mida soovime
    nbr = toc.get(file_name)
    if nbr is not None:
        toc_text = doc[nbr].get_text().replace('\n',' ')
        # Otsime vastavat mustrit sisukordadest, et saada soovitud lehekülgede vahemik
        for keyword in keywords:
            pattern = fr"{re.escape(keyword)}\s*(?:\.+\s*)+(\d+)"
            match = re.search(pattern, toc_text, re.IGNORECASE)
            if match:
                trim_start_page = int(match.group(1)) - 1
                break  
    # Kirjutame soovitud leheküljed uude faili tekstina
    with open(text_output_path, "w", encoding="utf-8") as text_file:
        if trim_start_page is not None:
            for page_num in range(trim_start_page):
                page_text = doc[page_num].get_text()
                text_file.write(page_text)

    doc.close()

    return trim_start_page

    

In [None]:
aastaaruanded = glob.glob('aruanded/aastaaruanded/*.pdf')

keywords = ['Konsolideeritud raamatupidamise aastaaruanne',
'Kontserni raamatupidamise aastaaruanne',
'Konsolideerimisgrupi raamatupidamise aastaaruanne',
'RAAMATUPIDAMISE AASTAARUANNE',
'Consolidated and separate financial statements',
'Financial Statements']
cuts = {}
for aruanne in aastaaruanded:
    print()
    print('ARUANNE:',aruanne)
    file_name = aruanne.split('\\')[-1].split('.')[0].split('_')[1]
    cut = leia_lk_numbrid(aruanne,keywords,toc)
    print(cut)
    cuts[file_name] = cut

In [None]:
def trim_problematic_pdf(path,cut):
    doc = fitz.open(path)
    text_output_path = path.replace('.pdf', '.txt')

    with open(text_output_path, "w", encoding="utf-8") as text_file:
        for page_num in range(cut):
            page_text = doc[page_num].get_text()
            text_file.write(page_text)

    doc.close()
    # print(trim_start_page)
    return text_output_path

In [None]:
# Aastaaruanded, mille puhul on raskendatud sisukorrast lehekülgede leidmine, kuna ei ole mustrit, mille järgi otsida
problematic_reports = {
    'egr':89, 
    'hae':87, 
    'inf':14, 
    'lhv':81, 
    'ntu':41, 
    'saf':5, 
    'tel1':134, 
    'tsm':75
}

aastaaruanded = glob.glob('aruanded/aastaaruanded/*.pdf')

for aruanne in aastaaruanded:
    for firm,cut in problematic_reports.items():
        if firm in aruanne:
            trim_problematic_pdf(aruanne,cut)
    

In [None]:
def puhasta_tekst(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()

    text = text.strip()
    
    # Eemaldame üleliigsed tühikud
    text = re.sub(r'\s+', ' ', text)

    # Liidame silbitatud sõnad
    text = re.sub(r'-\s+', '', text)

    # Eemaldame mitteolulised sümbolid gpt mudeli jaoks
    # text = re.sub(r'[^a-zA-Z0-9.,;:!?()"\']+', ' ', text)

    # Kirjutame faili sisu üle
    with open(file_path, 'w', encoding='utf-8') as file:
        file.write(text)
        

In [None]:
aastaaruanded = glob.glob('aruanded/aastaaruanded/*.txt')

for aruanne in aastaaruanded:
    puhasta_tekst(aruanne)

In [None]:
from transformers import GPT2Tokenizer
'''Kood leidmaks tokenite umbkaudne (mudelid on erinevad) arv'''
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

txt_files = glob.glob('aruanded/aastaaruanded/*.txt')
token_counts = {}


for file_path in txt_files:
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    
    cleaned_text = text
    file_name = file_path.split('/')[-1]

    tokens = tokenizer.tokenize(cleaned_text)
    
    token_counts[file_name] = len(tokens)

print(f"Tokenite arv: {token_counts}")

In [None]:
sorted_token_counts = sorted(token_counts.items(), key=lambda x: x[1], reverse=True)
for file_name, token_count in sorted_token_counts:
    print(f"{file_name}: {token_count}")

In [None]:
# esg_data_path = os.getenv('ESG_INPUT')
# esg_data = pd.read_excel(esg_data_path)
# grouped_data = esg_data.groupby(['Topic', 'Question'])['Answers'].apply(list).reset_index()
# order_of_topics = [
#     'Stakeholder engagement and reporting',
#     'Leadership commitment', 
#     'Impact assesment', 
#     'Planning', 
#     'Execution', 
#     'Monitoring', 
#     'Performance improvement', 
#     'Across all topics'
# ]

# grouped_data['Topic'] = pd.Categorical(grouped_data['Topic'], categories=order_of_topics, ordered=True)
# sorted_data = grouped_data.sort_values('Topic')
# sorted_data

In [None]:
# esg_val_path = os.getenv('ESG_VAL')
# esg_val = pd.read_excel(esg_val_path)
# esg_val

In [None]:
def get_answer_text(row, answer_column):
    if pd.notna(row[answer_column]):
        answer_number = int(row[answer_column]) - 1 
        if answer_number >= 0 and answer_number < len(row['Answers']):
            return row['Answers'][answer_number]
    return None

In [None]:
# merged_data = esg_val.merge(sorted_data, on='Question', how='left')
# merged_data['SS Explained'] = None
# merged_data['GPT Explained'] = None
# column_order = ['Company', 'Abbreviation','Topic', 'Question', 'SS Answer', 'SS Explained', 'GPT ANSWER 1','GPT Explained',  'Answers']
# merged_data = merged_data[column_order]

# merged_data['SS Explained'] = merged_data.apply(lambda row: get_answer_text(row, 'SS Answer'), axis=1)

# merged_data.head()