# Получение данных из разных источников и хранение в разных БД

Задачи:

1. [+] Обработка YAML-файла настроек: проверка, создание, чтение.
2. [+] Скачивание архива Quandl.WIKI и парсинг.
3. [+] Класс для работы с HDF5.
4. [+] Запись цен в БД, обновление и чтение.
5. [+] Класс работы с IEX.
6. [-] Класс работы с MySQL.
7. [-] Класс работы с PostgreSQL.

In [1]:
import os.path
import io
import yaml
import numpy as np
import pandas as pd
import tqdm
import aiohttp
import asyncio
import zipfile
from datetime import date


# Настройки

In [2]:
class Config(object):
    
    filename = None
    settings = {}
    def __init__(self, filename='config.yaml', edit=False):
        self.filename = filename
        
        # file exists
        if not os.path.isfile(self.filename) or edit:
            self.create_config()
        else:
            with open(self.filename, 'r') as f:
                self.settings = yaml.load(f)
                print('Settings loaded.')
                
        print(self.settings)
                
    def create_config(self):
        options = {
            'quandl-api': 'Your API key in Quandl: ',
            'quandl-full-zip': 'Filename in which download prices from Quandl (quandl-full.zip): ',
            'hdf-storage': 'Filename of your HDF5-storage (storage.h5): ',
        }
        print('Fill settings, please.')
        for n, desc in options.items():
            self.settings[n] = input(desc)
        
        if not self.settings['quandl-full-zip']:
            self.settings['quandl-full-zip'] = 'quandl-full.zip'
            
        if not self.settings['hdf-storage']:
            self.settings['hdf-storage'] = 'storage.h5'
        
        with open(self.filename, 'w') as f:
            yaml.dump(self.settings, f)
            print('Settings saved in {0}.'.format(self.filename))
            

In [3]:
config = Config()

Settings loaded.
{'quandl-api': 'Mco3q1YVma1jfgbqbHqo', 'quandl-full-zip': 'quandl-full.zip', 'hdf-storage': 'storage.h5'}


# Хранилище HDF5

In [4]:
class StorageHDF(object):
    settings = None
    prefix = ''
    
    def __init__(self, settings, prefix='', debug=False):
        self.settings = settings
        
        self.hdf = pd.HDFStore(self.settings['hdf-storage'])
        self.prefix = prefix
        self.debug = debug
        
    def close(self):
        self.hdf.close()
    
    def save(self, key, data):
        if data.empty:
            if self.debug:
                print('Income data is empty.')
            return
                
        db_key = self.prefix + '/' + key
        
        self.hdf.put(db_key, data, format='table', data_columns=True)
    
    def save_symbols(self, data):
        db_key = self.prefix + '/symbols'
        self.hdf.put(db_key, pd.Series(data), format='table', data_columns=True)
    
    def get_symbols(self):
        db_key = self.prefix + '/symbols'
        return self.hdf[db_key].values
    
    def read(self, symbols=[], start=None, end=date.today()):
        data = {}
        if not len(symbols):
            return data
        else:
            for s in symbols:
                db_key = self.prefix + '/' + s
                
                if db_key in self.hdf:
                    data[s] = self.hdf[db_key]
        return data
    
    def drop(self, key):
        db_key = self.prefix + '/' + key
        if db_key in self.hdf:
            del self.hdf[db_key]
            
    def close(self):
        self.hdf.close()


# Скачиваем цены с Quandl

In [5]:
class QuandlBundle(object):
    loop = None
    settings = {}
    def __init__(self, settings, loop=None):
        self.settings = settings
        self.loop = loop
        
        if not self.loop:
            self.loop = asyncio.get_event_loop()
            
    async def download(self, url, filename=None):
        print('download', url, filename)        
        
        async with aiohttp.ClientSession() as ses:
            async with ses.get(url) as resp:
                if resp.status < 300:
                    if filename:
                        with open(filename, 'wb') as fd:
                            total_size = int(resp.headers.get('content-length', 0));
                            chunk_size = 4096
                            bar = tqdm.tqdm(total=total_size)
                            while True:
                                chunk = await resp.content.read(chunk_size)                                
                                if not chunk:
                                    break
                                fd.write(chunk)
                                bar.update(chunk_size)
                            bar.close()
                        return True
                    else:
                        return await resp.json()
     
    async def _async_get_full(self):
        # download urls
        url = 'https://www.quandl.com/api/v3/datatables/WIKI/PRICES/delta.json?api_key={0}'.format(
            self.settings['quandl-api'])
        urls = await self.download(url)

        if urls['data']['latest_full_data']:
            url = urls['data']['latest_full_data']['full_data']
            if not await self.download(url, self.settings['quandl-full-zip']):
                print('Error: Data was not downloaded.')
            else:
                print('Data succesfully downloaded to {0}.'.format(self.settings['quandl-full-zip']))

    def get_full(self):       
        self.loop.run_until_complete(self._async_get_full())

    def parse_to_dataframe(self):
        fn = self.settings['quandl-full-zip']
        # file exists
        if not os.path.isfile(fn):
            self.get_full()
        
        with zipfile.ZipFile(fn) as opened_zip:
            with opened_zip.open(opened_zip.namelist()[0]) as prices_file:
                df = pd.read_csv(io.TextIOWrapper(prices_file))

        cols = ['ticker', 'date', 'open', 'high', 'low', 'close', 'volume', 'adj_close']
        return df[cols]
    
    def save_to_db(self, storage, limit=None):
        df = self.parse_to_dataframe()
        
        storage.save_symbols(df['ticker'].unique())
        for s in tqdm.tqdm(df['ticker'].unique()):
            fltr = df['ticker'] == s    
            storage.save(s, df[fltr].set_index('date'))
            
            if limit is not None:
                limit -= 1
                if limit < 0:
                    break

In [6]:
qb = QuandlBundle(config.settings)
#qb.get_full()
#df = qb.parse_to_dataframe()
storage = StorageHDF(config.settings, prefix='Quandl', debug=True)
qb.save_to_db(storage, limit=100)

  3%|▎         | 100/3190 [02:00<1:02:09,  1.21s/it]

In [7]:
symbols = storage.get_symbols()[:2]

data = storage.read(symbols=symbols)
symbols, data[symbols[0]].tail()

(array(['A', 'AA'], dtype=object),
            ticker   open   high    low  close     volume  adj_close
 date                                                               
 2017-07-17      A  61.60  61.84  61.40  61.55  1332749.0      61.55
 2017-07-18      A  61.43  61.54  60.95  61.21  1520053.0      61.21
 2017-07-19      A  61.32  61.64  61.17  61.38  1509141.0      61.38
 2017-07-20      A  61.45  61.55  60.81  60.98  1925629.0      60.98
 2017-07-21      A  60.83  61.60  60.76  61.37  1275630.0      61.37)

# Скачиваем цены с IEX

API: https://www.iextrading.com/developer/docs/

Symbols: https://api.iextrading.com/1.0/ref-data/symbols

5-years prices: https://api.iextrading.com/1.0/stock/aapl/chart/5y

In [6]:
class IEXPrices(object):
    loop = None
    settings = {}
    limit = None
    symbols = None
    data = {}
    def __init__(self, settings, loop=None):
        self.settings = settings
        self.loop = loop
        
        if not self.loop:
            self.loop = asyncio.get_event_loop()
            
    async def download(self, url):
        #print('download', url)        
        
        async with aiohttp.ClientSession() as ses:
            async with ses.get(url) as resp:
                if resp.status < 300:
                    return await resp.json()
     
    async def _async_get(self):
        # download symbols
        url = 'https://api.iextrading.com/1.0/ref-data/symbols'
        self.symbols = pd.DataFrame(await self.download(url))

        cols = ['date', 'open', 'high', 'low', 'close', 'volume', 'unadjustedClose']
        
        if len(self.symbols):
            for s in tqdm.tqdm(self.symbols['symbol'].values):
                url = 'https://api.iextrading.com/1.0/stock/{0}/chart/5y'.format(s)
                self.data[s] = pd.DataFrame(await self.download(url))[cols]
                
                if self.limit is not None:
                    self.limit -= 1
                    if self.limit < 0:
                        break

    def get_prices(self):       
        self.loop.run_until_complete(self._async_get())
        
        return self.data

    def save_to_db(self, storage, limit=None):
        self.limit = limit
        self.get_prices()
        
        storage.save_symbols(list(self.data.keys()))
        for s in tqdm.tqdm(self.data.keys()):            
            storage.save(s, self.data[s].set_index('date'))            
            

In [7]:
iex = IEXPrices(config.settings)

storage = StorageHDF(config.settings, prefix='IEX', debug=True)
iex.save_to_db(storage, limit=10)

  0%|          | 10/8440 [00:16<4:36:30,  1.97s/it]
  0%|          | 0/11 [00:00<?, ?it/s][A
 18%|█▊        | 2/11 [00:00<00:00, 16.84it/s][A
 36%|███▋      | 4/11 [00:00<00:00, 16.59it/s][A
 55%|█████▍    | 6/11 [00:00<00:00, 16.32it/s][A
 73%|███████▎  | 8/11 [00:00<00:00, 16.17it/s][A
 91%|█████████ | 10/11 [00:00<00:00, 15.83it/s][A
100%|██████████| 11/11 [00:00<00:00, 15.89it/s][A

In [9]:
storage.get_symbols()
storage.read(symbols=['AAL'])['AAL'].tail(5)

Unnamed: 0_level_0,open,high,low,close,volume,unadjustedClose
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2017-07-17,54.21,54.28,53.85,53.87,3727804,53.87
2017-07-18,53.83,53.84,53.02,53.15,4101431,53.15
2017-07-19,52.26,53.19,51.78,52.61,5774713,52.61
2017-07-20,52.72,52.78,52.1,52.34,4836234,52.34
2017-07-21,52.13,52.55,51.45,51.91,4544423,51.91
