### Сборка

In [1]:
! pip install wget
! pip install requests_html

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from pathlib import Path
import wget
from bs4 import BeautifulSoup 
from requests_html import HTMLSession, AsyncHTMLSession

import re, os

from time import sleep
from tqdm import tqdm

import ipywidgets as widgets
from IPython.display import display
from ipywidgets import Button, HBox, VBox
import requests

In [3]:
# метанстройки скрипта. 
base_url = 'https://www.finam.ru/profile/moex-akcii/gazprom/export/' #путь к странице с формой
temp_dir = Path('./__temp__') # папка для временных файлов (не удаляется сама)
sleep_time = 0.5 # перерыв между двумя скачиваниями.

In [4]:
temp_dir.mkdir(exist_ok=True, parents=True)
html_path = temp_dir/'export.html'
icharts_path = temp_dir/'icharts.js'
if html_path.exists(): os.unlink(html_path)

In [5]:
wget.download(base_url, out=str(html_path))

with open(html_path,'r', encoding='cp1251') as f:
    html_data = f.read()
    
for line in html_data.split('\n'):
    if line.find(icharts_path.name) > 0:
        print(line)
        for piece in line.split('"'):
            if piece.find(icharts_path.name) > 0:
                if piece[0] == '/': piece = piece[1:]
                print(piece)
                charts_rel_url = piece
                break
        break
            
parsed_url = wget.urlparse.urlparse(base_url)
site_name = parsed_url.scheme+'://'+parsed_url.netloc
if icharts_path.exists(): os.unlink(icharts_path)

wget.download('/'.join([site_name, charts_rel_url]), out=str(icharts_path))

		<script src="/cache/N72Hgd54/icharts/icharts.js" type="text/javascript"></script>
cache/N72Hgd54/icharts/icharts.js


'__temp__/icharts.js'

In [6]:
with open(icharts_path,'r', encoding='cp1251') as f:
    js_to_dict = '''dict('''
    for line in f.readlines():     
        if line[:3] == 'var':
            if line.startswith('var aEmitentUrls'):
                aEmitentUrls = dict()
                pairs = [pair.split(':') for pair in line[21:-2].split(',')]
                for p in pairs:
                    if len(p) == 2:
                        p[0] = p[0].replace('"', '').replace("'", '')
                        p[1] = p[1].replace('"', '').replace("'", '').replace(" ", '')
                        aEmitentUrls[p[0]] = p[1]
                    else:
                        print('!!!!', p)
            else:
                js_to_dict =js_to_dict+line[4:-2]+', '
            
    js_to_dict =  js_to_dict +')'
    js_to_dict = js_to_dict.replace('""', "''")
    js_to_dict = js_to_dict.replace('"', '')
    js_dict = eval(js_to_dict)
    
    js_dict['aEmitentUrls'] = aEmitentUrls

In [7]:
emitents_dict = {}
for i, emId in enumerate(js_dict['aEmitentIds']):
    preudo_emId = emId
    while preudo_emId in emitents_dict:
        preudo_emId += 0.01
        
    emitents_dict[preudo_emId] = dict()
    emitents_dict[preudo_emId]['emId'] = emId
    emitents_dict[preudo_emId]['name'] = js_dict['aEmitentNames'][i]
    emitents_dict[preudo_emId]['code'] = js_dict['aEmitentCodes'][i]
    emitents_dict[preudo_emId]['market'] = js_dict['aEmitentMarkets'][i]
    emitents_dict[preudo_emId]['decp'] = js_dict['aEmitentDecp'][emId] if emId in js_dict['aEmitentDecp'] else None
    emitents_dict[preudo_emId]['child'] = js_dict['aEmitentChild'][i]
    
    sId = str(emId)
    emitents_dict[preudo_emId]['url'] = js_dict['aEmitentUrls'][sId] if sId in js_dict['aEmitentUrls'] else None

### Parse HTML

In [8]:
class link_maker:
    def __init__(self):
        self.pref="https://export.finam.ru/export9.out?"
        
        self.ordered_fields_names = ["market", "em", "token", "code", "apply", "df", "mf", "yf", "from",
                     "dt", "mt", "yt", "to", "p", "f", "e", "cn", "dtf", "tmf", "MSOR", 
                     "mstime", "mstimever", "sep",  "sep2", "datf", "at", "fsp"]
        
        self.fields_description = {
         "market" : "Рынок",
         "apply": "Какой-то параметр",
         "dt" : "конец диапазона (день)",
         "mt" : "конец диапазона (месяц)",
         "yt" : "конец диапазона (год)",
         "df" : "начало диапазона (день)",
         "mf" : "начало диапазона (месяц)",
         "yf" : "начало диапазона (год)",
         "p" : "интервал измерения",
         "e" : "формат файла",
         #"cn" : "имя контракта",
         "dtf" : "формат даты",
         "tmf" : "формат времени",
         "MSOR" : "выдавать время (да - окончан. свечи, нет - начало свечи)",
         "mstime" : "московское время",
         "mstimever" : "московское время",
        "sep" : "Разделитель полей", 
        "sep2" : "Разделитель разрядов",
        "datf" : "Формат записи в файл",
        "at" : "Добавить заголовок файла",
        "fsp" : "Заполнять периоды без сделок",
        }
        
        self.market_info_row = 'Finam.IssuerProfile.Main.setMarkets'
    
    def extract_fields(self, html_data):
        soup = BeautifulSoup(html_data, 'html.parser')
        match = soup.find("div", {"id" : "issuer-profile-export-form"})
        soup = BeautifulSoup(str(match), 'html.parser')

        attr_values = dict()
        text_fields = list()
        
        for attr_name in self.fields_description:
            attr_values[attr_name] = dict()

            attr_blocks = [m for m in soup.find_all("td") if str(m).find(f'name="{attr_name}"')> 0]

            ischeckbox = False


            if len(attr_blocks) >= 1: 

                for attr_block in attr_blocks:
                    attr_block = str(attr_block)

                    if attr_block.find('type="checkbox"') > 0 or attr_block.find('checked="checked"') > 0:
                        ischeckbox = True

                    for line in attr_block.splitlines():
                        finds = re.findall('.*value="(.*)".*>(.*)<', line)
                        if len(finds) == 1:
                            attr_values[attr_name][finds[0][1]] = finds[0][0]
                        elif len(finds) > 1:
                            raise NameError()
                            
                        if line.find('type="text"') > 0:
                            text_fields.append(attr_name)

                if ischeckbox and len(attr_values[attr_name]) == 1:
                    attr_values[attr_name] = {'да' : 1, 'нет' : 0}

            elif len(attr_blocks) == 0:
                line = str([m for m in soup.find_all("input") if str(m).find(f'name="{attr_name}"')> 0][0])
                if line.find('type="hidden"') > 0:
                    value = re.findall('.*value="(.*)".*', line)[0]
                    attr_values[attr_name] = {'hidden_value' : value}

        #markets info:
        for line in html_data.splitlines():
            if line.startswith(self.market_info_row ):
                attr_values["market"] = dict()
                line = line[len(self.market_info_row)+1:-2]
                line = line.replace('value', '"value"').replace('title', '"title"')
                for pair in eval(line):
                    attr_values["market"][pair['title']] = pair['value']
                break

        self.fields_values = attr_values
        self.text_fields = text_fields


    def get_link(self, market, em, code, apply, df, mf, yf, dt, mt, yt, p, f, e, cn, dtf, tmf, MSOR, mstime, mstimever, sep, sep2, datf, at, fsp):
     
        fields_dict =dict(market=market, em=em, code=code, apply=apply, 

                          df=df, mf=str(int(mf)-1), yf=yf,
                          dt=dt, mt=str(int(mt)-1), yt=yt,

                          p=p, f=f, e=e, cn=cn, 

                          dtf=dtf, tmf=tmf, MSOR=MSOR, mstime=mstime, mstimever=mstimever,

                          sep=sep, sep2=sep2, datf=datf, at=1, fsp = fsp)

        fields_dict['token'] = ""
        fields_dict['from'] = '.'.join(map(str, [fields_dict['df'], ("0" if int(fields_dict['mf'])<10 else "") +  str(int(fields_dict['mf'])+1), fields_dict['yf']]))
        fields_dict['to'] = '.'.join(map(str, [fields_dict['dt'], ("0" if int(fields_dict['mt'])<10 else "") +  str(int(fields_dict['mt'])+1), fields_dict['yt']]))




        return self.pref + '&'.join([str(n)+"="+str(fields_dict[n]) for n in self.ordered_fields_names])

In [9]:
lm = link_maker()

In [10]:
lm.extract_fields(html_data)

### Draw Interface

In [11]:
fields_values = dict()
rows = list()
for field in lm.fields_values:
    if field not in lm.text_fields:
        field_drop_down = widgets.Dropdown(options=[(flabel,lm.fields_values[field][flabel]) for flabel in lm.fields_values[field]])
        field_descr = widgets.Label("[{:^31}]".format(lm.fields_description[field]).replace(' ', '_'))
        rows.append(HBox([field_descr, field_drop_down]))
        
        fields_values[field] = field_drop_down
        
for field in lm.fields_values:
    
    max_val = {'df' : 31 , 'mf' : 12, 'yf' : 2022, 'dt':31, 'mt':12, 'yt' :2022}
    
    if field in lm.text_fields:
        field_text = widgets.Text(size=10) #options=[(flabel,lm.fields_values[field][flabel]) for flabel in lm.fields_values[field]])
        field_descr = widgets.Label("[{:^31}]".format(lm.fields_description[field]).replace(' ', '_'))
        rows.append(HBox([field_descr, field_text]))
        
        fields_values[field] = field_text

rows.append(widgets.Label('_'*90))

result_dir = widgets.Text(size=10)
rows.append(HBox([widgets.Label('куда положить результат?'), result_dir]))

download_button = widgets.Button(description='Скачать')
rows.append(download_button)
label = widgets.Label('Пусто')
rows.append(label)

def get_field_values():
    res = dict()
    for f in fields_values:
        res[f] = fields_values[f].value
    
    return res

### Try to download multiple stocs

In [12]:
def download_file(url, file_path):
    # NOTE the stream=True parameter below
    with requests.get(url, stream=True, headers={
                                    "User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:96.0) Gecko/20100101 Firefox/96.0",
                                    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
                                                "Accept-Language": "en-US,en;q=0.5",
                                                "Accept-Encoding": "gzip, deflate, br",
                                                "Connection": "keep-alive",
                                                "Upgrade-Insecure-Requests": "1",
                                                "Sec-Fetch-Dest": "document",
                                                "Sec-Fetch-Mode": "navigate",
                                                "Sec-Fetch-Site": "cross-site",
                                                "If-Modified-Since": "Wed, 19 Jan 2022 16:13:15 GMT",
                                                "Cache-Control": "max-age=0"}) as r:
        r.raise_for_status()
        with open(file_path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192): 
                # If you have chunk encoded response uncomment if
                # and set chunk_size parameter to None.
                #if chunk: 
                f.write(chunk)
    return file_path

def download_data(obj):
    
        label.value= "Подгатавливаем ссылки"
        
        args = get_field_values()
        
        links_and_files_dicts = dict()
        
        emitents_for_market = [em for em in emitents_dict if str(emitents_dict[em]['market']) == str(args['market'])]
        
        res_dir = Path(result_dir.value)/([m for m in lm.fields_values['market'] if str(lm.fields_values['market'][m]) == str(args['market'])][0]\
                                          + '_' + '_'.join([args['df'], args['mf'],args['yf'],args['dt'], args['mt'],args['yt']]))
        
        
        for emId in emitents_for_market:
            #print(emitents_dict[emId])
            args['em'] = emitents_dict[emId]['emId']
            args['code'] = emitents_dict[emId]['code']
            args['cn'] = emitents_dict[emId]['code']
            args['f'] = emitents_dict[emId]['name'].lower() + '_' + emitents_dict[emId]['code'].lower() + '_'\
                        +  str(emitents_dict[emId]['decp']) + '_' +  str(emitents_dict[emId]['child'])+ '_'\
                        +  emitents_dict[emId]['url'].replace('/', '_')+ args['e']
            
        
            links_and_files_dicts[Path(res_dir)/args['f']] = lm.get_link(**args)

        label.value= f"Загружено {0}/{len(links_and_files_dicts)}"
        for i, ln in enumerate(links_and_files_dicts):
            
            
            ln.parent.mkdir(exist_ok=True, parents=True)
            
            download_file(links_and_files_dicts[ln], ln)
            sleep(sleep_time)
            
            label.value= f"Загружено {i+1}/{len(links_and_files_dicts)}"
        
        return links_and_files_dicts


download_button.on_click(download_data)

In [31]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [32]:
! touch "/content/drive/MyDrive/"

In [33]:
import os

In [13]:
VBox(rows)

VBox(children=(HBox(children=(Label(value='[_____________Рынок_____________]'), Dropdown(options=(('Акции Герм…

In [35]:
download_data(1)

ValueError: ignored