In [174]:
import asyncio
import aiohttp
import aiomoex
import pandas as pd
from datetime import datetime, timedelta
import sqlite3
import nest_asyncio


nest_asyncio.apply()

class MoexDataRetrival():
    
    def __init__(self, start_date : str, end_date : str):
        self.start_date = start_date
        self.end_date = end_date

    #get a stock list from MOEX exchange
    async def get_stock_list(self) -> pd.DataFrame:
        request_url = "https://iss.moex.com/iss/engines/stock/" "markets/shares/boards/TQBR/securities.json"
        arguments = {
            "securities.columns": (
                "SECID,"
                "REGNUMBER,"
                "LOTSIZE,"
                "SHORTNAME,"
                "BOARDID,"
                "LISTLEVEL,"
                "ISIN,"
                "ISSUECAPITALIZATION,"
                "FACEVALUE,"
                "ISSUESIZE,"
            )
        }
        async with aiohttp.ClientSession() as session:
            iss = aiomoex.ISSClient(session, request_url, arguments)
            data = await iss.get()
            data = pd.DataFrame(data["securities"])
            return data['SECID']

    #get a stock list from MOEX exchange
    async def get_stock_list_with_extra_data(self) -> pd.DataFrame:
        request_url = "https://iss.moex.com/iss/engines/stock/" "markets/shares/boards/TQBR/securities.json"
        arguments = {
            "securities.columns": (
                "SECID,"
                "REGNUMBER,"
                "LOTSIZE,"
                "SHORTNAME,"
                "BOARDID,"
                "LISTLEVEL,"
                "ISIN,"
                "ISSUECAPITALIZATION,"
                "FACEVALUE,"
                "ISSUESIZE,"
            )
        }
        async with aiohttp.ClientSession() as session:
            iss = aiomoex.ISSClient(session, request_url, arguments)
            data = await iss.get()
            data = pd.DataFrame(data["securities"])
            return data

    #get a price history for the ticker in TQBR regime  
    
    async def get_price_history(self, ticker : str) -> pd.DataFrame:
        async with aiohttp.ClientSession() as session:
            columns=[
                    "BOARDID", "TRADEDATE", "SHORTNAME", "SECID", "NUMTRADES",
                    "VALUE", "OPEN", "LOW", "HIGH", "LEGALCLOSEPRICE",
                    "WAPRICE", "CLOSE", "VOLUME", "MARKETPRICE2", "MARKETPRICE3",
                    "ADMITTEDQUOTE", "MP2VALTRD", "MARKETPRICE3TRADESVALUE",
                    "ADMITTEDVALUE", "WAVAL"
                ]
            data = await aiomoex.get_board_history(session, ticker,columns = columns,start = self.start_date, end = self.end_date)
            df = pd.DataFrame(data)
            return df

    #get a price history for the ticker in TQBR regime  
    async def get_multiple_price_histories(self, tickers : pd.Series) -> pd.DataFrame:
        tasks = [get_price_history(ticker,self.start_date,self.end_date) for ticker in tickers]
        results = await asyncio.gather(*tasks)
        combined_df = pd.concat(results, ignore_index=True)
        combined_df['TRADEDATE'] = pd.to_datetime(combined_df['TRADEDATE'])
        condition = (combined_df['SECID'] == 'GMKN') & (combined_df['TRADEDATE'] <= '2024-04-01')
        combined_df.loc[condition, 'CLOSE'] = combined_df.loc[condition, 'CLOSE'] / 100
        condition = (combined_df['SECID'] == 'TRNFP') & (combined_df['TRADEDATE'] <= '2024-02-14')
        combined_df.loc[condition, 'CLOSE'] = combined_df.loc[condition, 'CLOSE'] / 100
        return combined_df


    #check the stocks which are trading near min for the last n years
    def get_tickers_which_we_can_buy_now(self, df : pd.DataFrame, years : int, percent : float):

        # Ensure TRADEDATE is in datetime format
        df['TRADEDATE'] = pd.to_datetime(df['TRADEDATE'])
        # Calculate date n years ago
        today = datetime.now()
        n_years_ago = today - timedelta(days=years*365)

        # Filter data for the last n years
        df_last_n_years = df[df['TRADEDATE'] >= n_years_ago]

        # Calculate the minimum and maximum closing price for each ticker over the last 2 years
        min_prices = df_last_n_years.groupby('SECID')['CLOSE'].min().reset_index()
        min_prices.rename(columns={'CLOSE': 'MIN_CLOSE'}, inplace=True)
        max_prices = df_last_n_years.groupby('SECID')['CLOSE'].max().reset_index()
        max_prices.rename(columns={'CLOSE': 'MAX_CLOSE'}, inplace=True)

        # Get the latest closing price for each ticker
        latest_prices = df_last_n_years.sort_values('TRADEDATE').groupby('SECID').tail(1).reset_index()
        latest_prices = latest_prices[['SECID', 'CLOSE']].rename(columns={'CLOSE': 'LATEST_CLOSE'})

        # Merge the minimum, maximum, and latest prices
        merged_df = pd.merge(min_prices, max_prices, on='SECID')
        merged_df = pd.merge(merged_df, latest_prices, on='SECID')

        # Filter tickers where the latest closing price is not higher than x% above the minimum price
        condition = merged_df['LATEST_CLOSE'] <= merged_df['MIN_CLOSE'] * (1+percent/100)
        result = merged_df[condition]

        return result
    
    
class BiddingResultsDBLoader():
    
    #main table name - это название основной таблицы где лежат итоги торгов
    def __init__(self, path_to_db : str, main_table_name : str):
        
        self.path_to_db = path_to_db
        self.main_table_name = main_table_name
        
    #remove duplicates from table
    def remove_duplicates(self):
        """
        Connects to an SQLite database and removes duplicate rows from the specified table.

        Parameters:
        database_path (str): The path to the SQLite database file.
        table_name (str): The name of the table from which to remove duplicates.
        """
        # Connect to the SQLite database
        conn = sqlite3.connect(self.database_path)
        cursor = conn.cursor()

        # Define the query to remove duplicates
        delete_duplicates_query = f"""
        WITH duplicates AS (
            SELECT 
                ROWID,
                BOARDID,
                TRADEDATE,
                CLOSE,
                VOLUME,
                VALUE,
                SECID,
                ROW_NUMBER() OVER (PARTITION BY BOARDID, TRADEDATE, SECID ORDER BY ROWID) AS rn
            FROM 
                {self.main_table_name}
        )
        DELETE FROM {self.main_table_name}
        WHERE ROWID IN (
            SELECT ROWID
            FROM duplicates
            WHERE rn > 1
        );
        """

        try:
            # Execute the query to delete duplicates
            cursor.execute(delete_duplicates_query)
            # Commit the transaction
            conn.commit()
        except sqlite3.Error as error:
            print(f"Error while removing duplicates: {error}")
        finally:
            # Close the connection
            conn.close()



    def insert_data_to_database(self, df : pd.DataFrame):
        conn = sqlite3.connect(self.path_to_db)
        df.to_sql(self.main_table_name, conn, if_exists='append', index=False)
        remove_duplicates(self.path_to_db,self.main_table_name)
        conn.close()
    

In [175]:
moex_data_loader = MoexDataRetrival('2024-05-27','2024-05-27')
stock_list = asyncio.run(moex_data_loader.get_stock_list())
trade_data = asyncio.run(moex_data_loader.get_multiple_price_histories(stock_list))

db_loader = BiddingResultsDBLoader('/Users/antonvolkov/Desktop/MoexTrading/MoexData.db','MOEXDATA')
db_loader.insert_data_to_database(trade_data)