In [40]:
!pip install -q tensorflow
!pip install -q tensorflow_ranking

In [1]:
import numpy as np
import pandas as pd
import datetime as dt
import os

PATH = os.getcwd()

In [42]:
import tensorflow_ranking as tfr
import tensorflow as tf
from tensorflow_serving.apis import input_pb2

## Import Tables

In [87]:
tables_names = ['big_log_ret','big_RCV'] #, 'big_RVT', 'big_positivePartscr', 'big_negativePartscr', 'big_splogscr', 'big_linscr']
tables_dict = {}
for name in tables_names:
    table = pd.read_csv(os.path.join(PATH, 'Tables', name+'.csv'))
    table['Date'] = pd.to_datetime(table['Date']).dt.date
    table.set_index('Date', inplace=True)
    tables_dict[name] = table

tables_dict['big_RCV'].head()

Unnamed: 0_level_0,AAL,AAPL,ABBV,ABC,ABT,ADP,AIG,AMD,AMZN,AXP,...,UAL,UNH,UPS,USB,V,VZ,WFC,WMT,WY,XOM
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2012-01-08,8.476,13.162167,,0.404,-24.6072,,,,-3.37,,...,12.493714,,-9.7618,8.849167,,8.0314,7.3165,33.6786,,1.957143
2012-01-15,,11.965,,-4.760143,-3.706,,-11.584333,16.8346,-18.815571,-10.443167,...,12.935143,11.4355,-1.28,-17.410857,-33.0858,-10.427667,21.078333,15.6566,,-18.02
2012-01-22,9.9768,-10.776667,,-17.240429,-22.233833,,-10.8984,,15.001167,0.860833,...,2.058714,17.2168,-8.6162,-24.0268,,-31.776,-0.7585,-14.170714,,-19.522714
2012-01-29,-20.94,-9.334,,11.74,-8.1658,-42.6148,-22.226,,-7.894167,-33.329167,...,9.101,,22.8662,-52.884,,8.221,-20.2015,0.0368,,6.625571
2012-02-05,,-33.438857,,-2.313,-30.0204,-39.871,-18.9598,-30.327667,-3.668714,-20.110167,...,-5.529429,-36.037833,4.4648,-42.687167,14.249667,-47.767143,-7.9482,-45.108167,-28.786,8.348429


## Merging Tables - Building the query-stock final table

In [38]:
final_table = tables_dict['big_log_ret'].stack() #Stack DF to a Series so that is grouped by Date and then by Ticker
final_table.index.names=('Query','Ticker') #Rename 'Date' to 'Query'
final_table = final_table.reset_index()
final_table.rename(columns={0: 'big_log_ret'}, inplace=True)
resting_tables_dict = dict(tables_dict) #Make a copy of the dictionary
del resting_tables_dict['big_log_ret'] #Remove the log_ret table

for i, t_name in enumerate(resting_tables_dict):
    table = tables_dict[t_name].stack()
    table.index.names=('Query','Ticker')
    table = table.reset_index()
    table.rename(columns={0: t_name}, inplace=True)
    final_table = pd.merge(final_table, table, on=['Query','Ticker'])

final_table

Unnamed: 0,Query,Ticker,big_log_ret,big_RCV
0,2012-01-08,AAL,0.063980,54.733580
1,2012-01-08,AAPL,-0.006150,48.380133
2,2012-01-08,ABC,-0.020684,65.547120
3,2012-01-08,ABT,0.000864,27.375300
4,2012-01-08,AMZN,-0.023212,40.688020
...,...,...,...,...
35351,2021-11-28,TSLA,-0.063878,31.423967
35352,2021-11-28,TSN,0.021006,45.611180
35353,2021-11-28,UAL,-0.039827,14.679940
35354,2021-11-28,UPS,-0.022512,46.677617


In [None]:
'''
Aqui tenemos q añadir un filtro para eliminar aquellas filas q tengan un NaN y tambien si queremos hacer algun preprocessing previo
'''

In [75]:
'''
Protobuffers are extensible structures suitable for storing data in a serialized format, either locally or in a distributed manner.
TF ranking has a couple of pre-defined protobufs such as ELWC which make it easier to integrate and formalize data ingestion into
the ranking pipeline.

Protocol buffers and the tf.data API is a set of utilities that provide a mechanism to read and store data for efficient loading
and preprocessing in a way that's fast and scalable.
'''

# Class I wrote to organize code resposible for parsing our dataframe data and 
# creating compressed TF-Records in ELWC protobuf format so that they used by 
# most rankers in TF-Ranking and be compatible with future rankers or new methods

class DFToELWCProto():
    """ Class to parse Dataframe ranking data in ELWC proto TFRecords"""
    
    def __init__(self, dir:str=".", use_compression:bool=False):
        assert isinstance(dir,str)
        if not os.path.isdir(dir):
            os.mkdir(dir)
        self.input_path = dir
        assert isinstance(use_compression,bool)
        self.compress = use_compression
        if self.compress:
            self.compress_type = 'GZIP'
        else:
            self.compress_type = None


    # Helper functions (see also https://www.tensorflow.org/tutorials/load_data/tf_records)
    def _bytes_feature(self,value_list):
        """Returns a bytes_list from a string / byte."""
        if isinstance(value_list, type(tf.constant(0))):
            value_list = value_list.numpy()
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value_list]))

    def _float_feature(self,value_list):
        """Returns a float_list from a float / double."""
        return tf.train.Feature(float_list=tf.train.FloatList(value=[value_list]))

    def _int64_feature(self,value_list):
        """Returns an int64_list from a bool / enum / int / uint."""
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value_list]))


    def read_and_print_topn_tfrecord(self, target_filename, num_of_examples_to_read):
        filenames = [target_filename]
        tf_record_dataset = tf.data.TFRecordDataset(filenames,
                                                    compression_type=self.compress_type)

        for raw_record in tf_record_dataset.take(num_of_examples_to_read):
            example_list_with_context = input_pb2.ExampleListWithContext()
            example_list_with_context.ParseFromString(raw_record.numpy())
            print(example_list_with_context)

    def df_to_TFrecord(self, df, file_name):

        """ 
        for reading and converting directly from arrays in memory

        """    
        file_name = os.path.basename(file_name)

        if self.compress:
            print('Using GZIP compression for writing ELWC TFRecord Dataset')
            opts = tf.io.TFRecordOptions(compression_type = self.compress_type)
            file_name = f"{file_name}.gzipped_tfrecord"
        else:
            file_name = f"{file_name}.tfrecord"
        save_path = f"{self.input_path}/{file_name}"

        with tf.io.TFRecordWriter(f"{save_path}", options=opts) as writer:
            
            input_array = np.array(df)
            col_names = df.columns

            ELWC = input_pb2.ExampleListWithContext()
            prev_qid = None

            for i in range(input_array.shape[0]):

                qid, doc, r, features = str(input_array[i,0]), input_array[i,1], input_array[i,2], input_array[i,3:]

                example_proto_dict = {
                              f"{col_names[f_n+3]}":self._float_feature((f_v))
                                  for (f_n, f_v) in enumerate(features)
                          }
                example_proto_dict['rel'] = self._int64_feature(int(r))
                #example_proto_dict['doc_name'] = self._bytes_feature(str(doc))

                example_proto = tf.train.Example(
                            features=tf.train.Features(feature=example_proto_dict))
                if qid != prev_qid:
                    if prev_qid is not None:
                        writer.write(ELWC.SerializeToString())
                    prev_qid = qid
                    ELWC = input_pb2.ExampleListWithContext()
                    ELWC.examples.append(example_proto)
                else:
                    ELWC.examples.append(example_proto)

            # final write for the last query grp
            writer.write(ELWC.SerializeToString())

In [78]:
ELWC_converter = DFToELWCProto(dir ="./Tables/LTR-tfrecords",
                              use_compression=True)
ELWC_converter.df_to_TFrecord(final_table,"LTR_dataset")

Using GZIP compression for writing ELWC TFRecord Dataset


In [None]:
# Store the paths to files containing training and test instances.
_DATA_PATH = f"./Tables/LTR-tfrecords.gzip_tfrecord"

# The maximum number of documents per query in the dataset.
# Document lists are padded or truncated to this size.
_LIST_SIZE = 200

# The document relevance label.
_LABEL_FEATURE_NAME = "rel"
_NUM_FEATURES = 136

# Padding labels are set negative so that the corresponding examples can be
# ignored in loss and metrics.
_PADDING_LABEL = -1

# Learning rate for optimizer.
_LEARNING_RATE = 0.05

# Parameters to the scoring function.
_BATCH_SIZE = 128
_DROPOUT_RATE = 0.5

# Location of model directory and number of training steps.
_MODEL_DIR = f"./Models/model_{dt.datetime.now().strftime('%m-%d-%Y_%H-%M-%S')}"

# setting as shell env for tensorboard stuff
os.environ["models_dir"] = _MODEL_DIR

In [86]:
def create_dataset_from_tfrecords(input_path:str,
                                  batch_sz:int,
                                  shuffle:bool = True,
                                  num_epochs:int = None,
                                  data_format:str = "ELWC",
                                  compression_type:str = ''):

    context_feature_columns, example_feature_columns = create_feature_columns()


    context_feature_spec = tf.feature_column.make_parse_example_spec(
      context_feature_columns.values())
    label_column = tf.feature_column.numeric_column(
      _LABEL_FEATURE_NAME, dtype=tf.int64, default_value=_PADDING_LABEL)
    example_feature_spec = tf.feature_column.make_parse_example_spec(
      list(example_feature_columns.values()) + [label_column])

    _reader_arg_list = []
    if compression_type:
        assert compression_type in ["", "GZIP","ZLIB"]
        _reader_arg_list = [compression_type]


    dataset = tfr.data.build_ranking_dataset(
      file_pattern=input_path,
      data_format=tfr.data.ELWC,
      batch_size=batch_sz,
      list_size=_LIST_SIZE,
      context_feature_spec=context_feature_spec,
      example_feature_spec=example_feature_spec,
      reader=tf.data.TFRecordDataset,
      reader_args= _reader_arg_list,
      shuffle=shuffle,
      num_epochs=num_epochs,
      )

    def _log1p_transform(features):
    '''computes elementwise log_e(|x|)*sign(x) '''
    transformed_feats = {
        f:tf.math.multiply(
            tf.math.log1p(
                tf.math.abs(features[f])
                ),
            tf.math.sign(features[f])
            )
        for f in features}
    return transformed_feats

    def _split_label_and_transform_features(features):
    label = tf.squeeze(features.pop(_LABEL_FEATURE_NAME), axis=2)
    label = tf.cast(label, tf.float32)
    features = _log1p_transform(features)

    return features, label

    dataset = dataset.map(_split_label_and_transform_features)

    return dataset

IndentationError: expected an indented block (Temp/ipykernel_72848/3741735708.py, line 38)