In [None]:
import os
import datetime
import logging
import pandas as pd
from dateutil import parser
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score
import numpy as np
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.stattools import acf, pacf

load_dotenv()

logging.basicConfig(level=logging.INFO)

def get_hist_klines(conn, symbol, table, client_interval, client): 
    """ Return the symbol's historical klines data from Binance
    Parameters 
    ---------------------------------------
        con : Engine()
                the engine/connection of MySQL
        symbol : str
                symbol (pair of crypto)
        client_interval : str
                interval of klines
    Return
    ---------------------------------------
            client.get_historical_klines() : json
                The json symbol data klines
    """
    # Check if no symbol's data in table, download data from the oldest date available e.g 1 Aug 2017 (Binance founded date)
    if ((conn.execute(text(f"SELECT COUNT(*) FROM {table} WHERE symbol = '{symbol}'")).scalar() == 0)):
        logging.info(f"No data for {symbol}, currently getting {symbol} data from 1 Aug 2017 ...")
        return client.get_historical_klines(symbol, client_interval, "1 Aug 2017")
    else :
        # Get the most recent date of the data if the table is not empty in order to download from the most recent date
        most_recent_date_in_db = (conn.execute(text(f"SELECT max(open_time) FROM {table} WHERE\
        symbol = '{symbol}'"))).scalar()
        one_hour_from_db_date= str(datetime.now() + timedelta(hours=1))[:19] # Select only %y-%m-%d %H:%M:%S and add 1 hour
        one_hour_from_db_date = parser.parse(one_hour_from_db_date) # Parse the date
        logging.info(f"{symbol} data already present, getting {symbol} data from {most_recent_date_in_db} + 1 hour if exists")
        return client.get_historical_klines(symbol, client_interval, str(one_hour_from_db_date))

def create_con(user, pw, ip, port, db):
        """ Return the engine (the connection) to interact with MySQL
        Parameters 
        ---------------------------------------
                user : str
                        the user name
                pw : str
                        the user password
                db : str
                        the database name
        Return
        ---------------------------------------
                engine : Engine(mysql+pymysql://{user}:{pw}@localhost)
        """
        engine = create_engine(f"mysql+pymysql://{user}:{pw}@{ip}:{port}/{db}")
        logging.info(f"Connection at {engine} : created !")
        return engine

def export_data(conn, data, schema, table):
        """ Load data into MySQL
        Parameters 
        ---------------------------------------
                con : Engine()
                        the engine/connection of MySQ
                data : pandas.DataFrame()
                        symbols data
                table : str
                        the table name
        Return
        ---------------------------------------
                Nothing
        """
        data.to_sql(con=conn, schema=schema, name=table, if_exists="append")
        

def process_hist_data(data, symbol):
    """ Return DataFrame of the data processed 
    Parameters 
    ---------------------------------------
            data : json
                    symbol data
            symbol : str
                    symbol (pair of crypto)
    Return
    ---------------------------------------
            df : pandas.Dataframe()
                symbol data processed into DataFrame
    """
    # Return processed json klines data into pandas dataframe
    df = pd.DataFrame(data, columns=["open_time", "open", "high", "low", "close", "volume","close_time",\
    "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume","taker_buy_quote_asset_volume","ignore"])
    
    df.drop("ignore",axis=1, inplace=True)
    df['open_time'] = pd.to_datetime(df['open_time']/1000, unit='s')
    df['close_time'] = pd.to_datetime(df['close_time']/1000, unit='s')

    numeric_columns = ['open', 'high', 'low', 'close', 'volume', 'quote_asset_volume', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume']
    df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, axis=1)
    
    df.set_index("open_time", inplace=True)

    df["symbol"] = symbol
    df = df.astype({"symbol" : "string"})
    return df

def etl(client, symbols=["ETHUSDT"]):
    """ Extract, transform and load the symbol's klines data processed
    Parameters 
    ---------------------------------------
        symbols : list
                the list of symbol to treat
        client : Binance Client
    Return
    ---------------------------------------
        Nothing
    """
    conn = create_con(user=os.environ["MYSQL_USER"], pw=os.environ["MYSQL_PASSWORD"], ip=os.environ["MYSQL_IP"],port=os.environ["MYSQL_PORT"], db=os.environ["MYSQL_DATABASE"])
    if symbols == None:
        logging.info("No symbols")
        symbols = conn.execute(text("SELECT DISTINCT symbol FROM historical_klines")).fetchall()
    for crypto in symbols:  
        historical_data = get_hist_klines(conn, str(crypto), os.environ["KLINES_TABLE"], client.KLINE_INTERVAL_1DAY, client=client)
        logging.info(f"{crypto} downloaded")
        df = process_hist_data(historical_data, crypto)
        logging.info(f"{crypto} processed")
        export_data(conn=conn, data=df, schema=os.environ["MYSQL_DATABASE"], table=os.environ["KLINES_TABLE"])
        logging.info(f"{crypto} pushed to database")

In [None]:
conn = create_con(user=os.environ["MYSQL_USER"], pw=os.environ["MYSQL_PASSWORD"], ip="localhost",port=3307, db=os.environ["MYSQL_DATABASE"])

In [None]:
df = pd.read_sql("SELECT * FROM historical_klines WHERE symbol='ETHUSDT'", con=conn)
df = df.set_index("close_time")
df.index = pd.to_datetime(df.index)
df = df.drop(["symbol", "number_of_trades", "open_time"], axis=1)
df = df.sort_index()
df.head()

In [None]:
data = df["close"]
train_data = data.sample(frac=0.8,random_state=0)
data.head()

In [None]:
test_data = data.drop(train_data.index)
test_data.head()

In [None]:
train_data.shape

In [None]:
test_data.shape

In [None]:
ts = train_data
ts_logtransformed = np.log(ts)

In [None]:
decomposition = seasonal_decompose(ts_logtransformed, period=1) #, 

trend = decomposition.trend
seasonal = decomposition.seasonal
residual = decomposition.resid

In [None]:
decomposed_TS = residual
decomposed_TS.dropna(inplace=True)

In [None]:
model = ARIMA(ts_logtransformed, order=(8, 1, 0))  
results_AR = model.fit()  
RSS = results_AR.fittedvalues-ts_diff_logtrans
RSS.dropna(inplace=True)

In [None]:
model = ARIMA(ts_logtransformed, order=(2, 1, 0))  
results_AR = model.fit()  
RSS = results_AR.fittedvalues-ts_diff_logtrans
RSS.dropna(inplace=True)

In [None]:
print(results_AR.summary())

In [None]:
model = ARIMA(ts_logtransformed, order=(0, 1,18)) 
results_MA = model.fit()  
RSS = results_MA.fittedvalues-ts_diff_logtrans
RSS.dropna(inplace=True)

In [None]:
print(results_MA.summary())

In [None]:
model = ARIMA(ts_logtransformed, order=(8, 1, 18))  
results_ARIMA = model.fit()  

RSS =results_ARIMA.fittedvalues-ts_diff_logtrans
RSS.dropna(inplace=True)


In [None]:
print(results_ARIMA.summary())

In [None]:
predictions_ARIMA_diff = pd.Series(results_ARIMA.fittedvalues, copy=True)
print(predictions_ARIMA_diff.head())

In [None]:
predictions_ARIMA_diff_cumsum = predictions_ARIMA_diff.cumsum()
print(predictions_ARIMA_diff_cumsum.head())

In [None]:
predictions_ARIMA_log = pd.Series(ts_logtransformed.iloc[0], index=ts_logtransformed.index)
predictions_ARIMA_log = predictions_ARIMA_log.add(predictions_ARIMA_diff_cumsum,fill_value=0)
predictions_ARIMA_log.head()

In [None]:
predictions_ARIMA = np.exp(predictions_ARIMA_log)

In [1]:
import datetime
import pandas as pd

In [2]:
tomorow = [pd.Timestamp(datetime.date.today() + datetime.timedelta(days=1))]
tomorow

[Timestamp('2023-10-24 00:00:00')]

In [None]:
tomorow = [pd.Timestamp(datetime.date.today() + datetime.timedelta(days=1))]
tomorow

In [None]:
dates = [pd.Timestamp('2023-10-23'), pd.Timestamp('2023-10-24'), pd.Timestamp('2023-10-25'),pd.Timestamp('2023-10-26'), pd.Timestamp('2023-10-27'), pd.Timestamp('2023-10-28'), pd.Timestamp('2023-10-29')]
tomorow = [pd.Timestamp(datetime.date.today() + datetime.timedelta(days=1))]

forecast = pd.Series(data=results_ARIMA.forecast(steps=1).to_list(), index=tomorow)

In [None]:
print(forecast)

In [None]:
results_ARIMA.save("./model")

In [None]:
from statsmodels.tsa.arima_model import ARIMAResults
import pickle 

model = pickle.load(open("/home/omar/DevWSL/binance-data-trader/airflow/models/2023-10-23_arima_model.pkl", 'rb'))
# model = ARIMAResults.load("../airflow/models/2023-10-23_arima_model.pkl")
tomorow = [pd.Timestamp(datetime.date.today() + datetime.timedelta(days=1))]
forecast = pd.Series(data=model.forecast(steps=1).to_list(), index=tomorow)


In [None]:
forecast

In [None]:
for i in range(10):
    print(i)

In [1]:
import os
import datetime
import logging
import pandas as pd
from dateutil import parser
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
from dotenv import load_dotenv


In [4]:
a = datetime.now() + timedelta(hours=1)
a

datetime.datetime(2023, 11, 4, 21, 14, 14, 530037)