In [32]:
# %%writefile crypto_sql_connection.py
import getpass
import time
import pandas as pd
import sqlalchemy
from binance.client import Client
from binance import BinanceSocketManager
import numpy as np
import snowflake.connector
import sqlite3
import yaml

columns=["OPEN_TIME",
"OPEN",
"HIGH",
"LOW",
"CLOSE",
"VOLUME",
"CLOSE TIME",
"QUOTE_ASSET_VOLUME",
"NUMBER_OF_TRADES",
"TAKER_BUY_BASE_ASSET",
"TAKERY_BUY_QUOTE_ASSET_VOLUME",
"NA",
"COIN"]


def get_klines(client, coin):
    if isinstance(coin, str):
        curr_klines = pd.DataFrame(client.get_klines(symbol=coin, interval="1m", limit=1))
    elif isinstance(coin, list):
        curr_klines = []
        for cn in coin:
            curr_klines += client.get_klines(symbol=cn, interval="1m", limit=1)
        curr_klines = pd.DataFrame(curr_klines)
    curr_klines.columns = columns[:-1]
    curr_klines["COIN"] = coin
    return curr_klines


def generate_df_from_query(query, snowflake_yaml="snowflake_key.yaml", account="kga72450.us-east-1", conn=None):
    if conn is None:
        with open(snowflake_yaml, "r") as stream:
            try:
                sf_key = yaml.safe_load(stream)
            except yaml.YAMLError as exc:
                print(exc)

        sf_user = sf_key["username"]
        sf_passwd = sf_key["passwd"]
        # sf_user = getpass.getpass("Snowflake User: ")
        # sf_passwd = getpass.getpass("Snowflake Passwd: ")

        conn  = snowflake.connector.connect(user=sf_user,
                                           password=sf_passwd,
                                           account=account)

        conn.cursor().execute("USE WAREHOUSE COMPUTE_WH")
        conn.cursor().execute("USE DATABASE CRYPTO")
    else:
        pass
    
    # Saving cursor object to new var
    cur = conn.cursor()

    # Forming our query & executing
    cur.execute(query)

    # Getting query results as dataframe
    return cur.fetch_pandas_all()


# Reading in binance credentials
with open("binance_key.yaml", "r") as stream:
    try:
        key = yaml.safe_load(stream)
    except yaml.YAMLError as exc:
        print(exc)

# api_key = getpass.getpass("API Key: ")
# api_secret = getpass.getpass("API Secret: ")
api_key = key["API_Key"]
api_secret = key["Secret"]
client = Client(api_key, api_secret)
interval = 10# Number of seconds between each request

# Creating SQLite engine
engine = sqlalchemy.create_engine("sqlite:///currentstream.db".format(coin))

# Reading in list of coins for analysis
df_coins = generate_df_from_query("SELECT DISTINCT COIN FROM COINS", snowflake_yaml="snowflake_key.yaml")

# Starting the data collection
start_time = time.time()
count = 0
verbose = False
while True:
    current_klines = pd.concat([get_klines(client, coin) for coin in coins])
    current_klines.to_sql("ALL_COINS", engine, if_exists="append", index=False)        
    time.sleep(interval)
    count += 1
    if verbose:
        print("Elapsed: {}s".format(time.time() - start_time))
        print("Requests made: {}".format(count))