In [1]:
!ls

AAPL  docker-compose.yml  tinkoff  tinkoff.cfg	tinkoff.ipynb


In [2]:
with open('tinkoff', 'r') as f:
    token = f.readline().strip()

In [3]:
ev = None

In [75]:
import asyncio
import tinvest
from datetime import date, datetime, timedelta
import psycopg2
import sqlalchemy
import pandas as pd
from io import StringIO
import csv

In [54]:
login = 'airflow'
password = 'airflow'
host = 'localhost'
port = 5432
schema = 'airflights'

In [67]:
DB_URL = 'user=airflow password=airflow host=localhost port=5432 dbname=airflights'

In [68]:
def get_data_from_table(table_name: str, query: str = None) -> pd.DataFrame:
    """
    Параметры:
    table_name: str - Название таблицы в формате строки

    Функция подключается к БД и забирает данные из таблицы,
    возвращая их в виде pd.DataFrame
    """
    conn = psycopg2.connect(dsn=DB_URL)
    if query is None:
        query = f'SELECT * FROM {table_name}'
    try:
        df = pd.read_sql(query.format(table_name, table_name), conn)
    except:
        df = None
    conn.close()

    return df


def load_data_to_staging(df: pd.DataFrame, table_name: str) -> None:
    """
    Параметры:
    df: pd.DataFrame - Данные для загрузки
    table_name: str - Название таблицы для загрузки данных

    Функция подключается к staging и копирует данные через буффер,
    который представлен в виде CSV-файла
    """
    buffer = StringIO()
    df.to_csv(buffer, index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
              header=False, float_format='%.8f', doublequote=False, escapechar='\\')
    buffer.seek(0)
    copy_query = f"""
                COPY {table_name}({', '.join(df.columns)})
                FROM STDIN
                DELIMITER '|'
                NULL 'NUL'
    """
    conn = psycopg2.connect(dsn=DB_URL)
    with conn.cursor() as cursor:
        cursor.copy_expert(copy_query, buffer)
    conn.commit()
    conn.close()

In [6]:
client = ti.SyncClient(token, use_sandbox=True)

In [8]:
aapl = client.get_market_search_by_ticker('AAPL')

In [12]:
aapl.payload.instruments[0].figi

'BBG000B9XRY4'

In [13]:
ti.schemas.CandleResolution.day

<CandleResolution.day: 'day'>

In [18]:
now = datetime.now()

In [36]:
datetime.today() - timedelta(days=365)

datetime.datetime(2020, 2, 5, 12, 40, 32, 178188)

In [38]:
datetime.strptime(now.strftime('%Y-%m-%d'), '%Y-%m-%d')

datetime.datetime(2021, 2, 4, 0, 0)

In [39]:
now = date.today()

In [69]:
candles = client.get_market_candles(
    aapl.payload.instruments[0].figi,
    datetime.strptime((now - timedelta(days=365)).strftime('%Y-%m-%d'), '%Y-%m-%d'),
    datetime.strptime((now - timedelta(days=1)).strftime('%Y-%m-%d'), '%Y-%m-%d'),
    ti.schemas.CandleResolution.day
)

In [70]:
candles = pd.DataFrame(
    ((candle['time'], candle['c']) for candle in candles.dict()['payload']['candles']),
    columns=('time', 'close_price')
)

In [71]:
load_data_to_staging(candles, 'aapl')

In [99]:
def get_close_price_by_date(figi: str, date_: datetime) -> pd.DataFrame:
    client = tinvest.SyncClient(token, use_sandbox=True)

    candle = client.get_market_candles(
        figi,
        datetime.strptime((date_ - timedelta(days=1)).strftime('%Y-%m-%d'), '%Y-%m-%d'),
        datetime.strptime(date_.strftime('%Y-%m-%d'), '%Y-%m-%d'),
        tinvest.schemas.CandleResolution.day
    ).dict()['payload']['candles'][-1]
    
    return pd.DataFrame(
        [[
            candle['time'],
            candle['c']
        ]],
        columns=('time', 'close_price')
    )

In [100]:
get_close_price_by_date('BBG000B9XRY4', datetime.now())

Unnamed: 0,time,close_price
0,2021-02-03 07:00:00+00:00,133.94


In [6]:



async def main():
    global ev
    async with ti.Streaming(token) as streaming:
        await streaming.candle.subscribe('BBG000B9XRY4', ti.CandleResolution.min1)
#         await streaming.orderbook.subscribe('BBG0013HGFT4', 5)
#         await streaming.instrument_info.subscribe('BBG0013HGFT4')
        async for event in streaming:
            ev = event
            print(event)


await main()

event=<Event.candle: 'candle'> time=datetime.datetime(2021, 1, 27, 11, 8, 18, 685791, tzinfo=datetime.timezone.utc) payload=Candle(c=Decimal('144.47'), figi='BBG000B9XRY4', h=Decimal('144.47'), interval=<CandleResolution.min1: '1min'>, l=Decimal('144.36'), o=Decimal('144.41'), time=datetime.datetime(2021, 1, 27, 11, 8, tzinfo=datetime.timezone.utc), v=86)
event=<Event.candle: 'candle'> time=datetime.datetime(2021, 1, 27, 11, 8, 28, 548646, tzinfo=datetime.timezone.utc) payload=Candle(c=Decimal('144.44'), figi='BBG000B9XRY4', h=Decimal('144.47'), interval=<CandleResolution.min1: '1min'>, l=Decimal('144.36'), o=Decimal('144.41'), time=datetime.datetime(2021, 1, 27, 11, 8, tzinfo=datetime.timezone.utc), v=87)
event=<Event.candle: 'candle'> time=datetime.datetime(2021, 1, 27, 11, 8, 29, 905483, tzinfo=datetime.timezone.utc) payload=Candle(c=Decimal('144.44'), figi='BBG000B9XRY4', h=Decimal('144.47'), interval=<CandleResolution.min1: '1min'>, l=Decimal('144.36'), o=Decimal('144.41'), time=d

CancelledError: 

In [7]:
ev.json()

'{"event": "candle", "time": "2021-01-27T11:08:33.419956+00:00", "payload": {"c": 144.44, "figi": "BBG000B9XRY4", "h": 144.47, "interval": "1min", "l": 144.36, "o": 144.41, "time": "2021-01-27T11:08:00+00:00", "v": 92}}'

In [38]:
cl = None

In [50]:
async def main():
    global cl
    client = ti.AsyncClient(token, use_sandbox=True)
    
    response = await client.get_market_search_by_ticker("AAPL")  # tinvest.PortfolioResponse
    print(response.payload.instruments[0].figi)

    await client.close()

await main()

BBG000B9XRY4


In [45]:
await cl.get_market_search_by_ticker("AAPL")

RuntimeError: Session is closed