In [24]:
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
from datetime import datetime
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import re
import numpy as np
import psycopg2
from scipy.stats import norm
import time
import scipy.stats as stats
from sqlalchemy import create_engine
import zipfile

connection = psycopg2.connect(host="10.5.1.20", database="marketdata", user="elliott", password="scalp", port='5432')
connection2 = psycopg2.connect(host="10.7.8.59", database="theoretical", user="scalp", password="QAtr@de442", port='5433')

m_root = 'RBadmin'
m_password = '$Calp123'
m_host = '10.5.1.32'
m_db = 'rbandits2'
uri = f"mysql+mysqldb://{m_root}:{m_password}@{m_host}/{m_db}"
mydb = create_engine(uri, connect_args={'ssl_mode': 'DISABLED'})
mapping = {0: "Vertical",5: "Straddle", 6: "Strangle"}

def identify_inverted(row):
    # Ignore the rows where either 'BBOAsk' or 'BBOBid' is zero
    if row['BBOAsk'] == 0 or row['BBOBid'] == 0:
        return ''
    # Check if 'BBOAsk' is less than 'BBOBid'
    elif row['BBOAsk'] < row['BBOBid']:
        return 'Inverted'
    else:
        return ''
def identify_option_type(row):
    # Check if the 'sprdtype' is 'Vertical'
    if row['sprdtype'] == 'Vertical':
        # Check for 'C' or 'P' surrounded by digits in 'formatted_symbol'
        matches = re.findall(r'\d(C|P)\d', row['formatted_symbol'])
        if matches:
            # Take the first match and check the middle character
            if 'C' in matches[0]:
                return 'Call Vertical'
            else:
                
                return 'Put Vertical'
    # If 'sprdtype' is not 'Vertical', return the original 'sprdtype'
    return row['sprdtype']
def get_data(date):
    query = f"""
                SELECT ts as Time, underly, sprdsym as Spread, price as LastPrice, sprdtype
                FROM trdsprd
                WHERE ts >= DATE_SUB('{date}', INTERVAL 1 DAY)
                AND DAYOFWEEK(ts) BETWEEN 2 AND 6
                AND sprdtype in ('0', '5', '6')
                AND underly NOT LIKE 'QQQ%%'
                AND underly NOT LIKE 'SPY%%'
                AND underly NOT LIKE 'IWM%%'
                ORDER BY ts DESC
            """
    df = pd.read_sql_query(query, mydb)
    return df
def convert_to_new_format(option):
    if pd.isnull(option):
        return ''  # Return an empty string for NaN values

    parts = option.split('_')  # Split the spread into individual options
    new_parts = []
    for part in parts:
        # Identify the beginning of the date substring
        date_start = None
        for i in range(len(part) - 5):  # Subtract 5 to avoid running off the end of the string
            if part[i:i+2].isdigit() and part[i+2:i+4].isdigit() and part[i+4:i+6].isdigit():
                date_start = i
                break
        # If a date substring couldn't be found, treat the part as a non-option
        if date_start is None:
            new_parts.append(part)
            continue
        # Extract underlying, date, call/put, strike, and suffix
        underlying = part[:date_start]
        date = '20' + part[date_start:date_start+6]  # Convert YY to YYYY
        cp = part[date_start+6]
        strike_start = date_start + 7
        strike_end = strike_start
        for char in part[strike_start:]:
            if char.isdigit() or char == '.':
                strike_end += 1
            else:
                break
        strike = part[strike_start:strike_end]
        # Append decimal and trailing zeros if necessary
        if '.' not in strike:
            strike += '.00'
        elif len(strike.split('.')[1]) == 1:
            strike += '0'
        suffix = part[strike_end:]
        new_part = underlying + date + cp + strike + suffix
        new_parts.append(new_part)
    return '_'.join(new_parts)  # Join the options back into a spread
def get_batch_data2(df, current_date):
    # Extract relevant parameters from the dataframe
    symbols = df['underly'].unique()
    
    # Convert the current date string to a datetime.date object
    current_date_obj = datetime.strptime(current_date, '%Y%m%d').date()
    
    # Calculate the target times
    target_time_915 = datetime.combine(current_date_obj, pd.Timestamp("9:15:00").time())
    target_time_914 = datetime.combine(current_date_obj, pd.Timestamp("9:14:00").time())
    
    # Convert symbols list to a format suitable for SQL IN clause
    symbols_placeholder = ",".join(["%s"] * len(symbols))

    # Construct the query
    query = f"""
                SELECT symbol, tradets, tradevolume, tradelast
                FROM equity_trades_new
                WHERE 
                    symbol IN ({symbols_placeholder}) AND
                    (tradets BETWEEN %s AND %s)
            """
    
    cursor = connection.cursor()

    # Execute the query using parameters
    cursor.execute(query, (*symbols, target_time_914, target_time_915))
    results = cursor.fetchall()

    cursor.close()

    # Convert results to a dataframe for easier manipulation
    columns = ['Symbol', 'Timestamp', 'Volume', 'Last_Trade']
    batch_data_df = pd.DataFrame(results, columns=columns)

    return batch_data_df
def get_batch_data1(df):
    # Extract relevant parameters from the dataframe
    min_time = df['Time'].min()
    max_time = df['Time'].max()
    symbols = df['underly'].unique()

    # Convert symbols list to a format suitable for SQL IN clause
    symbols_placeholder = ",".join(["%s"] * len(symbols))

    # Construct the query
    query = f"""
                SELECT symbol, tradets, tradevolume, tradelast
                FROM equity_trades_new
                WHERE 
                    symbol IN ({symbols_placeholder}) AND
                    tradets BETWEEN %s AND %s
            """

    cursor = connection.cursor()

    # Execute the query using parameters
    cursor.execute(query, (*symbols, min_time, max_time))
    results = cursor.fetchall()

    cursor.close()

    # Convert results to a dataframe for easier manipulation
    columns = ['Symbol', 'Timestamp', 'Volume', 'Last_Trade']
    batch_data_df = pd.DataFrame(results, columns=columns)

    return batch_data_df
def assign_closest_prices(original_df, batch_data_df):
    # Create a new column for prc_eqt_at_time
    original_df = original_df.copy() 
    original_df['prc_eqt_at_time'] = None

    # Process each unique symbol
    for symbol in original_df['underly'].unique():
        # Work on a copy of the slice
        original_subset = original_df[original_df['underly'] == symbol].copy()
        batch_subset = batch_data_df[batch_data_df['Symbol'] == symbol]
        
        # If there are no matching records in batch_subset, skip this symbol
        if batch_subset.empty:
            continue

        # For each row in the original subset, find the closest timestamp in the batch subset
        for idx, row in original_subset.iterrows():
            time_diffs = (batch_subset['Timestamp'] - row['Time']).abs()
            
            # If there are no time differences, skip this row
            if time_diffs.empty:
                continue

            closest_idx = time_diffs.idxmin()
            closest_trade_price = batch_subset.loc[closest_idx, 'Last_Trade']

            # Assign the closest trade price to the prc_eqt_at_time column
            original_subset.at[idx, 'prc_eqt_at_time'] = closest_trade_price

        # Update the original DataFrame
        original_df.update(original_subset)

    return original_df
def send_email(names, subject, attachment_path):
    fromaddr = "reports@scalptrade.com"
    toaddr = names

    msg = MIMEMultipart()
    msg['From'] = fromaddr
    msg['To'] = ", ".join(toaddr)
    msg['Subject'] = '{}'.format(subject)

    # Attach the CSV file
    attachment = open(attachment_path, 'rb')
    part = MIMEBase('application', 'octet-stream')
    part.set_payload(attachment.read())
    encoders.encode_base64(part)
    part.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(attachment_path)}"')
    msg.attach(part)

    s = smtplib.SMTP('smtp.gmail.com', 587)
    s.starttls()
    s.login(fromaddr, "sc@lptrade")
    text = msg.as_string()
    s.sendmail(fromaddr, toaddr, text)
def round_to_two_decimals(df, columns):
    """Format specified columns in a dataframe to two decimal places as strings."""
    for column in columns:
        df[column] = df[column].apply(lambda x: "{:.2f}".format(float(x)) if pd.notna(x) else None)
    return df
def assign_equity_prices_at_915(original_df, batch_data_df, current_date):
    # Create a new column for price at 915
    original_df['price_at_915'] = None
    
    # Get a unique set of symbols
    unique_symbols = original_df['underly'].unique()

    # Process each unique symbol
    for symbol in unique_symbols:
        # Filter batch data for the current symbol
        batch_subset = batch_data_df[batch_data_df['Symbol'] == symbol]

        # If there are no matching records in batch_subset, skip this symbol
        if batch_subset.empty:
            continue
        
        # Convert the current_date string to a datetime.date object
        current_date_obj = datetime.strptime(current_date, '%Y%m%d').date()
        
        # Calculate the target time
        target_time = datetime.combine(current_date_obj, pd.Timestamp("9:15:00").time())
        
        time_diffs = batch_subset['Timestamp'].apply(lambda x: abs(datetime.combine(current_date_obj, x.time()) - target_time))
        closest_idx = time_diffs.idxmin()
        price_at_915 = batch_subset.loc[closest_idx, 'Last_Trade']
                
        # Assign the price to all rows with the matching symbol
        original_df.loc[original_df['underly'] == symbol, 'price_at_915'] = price_at_915
    return original_df
def generate_and_execute_sql(result_df):
    new_df = result_df[['Symbol', 'underly']].copy()
    new_df['Symbol'] = new_df['Symbol'].str.split('_')
    exploded_df = new_df.explode('Symbol', ignore_index=True)
    unique_underly_values = exploded_df['underly'].unique()

    unique_underly_str = ','.join(f"'{symbol}%'" for symbol in unique_underly_values)
    unique_underly_str = f"ARRAY[{unique_underly_str}]"

    # Database connection
    connection = psycopg2.connect(host="10.7.8.59", database="theoretical", user="scalp", password="QAtr@de442", port='5433')
    cursor = connection.cursor()

    query = f"""
        SELECT theo_rate, brents_vol, s.symbol, underlying
        FROM public.theo_rates r, public.theo_opt_sym_chain s
        WHERE r.theo_symbol_id = s.theo_symbol_id
        AND symbol LIKE ANY({unique_underly_str})
        AND r.create_time > timestamp '2023-08-28 16:00:00'
        ORDER BY strike, r.create_time DESC;
    """
    
    cursor.execute(query)
    query_results = cursor.fetchall()
    cursor.close()
    connection.close()

    df = pd.DataFrame(query_results, columns=['theo_rate', 'volatility', 'symbol', 'underlying'])

    result_dict = {}

    for index, row in df.iterrows():
        symbol = row['symbol']
        theo_rate = row['theo_rate']
        volatility = row['volatility']
    
        result_dict[symbol] = {'theo_rate': theo_rate, 'volatility': volatility}

    return result_dict
def lookup_rates_and_vols(row, rates_vol_dict):
    symbols = row['Symbol'].split('_')
    modified_symbols = [symbol[:-2] for symbol in symbols]
    
    if len(modified_symbols) != 2:
        #print(f"Skipping row with symbols: {modified_symbols}")
        return row  # or set specific columns to NaN or some default value
    
    try:
        symbol1, symbol2 = modified_symbols
        row['rate1'] = rates_vol_dict[symbol1]['theo_rate']
        row['rate2'] = rates_vol_dict[symbol2]['theo_rate']
        row['vol1'] = rates_vol_dict[symbol1]['volatility']
        row['vol2'] = rates_vol_dict[symbol2]['volatility']
    except KeyError:
        #print(f"Symbols {modified_symbols} not found in rates_vol_dict.")
        row['rate1'], row['rate2'], row['vol1'], row['vol2'] = [np.nan]*4
    
    return row
def black_scholes(S, X, t, r, q, sigma, option_type='call'):
    d1 = (np.log(S / X) + (r - q + sigma ** 2 / 2) * t) / (sigma * np.sqrt(t))
    d2 = d1 - sigma * np.sqrt(t)
    
    if option_type == 'call':
        option_price = S * np.exp(-q * t) * stats.norm.cdf(d1) - X * np.exp(-r * t) * stats.norm.cdf(d2)
    elif option_type == 'put':
        option_price = X * np.exp(-r * t) * stats.norm.cdf(-d2) - S * np.exp(-q * t) * stats.norm.cdf(-d1)
    else:
        raise ValueError("Invalid option_type. Use 'call' or 'put'")
        
    return option_price
def black_scholes_delta(S, X, t, r, q, sigma, option_type='call'):
    d1 = (np.log(S / X) + (r - q + sigma ** 2 / 2) * t) / (sigma * np.sqrt(t))
    
    if option_type == 'call':
        delta = np.exp(-q * t) * stats.norm.cdf(d1)
    elif option_type == 'put':
        delta = np.exp(-q * t) * (stats.norm.cdf(d1) - 1)
    else:
        raise ValueError("Invalid option_type. Use 'call' or 'put'")
    
    return delta
def calculate_black_scholes_and_delta(row):
    S = float(row['prc_eqt_at_time'])  # Convert to float
    X1 = float(row['Symbol'].split('_')[0].split('C')[-1].split('P')[-1][:-2])  # Convert to float
    X2 = float(row['Symbol'].split('_')[1].split('C')[-1].split('P')[-1][:-2])  # Convert to float
    t = 30 / 365  # Assuming 30 days to expiration (replace with actual value)
    r1 = row['rate1']
    r2 = row['rate2']
    q = 0  # Assuming no dividend yield
    sigma1 = row['vol1']
    sigma2 = row['vol2']
    
    bs_price1 = black_scholes(S, X1, t, r1, q, sigma1)
    bs_price2 = black_scholes(S, X2, t, r2, q, sigma2)
    
    delta1 = black_scholes_delta(S, X1, t, r1, q, sigma1)
    delta2 = black_scholes_delta(S, X2, t, r2, q, sigma2)
    
    row['BS_Price1'] = bs_price1
    row['BS_Price2'] = bs_price2
    row['Delta1'] = delta1
    row['Delta2'] = delta2
    row['Spread_Delta'] = delta1 - delta2
    
    return row
def process_put_verticals(row):
    if row['sprdtype'] == 'Put Vertical':
        # Swap BBOAsk and BBOBid
        row['BBOAsk'], row['BBOBid'] = row['BBOBid'], row['BBOAsk']
        
        # Swap BBOAskSize and BBOBidSize (assuming you have these columns)
        row['BBOAskSize'], row['BBOBidSize'] = row['BBOBidSize'], row['BBOAskSize']

        # Take absolute values of LastPrice, BBOBid, and BBOAsk
        row['LastPrice'] = abs(row['LastPrice'])
        row['BBOBid'] = abs(row['BBOBid'])
        row['BBOAsk'] = abs(row['BBOAsk'])
        
    return row
def get_data_parallel(dates, io_threads):
    with ThreadPoolExecutor(max_workers=io_threads) as executor:
        results = list(executor.map(get_data, dates))
    return pd.concat(results, ignore_index=True)
def get_batch_data1_parallel(df, io_threads):
    unique_symbols = df['underly'].unique()
    with ThreadPoolExecutor(max_workers=io_threads) as executor:
        results = list(executor.map(get_batch_data1, [df[df['underly'] == symbol] for symbol in unique_symbols]))
    return pd.concat(results, ignore_index=True)
def assign_closest_prices_parallel(original_df, batch_data_df, io_threads):
    unique_symbols = original_df['underly'].unique()
    with ThreadPoolExecutor(max_workers=io_threads) as executor:
        results = list(executor.map(assign_closest_prices, 
                                    [original_df[original_df['underly'] == symbol] for symbol in unique_symbols],
                                    [batch_data_df[batch_data_df['Symbol'] == symbol] for symbol in unique_symbols]))
    return pd.concat(results, ignore_index=True)
def generate_and_execute_sql_parallel(result_df, cpu_threads):
    unique_underly_values = result_df['underly'].unique()
    with ThreadPoolExecutor(max_workers=cpu_threads) as executor:
        results = list(executor.map(generate_and_execute_sql, [result_df[result_df['underly'] == symbol] for symbol in unique_underly_values]))
    return {k: v for d in results for k, v in d.items()}
def set1(df):
    condition1 = (
        (df['PriceDelta'] > 0) & 
        (df['sprdtype'] == 'Call Vertical') & 
        (df['OfferEdge'] > 0)
    ) | (
        (df['PriceDelta'] > 0) & 
        (df['sprdtype'] == 'Put Vertical') & 
        (df['BidEdge'] > 0)
    )
    
    condition2 = (
        (df['PriceDelta'] < 0) & 
        (df['sprdtype'] == 'Call Vertical') & 
        (df['BidEdge'] > 0)
    ) | (
        (df['PriceDelta'] < 0) & 
        (df['sprdtype'] == 'Put Vertical') & 
        (df['OfferEdge'] > 0)
    )
    
    return df[condition1 | condition2]
def set2(df):
    condition = (
        df['equity_change_pct'].abs() > 1
    ) & ((df['Spread_Delta'].abs() > .25))
    return df[condition]
def run():
    start_time_1 = time.perf_counter()
    date = datetime.now().strftime('%Y%m%d')
    
    main_df = pd.read_csv(f'/home/elliott/Development/scripts/jupyter_notebooks/spreads-{date}0915.csv')
    main_df = main_df[['Symbol', 'BBOAsk', 'BBOBid', 'BBOBidSize', 'BBOAskSize']].copy()
    main_df = main_df[(main_df['BBOBid'] != 0) | (main_df['BBOAsk'] != 0)].copy()
    main_df['formatted_symbol'] = main_df['Symbol'].apply(convert_to_new_format)
    end_time_1 = time.perf_counter()
    print(f"time1: {end_time_1 - start_time_1}")

    start_time_2 = time.perf_counter()
    db_df = get_data_parallel([date])
    merged_df = main_df.merge(db_df, how='left', left_on='formatted_symbol', right_on='Spread').copy()
    merged_df.dropna(inplace=True)
    merged_df['sprdtype'] = merged_df['sprdtype'].map(mapping)
    merged_df['sprdtype'] = merged_df.apply(identify_option_type, axis=1)
    merged_df['Inverted'] = merged_df.apply(identify_inverted, axis=1)
    merged_df = merged_df.apply(process_put_verticals, axis=1)
    merged_df.sort_values(by='Time', ascending=False, inplace=True)
    merged_df.drop_duplicates(subset='formatted_symbol', keep='first', inplace=True)
    end_time_2 = time.perf_counter()
    print(f"time2: {end_time_2 - start_time_2}")

    start_time_3 = time.perf_counter()
    prices915_df = get_batch_data2(merged_df,date)
    assign_equity_prices_at_915(merged_df, prices915_df, date)  # This line was added
    end_time_3 = time.perf_counter()
    print(f"time3: {end_time_3 - start_time_3}")

    # Measure time for get_batch_data1
    start_time_get_batch = time.perf_counter()
    pricesattime_df = get_batch_data1_parallel(merged_df)
    end_time_get_batch = time.perf_counter()
    print(f"Time taken for get_batch_data1: {end_time_get_batch - start_time_get_batch} seconds")

    # Measure time for assign_closest_prices
    start_time_assign_closest = time.perf_counter()
    merged_prices_df = assign_closest_prices_parallel(merged_df, pricesattime_df)
    merged_prices_df['equity_change_pct'] = ((merged_prices_df['prc_eqt_at_time'] - merged_prices_df['price_at_915']) / merged_prices_df['price_at_915']) * 100
    end_time_assign_closest = time.perf_counter()
    print(f"Time taken for assign_closest_prices: {end_time_assign_closest - start_time_assign_closest} seconds")


    start_time_5 = time.perf_counter()
    column_order = ['formatted_symbol', 'Symbol', 'Inverted', 'sprdtype', 'LastPrice', 'BBOBid', 'BBOBidSize',
                    'BBOAsk', 'BBOAskSize', 'Time', 'underly', 'prc_eqt_at_time', 'price_at_915']
    result_df = (merged_prices_df.loc[:, column_order]
                .dropna(subset=['price_at_915', 'prc_eqt_at_time'])
                .assign(PriceDelta=lambda df: df['price_at_915'].astype(float) - df['prc_eqt_at_time'],
                        BidEdge=lambda df: np.where(df['BBOBid'] != 0, df['BBOBid'] - df['LastPrice'], np.nan),
                        OfferEdge=lambda df: np.where(df['BBOAsk'] != 0, df['LastPrice'] - df['BBOAsk'], np.nan))
            ).copy()
    result_df = round_to_two_decimals(result_df, columns=['BidEdge', 'OfferEdge', 'prc_eqt_at_time', 'price_at_915', 
                                                        'PriceDelta', 'LastPrice', 'BBOBid', 'BBOAsk'])
    result_df['BidEdge'] = result_df['BidEdge'].astype(float)
    result_df['OfferEdge'] = result_df['OfferEdge'].astype(float)
    result_df = result_df.loc[(result_df['BidEdge'] < -0.20) | (result_df['OfferEdge'] < -0.20)]
    end_time_5 = time.perf_counter()
    print(f"time5: {end_time_5 - start_time_5}")

    start_time_6 = time.perf_counter()
    vol_rates_dict = generate_and_execute_sql_parallel(result_df)
    result_df = result_df.apply(lookup_rates_and_vols, axis=1, args=(vol_rates_dict,))
    result_df.dropna(subset=['rate1'], inplace=True)
    result_df = result_df.apply(calculate_black_scholes_and_delta, axis=1)
    end_time_6 = time.perf_counter()
    print(f"time6: {end_time_6 - start_time_6}")
    

    start_time_7 = time.perf_counter()
    result_df.drop(columns=["Symbol"], inplace=True)
    result_df['PriceDelta'] = pd.to_numeric(result_df['PriceDelta'], errors='coerce')
    result_df['OfferEdge'] = pd.to_numeric(result_df['OfferEdge'], errors='coerce')
    result_df['BidEdge'] = pd.to_numeric(result_df['BidEdge'], errors='coerce')
    verticals_df = result_df[result_df['sprdtype'].str.contains("Vertical")].copy()
    verticals_df['prc_eqt_at_time'] = pd.to_numeric(verticals_df['prc_eqt_at_time'], errors='coerce')
    verticals_df['price_at_915'] = pd.to_numeric(verticals_df['price_at_915'], errors='coerce')
    verticals_df['equity_change_pct'] = ((verticals_df['price_at_915'] - verticals_df['prc_eqt_at_time']) / verticals_df['prc_eqt_at_time']) * 100
    verticals_df.drop(columns=['rate1', 'rate2', 'vol1', 'vol2', 'BS_Price1', 'BS_Price2'], inplace=True)
    verticals_df.to_csv(f"verticals_{date}.csv")
    verticles_set1 = set1(verticals_df)
    verticles_set1.to_csv(f"verticles_set_1_{date}.csv")
    verticles_set2 = set2(verticals_df)
    verticles_set2.to_csv(f"verticles_set_2_{date}.csv")
    verticles_set3 = verticals_df[(verticals_df['BidEdge'] > 0.50) | (verticals_df['OfferEdge'] > 0.50)]
    verticles_set3.to_csv(f"verticles_set_3_{date}.csv")
    straddle_df = result_df[result_df['sprdtype'].str.contains("Straddle")].copy()
    straddle_df.to_csv(f"straddles_{date}.csv")
    strangle_df = result_df[result_df['sprdtype'].str.contains("Strangle")].copy()
    strangle_df.to_csv(f"strangles_{date}.csv")
    end_time_7 = time.perf_counter()
    print(f"time7: {end_time_7 - start_time_7}")

    zip_filename = f"e.watchlist_{date}.zip"
    straddle_csv = f"straddles_{date}.csv"
    vertical_csv = f"verticals_{date}.csv"
    verticles_set1 = f"verticles_set_1_{date}.csv"
    verticles_set2 = f"verticles_set_2_{date}.csv"
    verticles_set3 = f"verticles_set_3_{date}.csv"
    strangles_csv = f"strangles_{date}.csv"

    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zip_file:
        zip_file.write(straddle_csv, arcname=os.path.basename(straddle_csv))
        zip_file.write(vertical_csv, arcname=os.path.basename(vertical_csv))
        zip_file.write(verticles_set1, arcname=os.path.basename(verticles_set1))
        zip_file.write(verticles_set2, arcname=os.path.basename(verticles_set2))
        zip_file.write(verticles_set3, arcname=os.path.basename(verticles_set3))
        zip_file.write(strangles_csv, arcname=os.path.basename(strangles_csv))

    #Send the zip archive via email
    send_email(['ewashington@scalptrade.com', 'sleland@scalptrade.com', 'aiacullo@scalptrade.com', 'jfeng@scalptrade.com', 'jthakkar@scalptrade.com ', 'jwood@scalptrade.com'], 
            'Open Rotation Watchlist', 
            attachment_path=zip_filename)

if __name__ == "__main__":
    start = time.perf_counter()
    for i in range(40)
        run()
    end = time.perf_counter()
    print(f"Total runtime: {end - start}")

time1: 1.1686136450152844
time2: 11.002577741979621
time3: 37.640348635963164
Time taken for get_batch_data1: 63.76835818600375 seconds


KeyboardInterrupt: 

In [23]:
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
from datetime import datetime
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import re
import numpy as np
import psycopg2
from scipy.stats import norm
import time
import scipy.stats as stats
from sqlalchemy import create_engine
import zipfile

connection = psycopg2.connect(host="10.5.1.20", database="marketdata", user="elliott", password="scalp", port='5432')
connection2 = psycopg2.connect(host="10.7.8.59", database="theoretical", user="scalp", password="QAtr@de442", port='5433')

m_root = 'RBadmin'
m_password = '$Calp123'
m_host = '10.5.1.32'
m_db = 'rbandits2'
uri = f"mysql+mysqldb://{m_root}:{m_password}@{m_host}/{m_db}"
mydb = create_engine(uri, connect_args={'ssl_mode': 'DISABLED'})
mapping = {0: "Vertical",5: "Straddle", 6: "Strangle"}

def identify_inverted(row):
    # Ignore the rows where either 'BBOAsk' or 'BBOBid' is zero
    if row['BBOAsk'] == 0 or row['BBOBid'] == 0:
        return ''
    # Check if 'BBOAsk' is less than 'BBOBid'
    elif row['BBOAsk'] < row['BBOBid']:
        return 'Inverted'
    else:
        return ''
def identify_option_type(row):
    # Check if the 'sprdtype' is 'Vertical'
    if row['sprdtype'] == 'Vertical':
        # Check for 'C' or 'P' surrounded by digits in 'formatted_symbol'
        matches = re.findall(r'\d(C|P)\d', row['formatted_symbol'])
        if matches:
            # Take the first match and check the middle character
            if 'C' in matches[0]:
                return 'Call Vertical'
            else:
                
                return 'Put Vertical'
    # If 'sprdtype' is not 'Vertical', return the original 'sprdtype'
    return row['sprdtype']
def get_data(date):
    query = f"""
                SELECT ts as Time, underly, sprdsym as Spread, price as LastPrice, sprdtype
                FROM trdsprd
                WHERE ts >= DATE_SUB('{date}', INTERVAL 1 DAY)
                AND DAYOFWEEK(ts) BETWEEN 2 AND 6
                AND sprdtype in ('0', '5', '6')
                AND underly NOT LIKE 'QQQ%%'
                AND underly NOT LIKE 'SPY%%'
                AND underly NOT LIKE 'IWM%%'
                ORDER BY ts DESC
            """
    df = pd.read_sql_query(query, mydb)
    return df
def convert_to_new_format(option):
    if pd.isnull(option):
        return ''  # Return an empty string for NaN values

    parts = option.split('_')  # Split the spread into individual options
    new_parts = []
    for part in parts:
        # Identify the beginning of the date substring
        date_start = None
        for i in range(len(part) - 5):  # Subtract 5 to avoid running off the end of the string
            if part[i:i+2].isdigit() and part[i+2:i+4].isdigit() and part[i+4:i+6].isdigit():
                date_start = i
                break
        # If a date substring couldn't be found, treat the part as a non-option
        if date_start is None:
            new_parts.append(part)
            continue
        # Extract underlying, date, call/put, strike, and suffix
        underlying = part[:date_start]
        date = '20' + part[date_start:date_start+6]  # Convert YY to YYYY
        cp = part[date_start+6]
        strike_start = date_start + 7
        strike_end = strike_start
        for char in part[strike_start:]:
            if char.isdigit() or char == '.':
                strike_end += 1
            else:
                break
        strike = part[strike_start:strike_end]
        # Append decimal and trailing zeros if necessary
        if '.' not in strike:
            strike += '.00'
        elif len(strike.split('.')[1]) == 1:
            strike += '0'
        suffix = part[strike_end:]
        new_part = underlying + date + cp + strike + suffix
        new_parts.append(new_part)
    return '_'.join(new_parts)  # Join the options back into a spread
def get_batch_data2(df, current_date):
    # Extract relevant parameters from the dataframe
    symbols = df['underly'].unique()
    
    # Convert the current date string to a datetime.date object
    current_date_obj = datetime.strptime(current_date, '%Y%m%d').date()
    
    # Calculate the target times
    target_time_915 = datetime.combine(current_date_obj, pd.Timestamp("9:15:00").time())
    target_time_914 = datetime.combine(current_date_obj, pd.Timestamp("9:14:00").time())
    
    # Convert symbols list to a format suitable for SQL IN clause
    symbols_placeholder = ",".join(["%s"] * len(symbols))

    # Construct the query
    query = f"""
                SELECT symbol, tradets, tradevolume, tradelast
                FROM equity_trades_new
                WHERE 
                    symbol IN ({symbols_placeholder}) AND
                    (tradets BETWEEN %s AND %s)
            """
    
    cursor = connection.cursor()

    # Execute the query using parameters
    cursor.execute(query, (*symbols, target_time_914, target_time_915))
    results = cursor.fetchall()

    cursor.close()

    # Convert results to a dataframe for easier manipulation
    columns = ['Symbol', 'Timestamp', 'Volume', 'Last_Trade']
    batch_data_df = pd.DataFrame(results, columns=columns)

    return batch_data_df
def get_batch_data1(df):
    # Extract relevant parameters from the dataframe
    min_time = df['Time'].min()
    max_time = df['Time'].max()
    symbols = df['underly'].unique()

    # Convert symbols list to a format suitable for SQL IN clause
    symbols_placeholder = ",".join(["%s"] * len(symbols))

    # Construct the query
    query = f"""
                SELECT symbol, tradets, tradevolume, tradelast
                FROM equity_trades_new
                WHERE 
                    symbol IN ({symbols_placeholder}) AND
                    tradets BETWEEN %s AND %s
            """

    cursor = connection.cursor()

    # Execute the query using parameters
    cursor.execute(query, (*symbols, min_time, max_time))
    results = cursor.fetchall()

    cursor.close()

    # Convert results to a dataframe for easier manipulation
    columns = ['Symbol', 'Timestamp', 'Volume', 'Last_Trade']
    batch_data_df = pd.DataFrame(results, columns=columns)

    return batch_data_df
def assign_closest_prices(original_df, batch_data_df):
    # Create a new column for prc_eqt_at_time
    original_df = original_df.copy() 
    original_df['prc_eqt_at_time'] = None

    # Process each unique symbol
    for symbol in original_df['underly'].unique():
        # Work on a copy of the slice
        original_subset = original_df[original_df['underly'] == symbol].copy()
        batch_subset = batch_data_df[batch_data_df['Symbol'] == symbol]
        
        # If there are no matching records in batch_subset, skip this symbol
        if batch_subset.empty:
            continue

        # For each row in the original subset, find the closest timestamp in the batch subset
        for idx, row in original_subset.iterrows():
            time_diffs = (batch_subset['Timestamp'] - row['Time']).abs()
            
            # If there are no time differences, skip this row
            if time_diffs.empty:
                continue

            closest_idx = time_diffs.idxmin()
            closest_trade_price = batch_subset.loc[closest_idx, 'Last_Trade']

            # Assign the closest trade price to the prc_eqt_at_time column
            original_subset.at[idx, 'prc_eqt_at_time'] = closest_trade_price

        # Update the original DataFrame
        original_df.update(original_subset)

    return original_df
def send_email(names, subject, attachment_path):
    fromaddr = "reports@scalptrade.com"
    toaddr = names

    msg = MIMEMultipart()
    msg['From'] = fromaddr
    msg['To'] = ", ".join(toaddr)
    msg['Subject'] = '{}'.format(subject)

    # Attach the CSV file
    attachment = open(attachment_path, 'rb')
    part = MIMEBase('application', 'octet-stream')
    part.set_payload(attachment.read())
    encoders.encode_base64(part)
    part.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(attachment_path)}"')
    msg.attach(part)

    s = smtplib.SMTP('smtp.gmail.com', 587)
    s.starttls()
    s.login(fromaddr, "sc@lptrade")
    text = msg.as_string()
    s.sendmail(fromaddr, toaddr, text)
def round_to_two_decimals(df, columns):
    """Format specified columns in a dataframe to two decimal places as strings."""
    for column in columns:
        df[column] = df[column].apply(lambda x: "{:.2f}".format(float(x)) if pd.notna(x) else None)
    return df
def assign_equity_prices_at_915(original_df, batch_data_df, current_date):
    # Create a new column for price at 915
    original_df['price_at_915'] = None
    
    # Get a unique set of symbols
    unique_symbols = original_df['underly'].unique()

    # Process each unique symbol
    for symbol in unique_symbols:
        # Filter batch data for the current symbol
        batch_subset = batch_data_df[batch_data_df['Symbol'] == symbol]

        # If there are no matching records in batch_subset, skip this symbol
        if batch_subset.empty:
            continue
        
        # Convert the current_date string to a datetime.date object
        current_date_obj = datetime.strptime(current_date, '%Y%m%d').date()
        
        # Calculate the target time
        target_time = datetime.combine(current_date_obj, pd.Timestamp("9:15:00").time())
        
        time_diffs = batch_subset['Timestamp'].apply(lambda x: abs(datetime.combine(current_date_obj, x.time()) - target_time))
        closest_idx = time_diffs.idxmin()
        price_at_915 = batch_subset.loc[closest_idx, 'Last_Trade']
                
        # Assign the price to all rows with the matching symbol
        original_df.loc[original_df['underly'] == symbol, 'price_at_915'] = price_at_915
    return original_df
def generate_and_execute_sql(result_df):
    new_df = result_df[['Symbol', 'underly']].copy()
    new_df['Symbol'] = new_df['Symbol'].str.split('_')
    exploded_df = new_df.explode('Symbol', ignore_index=True)
    unique_underly_values = exploded_df['underly'].unique()

    unique_underly_str = ','.join(f"'{symbol}%'" for symbol in unique_underly_values)
    unique_underly_str = f"ARRAY[{unique_underly_str}]"

    # Database connection
    connection = psycopg2.connect(host="10.7.8.59", database="theoretical", user="scalp", password="QAtr@de442", port='5433')
    cursor = connection.cursor()

    query = f"""
        SELECT theo_rate, brents_vol, s.symbol, underlying
        FROM public.theo_rates r, public.theo_opt_sym_chain s
        WHERE r.theo_symbol_id = s.theo_symbol_id
        AND symbol LIKE ANY({unique_underly_str})
        AND r.create_time > timestamp '2023-08-28 16:00:00'
        ORDER BY strike, r.create_time DESC;
    """
    
    cursor.execute(query)
    query_results = cursor.fetchall()
    cursor.close()
    connection.close()

    df = pd.DataFrame(query_results, columns=['theo_rate', 'volatility', 'symbol', 'underlying'])

    result_dict = {}

    for index, row in df.iterrows():
        symbol = row['symbol']
        theo_rate = row['theo_rate']
        volatility = row['volatility']
    
        result_dict[symbol] = {'theo_rate': theo_rate, 'volatility': volatility}

    return result_dict
def lookup_rates_and_vols(row, rates_vol_dict):
    symbols = row['Symbol'].split('_')
    modified_symbols = [symbol[:-2] for symbol in symbols]
    
    if len(modified_symbols) != 2:
        #print(f"Skipping row with symbols: {modified_symbols}")
        return row  # or set specific columns to NaN or some default value
    
    try:
        symbol1, symbol2 = modified_symbols
        row['rate1'] = rates_vol_dict[symbol1]['theo_rate']
        row['rate2'] = rates_vol_dict[symbol2]['theo_rate']
        row['vol1'] = rates_vol_dict[symbol1]['volatility']
        row['vol2'] = rates_vol_dict[symbol2]['volatility']
    except KeyError:
        #print(f"Symbols {modified_symbols} not found in rates_vol_dict.")
        row['rate1'], row['rate2'], row['vol1'], row['vol2'] = [np.nan]*4
    
    return row
def black_scholes(S, X, t, r, q, sigma, option_type='call'):
    d1 = (np.log(S / X) + (r - q + sigma ** 2 / 2) * t) / (sigma * np.sqrt(t))
    d2 = d1 - sigma * np.sqrt(t)
    
    if option_type == 'call':
        option_price = S * np.exp(-q * t) * stats.norm.cdf(d1) - X * np.exp(-r * t) * stats.norm.cdf(d2)
    elif option_type == 'put':
        option_price = X * np.exp(-r * t) * stats.norm.cdf(-d2) - S * np.exp(-q * t) * stats.norm.cdf(-d1)
    else:
        raise ValueError("Invalid option_type. Use 'call' or 'put'")
        
    return option_price
def black_scholes_delta(S, X, t, r, q, sigma, option_type='call'):
    d1 = (np.log(S / X) + (r - q + sigma ** 2 / 2) * t) / (sigma * np.sqrt(t))
    
    if option_type == 'call':
        delta = np.exp(-q * t) * stats.norm.cdf(d1)
    elif option_type == 'put':
        delta = np.exp(-q * t) * (stats.norm.cdf(d1) - 1)
    else:
        raise ValueError("Invalid option_type. Use 'call' or 'put'")
    
    return delta
def calculate_black_scholes_and_delta(row):
    S = float(row['prc_eqt_at_time'])  # Convert to float
    X1 = float(row['Symbol'].split('_')[0].split('C')[-1].split('P')[-1][:-2])  # Convert to float
    X2 = float(row['Symbol'].split('_')[1].split('C')[-1].split('P')[-1][:-2])  # Convert to float
    t = 30 / 365  # Assuming 30 days to expiration (replace with actual value)
    r1 = row['rate1']
    r2 = row['rate2']
    q = 0  # Assuming no dividend yield
    sigma1 = row['vol1']
    sigma2 = row['vol2']
    
    bs_price1 = black_scholes(S, X1, t, r1, q, sigma1)
    bs_price2 = black_scholes(S, X2, t, r2, q, sigma2)
    
    delta1 = black_scholes_delta(S, X1, t, r1, q, sigma1)
    delta2 = black_scholes_delta(S, X2, t, r2, q, sigma2)
    
    row['BS_Price1'] = bs_price1
    row['BS_Price2'] = bs_price2
    row['Delta1'] = delta1
    row['Delta2'] = delta2
    row['Spread_Delta'] = delta1 - delta2
    
    return row
def process_put_verticals(row):
    if row['sprdtype'] == 'Put Vertical':
        # Swap BBOAsk and BBOBid
        row['BBOAsk'], row['BBOBid'] = row['BBOBid'], row['BBOAsk']
        
        # Swap BBOAskSize and BBOBidSize (assuming you have these columns)
        row['BBOAskSize'], row['BBOBidSize'] = row['BBOBidSize'], row['BBOAskSize']

        # Take absolute values of LastPrice, BBOBid, and BBOAsk
        row['LastPrice'] = abs(row['LastPrice'])
        row['BBOBid'] = abs(row['BBOBid'])
        row['BBOAsk'] = abs(row['BBOAsk'])
        
    return row
def get_data_parallel(dates):
    with ThreadPoolExecutor(max_workers=16) as executor:
        results = list(executor.map(get_data, dates))
    return pd.concat(results, ignore_index=True)
def get_batch_data1_parallel(df):
    unique_symbols = df['underly'].unique()
    with ThreadPoolExecutor(max_workers=16) as executor:
        results = list(executor.map(get_batch_data1, [df[df['underly'] == symbol] for symbol in unique_symbols]))
    return pd.concat(results, ignore_index=True)
def assign_closest_prices_parallel(original_df, batch_data_df):
    unique_symbols = original_df['underly'].unique()
    with ThreadPoolExecutor(max_workers=8) as executor:
        results = list(executor.map(assign_closest_prices, 
                                    [original_df[original_df['underly'] == symbol] for symbol in unique_symbols],
                                    [batch_data_df[batch_data_df['Symbol'] == symbol] for symbol in unique_symbols]))
    return pd.concat(results, ignore_index=True)
def generate_and_execute_sql_parallel(result_df):
    unique_underly_values = result_df['underly'].unique()
    with ThreadPoolExecutor(max_workers=16) as executor:
        results = list(executor.map(generate_and_execute_sql, [result_df[result_df['underly'] == symbol] for symbol in unique_underly_values]))
    return {k: v for d in results for k, v in d.items()}
def set1(df):
    condition1 = (
        (df['PriceDelta'] > 0) & 
        (df['sprdtype'] == 'Call Vertical') & 
        (df['OfferEdge'] > 0)
    ) | (
        (df['PriceDelta'] > 0) & 
        (df['sprdtype'] == 'Put Vertical') & 
        (df['BidEdge'] > 0)
    )
    
    condition2 = (
        (df['PriceDelta'] < 0) & 
        (df['sprdtype'] == 'Call Vertical') & 
        (df['BidEdge'] > 0)
    ) | (
        (df['PriceDelta'] < 0) & 
        (df['sprdtype'] == 'Put Vertical') & 
        (df['OfferEdge'] > 0)
    )
    
    return df[condition1 | condition2]
def set2(df):
    condition = (
        df['equity_change_pct'].abs() > 1
    ) & ((df['Spread_Delta'].abs() > .25))
    return df[condition]
def run():
    # Time Block 1
    start_time_1 = time.perf_counter()
    date = datetime.now().strftime('%Y%m%d')
    
    dtype_map = {'Symbol': str, 'BBOAsk': float, 'BBOBid': float, 'BBOBidSize': int, 'BBOAskSize': int}
    main_df = pd.read_csv(f'/home/elliott/Development/scripts/jupyter_notebooks/spreads-{date}0915.csv', usecols=['Symbol', 'BBOAsk', 'BBOBid', 'BBOBidSize', 'BBOAskSize'], dtype=dtype_map)
    main_df = main_df[(main_df['BBOBid'] != 0) | (main_df['BBOAsk'] != 0)]
    
    # Assuming convert_to_new_format is a function you have defined elsewhere
    main_df['formatted_symbol'] = main_df['Symbol'].apply(convert_to_new_format)
    
    end_time_1 = time.perf_counter()
    print(f"time1: {end_time_1 - start_time_1}")

    # Time Block 2
    start_time_2 = time.perf_counter()
    
    # Assuming get_data_parallel is a function you have defined elsewhere
    db_df = get_data_parallel([date])
    merged_df = main_df.merge(db_df, how='left', left_on='formatted_symbol', right_on='Spread')
    merged_df.dropna(inplace=True)
    
    # Assuming mapping is a dictionary you have defined elsewhere
    merged_df['sprdtype'].replace(mapping, inplace=True)
    
    # Assuming identify_option_type, identify_inverted, and process_put_verticals are functions you have defined elsewhere
    merged_df['sprdtype'] = merged_df.apply(identify_option_type, axis=1)
    merged_df['Inverted'] = merged_df.apply(identify_inverted, axis=1)
    merged_df.apply(process_put_verticals, axis=1)
    
    merged_df.sort_values(by='Time', ascending=False, inplace=True)
    merged_df.drop_duplicates(subset='formatted_symbol', keep='first', inplace=True)
    
    end_time_2 = time.perf_counter()
    print(f"time2: {end_time_2 - start_time_2}")

    start_time_3 = time.perf_counter()
    prices915_df = get_batch_data2(merged_df,date)
    assign_equity_prices_at_915(merged_df, prices915_df, date)  # This line was added
    end_time_3 = time.perf_counter()
    print(f"time3: {end_time_3 - start_time_3}")

    # Measure time for get_batch_data1
    start_time_get_batch = time.perf_counter()
    pricesattime_df = get_batch_data1_parallel(merged_df)
    end_time_get_batch = time.perf_counter()
    print(f"Time taken for get_batch_data1: {end_time_get_batch - start_time_get_batch} seconds")

    # Measure time for assign_closest_prices
    start_time_assign_closest = time.perf_counter()
    merged_prices_df = assign_closest_prices_parallel(merged_df, pricesattime_df)
    merged_prices_df['equity_change_pct'] = ((merged_prices_df['prc_eqt_at_time'] - merged_prices_df['price_at_915']) / merged_prices_df['price_at_915']) * 100
    end_time_assign_closest = time.perf_counter()
    print(f"Time taken for assign_closest_prices: {end_time_assign_closest - start_time_assign_closest} seconds")


    start_time_5 = time.perf_counter()
    column_order = ['formatted_symbol', 'Symbol', 'Inverted', 'sprdtype', 'LastPrice', 'BBOBid', 'BBOBidSize',
                    'BBOAsk', 'BBOAskSize', 'Time', 'underly', 'prc_eqt_at_time', 'price_at_915']
    result_df = (merged_prices_df.loc[:, column_order]
                .dropna(subset=['price_at_915', 'prc_eqt_at_time'])
                .assign(PriceDelta=lambda df: df['price_at_915'].astype(float) - df['prc_eqt_at_time'],
                        BidEdge=lambda df: np.where(df['BBOBid'] != 0, df['BBOBid'] - df['LastPrice'], np.nan),
                        OfferEdge=lambda df: np.where(df['BBOAsk'] != 0, df['LastPrice'] - df['BBOAsk'], np.nan))
            ).copy()
    result_df = round_to_two_decimals(result_df, columns=['BidEdge', 'OfferEdge', 'prc_eqt_at_time', 'price_at_915', 
                                                        'PriceDelta', 'LastPrice', 'BBOBid', 'BBOAsk'])
    result_df['BidEdge'] = result_df['BidEdge'].astype(float)
    result_df['OfferEdge'] = result_df['OfferEdge'].astype(float)
    result_df = result_df.loc[(result_df['BidEdge'] < -0.20) | (result_df['OfferEdge'] < -0.20)]
    end_time_5 = time.perf_counter()
    print(f"time5: {end_time_5 - start_time_5}")

    start_time_6_1 = time.perf_counter()
    vol_rates_dict = generate_and_execute_sql_parallel(result_df)
    end_time_6_1 = time.perf_counter()
    print(f"Time taken to generate_and_execute_sql_parallel: {end_time_6_1 - start_time_6_1} seconds")

    start_time_6_2 = time.perf_counter()
    result_df = result_df.apply(lookup_rates_and_vols, axis=1, args=(vol_rates_dict,))
    result_df.dropna(subset=['rate1'], inplace=True)
    result_df = result_df.apply(calculate_black_scholes_and_delta, axis=1)
    end_time_6_2 = time.perf_counter()
    print(f"time6.2: {end_time_6_2 - start_time_6_2}")
    

    start_time_7 = time.perf_counter()
    result_df.drop(columns=["Symbol"], inplace=True)
    result_df['PriceDelta'] = pd.to_numeric(result_df['PriceDelta'], errors='coerce')
    result_df['OfferEdge'] = pd.to_numeric(result_df['OfferEdge'], errors='coerce')
    result_df['BidEdge'] = pd.to_numeric(result_df['BidEdge'], errors='coerce')
    verticals_df = result_df[result_df['sprdtype'].str.contains("Vertical")].copy()
    verticals_df['prc_eqt_at_time'] = pd.to_numeric(verticals_df['prc_eqt_at_time'], errors='coerce')
    verticals_df['price_at_915'] = pd.to_numeric(verticals_df['price_at_915'], errors='coerce')
    verticals_df['equity_change_pct'] = ((verticals_df['price_at_915'] - verticals_df['prc_eqt_at_time']) / verticals_df['prc_eqt_at_time']) * 100
    verticals_df.drop(columns=['rate1', 'rate2', 'vol1', 'vol2', 'BS_Price1', 'BS_Price2'], inplace=True)
    verticals_df.to_csv(f"verticals_{date}.csv")
    verticles_set1 = set1(verticals_df)
    verticles_set1.to_csv(f"verticles_set_1_{date}.csv")
    verticles_set2 = set2(verticals_df)
    verticles_set2.to_csv(f"verticles_set_2_{date}.csv")
    verticles_set3 = verticals_df[(verticals_df['BidEdge'] > 0.50) | (verticals_df['OfferEdge'] > 0.50)]
    verticles_set3.to_csv(f"verticles_set_3_{date}.csv")
    straddle_df = result_df[result_df['sprdtype'].str.contains("Straddle")].copy()
    straddle_df.to_csv(f"straddles_{date}.csv")
    strangle_df = result_df[result_df['sprdtype'].str.contains("Strangle")].copy()
    strangle_df.to_csv(f"strangles_{date}.csv")
    end_time_7 = time.perf_counter()
    print(f"time7: {end_time_7 - start_time_7}")

    start_time_8 = time.perf_counter()
    zip_filename = f"e.watchlist_{date}.zip"
    straddle_csv = f"straddles_{date}.csv"
    vertical_csv = f"verticals_{date}.csv"
    verticles_set1 = f"verticles_set_1_{date}.csv"
    verticles_set2 = f"verticles_set_2_{date}.csv"
    verticles_set3 = f"verticles_set_3_{date}.csv"
    strangles_csv = f"strangles_{date}.csv"

    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zip_file:
        zip_file.write(straddle_csv, arcname=os.path.basename(straddle_csv))
        zip_file.write(vertical_csv, arcname=os.path.basename(vertical_csv))
        zip_file.write(verticles_set1, arcname=os.path.basename(verticles_set1))
        zip_file.write(verticles_set2, arcname=os.path.basename(verticles_set2))
        zip_file.write(verticles_set3, arcname=os.path.basename(verticles_set3))
        zip_file.write(strangles_csv, arcname=os.path.basename(strangles_csv))
    
    end_time_8 = time.perf_counter()
    print(f"time8: {end_time_8 - start_time_8}")

    #Send the zip archive via email
    # send_email(['ewashington@scalptrade.com', 'sleland@scalptrade.com', 'aiacullo@scalptrade.com', 'jfeng@scalptrade.com', 'jthakkar@scalptrade.com ', 'jwood@scalptrade.com'], 
    #         'Open Rotation Watchlist', 
    #         attachment_path=zip_filename)

if __name__ == "__main__":
    start = time.perf_counter()
    run()
    end = time.perf_counter()
    print(f"Total runtime: {end - start}")

time1: 0.7300206760410219
time2: 11.038635996053927
time3: 19.872425372013822


KeyboardInterrupt: 

In [20]:
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
from datetime import datetime
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import re
import numpy as np
import psycopg2
from scipy.stats import norm
import time
import scipy.stats as stats
from sqlalchemy import create_engine
import zipfile

connection = psycopg2.connect(host="10.5.1.20", database="marketdata", user="elliott", password="scalp", port='5432')
connection2 = psycopg2.connect(host="10.7.8.59", database="theoretical", user="scalp", password="QAtr@de442", port='5433')

m_root = 'RBadmin'
m_password = '$Calp123'
m_host = '10.5.1.32'
m_db = 'rbandits2'
uri = f"mysql+mysqldb://{m_root}:{m_password}@{m_host}/{m_db}"
mydb = create_engine(uri, connect_args={'ssl_mode': 'DISABLED'})
mapping = {0: "Vertical",5: "Straddle", 6: "Strangle"}

def identify_inverted(row):
    # Ignore the rows where either 'BBOAsk' or 'BBOBid' is zero
    if row['BBOAsk'] == 0 or row['BBOBid'] == 0:
        return ''
    # Check if 'BBOAsk' is less than 'BBOBid'
    elif row['BBOAsk'] < row['BBOBid']:
        return 'Inverted'
    else:
        return ''
def identify_option_type(row):
    # Check if the 'sprdtype' is 'Vertical'
    if row['sprdtype'] == 'Vertical':
        # Check for 'C' or 'P' surrounded by digits in 'formatted_symbol'
        matches = re.findall(r'\d(C|P)\d', row['formatted_symbol'])
        if matches:
            # Take the first match and check the middle character
            if 'C' in matches[0]:
                return 'Call Vertical'
            else:
                
                return 'Put Vertical'
    # If 'sprdtype' is not 'Vertical', return the original 'sprdtype'
    return row['sprdtype']
def get_data(date):
    query = f"""
                SELECT ts as Time, underly, sprdsym as Spread, price as LastPrice, sprdtype
                FROM trdsprd
                WHERE ts >= DATE_SUB('{date}', INTERVAL 1 DAY)
                AND DAYOFWEEK(ts) BETWEEN 2 AND 6
                AND sprdtype in ('0', '5', '6')
                AND underly NOT LIKE 'QQQ%%'
                AND underly NOT LIKE 'SPY%%'
                AND underly NOT LIKE 'IWM%%'
                ORDER BY ts DESC
            """
    df = pd.read_sql_query(query, mydb)
    return df
def convert_to_new_format(option):
    if pd.isnull(option):
        return ''  # Return an empty string for NaN values

    parts = option.split('_')  # Split the spread into individual options
    new_parts = []
    for part in parts:
        # Identify the beginning of the date substring
        date_start = None
        for i in range(len(part) - 5):  # Subtract 5 to avoid running off the end of the string
            if part[i:i+2].isdigit() and part[i+2:i+4].isdigit() and part[i+4:i+6].isdigit():
                date_start = i
                break
        # If a date substring couldn't be found, treat the part as a non-option
        if date_start is None:
            new_parts.append(part)
            continue
        # Extract underlying, date, call/put, strike, and suffix
        underlying = part[:date_start]
        date = '20' + part[date_start:date_start+6]  # Convert YY to YYYY
        cp = part[date_start+6]
        strike_start = date_start + 7
        strike_end = strike_start
        for char in part[strike_start:]:
            if char.isdigit() or char == '.':
                strike_end += 1
            else:
                break
        strike = part[strike_start:strike_end]
        # Append decimal and trailing zeros if necessary
        if '.' not in strike:
            strike += '.00'
        elif len(strike.split('.')[1]) == 1:
            strike += '0'
        suffix = part[strike_end:]
        new_part = underlying + date + cp + strike + suffix
        new_parts.append(new_part)
    return '_'.join(new_parts)  # Join the options back into a spread
def get_batch_data2(df, current_date):
    # Extract relevant parameters from the dataframe
    symbols = df['underly'].unique()
    
    # Convert the current date string to a datetime.date object
    current_date_obj = datetime.strptime(current_date, '%Y%m%d').date()
    
    # Calculate the target times
    target_time_915 = datetime.combine(current_date_obj, pd.Timestamp("9:15:00").time())
    target_time_914 = datetime.combine(current_date_obj, pd.Timestamp("9:14:00").time())
    
    # Convert symbols list to a format suitable for SQL IN clause
    symbols_placeholder = ",".join(["%s"] * len(symbols))

    # Construct the query
    query = f"""
                SELECT symbol, tradets, tradevolume, tradelast
                FROM equity_trades_new
                WHERE 
                    symbol IN ({symbols_placeholder}) AND
                    (tradets BETWEEN %s AND %s)
            """
    
    cursor = connection.cursor()

    # Execute the query using parameters
    cursor.execute(query, (*symbols, target_time_914, target_time_915))
    results = cursor.fetchall()

    cursor.close()

    # Convert results to a dataframe for easier manipulation
    columns = ['Symbol', 'Timestamp', 'Volume', 'Last_Trade']
    batch_data_df = pd.DataFrame.from_records(results, columns=columns)

    return batch_data_df
def get_batch_data1(df):
    # Extract relevant parameters from the dataframe
    min_time = df['Time'].min()
    max_time = df['Time'].max()
    symbols = df['underly'].unique()

    # Convert symbols list to a format suitable for SQL IN clause
    symbols_placeholder = ",".join(["%s"] * len(symbols))

    # Construct the query
    query = f"""
                SELECT symbol, tradets, tradevolume, tradelast
                FROM equity_trades_new
                WHERE 
                    symbol IN ({symbols_placeholder}) AND
                    tradets BETWEEN %s AND %s
            """

    cursor = connection.cursor()

    # Execute the query using parameters
    cursor.execute(query, (*symbols, min_time, max_time))
    results = cursor.fetchall()

    cursor.close()

    # Convert results to a dataframe for easier manipulation
    columns = ['Symbol', 'Timestamp', 'Volume', 'Last_Trade']
    batch_data_df = pd.DataFrame.from_records(results, columns=columns)

    return batch_data_df
def assign_closest_prices(original_df, batch_data_df):
    # Create a new column for prc_eqt_at_time
    original_df = original_df.copy() 
    original_df['prc_eqt_at_time'] = None

    # Process each unique symbol
    for symbol in original_df['underly'].unique():
        # Work on a copy of the slice
        original_subset = original_df[original_df['underly'] == symbol].copy()
        batch_subset = batch_data_df[batch_data_df['Symbol'] == symbol]
        
        # If there are no matching records in batch_subset, skip this symbol
        if batch_subset.empty:
            continue

        # For each row in the original subset, find the closest timestamp in the batch subset
        for idx, row in original_subset.iterrows():
            time_diffs = (batch_subset['Timestamp'] - row['Time']).abs()
            
            # If there are no time differences, skip this row
            if time_diffs.empty:
                continue

            closest_idx = time_diffs.idxmin()
            closest_trade_price = batch_subset.loc[closest_idx, 'Last_Trade']

            # Assign the closest trade price to the prc_eqt_at_time column
            original_subset.at[idx, 'prc_eqt_at_time'] = closest_trade_price

        # Update the original DataFrame
        original_df.update(original_subset)

    return original_df
def send_email(names, subject, attachment_path):
    fromaddr = "reports@scalptrade.com"
    toaddr = names

    msg = MIMEMultipart()
    msg['From'] = fromaddr
    msg['To'] = ", ".join(toaddr)
    msg['Subject'] = '{}'.format(subject)

    # Attach the CSV file
    attachment = open(attachment_path, 'rb')
    part = MIMEBase('application', 'octet-stream')
    part.set_payload(attachment.read())
    encoders.encode_base64(part)
    part.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(attachment_path)}"')
    msg.attach(part)

    s = smtplib.SMTP('smtp.gmail.com', 587)
    s.starttls()
    s.login(fromaddr, "sc@lptrade")
    text = msg.as_string()
    s.sendmail(fromaddr, toaddr, text)
def round_to_two_decimals(df, columns):
    """Format specified columns in a dataframe to two decimal places as strings."""
    for column in columns:
        df[column] = df[column].apply(lambda x: "{:.2f}".format(float(x)) if pd.notna(x) else None)
    return df
def assign_equity_prices_at_915(original_df, batch_data_df, current_date):
    # Create a new column for price at 915
    original_df['price_at_915'] = None
    
    # Get a unique set of symbols
    unique_symbols = original_df['underly'].unique()

    # Process each unique symbol
    for symbol in unique_symbols:
        # Filter batch data for the current symbol
        batch_subset = batch_data_df[batch_data_df['Symbol'] == symbol]

        # If there are no matching records in batch_subset, skip this symbol
        if batch_subset.empty:
            continue
        
        # Convert the current_date string to a datetime.date object
        current_date_obj = datetime.strptime(current_date, '%Y%m%d').date()
        
        # Calculate the target time
        target_time = datetime.combine(current_date_obj, pd.Timestamp("9:15:00").time())
        
        time_diffs = batch_subset['Timestamp'].apply(lambda x: abs(datetime.combine(current_date_obj, x.time()) - target_time))
        closest_idx = time_diffs.idxmin()
        price_at_915 = batch_subset.loc[closest_idx, 'Last_Trade']
                
        # Assign the price to all rows with the matching symbol
        original_df.loc[original_df['underly'] == symbol, 'price_at_915'] = price_at_915
    return original_df
def generate_and_execute_sql(result_df):
    new_df = result_df[['Symbol', 'underly']].copy()
    new_df['Symbol'] = new_df['Symbol'].str.split('_')
    exploded_df = new_df.explode('Symbol', ignore_index=True)
    unique_underly_values = exploded_df['underly'].unique()

    unique_underly_str = ','.join(f"'{symbol}%'" for symbol in unique_underly_values)
    unique_underly_str = f"ARRAY[{unique_underly_str}]"

    # Database connection
    connection = psycopg2.connect(host="10.7.8.59", database="theoretical", user="scalp", password="QAtr@de442", port='5433')
    cursor = connection.cursor()

    query = f"""
        SELECT theo_rate, brents_vol, s.symbol, underlying
        FROM public.theo_rates r, public.theo_opt_sym_chain s
        WHERE r.theo_symbol_id = s.theo_symbol_id
        AND symbol LIKE ANY({unique_underly_str})
        AND r.create_time > timestamp '2023-08-28 16:00:00'
        ORDER BY strike, r.create_time DESC;
    """
    
    cursor.execute(query)
    query_results = cursor.fetchall()
    cursor.close()
    connection.close()

    df = pd.DataFrame(query_results, columns=['theo_rate', 'volatility', 'symbol', 'underlying'])

    result_dict = {}

    for index, row in df.iterrows():
        symbol = row['symbol']
        theo_rate = row['theo_rate']
        volatility = row['volatility']
    
        result_dict[symbol] = {'theo_rate': theo_rate, 'volatility': volatility}

    return result_dict
def lookup_rates_and_vols(row, rates_vol_dict):
    symbols = row['Symbol'].split('_')
    modified_symbols = [symbol[:-2] for symbol in symbols]
    
    if len(modified_symbols) != 2:
        #print(f"Skipping row with symbols: {modified_symbols}")
        return row  # or set specific columns to NaN or some default value
    
    try:
        symbol1, symbol2 = modified_symbols
        row['rate1'] = rates_vol_dict[symbol1]['theo_rate']
        row['rate2'] = rates_vol_dict[symbol2]['theo_rate']
        row['vol1'] = rates_vol_dict[symbol1]['volatility']
        row['vol2'] = rates_vol_dict[symbol2]['volatility']
    except KeyError:
        #print(f"Symbols {modified_symbols} not found in rates_vol_dict.")
        row['rate1'], row['rate2'], row['vol1'], row['vol2'] = [np.nan]*4
    
    return row
def black_scholes(S, X, t, r, q, sigma, option_type='call'):
    d1 = (np.log(S / X) + (r - q + sigma ** 2 / 2) * t) / (sigma * np.sqrt(t))
    d2 = d1 - sigma * np.sqrt(t)
    
    if option_type == 'call':
        option_price = S * np.exp(-q * t) * stats.norm.cdf(d1) - X * np.exp(-r * t) * stats.norm.cdf(d2)
    elif option_type == 'put':
        option_price = X * np.exp(-r * t) * stats.norm.cdf(-d2) - S * np.exp(-q * t) * stats.norm.cdf(-d1)
    else:
        raise ValueError("Invalid option_type. Use 'call' or 'put'")
        
    return option_price
def black_scholes_delta(S, X, t, r, q, sigma, option_type='call'):
    d1 = (np.log(S / X) + (r - q + sigma ** 2 / 2) * t) / (sigma * np.sqrt(t))
    
    if option_type == 'call':
        delta = np.exp(-q * t) * stats.norm.cdf(d1)
    elif option_type == 'put':
        delta = np.exp(-q * t) * (stats.norm.cdf(d1) - 1)
    else:
        raise ValueError("Invalid option_type. Use 'call' or 'put'")
    
    return delta
def calculate_black_scholes_and_delta(row):
    S = float(row['prc_eqt_at_time'])  # Convert to float
    X1 = float(row['Symbol'].split('_')[0].split('C')[-1].split('P')[-1][:-2])  # Convert to float
    X2 = float(row['Symbol'].split('_')[1].split('C')[-1].split('P')[-1][:-2])  # Convert to float
    t = 30 / 365  # Assuming 30 days to expiration (replace with actual value)
    r1 = row['rate1']
    r2 = row['rate2']
    q = 0  # Assuming no dividend yield
    sigma1 = row['vol1']
    sigma2 = row['vol2']
    
    bs_price1 = black_scholes(S, X1, t, r1, q, sigma1)
    bs_price2 = black_scholes(S, X2, t, r2, q, sigma2)
    
    delta1 = black_scholes_delta(S, X1, t, r1, q, sigma1)
    delta2 = black_scholes_delta(S, X2, t, r2, q, sigma2)
    
    row['BS_Price1'] = bs_price1
    row['BS_Price2'] = bs_price2
    row['Delta1'] = delta1
    row['Delta2'] = delta2
    row['Spread_Delta'] = delta1 - delta2
    
    return row
def process_put_verticals(row):
    if row['sprdtype'] == 'Put Vertical':
        # Swap BBOAsk and BBOBid
        row['BBOAsk'], row['BBOBid'] = row['BBOBid'], row['BBOAsk']
        
        # Swap BBOAskSize and BBOBidSize (assuming you have these columns)
        row['BBOAskSize'], row['BBOBidSize'] = row['BBOBidSize'], row['BBOAskSize']

        # Take absolute values of LastPrice, BBOBid, and BBOAsk
        row['LastPrice'] = abs(row['LastPrice'])
        row['BBOBid'] = abs(row['BBOBid'])
        row['BBOAsk'] = abs(row['BBOAsk'])
        
    return row
def get_data_parallel(dates):
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(get_data, dates))
    return pd.concat(results, ignore_index=True)
def get_batch_data1_parallel(df):
    unique_symbols = df['underly'].unique()
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(get_batch_data1, [df[df['underly'] == symbol] for symbol in unique_symbols]))
    return pd.concat(results, ignore_index=True)
def assign_closest_prices_parallel(original_df, batch_data_df):
    unique_symbols = original_df['underly'].unique()
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(assign_closest_prices, 
                                    [original_df[original_df['underly'] == symbol] for symbol in unique_symbols],
                                    [batch_data_df[batch_data_df['Symbol'] == symbol] for symbol in unique_symbols]))
    return pd.concat(results, ignore_index=True)
def generate_and_execute_sql_parallel(result_df):
    unique_underly_values = result_df['underly'].unique()
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(generate_and_execute_sql, [result_df[result_df['underly'] == symbol] for symbol in unique_underly_values]))
    return {k: v for d in results for k, v in d.items()}
def set1(df):
    condition1 = (
        (df['PriceDelta'] > 0) & 
        (df['sprdtype'] == 'Call Vertical') & 
        (df['OfferEdge'] > 0)
    ) | (
        (df['PriceDelta'] > 0) & 
        (df['sprdtype'] == 'Put Vertical') & 
        (df['BidEdge'] > 0)
    )
    
    condition2 = (
        (df['PriceDelta'] < 0) & 
        (df['sprdtype'] == 'Call Vertical') & 
        (df['BidEdge'] > 0)
    ) | (
        (df['PriceDelta'] < 0) & 
        (df['sprdtype'] == 'Put Vertical') & 
        (df['OfferEdge'] > 0)
    )
    
    return df[condition1 | condition2]
def set2(df):
    condition = (
        df['equity_change_pct'].abs() > 1
    ) & ((df['Spread_Delta'].abs() > .25))
    return df[condition]
def run():
    # Time Block 1
    start_time_1 = time.perf_counter()
    date = datetime.now().strftime('%Y%m%d')
    
    dtype_map = {'Symbol': str, 'BBOAsk': float, 'BBOBid': float, 'BBOBidSize': int, 'BBOAskSize': int}
    main_df = pd.read_csv(f'/home/elliott/Development/scripts/jupyter_notebooks/spreads-{date}0915.csv', usecols=['Symbol', 'BBOAsk', 'BBOBid', 'BBOBidSize', 'BBOAskSize'], dtype=dtype_map)
    main_df = main_df[(main_df['BBOBid'] != 0) | (main_df['BBOAsk'] != 0)]
    
    # Assuming convert_to_new_format is a function you have defined elsewhere
    main_df['formatted_symbol'] = main_df['Symbol'].apply(convert_to_new_format)
    
    end_time_1 = time.perf_counter()
    print(f"time1: {end_time_1 - start_time_1}")

    # Time Block 2
    start_time_2 = time.perf_counter()
    
    # Assuming get_data_parallel is a function you have defined elsewhere
    db_df = get_data_parallel([date])
    merged_df = main_df.merge(db_df, how='left', left_on='formatted_symbol', right_on='Spread')
    merged_df.dropna(inplace=True)
    
    # Assuming mapping is a dictionary you have defined elsewhere
    merged_df['sprdtype'].replace(mapping, inplace=True)
    
    # Assuming identify_option_type, identify_inverted, and process_put_verticals are functions you have defined elsewhere
    merged_df['sprdtype'] = merged_df.apply(identify_option_type, axis=1)
    merged_df['Inverted'] = merged_df.apply(identify_inverted, axis=1)
    merged_df.apply(process_put_verticals, axis=1)
    
    merged_df.sort_values(by='Time', ascending=False, inplace=True)
    merged_df.drop_duplicates(subset='formatted_symbol', keep='first', inplace=True)
    
    end_time_2 = time.perf_counter()
    print(f"time2: {end_time_2 - start_time_2}")

    start_time_3 = time.perf_counter()
    prices915_df = get_batch_data2(merged_df,date)
    assign_equity_prices_at_915(merged_df, prices915_df, date)  # This line was added
    end_time_3 = time.perf_counter()
    print(f"time3: {end_time_3 - start_time_3}")

    # Measure time for get_batch_data1
    start_time_get_batch = time.perf_counter()
    pricesattime_df = get_batch_data1_parallel(merged_df)
    end_time_get_batch = time.perf_counter()
    print(f"Time taken for get_batch_data1: {end_time_get_batch - start_time_get_batch} seconds")

    # Measure time for assign_closest_prices
    start_time_assign_closest = time.perf_counter()
    merged_prices_df = assign_closest_prices_parallel(merged_df, pricesattime_df)
    merged_prices_df['equity_change_pct'] = ((merged_prices_df['prc_eqt_at_time'] - merged_prices_df['price_at_915']) / merged_prices_df['price_at_915']) * 100
    end_time_assign_closest = time.perf_counter()
    print(f"Time taken for assign_closest_prices: {end_time_assign_closest - start_time_assign_closest} seconds")


    start_time_5 = time.perf_counter()
    column_order = ['formatted_symbol', 'Symbol', 'Inverted', 'sprdtype', 'LastPrice', 'BBOBid', 'BBOBidSize',
                    'BBOAsk', 'BBOAskSize', 'Time', 'underly', 'prc_eqt_at_time', 'price_at_915']
    result_df = (merged_prices_df.loc[:, column_order]
                .dropna(subset=['price_at_915', 'prc_eqt_at_time'])
                .assign(PriceDelta=lambda df: df['price_at_915'].astype(float) - df['prc_eqt_at_time'],
                        BidEdge=lambda df: np.where(df['BBOBid'] != 0, df['BBOBid'] - df['LastPrice'], np.nan),
                        OfferEdge=lambda df: np.where(df['BBOAsk'] != 0, df['LastPrice'] - df['BBOAsk'], np.nan))
            ).copy()
    result_df = round_to_two_decimals(result_df, columns=['BidEdge', 'OfferEdge', 'prc_eqt_at_time', 'price_at_915', 
                                                        'PriceDelta', 'LastPrice', 'BBOBid', 'BBOAsk'])
    result_df['BidEdge'] = result_df['BidEdge'].astype(float)
    result_df['OfferEdge'] = result_df['OfferEdge'].astype(float)
    result_df = result_df.loc[(result_df['BidEdge'] < -0.20) | (result_df['OfferEdge'] < -0.20)]
    end_time_5 = time.perf_counter()
    print(f"time5: {end_time_5 - start_time_5}")

    start_time_6_1 = time.perf_counter()
    vol_rates_dict = generate_and_execute_sql_parallel(result_df)
    end_time_6_1 = time.perf_counter()
    print(f"Time taken to generate_and_execute_sql_parallel: {end_time_6_1 - start_time_6_1} seconds")

    start_time_6_2 = time.perf_counter()
    result_df = result_df.apply(lookup_rates_and_vols, axis=1, args=(vol_rates_dict,))
    result_df.dropna(subset=['rate1'], inplace=True)
    result_df = result_df.apply(calculate_black_scholes_and_delta, axis=1)
    end_time_6_2 = time.perf_counter()
    print(f"time6.2: {end_time_6_2 - start_time_6_2}")
    

    start_time_7 = time.perf_counter()
    result_df.drop(columns=["Symbol"], inplace=True)
    result_df['PriceDelta'] = pd.to_numeric(result_df['PriceDelta'], errors='coerce')
    result_df['OfferEdge'] = pd.to_numeric(result_df['OfferEdge'], errors='coerce')
    result_df['BidEdge'] = pd.to_numeric(result_df['BidEdge'], errors='coerce')
    verticals_df = result_df[result_df['sprdtype'].str.contains("Vertical")].copy()
    verticals_df['prc_eqt_at_time'] = pd.to_numeric(verticals_df['prc_eqt_at_time'], errors='coerce')
    verticals_df['price_at_915'] = pd.to_numeric(verticals_df['price_at_915'], errors='coerce')
    verticals_df['equity_change_pct'] = ((verticals_df['price_at_915'] - verticals_df['prc_eqt_at_time']) / verticals_df['prc_eqt_at_time']) * 100
    verticals_df.drop(columns=['rate1', 'rate2', 'vol1', 'vol2', 'BS_Price1', 'BS_Price2'], inplace=True)
    verticals_df.to_csv(f"verticals_{date}.csv")
    verticles_set1 = set1(verticals_df)
    verticles_set1.to_csv(f"verticles_set_1_{date}.csv")
    verticles_set2 = set2(verticals_df)
    verticles_set2.to_csv(f"verticles_set_2_{date}.csv")
    verticles_set3 = verticals_df[(verticals_df['BidEdge'] > 0.50) | (verticals_df['OfferEdge'] > 0.50)]
    verticles_set3.to_csv(f"verticles_set_3_{date}.csv")
    straddle_df = result_df[result_df['sprdtype'].str.contains("Straddle")].copy()
    straddle_df.to_csv(f"straddles_{date}.csv")
    strangle_df = result_df[result_df['sprdtype'].str.contains("Strangle")].copy()
    strangle_df.to_csv(f"strangles_{date}.csv")
    end_time_7 = time.perf_counter()
    print(f"time7: {end_time_7 - start_time_7}")

    start_time_8 = time.perf_counter()
    zip_filename = f"e.watchlist_{date}.zip"
    straddle_csv = f"straddles_{date}.csv"
    vertical_csv = f"verticals_{date}.csv"
    verticles_set1 = f"verticles_set_1_{date}.csv"
    verticles_set2 = f"verticles_set_2_{date}.csv"
    verticles_set3 = f"verticles_set_3_{date}.csv"
    strangles_csv = f"strangles_{date}.csv"

    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zip_file:
        zip_file.write(straddle_csv, arcname=os.path.basename(straddle_csv))
        zip_file.write(vertical_csv, arcname=os.path.basename(vertical_csv))
        zip_file.write(verticles_set1, arcname=os.path.basename(verticles_set1))
        zip_file.write(verticles_set2, arcname=os.path.basename(verticles_set2))
        zip_file.write(verticles_set3, arcname=os.path.basename(verticles_set3))
        zip_file.write(strangles_csv, arcname=os.path.basename(strangles_csv))
    
    end_time_8 = time.perf_counter()
    print(f"time8: {end_time_8 - start_time_8}")

    #Send the zip archive via email
    # send_email(['ewashington@scalptrade.com', 'sleland@scalptrade.com', 'aiacullo@scalptrade.com', 'jfeng@scalptrade.com', 'jthakkar@scalptrade.com ', 'jwood@scalptrade.com'], 
    #         'Open Rotation Watchlist', 
    #         attachment_path=zip_filename)

if __name__ == "__main__":
    start = time.perf_counter()
    run()
    end = time.perf_counter()
    print(f"Total runtime: {end - start}")

time1: 0.8111178110120818
time2: 10.887649960001
time3: 32.07651446701493
Time taken for get_batch_data1: 59.351601448026486 seconds
Time taken for assign_closest_prices: 30.249853296088986 seconds
time5: 0.03559299104381353
Time taken to generate_and_execute_sql_parallel: 71.40892083500512 seconds
time6.2: 9.475634948932566
time7: 0.06371712498366833
time8: 0.024298794101923704
Total runtime: 214.4538668569876
