In [14]:
import sys
#import os
import pickle
import pandas as pd
import numpy as np
from pyarrow import feather
from scipy import sparse
from queue import Queue
from tqdm import tqdm
import itertools
#import concurrent
import multiprocessing

In [15]:
df_snap = feather.read_feather("local_snapshot_us_equity")
news_df = feather.read_feather("local_us_equity_news")

In [16]:
ticker_arr = df_snap.ticker.to_numpy() # us equity tickers
del df_snap
ticker_arr.sort()
valid_ticker_set = set(ticker_arr)
n = len(ticker_arr)
ticker_maping = {k: v for v, k in enumerate(ticker_arr)} # dictionary allows for quick lookups of the integer index corresponding to a particular ticker symbol.

In [22]:
%%time
news_lc = news_df.tickers.apply(lambda tickers: np.asarray(set(ticker_maping[tick] for tick in tickers if tick in valid_ticker_set))).values

CPU times: user 10.7 s, sys: 168 ms, total: 10.9 s
Wall time: 10.9 s


In [17]:
knn_ticker = np.zeros((n,n))
def fill_matrix(A):
    for sublist in tqdm(A):
        lc = np.asarray(itertools.combinations(sublist, 2))
        for ele in lc:
            knn_ticker[ele[0],ele[1]]+=1

In [18]:
%%time
fill_matrix(news_lc)

100%|██████████████████████████████████████████████████████████████████████████████████████████████████| 819163/819163 [31:28<00:00, 433.80it/s]

CPU times: user 31min 25s, sys: 2.89 s, total: 31min 28s
Wall time: 31min 28s





In [22]:
np.save('knn_ticker.npy', knn_ticker)

In [23]:
knn_ticker.nbytes/1024/1024

1124.6023635864258

In [2]:
knn_ticker = np.load('knn_ticker.npy')

In [28]:
num_cores = multiprocessing.cpu_count()
max_workers = 2 * num_cores

def fill_array(knn_ticker_flat, news_lc):
    for sublist in tqdm(news_lc):
        lc = list(itertools.combinations(sublist, 2))
        for ele in lc:
            knn_ticker_flat[ele[0]*n+ele[1]] += 1

In [29]:
%%time
with multiprocessing.Manager() as manager:
    knn_ticker_flat = manager.list([0] * (n * n))
    process_list = []
    chunk_size = len(news_lc)//max_workers
    for i in range(max_workers):
        if i == max_workers-1:
            chunk = news_lc[i*chunk_size:]
        else:
            chunk = news_lc[i*chunk_size:(i+1)*chunk_size]
        p = multiprocessing.Process(target=fill_array, args=(knn_ticker_flat, chunk))
        process_list.append(p)
        p.start()

    for p in tqdm(process_list):
        p.join()

  0%|                                                                                                                    | 0/24 [43:28<?, ?it/s]Process Process-17:
Process Process-10:
Process Process-5:
Process Process-12:
Process Process-21:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/peterzerg/miniconda3/envs/rapids-22.12/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/peterzerg/miniconda3/envs/rapids-22.12/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
Process Process-14:
  File "/home/peterzerg/miniconda3/envs/rapids-22.12/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/peterzerg/miniconda3/envs/rapids-22.12/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run(

KeyboardInterrupt: 

In [None]:
#ticker_pairs = news_tickers.apply(lambda tickers: set(itertools.combinations(tickers,2)))# generate ticker pairs

In [None]:
all_news_ticker_pairs = np.concatenate([np.array(list(set_element)) for set_element in ticker_pairs if set_element])# flatten to a big numpy array of tuples

In [None]:
df = pd.DataFrame(all_news_ticker_pairs, columns=['t0', 't1'])

In [None]:
df["t0_index"] = df.t0.apply(lambda ticker: ticker_maping[ticker])
df["t1_index"] = df.t1.apply(lambda ticker: ticker_maping[ticker])

In [None]:
knn_ticker = np.zeros((n,n))
grouped = df.groupby(['t0_index', 't1_index']).size().reset_index(name='count')
row_indices = grouped['t0_index'].to_numpy()
col_indices = grouped['t1_index'].to_numpy()
counts = grouped['count'].to_numpy()
knn_ticker[row_indices, col_indices] = counts

In [None]:
d = {v: k for k, v in ticker_maping.items()}
knn_news_df.rename(columns=d, inplace=True)
knn_news_df.index = d.values()

In [None]:
knn_news_df

In [16]:
batch = target_arr
ref = ticker_arr
q = Queue()
num_cores = multiprocessing.cpu_count()
max_workers = 2 * num_cores
print("max_workers: ",max_workers)

max_workers:  24


In [18]:
# same as valid_batch = np.array([array[np.isin(array, ref)] for array in tqdm(batch) if np.any(np.isin(array, ref))]) but save 50% time
# estimated runtime for 5000 news is 2 mins on windows conda, same for ubuntu wsl2
# the tqdm only displayed half of the time taken for 5000 news
%%time
def get_valid_array(array):
    return array[np.isin(array, ref)]

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    valid_batch0 = np.array(list(tqdm(executor.map(get_valid_array, batch), total=len(batch))), dtype=object)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████| 5000/5000 [01:08<00:00, 72.64it/s]

CPU times: user 1min 48s, sys: 12.2 s, total: 2min
Wall time: 1min 54s





In [26]:
#check = np.array([np.all(np.equal(valdi_batch, valdi_batch0)) for valdi_batch, valdi_batch0 in zip(valid_batch, valid_batch0)])
#print(np.all(check))

True


In [17]:
%%time
# estimated runtime for 5000 news is 1min (ubuntu 37s), faster than the combo_gen(x) which costs 2min20s
for array in tqdm(valid_batch):
    for combo in itertools.combinations(array, 2):
        q.put(combo)
print("queue size: ",q.qsize())

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████| 5000/5000 [00:36<00:00, 136.07it/s]

queue size:  22577985
CPU times: user 36 s, sys: 729 ms, total: 36.8 s
Wall time: 36.8 s





In [14]:
# Convert the queue to a DataFrame
df_q = pd.DataFrame(list(q.queue), columns=['t0', 't1'])

# Write the DataFrame to a feather file
feather.write_feather(df_q, "queue.feather")

In [15]:
def df_modify(tuple_pair, df, ref_map):
    """
    modify the dataframe df that increase the count by 1 if 2 tickers were mentioned in the same news article
    no return, modify the dataframe directly
    """
    df[ref_map[tuple_pair[0]], ref_map[tuple_pair[1]]] += 1

In [None]:
%%time
df = knn_ticker
ref_map = ticker_maping

def process_tuple(tuple_pair, df, ref_map):
    df_modify(tuple_pair, df, ref_map)

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = [executor.submit(process_tuple, tuple_pair, df, ref_map) for tuple_pair in tqdm(list(q.queue))]
    concurrent.futures.wait(futures)

In [41]:
knn_df = pd.DataFrame.sparse.from_spmatrix(knn_ticker, columns=ticker_arr, index=ticker_arr)

1.0

In [27]:
#CUDF

In [1]:
import cuda
import numba
import cudf
import cupy
from scipy import sparse

In [2]:
df_snap = cudf.read_feather("local_snapshot_us_equity")
news_df = cudf.read_feather("local_us_equity_news").head(5000)



In [25]:
df = df_snap[["ticker"]]
df = df.sort_values(by='ticker')
df = df.reset_index()[["ticker"]]
df = df.reset_index()

In [27]:
n = df.shape[0]
ticker_maping = df.set_index('ticker').to_dict()['index']

In [33]:
valid_ticker_set = set(ticker_maping.keys())

In [59]:
data_df = news_df[["tickers"]].to_pandas()
def to_set(tickers, ticker_maping):
    return np.asarray(set(ticker_maping[tick] for tick in tickers if tick in valid_ticker_set))
data_df = cudf.DataFrame(data_df['tickers'].apply(lambda tickers: to_set(tickers, ticker_maping)))
data_df.reset_index(inplace=True)
data_df = data_df[["tickers"]]

In [70]:
knn_ticker = cupy.zeros((n,n))

In [82]:
from numba import cuda
@cuda.jit
def combo_gen(tickers):
    return list(itertools.combinations(tickers,2))

In [84]:
data_df.tickers.apply(combo_gen)

ValueError: user defined function compilation failed.

In [92]:
%%time
df_snap = feather.read_feather("local_snapshot_us_equity")
ticker_arr = df_snap.ticker.to_numpy() # us equity tickers
ticker_arr.sort()
valid_ticker_set = set(ticker_arr)
ticker_maping = {k: v for v, k in enumerate(ticker_arr)} # dictionary allows for quick lookups of the integer index corresponding to a particular ticker symbol.

CPU times: user 19 ms, sys: 18 ms, total: 37 ms
Wall time: 26.7 ms


In [None]:
for i in tqdm(range(n)):
    tickers = data_df.tickers.iloc[i]
    combo = combo_gen(tickers)
    for j in range(len(combo)):
        pair = combo[j]
        knn_ticker[pair[0],pair[1]]+=1