# Alpaca Trading Workbook

### Data Scraper

In [104]:
import alpaca_trade_api as tradeapi
from alpaca_trade_api.rest import TimeFrame, TimeFrameUnit
import sqlite3
import pandas as pd
import ta
import time
import random
from datetime import datetime, timedelta

class AlpacaTradingBot:
    def __init__(self, keys_file_path='alpaca_keys.txt', base_url='https://paper-api.alpaca.markets', database_path=r'D:\Scripts\alpaca\alpaca_algo_trading\alpaca_data.db'):
        with open(keys_file_path, 'r') as file:
            self.api_key = file.readline().strip()
            self.api_secret = file.readline().strip()
        self.base_url = base_url
        self.api = tradeapi.REST(self.api_key, self.api_secret, base_url=base_url)
        self.database_path = database_path


    def download_bar_data(self, stock, timeframe, start_date, end_date):
        all_data = []

        current_date = start_date
        while current_date <= end_date:
            # Check if it's a weekend (Saturday or Sunday) & skip, if so
            if  datetime.strptime(current_date, "%Y-%m-%d").weekday() >= 5:
                current_date = (datetime.strptime(current_date, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d")
                continue

            # Get data for the current day
            bars = self.api.get_bars(stock, timeframe, start=current_date, end=current_date, limit=1000).df
            if bars.empty:
                continue

            bars['symbol'] = stock  # Add the stock symbol column
            all_data.append(bars)

            # Move to the next day
            current_date = (datetime.strptime(current_date, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d")

            # Random pause after each scrape.. API rate limits need to be considered. Only a problem for the main original pull to populate the database.
            if (datetime.strptime(current_date, "%Y-%m-%d") - datetime.strptime(start_date, "%Y-%m-%d")).days % 5 == 0:
                pause_duration = random.uniform(3, 5)
                time.sleep(pause_duration)
                print(f"Scraped a 1 day increment, pausing for {pause_duration} seconds...")

        if all_data:
            combined_data = pd.concat(all_data)
            combined_data = combined_data.reset_index()  # Ensure the index is reset to have 'timestamp' as a column
            return combined_data[['symbol'] + [col for col in combined_data.columns if col not in ['symbol']]]
        else:
            return pd.DataFrame()

    # This method calculates various indicators on a dataframe
    def calculate_indicators(self, stock_data):
        stock_data['rsi'] = ta.momentum.rsi(stock_data['CLOSE'], window=14)
        stock_data['sma_50'] = ta.trend.sma_indicator(stock_data['CLOSE'], window=50)
        stock_data['sma_200'] = ta.trend.sma_indicator(stock_data['CLOSE'], window=200)
        stock_data['bollinger_hband'] = ta.volatility.bollinger_hband(stock_data['CLOSE'], window=20, window_dev=2)
        stock_data['bollinger_lband'] = ta.volatility.bollinger_lband(stock_data['CLOSE'], window=20, window_dev=2)
        return stock_data

    # This method cleans the scraped data & runs indicator calculations on it
    def transfer_stage(self, input_table, output_table):
        db_path = self.database_path

        # Connect to the database & create a cursor object
        conn = sqlite3.connect(db_path)

        # Read the entire table
        input_data = pd.read_sql(f"SELECT * FROM {input_table}", conn)

        # Convert 'TIMESTAMP' to datetime and localize it to UTC
        input_data['timestamp_utc'] = pd.to_datetime(input_data['TIMESTAMP'], utc=True)
        input_data['timestamp_est'] = input_data['timestamp_utc'].dt.tz_convert('US/Eastern')
        input_data['trading_hours_ind'] = (input_data['timestamp_est'].dt.time >= datetime.strptime('09:30', '%H:%M').time()) & \
                                    (input_data['timestamp_est'].dt.time <= datetime.strptime('16:00', '%H:%M').time())

        # Calculate indicators
        indicators_data = self.calculate_indicators(input_data)

        # Drop some columns a reorder
        indicators_data = indicators_data.drop(columns=['TIMESTAMP','timestamp_utc'])
        indicators_data[['symbol','timestamp_est','trading_hours_ind'] + [col for col in indicators_data.columns if col not in ['symbol','timestamp_est','trading_hours_ind']]]

        # Upload data to output table
        self.db_append_no_duplicates(output_table, indicators_data)

        return indicators_data

    # This method executes a SQL statement against our database.
    def db_write(self, sql_statement):
        db_path = self.database_path

        # Connect to the database & create a cursor object
        conn = sqlite3.connect(db_path)
        cur = conn.cursor()

        # Execute the SQL statement
        cur.execute(sql_statement)

        # Commit the changes & close the connection
        conn.commit()
        conn.close()

    # This method appends records to our database.
    def db_append(self, table_name, data_frame):
        db_path = self.database_path

        # Connect to the database
        conn = sqlite3.connect(db_path)

        # Append our dataframe into our table
        data_frame.to_sql(table_name, conn, schema='main', if_exists='append', index=False)

        # Commit the changes & close the connection
        conn.commit()
        conn.close()

    # Modification of db_append that checks primary key & ensures we aren't inserting a duplicate value
    def db_append_no_duplicates(self, table_name, data_frame):
        db_path = self.database_path

        # Connect to the database
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()

        # Get the primary key column names
        cursor.execute(f"PRAGMA table_info({table_name})")
        table_info = cursor.fetchall()
        primary_key_columns = [column[1] for column in table_info if column[5] == 1]

        # If no primary key columns found, fall back to db_append method
        if not primary_key_columns:
            data_frame.to_sql(table_name, conn, schema='main', if_exists='append', index=False)

            # Commit the changes & close the connection
            conn.commit()
            conn.close()
            return

        # Construct the SELECT statement to fetch existing primary keys from the table
        existing_keys_query = f"SELECT {', '.join(primary_key_columns)} FROM {table_name}"
        existing_keys_df = pd.read_sql(existing_keys_query, conn)

        # Construct the composite primary key from the existing keys DataFrame
        existing_keys_df['COMPOSITE_KEY'] = existing_keys_df.apply(lambda row: tuple(row), axis=1)
        existing_keys = set(existing_keys_df['COMPOSITE_KEY'])

        # Construct the composite primary key for new records
        data_frame['COMPOSITE_KEY'] = data_frame.apply(lambda row: tuple(row[primary_key_columns]), axis=1)

        # Filter out rows with primary keys that already exist in the table
        data_frame_new_records = data_frame[~data_frame['COMPOSITE_KEY'].isin(existing_keys)]

        # Drop the composite key column
        data_frame_new_records = data_frame_new_records.drop(columns=['COMPOSITE_KEY'])

        # Append only the new rows into our table
        data_frame_new_records.to_sql(table_name, conn, schema='main', if_exists='append', index=False)

        # Commit the changes & close the connection
        conn.commit()
        conn.close()


### Schema DDL

In [102]:
#Schema DDL
atb = AlpacaTradingBot()

# This table will house the raw scraped data
sql_statement = atb.db_write('''DROP TABLE IF EXISTS STG_SYMBOL_DATA''')
sql_statement = atb.db_write('''CREATE TABLE IF NOT EXISTS STG_SYMBOL_DATA (
    SYMBOL                  TEXT        PRIMARY_KEY,
    TIMESTAMP               TIMESTAMP   PRIMARY_KEY,
    CLOSE                   DECIMAL,
    HIGH                    DECIMAL,
    LOW                     DECIMAL,
    TRADE_COUNT             INTEGER,
    OPEN                    DECIMAL,
    VOLUME                  INTEGER,
    VWAP                    DECIMAL

)
''')

# This table will house the scraped data + computed indicators
sql_statement = atb.db_write('''DROP TABLE IF EXISTS SYMBOL_DATA''')
sql_statement = atb.db_write('''CREATE TABLE IF NOT EXISTS SYMBOL_DATA (
    SYMBOL                  TEXT        PRIMARY_KEY,
    TIMESTAMP_EST           TIMESTAMP   PRIMARY_KEY,
    TRADING_HOURS_IND       BOOLEAN,                   
    CLOSE                   DECIMAL,
    HIGH                    DECIMAL,
    LOW                     DECIMAL,
    TRADE_COUNT             INTEGER,
    OPEN                    DECIMAL,
    VOLUME                  INTEGER,
    VWAP                    DECIMAL,
    RSI                     DECIMAL,
    SMA_50                  DECIMAL,
    SMA_200                 DECIMAL,
    BOLLINGER_HBAND         DECIMAL,
    BOLLINGER_LBAND         DECIMAL
)
''')





### Calling the class

In [105]:
atb = AlpacaTradingBot()

# Define the list of stocks
stocks = ["SPY"]

# Define the time range for historical data
start_date = "2020-01-01"
# end_date = "2020-01-15"
end_date = datetime.now().strftime("%Y-%m-%d")

# Download historical data
data = {}
for stock in stocks:
    stock_data = atb.download_bar_data(stock, TimeFrame(5, TimeFrameUnit.Minute), start_date, end_date)
    data[stock] = stock_data

# Store data locally
for stock, stock_data in data.items():
    atb.db_append_no_duplicates('STG_SYMBOL_DATA', stock_data)

# atb.transfer_stage('STG_SYMBOL_DATA','SYMBOL_DATA')


Scraped a 5 day increment, pausing for 8.823637889260677 seconds...
Scraped a 5 day increment, pausing for 7.297139144290808 seconds...


sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/SPY/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/SPY/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/SPY/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/SPY/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/SPY/bars 3 more time(s)...


KeyboardInterrupt: 