In [1]:
import json
from  urllib import request
from time import time, sleep
from datetime import datetime
from itertools import islice
import pandas as pd

import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from sqlalchemy import create_engine

import asyncio, asyncpg
from asyncpg import create_pool

  """)


# Creating new database

In [2]:
db = 'company_test'
try:
    con = psycopg2.connect(
        dbname = 'postgres', user = 'postgres', host = 'localhost', password = 'qmzpqm'
    )
    con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # <-- ADD THIS LINE
    cur = con.cursor()
    cur.execute('CREATE DATABASE {};'.format(db))
except psycopg2.Error:
    print('database already exist or there is a problem with connection')
    
try:
    con.close()
except psycopg2.Error:
    print('connection is already closed')

database already exist or there is a problem with connection


# Getting data and writing to PostgreSQL

In [3]:
#modifying data from dict to pandas dataframe
def dict_to_df(data):
    symbol = data['Meta Data']['2. Symbol']
    ts = data['Time Series (Daily)']
    df = pd.DataFrame(columns = ['symbol', 'date', 'open', 'high', 'low', 'close', 'volume'])
    for date in ts.keys():
        tmp = {
            'symbol': symbol,
            'date': date
        }
        for key in ts[date].keys():
            tmp[key[3:]] = (int if key[3:] == 'volume' else float)(ts[date][key])
        df = df.append(pd.DataFrame(tmp, index = [0]), ignore_index = True)
    df['date'] = pd.to_datetime(df['date']).dt.date
    return df

#inserting data to table
def insert_data_to_postgres(
    df, table = 'test1', schema = 'public', db = 'company_test'
):
    """
    here we don't need to specify any additional details such as dtype in to_sql
    """
    #deleting data to preventerror related with existing keys
    con = psycopg2.connect(
        dbname = db, user = 'postgres', host = 'localhost', password = 'qmzpqm'
    )
    cur = con.cursor()
    date_min = str(df['date'].min())
    date_max = str(df['date'].max())
    symbol = df['symbol'][0]
    cond = "symbol = '{}' AND date >= '{}' AND date <= '{}'".format(symbol, date_min, date_max)
    cur.execute('DELETE FROM test1 WHERE {}'.format(cond))
    # get the number of updated rows
    rows_deleted = cur.rowcount 
    con.commit()
    cur.close()
    con.close()
    #adding data
    engine = create_engine('postgresql://postgres:qmzpqm@localhost:5432/' + db)
    df.to_sql(table, engine, index = False, schema = schema, if_exists = 'append')

In [4]:
%%time
"""
I download daily stock data for Microsoft
Data is represented as JSON fine
After downloading it is stored in dict

This company has limit 5 requests per minute. That's why I sleep for a minute.
Actually in real situation it makes sense to validate format of data
"""
api_key = 'PDSA0HO9D4NKLBJM'
symbols = ['AAL', 'AAPL', 'ADBE', 'ADI', 'ADP', 'ADSK', 'ALGN', 'ALXN', 'AMAT', 'AMGN',
           'AMZN', 'ATVI', 'ASML', 'AVGO', 'BIDU', 'BIIB', 'BMRN', 'CDNS', 'CELG', 'MSFT']
for symbol in symbols:
    url_text = 'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={}&apikey={}'.format(symbol, api_key)
    while True:
        with request.urlopen(url_text) as url:
            data = json.loads(url.read().decode())
        if 'Meta Data' in data:
            break
        sleep(60)
    df = dict_to_df(data)
    insert_data_to_postgres(df)

CPU times: user 4.01 s, sys: 72.7 ms, total: 4.09 s
Wall time: 3min 39s


# Getting data from PostgreDB

## 1. psycopg2

In [5]:
%%time
try:
    conn = psycopg2.connect(
        database = db, user = 'postgres', password = 'qmzpqm', host = 'localhost', port = 5432
    )
    query = """
        SELECT *
        FROM test1
        """
    df = pd.read_sql(query, conn)
    conn.close()
    print('downloaded')
except psycopg2.Error:
    raise SystemExit(
        'Failed to setup Postgres environment.\n{0}'.format(sys.exc_info())
    ) 

downloaded
CPU times: user 12.9 ms, sys: 136 µs, total: 13 ms
Wall time: 28.2 ms


## 2. asyncpg (NO pool connections)

In [6]:
"""
without it we will receive 
"RuntimeError: This event loop is already running"
"""
import nest_asyncio
nest_asyncio.apply()

In [7]:
%%time
async def run():
    conn = await asyncpg.connect(
        user = 'postgres', password = 'qmzpqm', database = db, host = 'localhost'
    )
    values = await conn.fetch("""SELECT * FROM test1""")
    #print(values)
    await conn.close()
    return values

loop = asyncio.get_event_loop()
loop.run_until_complete(run())
#df = pd.DataFrame(list(map(dict, loop.run_until_complete(run()))))

CPU times: user 5.02 ms, sys: 148 µs, total: 5.17 ms
Wall time: 8.38 ms


## 3. asyncpg (pool connections)

In [8]:
def get_pool(db = 'company_test'):
    return create_pool(
        'postgresql://postgres:qmzpqm@localhost:5432/' + db,
        min_size = 5, max_size = 10, command_timeout = 60,
        init = lambda conn: conn.set_type_codec(
            'jsonb',
            encoder = json.dumps,
            decoder = json.loads,
            schema = 'pg_catalog',
        ),
    )

async def stream_tasks(tasks, batch_size = 5):
    while True:
        batch = list(islice(tasks, batch_size))
        if not batch:
            break
        for item in await asyncio.gather(*batch):
            yield item

async def one(pool, symbol):
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            """select *
               from test1
               where symbol = $1
            """,
            symbol,
        )   
    return rows

async def main():
    async with get_pool() as pool:
        tasks = (one(pool, symbol) for symbol in symbols)
        async for items in stream_tasks(tasks):
            for item in items:
                print(item)

In [9]:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

<Record symbol='AAL' date=datetime.date(2019, 1, 11) open=31.8 high=31.989 low=31.1 close=31.8 volume=6899555>
<Record symbol='AAL' date=datetime.date(2019, 1, 10) open=30.62 high=32.21 low=29.78 close=32.04 volume=19726307>
<Record symbol='AAL' date=datetime.date(2019, 1, 9) open=32.42 high=33.55 low=32.3 close=33.42 volume=6179397>
<Record symbol='AAL' date=datetime.date(2019, 1, 8) open=33.36 high=33.65 low=32.2 close=32.42 volume=7107729>
<Record symbol='AAL' date=datetime.date(2019, 1, 7) open=31.99 high=33.4804 low=31.24 close=32.95 volume=8010692>
<Record symbol='AAL' date=datetime.date(2019, 1, 4) open=30.44 high=32.09 low=30.4 close=32.04 volume=9369633>
<Record symbol='AAL' date=datetime.date(2019, 1, 3) open=31.69 high=31.8535 low=28.81 close=30.06 volume=16821991>
<Record symbol='AAL' date=datetime.date(2019, 1, 2) open=31.46 high=32.65 low=31.05 close=32.48 volume=5229460>
<Record symbol='AAL' date=datetime.date(2018, 12, 31) open=32.09 high=32.62 low=31.68 close=32.11 vol

<Record symbol='BIIB' date=datetime.date(2019, 1, 11) open=329.4 high=333.57 low=327.83 close=333.21 volume=1317277>
<Record symbol='BIIB' date=datetime.date(2019, 1, 10) open=325.62 high=331.37 low=324.2 close=331.19 volume=845819>
<Record symbol='BIIB' date=datetime.date(2019, 1, 9) open=325.76 high=330.7 low=324.46 close=326.63 volume=1223612>
<Record symbol='BIIB' date=datetime.date(2019, 1, 8) open=324.53 high=325.9 low=320.47 close=324.44 volume=1008127>
<Record symbol='BIIB' date=datetime.date(2019, 1, 7) open=319.28 high=327.26 low=317.41 close=322.4 volume=1316546>
<Record symbol='BIIB' date=datetime.date(2019, 1, 4) open=311.8 high=323.63 low=309.9 close=318.33 volume=1532580>
<Record symbol='BIIB' date=datetime.date(2019, 1, 3) open=306.76 high=316.69 low=305.92 close=307.0 volume=2148636>
<Record symbol='BIIB' date=datetime.date(2019, 1, 2) open=296.84 high=307.13 low=296.54 close=304.69 volume=1336591>
<Record symbol='BIIB' date=datetime.date(2018, 12, 31) open=294.56 high

<Record symbol='MSFT' date=datetime.date(2018, 11, 16) open=107.08 high=108.88 low=106.8 close=108.29 volume=33502121>
<Record symbol='MSFT' date=datetime.date(2018, 11, 15) open=104.99 high=107.8 low=103.91 close=107.28 volume=38505165>
<Record symbol='MSFT' date=datetime.date(2018, 11, 14) open=108.1 high=108.26 low=104.47 close=104.97 volume=39495141>
<Record symbol='MSFT' date=datetime.date(2018, 11, 13) open=107.55 high=108.74 low=106.64 close=106.94 volume=35374583>
<Record symbol='MSFT' date=datetime.date(2018, 11, 12) open=109.42 high=109.96 low=106.1 close=106.87 volume=33621807>
<Record symbol='MSFT' date=datetime.date(2018, 11, 9) open=110.85 high=111.45 low=108.76 close=109.57 volume=32039223>
<Record symbol='MSFT' date=datetime.date(2018, 11, 8) open=111.8 high=112.21 low=110.91 close=111.75 volume=25644105>
<Record symbol='MSFT' date=datetime.date(2018, 11, 7) open=109.44 high=112.24 low=109.4 close=111.96 volume=37901704>
<Record symbol='MSFT' date=datetime.date(2018, 11

# TASK: reading from Postgres and writing to Redis every 5 seconds

In [10]:
import redis
r = redis.StrictRedis(host = 'localhost', password = 'qmzpqm', port = 6379, db = 0)

In [11]:
""" 
I save to sorted sets
Key is symbol:date
"""
def to_redis(pg_element):
    key = '{}:{}'.format(pg_element['symbol'], pg_element['date'])
    r.zremrangebyscore(key, 0, 4)
    values = {
        str(pg_element['open']): 0,
        str(pg_element['high']): 1,
        str(pg_element['low']): 2,
        str(pg_element['close']): 3,
        str(pg_element['volume']): 4,
    }
    r.zadd(key, values)

In [None]:
while True:
    print('program is running: {}'.format(str(datetime.now().time())[:8]))
    async with get_pool() as pool:
        tasks = (one(pool, symbol) for symbol in symbols)
        async for data in stream_tasks(tasks):
            for element in data:
                to_redis(element)
        await pool.close()
    print('program is paused: {}'.format(str(datetime.now().time())[:8]))
    sleep(5)
    print()

programm is running: 00:42:06
programm is paused: 00:42:06

programm is running: 00:42:11
programm is paused: 00:42:11

programm is running: 00:42:16
programm is paused: 00:42:17

programm is running: 00:42:22
programm is paused: 00:42:22
