In [None]:
# pip install tabula-py

In [None]:
# pip install tabulate

In [None]:
from tabula import read_pdf
from tabulate import tabulate
from os import listdir
from os.path import isfile, join
import pandas as pd

In [None]:
def get_raw_prediction_dfs(path):
    dfs = read_pdf(
        path,
        pages='all', 
        lattice=True,
        pandas_options = {'header':None},
        multiple_tables = True,
        silent=True
    )
    
    # usually data df has many columns, tunable. 
    valid_col_num = 7
    valid_dfs = filter(lambda df: df.shape[-1] > valid_col_num, dfs)
    raw_prediction_dfs = list(map(lambda df: df.dropna(how='all'), valid_dfs))
    return raw_prediction_dfs

In [None]:
raw_prediction_dfs = get_raw_prediction_dfs('./data_es/LikeFolioSundayEarningsSheet20210313.pdf')
raw_prediction_dfs[0].head()

In [None]:
LINE_SEPARATOR = '\r'

In [None]:
import datetime as dt
def get_date(data):
    fmts = ('%B %d %Y','%m/%d/%Y','%m/%d/%y','%b %Y','%B%Y','%b %d,%Y', 
            '%m-%d-%Y', '%Y-%m-%d')
    for fmt in fmts:
        try:
            return dt.datetime.strptime(data, fmt)
        except:
            pass
    return None

In [None]:
get_date('2021-05-10')

### Determine type of column
This is scalable as new formats coming later on. 

In [None]:
from enum import Enum
class ColType(Enum):
    MORE_LINE_TICKER = 1
    THREE_LINE_TICKER = 2
    NAN = 3
    DATE = 4
    OTHER = 5
    
def col_type(col):
    if pd.isna(col):
        return ColType.NAN
    else:
        values = col.split(LINE_SEPARATOR)        
        if len(values) == 3 and values[1].isupper() and values[2].startswith('$'):
            return ColType.THREE_LINE_TICKER
        elif len(values) in [4, 5] and values[1].isupper() and values[2].startswith('$'):
            # eg.'Wal-Mart\rWMT\r$148.23\rTue before market'
            return ColType.MORE_LINE_TICKER
        elif len(values) == 2 and get_date(values[1]) :
            # eg. 'Week 5\r11/16/2020'
            return ColType.DATE
        else:
            return ColType.OTHER

In [None]:
test_cols = [float('nan'), 'Kodak\rKODK\r$9.76\rTue after market', 'Week 9\r2021-03-15']
for col in test_cols:
    print(col_type(col))

In [None]:
def get_week_and_df_builder(raw_df):
    # once seen this, following data should not be used, even for the following DF in 
    # the same file
    stop_token = 'Unconfirmed Earnings'
    is_unconfirmed = False
    df_builder = []
    week = ''
    for idx, row in raw_df.iterrows():
#         print(row.tolist())
        if is_unconfirmed:
            break
        row_builder = []
        cols = row.tolist()
        # pointer to col
        idx = 0
        while idx < len(cols):
            cur_col = cols[idx]
            cur_type = col_type(cur_col)
#             print(cur_type)

            if cur_type == ColType.DATE:
                week = get_date(cur_col.split(LINE_SEPARATOR)[-1])
                break
            elif cur_type == ColType.THREE_LINE_TICKER:
                values = cur_col.split(LINE_SEPARATOR)
                # Company, ticker
                row_builder = [values[0], values[1]]
                # Get weekday info 
                row_builder.append(cols[idx + 1])
                # Get earning score
                row_builder.append(cols[idx + 2])
                df_builder.append(row_builder)
                break
            elif cur_type == ColType.MORE_LINE_TICKER:
                values = cur_col.split(LINE_SEPARATOR)
                # Company, ticker, weekday
                row_builder = [values[0], values[1], values[3]]
                # Get earning score
                row_builder.append(cols[idx + 1])
                df_builder.append(row_builder)
                break
            elif stop_token in str(cur_col):
                is_unconfirmed = True
                break
            idx += 1
        
    
    return week, df_builder, is_unconfirmed

In [None]:
def earning_prediction_df_from_file(path):
    raw_prediction_dfs = get_raw_prediction_dfs(path)
    week_value = None 
    final_df_builder = []
    for df in raw_prediction_dfs:
        week, df_builder, is_following_unconfirmed = get_week_and_df_builder(df)
        if week:
            week_value = week
        final_df_builder.extend(df_builder)
        if is_following_unconfirmed:
            break
    final_df = pd.DataFrame(final_df_builder, columns = ['Company', 'Ticker', 'Earning Time', 'Earning Score'])
    final_df['Starting Week'] = week_value
    
    return final_df

In [None]:
test_df = earning_prediction_df_from_file('./data_es\LikeFolioSundayEarningsSheet_20201115.pdf')
test_df.head()

### Iterate the whole folder and merge data into one dataframe

In [None]:
DATA_FOLDER = './data_es'
PATHS = [join(DATA_FOLDER, f) for f in listdir(DATA_FOLDER) if isfile(join(DATA_FOLDER, f))]
earning_prediction_dfs = []
merged_df = None
success_counter = 0
for path in PATHS:
    print(f'Processing {path} ...')
    try:
        earning_prediction_df = earning_prediction_df_from_file(path)
        print(earning_prediction_df.shape)
        if earning_prediction_df.shape[0] > 0: 
            earning_prediction_dfs.append(earning_prediction_df)
            success_counter += 1
        else:
            print(f'[Warning]: Empty dataframe extracted {path}')
    except:
        print(f'Cannot process {path}')
if len(earning_prediction_dfs) > 0:
    merged_df = pd.concat(earning_prediction_dfs)
print(f'Processed {success_counter} files out of {len(PATHS)} raw files')

merged_df

In [None]:
# Further filter to remove data of unconfirmed date
print(f'Before trimming there are {merged_df.shape[0]} rows')
df_trim_unconfirmed = merged_df[merged_df['Earning Time'].apply(len) > 5] 
final_df = df_trim_unconfirmed.drop_duplicates()
print(f'After trimming there are {final_df.shape[0]} rows')

In [None]:
import datetime
from dateutil import parser

def er_date(starting_week_col, weekday_col):
    weekday = weekday_col.split(' ')[0]
    offset_map = {
        'Mon': 0,
        'Tue': 1,
        'Wed': 2,
        'Thu': 3,
        'Fri': 4
    }
    if weekday in offset_map:
        er_dt = starting_week_col + datetime.timedelta(offset_map[weekday])
        return er_dt
    else:
        return None

In [None]:
date_time_str = '2018-06-29'
dt = datetime.datetime.strptime(date_time_str, '%Y-%m-%d')
er_date(dt, 'Mon asdf')

In [None]:
final_df['Earning Date'] = final_df.apply(lambda x: er_date(x['Starting Week'], x['Earning Time']), axis=1)

In [None]:
PREMARKET = 'PREMARKET'
AFH = 'AFH'

def market_time(earning_time_col):
    if 'before' in str(earning_time_col):
        return PREMARKET
    if 'after' in str(earning_time_col):
        return AFH
    else:
        return None

In [None]:
final_df['Market Time'] = final_df.apply(lambda x: market_time(x['Earning Time']), axis=1)

In [None]:
final_df.to_csv('earning_prediction.csv', index=False)

### Use yahoo API to get stock quotes

There's a parallel query version script called 'parallel_query_yahoo.py', use this for large data processing. You need use python to run rather than from notebook here!

In [None]:
# pip install --user yfinance 

In [None]:
import yfinance as yf  

def get_quotes(row):
    ticker, earning_dt, market_time = row['Ticker'], row['Earning Date'], row['Market Time']
    if earning_dt and market_time:
        # actual is end_dt - 1, need +1 to offset API requirment
        if market_time == PREMARKET:
            start_dt = earning_dt + datetime.timedelta(-1)
            end_dt = earning_dt + datetime.timedelta(1)
        elif market_time == AFH:
            start_dt = earning_dt
            end_dt = earning_dt + datetime.timedelta(2)
        else:
            raise 
        start_dt = str(start_dt).split(' ')[0]
        end_dt = str(end_dt).split(' ')[0]
        print(start_dt, end_dt)
    
        try:
            stock = yf.Ticker(ticker)
            quotes = stock.history(start=start_dt, end=end_dt).round(2)
            start_quotes = quotes.iloc[0]
            end_quotes = quotes.iloc[1]
            row['Left-day Open'] = start_quotes.Open
            row['Right-day Open'] = end_quotes.Open
            row['Left-day Close'] = start_quotes.Close
            row['Right-day Close'] = end_quotes.Close
            row['Left-day High'] = start_quotes.High
            row['Right-day High'] = end_quotes.High
            row['Left-day Low'] = start_quotes.Low
            row['Right-day Low'] = end_quotes.Low
        except:
            pass

    return row