## Testing wide deep model example 

Following [original paper](https://arxiv.org/pdf/1606.07792.pdf) and primarily this [workshop](https://noelkonagai.github.io/Workshops/tensorflow_pt2_widedeep/). Additionally these resources: 
- [how to build wide deep model w. keras and tf2] (https://towardsdatascience.com/how-to-build-a-wide-and-deep-model-using-keras-in-tensorflow-2-0-2f7a236b5a4b) and its [code](https://github.com/GoogleCloudPlatform/data-science-on-gcp/blob/master/09_cloudml/flights_model_tf2.ipynb)
-[feature crossing tutorial](https://developers.google.com/machine-learning/crash-course/feature-crosses/crossing-one-hot-vectors)

Other implementations to look at:  
[1](https://github.com/Mohit67/Movie_recommendar/blob/master/wide_n_deep_tutorial.py),[2](https://github.com/wangby511/Recommendation_System), [3](https://github.com/AmoghM/Yelp-Restaurants-RecSys), [4](https://github.com/rajaharsha/Wide-Deep-Neural-Networks), [5](https://github.com/wang-henry4/wide-and-deep-recommender-model),[6](https://devblogs.nvidia.com/accelerating-wide-deep-recommender-inference-on-gpus/), [7](https://humboldt-wi.github.io/blog/research/information_systems_1718/08recommendation/),[8](https://www.youtube.com/watch?v=m_AZrITxs5M&t=0s)

In [0]:
from google.colab import drive
drive.mount('/content/drive/')
prefix = './drive/My Drive/ND_CSE/Year_1/Research:Care-Net/JBDF_CareNet/Care-Net Backend/code_and_data'
!echo "Project dir contents:" && ls "$prefix/"
!echo -e "\nColab Notebook home dir:" && ls

In [0]:
# !pip uninstall -y dask
!pip install -U ipykernel tensorflow-text==2.2.0rc2 plot-keras-history nest_asyncio dask-ml==1.0.0 dask[complete] distributed==1.25.1

In [0]:
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Download and clean the Census Income Dataset."""

import argparse
import os
import shutil

try:
    %tensorflow_version 2.x
except Exception as e:
    print(e)
import tensorflow as tf

import pandas as pd
import json
import csv
import pickle
import numpy as np
import os
from tqdm import tqdm
import tensorflow_hub as hub
import random
import multiprocessing
import ast
from dask_ml.model_selection import train_test_split
from dask_ml.preprocessing import StandardScaler
# import tensorflow_text

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from dask import delayed
from dask import compute

cluster = LocalCluster(processes=False)
client = Client(cluster)
client

In [0]:
try: 
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
    tf.config.experimental_connect_to_cluster(resolver)
    # This is the TPU initialization code that has to be at the beginning.
    tf.tpu.experimental.initialize_tpu_system(resolver)
    strategy = tf.distribute.experimental.TPUStrategy(resolver)
except Exception as e:
    print(e)

#### Default Args and names

In [0]:
# tf.enable_eager_execution()
# PREFIX = '/afs/crc.nd.edu/user/a/aveganog/ND_Care_Net'
PREFIX = './drive/My Drive/ND_CSE/Year_1/Research:Care-Net/JBDF_CareNet/Care-Net Backend/code_and_data'
TRAINING_FILE = 'care-net.train'
EVAL_FILE = 'care-net.test'

NODE_TYPE = 'services'

# needed for file name below
MODEL = 'USE'
# MODEL_URL = 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'
# MODEL_URL = 'https://tfhub.dev/google/universal-sentence-encoder-multilingual/3'
MODEL_URL = 'https://tfhub.dev/google/universal-sentence-encoder/4'
MODEL_TYPE = MODEL_URL.split('/')[-2]
MODEL_VER = MODEL_URL.split('/')[-1]

!mkdir -p "$prefix/embeddings/$MODEL/"
print("Using embeddings from Model {}_{}_v{}".format(MODEL, MODEL_TYPE, MODEL_VER))

DATA_DIR = os.path.join(PREFIX, 'data', 'wide_deep_dataset_chunks')

# help = 'Base directory for the model.')
# MODEL_TYP = 'wide-deep-USE'
MODEL_TYP = 'deep-USE'
# MODEL_TYP = 'wide-USE'
MODEL_DIR = os.path.join(PREFIX, 'models', MODEL_TYP)

# help = 'Number of training epochs.')
TRAIN_EPOCHS = 40

# help = 'The number of training epochs to run between evaluations.')
EPOCHS_PER_EVAL =2 

# help = 'Number of examples per batch.')
BATCH_SIZE = 256
QBAT_SIZE = 512

# parser.add_argument(
#     '--train_data', type=str, default='/tmp/census_data/adult.data',
#     help='Path to the training data.')

# parser.add_argument(
#     '--test_data', type=str, default='/tmp/census_data/adult.test',
#     help='Path to the test data.')

TRAIN_PCNT = 0.7 
TEST_PCNT = 1 - TRAIN_PCNT

CSV_COLUMNS = [
            'Distance',
            'Embedding_Similarity',
            'Month',
            'Selected_Service_Embedding',
            'Candidate_Service_Embedding', 
            'Previously_Recommended',
            'Query',
            'Label'
]

CSV_COLUMN_DEFAULTS = [[0.0], [0.0], [0.0], [0], [''], [0], [0], [0]]

In [0]:
"" # download auxillary data
# tf.enable_eager_execution()
 # json nodes of 211 IN services
with open(PREFIX + '/data/services_nodes.json') as sn:
    serv_nodes = json.loads(sn.read())

# our heterogeneous information network. 
# we're only using the services data
with open(PREFIX + '/data/HIN_nodes.json') as taxo:
    hin_nodes = json.loads(taxo.read())

# map service_ids to node numbers in our graph
with open(PREFIX + '/data/service_id_to_node_num.json') as sn:
    serv_trans = json.loads(sn.read())

# map 211 taxonomy codes to their node number in our graph
with open(PREFIX + '/data/code_to_node_num.json') as ct:
    code_trans = json.loads(ct.read())

# map 211 taxonomy codes to their node number in our graph
with open(PREFIX + '/data/tagged_texts.json') as ct:
    tagged_texts = json.loads(ct.read())

# load list of queries/keywords from taxonomy data
queries_path = os.path.join(PREFIX, 'data', 'HIN_references.csv')
with open(queries_path) as qf:
  queries = qf.read()
  queries = queries.split(',')
  # cleanup some bad data. TODO: Fix in notebook that generates the data
  queries = [''] + [q for q in queries if q != '']

# with open(os.path.join(PREFIX, 'data', 'service_recommendations.json')) as rf:
#     tagged_recs = json.loads(rf.read())

### Now we can build, run, and eval the test wide deep model

#### Build input feature columns

In [0]:
def build_model_columns():
    """Builds a set of wide and deep feature columns."""
    # Continuous columns
    distance = tf.feature_column.numeric_column('Distance', shape=(1,), dtype=tf.float32)
    similarity = tf.feature_column.numeric_column('Embedding_Similarity', shape=(1,), dtype=tf.float32)
    # month of referral
    month = tf.feature_column.categorical_column_with_identity('Month', num_buckets=13, default_value=0)
    month = tf.feature_column.indicator_column(month)

    #   prev_recs = tf.feature_column.numeric_column(key='Previously_Recommended',shape=(16547,))
    # voc_list = [str(serv_nodes[nid]['serv_id']) for nid in serv_nodes]
    # prev_recs = tf.feature_column.categorical_column_with_vocabulary_list(key='Shared_Recommendations', vocabulary_list=voc_list, dtype=tf.string)
    # prev_recs = tf.feature_column.embedding_column(prev_recs,32)
    serv_id = tf.feature_column.categorical_column_with_identity('Selected_Service_ID', num_buckets=len(serv_nodes))
    serv_id = tf.feature_column.indicator_column(serv_id)

    cand_id = tf.feature_column.categorical_column_with_identity('Candidate_Service_ID', num_buckets=len(serv_nodes))
    cand_id = tf.feature_column.indicator_column(cand_id)
    
    # bucket_sim = tf.feature_column.bucketized_column(similarity, [0.2,0.4,0.6,0.8])
    cross = tf.feature_column.crossed_column(['Selected_Service_ID', 'Candidate_Service_ID'], hash_bucket_size=40000)
    # try this later to combine embeddings into one feature:
    # https://www.tensorflow.org/api_docs/python/tf/feature_column/shared_embeddings
    
    # Wide columns and deep columns.
    base_columns = [
        distance, similarity, month
    ]

    crossed_columns = [
        tf.feature_column.embedding_column(cross, 2048)
    ]

    wide_columns = base_columns + [serv_id, cand_id] + crossed_columns #+ [bucket_sim]

    return wide_columns

In [0]:
data_files = os.listdir(DATA_DIR)
data_files = [os.path.join(DATA_DIR, f) for f in data_files]

In [0]:
nid_idxs = {}
for i, nid in enumerate(hin_nodes):
    nid_idxs[nid] = i

# save indexes of services to use in our dataset for embedding lookup
sid_idxs = {}
for i, sid in enumerate(serv_trans):
    sid_idxs[sid] = i

In [0]:
dne_cnt = 0

def preproc_query(emb_key):
    # idx = None
    # if str(emb_key) == 'nan':
    #     emb_key = ''
    if emb_key in q_idxs:
        idx = q_idxs[str(emb_key)]
    else:
        if str(emb_key) not in tagged_embeds:
            tagged_embeds[str(emb_key)] = extract_embed([str(emb_key)])
        q_idxs[str(emb_key)] = len(tagged_embeds.keys()) - 1
        idx = q_idxs[str(emb_key)]
    return idx

# def get_recs(nid):
#     if str(nid) in serv_nodes:
#         sid = hin_nodes[str(nid)]['serv_id']
#         return tagged_recs[str(sid).split('.')[0]]
#     else:
#         global dne_cnt
#         dne_cnt += 1
#         return [0]*len(serv_nodes.keys())

def proc_recs(row):
    return shared_recs

def preproc_lbl(lbl):
    print(int(lbl))
    return int(lbl)

In [0]:
CSV_COLUMNS_OG = [
            'Distance',
            'Embedding_Similarity',
            'Month',
            'Selected_Service_Embedding',
            'Candidate_Service_Embedding', 
            'Shared_Recommendations',
            'Query',
            'Label'
]

DEEP_COLUMNS = [
            'Selected_Service_Embedding',
            'Candidate_Service_Embedding', 
            'Query'
]

WIDE_COLUMNS = [
            'Distance',
            'Embedding_Similarity',
            'Month',
            # 'Shared_Recommendations'
            'Selected_Service_ID',
            'Candidate_Service_ID'
]

CSV_COLUMNS = [
            'Distance',
            'Embedding_Similarity',
            'Month',
            'Selected_Service_Embedding',
            'Candidate_Service_Embedding', 
            'Shared_Recommendations',
            'Query',
            'Label'
]

from tqdm import tqdm
from sklearn import preprocessing

LABEL_NAME = CSV_COLUMNS[-1]
serv_ids = None
def load_df(files, has_checkpoints=False):

    data_df = []
    tot = len(files)
    for i, f in enumerate(files):
        fname = f.split('/')[-1]
        # skip hidden files
        if fname[0] == '.':
            continue
        if has_checkpoints:
            data_df.append(pd.read_parquet(f))
        else:
    # None means dask uses a block per file
    # data_df = dd.read_csv(os.path.join(DATA_DIR, 'wide*.csv'), blocksize=None)#.head(n=500000)
            data_df.append(pd.read_csv(f, nrows=25000))
            print("loaded file {} {} of {}".format(fname, i+1, tot))
    data_df = pd.concat(data_df)

    if not has_checkpoints:
        # ignore indexing column
        data_df.columns = CSV_COLUMNS_OG
        # data_df.compute()
        # data_df = data_df.replace(np.nan, '', regex=True)
        
        data_df = data_df.fillna(-1)
        tqdm.pandas()
        
        serv_ids = data_df[CSV_COLUMNS[3]]#.apply(lambda x: get_sid(x))
        cand_ids = data_df[CSV_COLUMNS[4]]#.apply(lambda x: get_sid(x))
        labels = data_df.pop('Label')
        CSV_COLUMNS[-1] = 'Selected_Service_ID'
        CSV_COLUMNS.append('Candidate_Service_ID')
        global WIDE_COLUMNS
        WIDE_COLUMNS = WIDE_COLUMNS + ['Selected_Service_ID', 'Candidate_Service_ID']
        data_df[CSV_COLUMNS[-2]] = np.array(serv_ids.values).astype(np.int32)
        data_df[CSV_COLUMNS[-1]] = np.array(cand_ids.values).astype(np.int32)

        data_df[CSV_COLUMNS[0]] = data_df[CSV_COLUMNS[0]].fillna(-1.0)#, inplace=True)
        data_df[CSV_COLUMNS[1]] = data_df[CSV_COLUMNS[1]].fillna(0.0)#, inplace=True)
        data_df[CSV_COLUMNS[2]] = data_df[CSV_COLUMNS[2]].fillna(0)#, inplace=True)
        data_df[CSV_COLUMNS[3]] = data_df[CSV_COLUMNS[3]].fillna('')#, inplace=True)
        data_df[CSV_COLUMNS[4]] = data_df[CSV_COLUMNS[4]].fillna('')#, inplace=True)
        data_df[CSV_COLUMNS[5]] = data_df[CSV_COLUMNS[5]].fillna('')#, inplace=True)
        data_df[CSV_COLUMNS[6]] = data_df[CSV_COLUMNS[6]].fillna('')#, inplace=True)
        data_df[CSV_COLUMNS[7]] = data_df[CSV_COLUMNS[7]].fillna(0)#, inplace=True)
        data_df[CSV_COLUMNS[8]] = data_df[CSV_COLUMNS[8]].fillna(0)#, inplace=True)

        shared_recs = []
        # pbar = tqdm(total=500000, mininterval=5, desc='Finding shared recommendations')
        # def apply_or(x,y):
        #     z = np.logical_or(x,y)
        #     return z
        # for i in range(500000):
        #     z = np.logical_or(serv_id_recs.iloc[i], candidate_id_recs.iloc[i])
        #     shared_recs.append(z)
        #     pbar.update(1)
        
        z_score_scaler = StandardScaler()
        data_df[CSV_COLUMNS[0]] = z_score_scaler.fit_transform(data_df[CSV_COLUMNS[0]].values.reshape(-1,1))
        # data_df[CSV_COLUMNS[3]] = data_df[CSV_COLUMNS[3]].progress_apply(lambda f: preproc_serv(f))
        # data_df[CSV_COLUMNS[4]] = data_df[CSV_COLUMNS[4]].progress_apply(lambda f: preproc_serv(f))
        # data_df[CSV_COLUMNS[5]] = list(data_df[CSV_COLUMNS[5]].apply(lambda f: preproc_rec(f)))
        # data_df[CSV_COLUMNS[5]] = shared_recs
        
        # data_df = data_df[CSV_COLUMNS]
        
        # data_df = data_df[DEEP_COLUMNS]
        # data_df = data_df[WIDE_COLUMNS + ['Selected_Service_ID']]
        data_df = data_df[WIDE_COLUMNS]
        # data_df = data_df.dropna()
        # data_df['Shared_Recommendations']
    
    return train_test_split(data_df.values, labels.values, train_size=0.8, random_state=19, shuffle=True)

In [0]:
import glob

has_checkpoints = False
X_train, X_val, y_train, y_val = load_df(data_files, has_checkpoints)
X_train, X_test, y_train, y_test = train_test_split(X_train, y_train, train_size=(0.6/0.8), random_state=19, shuffle=True)
# X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, train_size=0.7/0.9, random_state=19, shuffle=True)
print("Using train data with shape: {}".format(X_train.shape))
print("Using test data with shape: {}".format(X_test.shape))

In [0]:
import dask.array as da
dtypes = [np.float32,np.float32,str,str,str,np.array([]),str,str]
dtype_ex = pd.Series([0.0,0.0,'','','',[],'',''])

X_tr = dd.from_dask_array(da.from_array(X_train, chunks=(2000,5)))
X_tr.columns = WIDE_COLUMNS
# X_tr['Shared_Recommendations'] = X_tr['Shared_Recommendations'].apply(lambda x: pd.arrays.SparseArray(x).astype(np.int32), meta=dtype_ex).compute()
X_vl = dd.from_dask_array(da.from_array(X_val, chunks=(2000,5)))
X_vl.columns = WIDE_COLUMNS
# X_vl['Shared_Recommendations'] = X_vl['Shared_Recommendations'].apply(lambda x: pd.arrays.SparseArray(x).astype(np.int32), meta=dtype_ex).compute()
X_te = dd.from_dask_array(da.from_array(X_test, chunks=(2000,5)))
X_te.columns = WIDE_COLUMNS
# X_te['Shared_Recommendations'] = X_te['Shared_Recommendations'].apply(lambda x: pd.arrays.SparseArray(x).astype(np.int32), meta=dtype_ex).compute()

print("Using train dataset with shape: {}".format(X_tr.shape))
print("Using validation dataset with shape: {}".format(X_vl.shape))
print("Using test dataset with shape: {}".format(X_te.shape))
X_tr.head()

In [0]:
CSV_COLUMNS = [
            'Distance',
            'Embedding_Similarity',
            'Month',
            'Selected_Service_Embedding',
            'Candidate_Service_Embedding', 
            'Shared_Recommendations',
            'Query',
            'Selected_Service_ID',
            'Candidate_Service_ID'
]

WIDE_COLUMNS = [
            'Distance',
            'Embedding_Similarity',
            'Month',
            # 'Shared_Recommendations',
            'Selected_Service_ID',
            'Candidate_Service_ID'
]

# following: https://towardsdatascience.com/how-to-build-a-wide-and-deep-model-using-keras-in-tensorflow-2-0-2f7a236b5a4b

# initialize inputs for model with appropriate keras layers
wide_in = {}
for i in range(len(CSV_COLUMNS)):
    if i in (0,1):
        print("Wide Input: Added {} input layer as float32".format(CSV_COLUMNS[i]))
        wide_in[CSV_COLUMNS[i]] = tf.keras.layers.Input(name=CSV_COLUMNS[i], shape=(1,), dtype=tf.float32)
    # elif i == 5:
        # print("Wide Input: Added {} input layer as list of strings".format(CSV_COLUMNS[i]))
        # wide_in[CSV_COLUMNS[i]] = tf.keras.layers.Input(name=CSV_COLUMNS[i], shape=(1,16547), dtype='string')
        # inputs[CSV_COLUMNS[i]] = tf.keras.layers.Input(name=CSV_COLUMNS[i], shape=(16547,), dtype='int32')
    elif i in (2,7,8):
        print("Wide Input: Added {} input layer as int32".format(CSV_COLUMNS[i]))
        wide_in[CSV_COLUMNS[i]] = tf.keras.layers.Input(name=CSV_COLUMNS[i], shape=(1,), dtype=tf.int32)
print(wide_in)

In [0]:
from tensorflow.keras import layers
from tensorflow.keras import Model

wide_columns = build_model_columns()
# following: https://blog.tensorflow.org/2018/04/predicting-price-of-wine-with-keras-api-tensorflow.html
def make_wide_model(linear_columns, linear_inputs):
    # dist = tf.keras.layers.Input(name=CSV_COLUMNS[0], shape=(1,), dtype=tf.float32)
    # sim = tf.keras.layers.Input(name=CSV_COLUMNS[1], shape=(1,), dtype=tf.float32)
    # month = tf.keras.layers.Input(name=CSV_COLUMNS[2], shape=(1,), dtype=tf.float32)
    # recs_in = tf.keras.layers.Input(name=CSV_COLUMNS[5], shape=(16547,), dtype=tf.float32)
    # recs = layers.Flatten()(recs_in)
    # sid = tf.keras.layers.Input(name=CSV_COLUMNS[-1], shape=(1,), dtype=tf.float32)
    # wide_input = layers.concatenate([recs])
    wide = layers.DenseFeatures(linear_columns)(linear_inputs)
    # wide = layers.Dense(25000)(recs)
    pred = layers.Dense(1, activation='sigmoid', name='Pred')(wide)
    wide_model = Model(inputs=list(linear_inputs.values()), outputs=pred)
    wide_model.compile(loss='mse', optimizer=tf.keras.optimizers.Ftrl(l1_regularization_strength=0.5, l2_regularization_strength=0.5), metrics=['accuracy','AUC', 'Recall', 'Precision'])
    return wide_model

wide_model = make_wide_model(wide_columns, wide_in)

In [0]:
wide_model.summary()

In [0]:
 tf.keras.utils.plot_model(wide_model, os.path.join(PREFIX, 'figures', 'deep_keras_model.png'), show_shapes=True, rankdir='LR')

In [0]:
from tensorflow.python.keras.utils.data_utils import Sequence
cnt = 0

def dask_gen(X, y):
    while True:
        for idx in range(len(X_train)):
            feats = X[idx].compute()
            features = {}
            col_types = [np.float32, np.float32, str, None, str]
            fs = []
            datasets = []
            for i, col in enumerate(WIDE_COLUMNS):
                col_type = col_types[i]
                if col_type is not None:
                    features[col] = feats[col].values.astype(col_type)
                    fs.append(feats[col].values.astype(col_type))
                    ds = tf.data.Dataset.from_tensor_slices(features[col])
                # else:
                #     print(col)
                #     features[col] = feats[col].progress_apply(lambda x: x.to_dense().tolist()).values.reshape(len(feats),1)
                #     fs.append(feats[col].progress_apply(lambda x: x.to_dense()).values)
                #     f = features[col]
                    ds = tf.data.Dataset.from_generator(lambda: f, tf.int32, output_shapes=[None])
            # x1,x2,x3,x4,x5 = fs
            lbl = np.array(y[idx].compute())
            labels = {'Pred': lbl}
            # dataset = tf.data.Dataset.zip(tuple(datasets))
            global cnt
            cnt += 1
            # yield x1,x2,x3,x4,x5,lbl
            yield features, labels#, dataset

# taken from: https://anaconda.org/defusco/keras-dask/notebook
class DaskGenerator(Sequence):
    def __init__(self, samples, classes):
        super().__init__()
        '''Initialize a generator of samples and classes for training'''
        self.sample_batches = samples.to_delayed()
        self.class_batches = classes.to_delayed()
        
        # assert len(self.sample_batches) == len(self.class_batches), 'lengths of samples and classes do not match'
        # assert self.sample_batches.shape[1] == 1, 'all columns should be in each chunk'
    
    def __len__(self):
        '''Total number of batches, equivalent to Dask chunks in 0th dimension'''
        return len(self.sample_batches)
        
    def __getitem__(self, idx):
        '''Extract and compute a single batch returned as (X, y)'''
        tmp_idx = idx+1 % len(self.sample_batches)
        feats = self.sample_batches[idx].compute()
        features = {}
        # datasets = []
        # print(feats)
        col_types = [np.float32, np.float32, np.int32, np.int32, np.int32]
        for i, col in enumerate(WIDE_COLUMNS):
            col_type = col_types[i]
            features[col] = feats[col].values.astype(col_type)
            ds = tf.data.Dataset.from_tensor_slices(features[col])
            
        lbl = np.array(self.class_batches[idx].compute())
        labels = {'Pred': lbl}
        global cnt
        cnt += 1
        return features, labels
        # return tf.convert_to_tensor(list(self.sample_batches[idx].compute())),  tf.convert_to_tensor(self.class_batches[idx].compute())

In [0]:
X_parts = X_tr.partitions
y_parts = dd.from_array(y_train).partitions
data_iter = dask_gen(X_parts,y_parts)
# features, labels = next(data_iter)
# with strategy.scope():
    # needs to be created inside here for TPU to work
    # wide_model = make_wide_model(wide_columns, wide_in)
    # train_ds = wide_input_fn(X_tr, y_train)
    # wide_model.compile(loss='mse', optimizer=tf.keras.optimizers.Ftrl(), metrics=['accuracy','AUC', 'Recall', 'Precision'])
# train_gen = DaskGenerator(X_tr,dd.from_array(y_train))

feed_dict = {}
val_dict = {}
for col in WIDE_COLUMNS:
    feed_dict[col] = X_tr[col].compute()
    val_dict[col] = X_vl[col].compute()
labels = {'Pred': y_train}
dataset = tf.data.Dataset.from_tensor_slices((feed_dict, labels))
dataset = dataset.shuffle(buffer_size=len(X_train))
dataset = dataset.repeat(TRAIN_EPOCHS)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

model_hist = wide_model.fit(dataset, verbose=1,use_multiprocessing=False, validation_data=(list(val_dict.values()), y_val))

In [0]:
from plot_keras_history import plot_history
import matplotlib.pyplot as plt

# plot keras history metrics 
plot_history(model_hist.history)
plt.show()
plot_history(model_hist.history, path="standard.png")
plt.close()

# follow for full model report
# https://www.kaggle.com/danbrice/keras-plot-history-full-report-and-grid-search
# https://www.machinecurve.com/index.php/2019/10/08/how-to-visualize-the-training-process-in-keras/

In [0]:
dataset = tf.data.Dataset.from_generator(lambda: dask_gen(X_tr,dd.from_array(y_train)), output_types=[tf.float32, tf.float32, tf.string, None,tf.string])
# dataset = dataset.map(parse_line, num_parallel_calls=4)
if shuffle:
    dataset = dataset.shuffle(buffer_size=len(X))
    dataset = dataset.repeat(TRAIN_EPOCHS)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [0]:
MODEL_DIR = os.path.join(PREFIX, 'models', MODEL_TYP, "init_baseline_deep-only")#.format(BATCH_SIZE)) #+ str(random.randint(1,1001))
print("Using model dir {}".format(MODEL_DIR))

config = tf.estimator.RunConfig(model_dir=MODEL_DIR, tf_random_seed=42, save_summary_steps=100,
    save_checkpoints_steps=250, session_config=None, keep_checkpoint_max=50, 
    keep_checkpoint_every_n_hours=1,session_creation_timeout_secs=7200)

def rec_line_gen(R):
    for r in range(len(R)):
        print(R.shape)
        print(r)
        yield np.asarray(R[r])

def wide_input_generator(X,y):
    while True:
        X = np.array(X)
        for i in range(len(X)):
            # WIDE: 
            x1,x2,x3,x4 = X[:,i]
            yield np.float32(x1),np.float32(x2),str(x3),str(x4),y[i]

def wide_input_fn(X, y, shuffle=True):

    # # WIDE ONLY
    def parse_line(f1,f2,f3,f4,f5):
        cols = [f1,f2,f3,f4,f5]
        lbl = cols.pop(-1)
        labels = {'Pred': lbl}
        feats = dict(zip(WIDE_COLUMNS + ['Selected_Service_ID'], cols))
        return feats,labels

    dist = X['Distance'].compute().values.astype(np.float32)
    sim = X['Embedding_Similarity'].compute().values.astype(np.float32)
    mon = X['Month'].apply(lambda x: str(x).split('.')[0], meta=('Month', 'object')).copmute().values.astype('U1')
    recs = X['Shared_Recommendations']
    recs = []

    def sparse_to_dense(v, recs):
        recs.append(delayed(pandas.SparseArray.to_dense, pure=False, name='sparseToDenseRecs')(v))
        return v
    X['Shared_Recommendations'].apply(lambda v: sparse_to_dense(v, recs), meta=('sparseV', 'object'))
    X['Shared_Recommendations'] = da.asarray(compute(*recs))
    
    sid = X['Selected_Service_ID'].apply(lambda x: str(x).split('.')[0], meta=('serv_ID', 'object')).values.astype('U6')

    X.apply(lambda x: x.compute(), axis=1, meta=('Col', 'object'))

    ds1 = tf.data.Dataset.from_tensors(dist)
    ds2 = tf.data.Dataset.from_tensor_slices(sim)
    ds3 = tf.data.Dataset.from_tensor_slices(mon)
    ds6 = tf.data.Dataset.from_tensors(recs)
    ds8 = tf.data.Dataset.from_tensor_slices(sid)
    ds9 = tf.data.Dataset.from_tensor_slices(y)
    
    ds = tf.data.Dataset.from_generator()
    dataset = tf.data.Dataset.zip((ds1,ds2,ds3,ds6,ds8,ds9))
    dataset = dataset.map(parse_line, num_parallel_calls=4)
    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(X))
        dataset = dataset.repeat(TRAIN_EPOCHS)
    dataset = dataset.batch(BATCH_SIZE)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    # dataset = dataset.cache() 
    return dataset

ds = wide_input_fn(X_tr, y_train)
X_tr.head()

In [0]:
train_dict = {}
for col in WIDE_COLUMNS + ['Selected_Service_ID']:
    if col in ('Month', 'Selected_Service_ID'):
        train_dict[col] = X_tr[col].values.astype('U6')
        print(train_dict[col].dtype)
    else:
        train_dict[col] = X_tr[col].values

val_dict = {}
for col in WIDE_COLUMNS + ['Selected_Service_ID']:
    if col in ('Month', 'Selected_Service_ID'):
        val_dict[col] = X_vl[col].values.astype('U6')
        # print(val_dict[col].dtype)
    else:
        val_dict[col] = X_vl[col].values

# with strategy.scope():
    # needs to be created inside here for TPU to work
wide_model = make_wide_model(wide_columns, wide_in)
    # train_ds = wide_input_fn(X_tr, y_train)
wide_model.compile(loss='mse', optimizer=tf.keras.optimizers.Ftrl(), metrics=['accuracy','AUC', 'Recall', 'Precision'])
train_gen = DaskGenerator(X_tr,dd.from_array(y_train))
os.environ['TF_KERAS'] = '1'

# model_hist = wide_model.fit(train_gen, verbose=1,use_multiprocessing)#, validation_data=(list(val_dict.values()), y_val))

In [0]:
print(cnt)

In [0]:
model_estimator = tf.keras.estimator.model_to_estimator(wide_model)

In [0]:
feed_dict = {}
for col in WIDE_COLUMNS + ['Selected_Service_ID']:
    feed_dict[col] = X_te[col].values
# feed_dict.update({'Pred': y_train})
wide_model.evaluate(x=list(feed_dict.values()), y=y_test, batch_size=BATCH_SIZE, verbose=1)

In [0]:
MODEL_TYP = 'wide-deep-USE'
MODEL_DIR = os.path.join(PREFIX, 'models', 'wide_and_deep_{}'.format(MODEL), MODEL_TYP)
model_fpath = os.path.join(MODEL_DIR, '{}-model.h5'.format(MODEL_TYP)
deep_model.save(model_fpath)

In [0]:
%load_ext tensorboard
%tensorboard --logdir "$MODEL_DIR/" #--debugger_port 6969 

In [0]:
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(X_tr, y_train))
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(X_te, y_test), name='wide-deep-care-net_batch-size_{}'.format(BATCH_SIZE),
                                  start_delay_secs=2, throttle_secs=10)
evals, exports = tf.estimator.train_and_evaluate(model_estimator, train_spec, eval_spec)

In [0]:
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(X_tr, y_train))
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(X_te, y_test), name='wide-deep-care-net_batch-size_{}'.format(BATCH_SIZE),
                                  start_delay_secs=2, throttle_secs=10)
evals, exports = tf.estimator.train_and_evaluate(wide_deep, train_spec, eval_spec)

In [0]:
X_tr['Selected_Service_Embedding'].values

In [0]:
import math
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix

# train_ds = input_fn(X_tr, y_train)
# test_ds = input_fn(X_te, y_test, shuffle=False)
num_5k_steps = math.ceil(len(X_te) / 5000) 
train_steps = math.ceil(len(X_tr) / num_5k_steps)

accs, precs, recs, f1s = [],[],[],[]
for i in range(num_5k_steps):
    print('='*20, "Train/Eval round {}/{}".format(i,num_5k_steps), '='*20)
    eval_er.train(input_fn=lambda: input_fn(X_tr, y_train), steps=train_steps)

    start_idx = i * train_steps
    end_idx = start_idx + train_steps
    print("Using train samples {} to {}".format(start_idx,end_idx))
    X_metr = None
    preds = None
    y_metr = None
    if end_idx <= len(X_train):
        X_metr = pd.DataFrame(X_train).iloc[start_idx:end_idx]
        print("Using subset of train data with size {}".format(len(X_metr)))
        X_metr.columns = CSV_COLUMNS
        y_metr = y_train[start_idx:end_idx]
        preds = eval_er.predict(input_fn=lambda: input_fn(X_metr, y_metr))
    else:
        X_metr = pd.DataFrame(X_train).iloc[start_idx::]
        X_metr.columns = CSV_COLUMNS
        y_metr = y_train[start_idx::]
        preds = eval_er.predict(input_fn=lambda: input_fn(X_metr, y_metr))
    yhat = []
    for i in range(train_steps):
        pred = next(preds)
        print(pred.keys())
        yhat.append(pred['Pred'])
    print("Going through {} predictions for training subset".format(len(yhat)))
    yhat_classes = list(map(lambda x: np.round(x),yhat))
    # accuracy: (tp + tn) / (p + n)
    accuracy = accuracy_score(y_metr, yhat_classes)
    print('Accuracy: %f' % accuracy)
    # precision tp / (tp + fp)
    precision = precision_score(y_metr, yhat_classes)
    print('Precision: %f' % precision)
    # recall: tp / (tp + fn)
    recall = recall_score(y_metr, yhat_classes)
    print('Recall: %f' % recall)
    # f1: 2 tp / (2 tp + fp + fn)
    f1 = f1_score(y_metr, yhat_classes)
    print('F1 score: %f' % f1)
    accs.append(accuracy)
    precs.append(precision)
    recs.append(recall)
    f1s.append(f1)

    eval_er.evaluate(input_fn=lambda: input_fn(X_te, y_test, shuffle=False), steps=5000, name='5kEvals')

In [0]:
    cfname = os.path.join(MODEL_DIR, 'label_metadata.tsv')
with open(fname, 'w') as f:
    f.write("Index\tLabel\tRecommended\n")
    sids = list(tagged_embeds.keys())
    for idx in range(len(sids)):
        typ = None
        if sids[idx] in hin_nodes:
            name = hin_nodes[sids[idx]]['name']
            typ = 'Service'
        else:
            name = sids[idx]
            typ = 'Query'
        # sel_name = serv_nodes[sel_idx]['name']
        # cand_idx = str(int(X_t['Candidate_Service_Embedding'].values[idx]))
        # try:
            # cand_name = serv_nodes[cand_idx]['name']
        # except:
            # continue
        # name = "{}-{}".format(sel_name, cand_name)
        f.write("{}\t{}\t{}\n".format(idx, name, typ))

In [0]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix

preds = eval_er.predict(lambda: input_fn(X_t, y_test,shuffle=False))
# yhat = np.array([tf.argmax(pred['Pred'],1) for pred in preds])[:,0]
yhat = []
for pred in preds:
    yhat.append(pred['Pred'])
yhat_classes = list(map(lambda x: np.round(x),yhat))


# accuracy: (tp + tn) / (p + n)
accuracy = accuracy_score(y_test, yhat_classes)
print('Accuracy: %f' % accuracy)
# precision tp / (tp + fp)
precision = precision_score(y_test, yhat_classes)
print('Precision: %f' % precision)
# recall: tp / (tp + fn)
recall = recall_score(y_test, yhat_classes)
print('Recall: %f' % recall)
# f1: 2 tp / (2 tp + fp + fn)
f1 = f1_score(y_test, yhat_classes)
print('F1 score: %f' % f1)


In [0]:
import datetime
X_t = pd.DataFrame(X_test)
X_t.columns = CSV_COLUMNS
# X_t.columns = DEEP_COLUMNS
# X_t.columns = WIDE_COLUMNS #+ ['Selected_Service_ID']


preds = eval_er.evaluate(lambda: input_fn(X_t, y_test,shuffle=False), name='evalShuffled')
# preds = eval_er.predict(lambda: input_fn(X_t, y_test,shuffle=False))
# print(next(preds))

In [0]:
    !rm -rf MODEL_DIR

In [0]:
print("Loading model as keras model from {}".format(export_path))
export_path = os.path.join('/tmp/census_model', "1585107419")
keras_model = tf.keras.models.load_model(export_path, compile=False)

In [0]:
print(keras_model.asset_paths)

In [0]:
keras_model.tensorflow_version
tf.keras.utils.plot_model(keras_model, 'census_test_model.png', show_shapes=False, rankdir='LR')

In [0]:
!ls "$export_path/../"

In [0]:
# FLAGS, unparsed = parser.parse_known_args()
# tf.app.run(argv=[sys.argv[0]] + unparsed)

# if not tf.compat.v1.gfile.Exists(FLAGS.data_dir):
#     tf.compat.v1.gfile.MkDir(FLAGS.data_dir)

# data_df = pd.DataFrame()
# data_df, labels = load_df(DATA_DIR)


In [0]:
# dataset = complete_dataset(data_df)
# TEST_PCT = 0.3
# test_size = int(TEST_PCT * len(dataset))
# train_size = int(len(dataset) - test_size)

# test_data = dataset.take(test_size)
# train_data = dataset.skip(test_size)

In [0]:
# def build_estimator(model_dir, model_type):
#   """Build an estimator appropriate for the given model type."""
#   wide_columns, deep_columns = build_model_columns()
#   hidden_units = [100, 75, 50, 25]

#   # Create a tf.estimator.RunConfig to ensure the model is run on GPU
#   run_config = tf.estimator.RunConfig().replace(
#       session_config=tf.compat.v1.ConfigProto(device_count={'GPU': 1}))

#   if LEARN_TYPE == 'wide':
#     return tf.estimator.LinearClassifier(
#         model_dir=model_dir,
#         feature_columns=wide_columns,
#         config=run_config)
#   elif LEARN_TYPE == 'deep':
#     return tf.estimator.DNNClassifier(
#         model_dir=model_dir,
#         feature_columns=deep_columns,
#         hidden_units=hidden_units,
#         config=run_config)
#   else:
#     return tf.estimator.DNNLinearCombinedClassifier(
#         model_dir=model_dir,
#         linear_feature_columns=wide_columns,
#         dnn_feature_columns=deep_columns,
#         dnn_hidden_units=hidden_units,
#         config=run_config)

In [0]:
def input_fn(data_files, num_epochs, shuffle, batch_size):
#     """Generate an input function for the Estimator."""
#     # assert tf.compat.v1.gfile.Exists(data_file), (
#     #     '%s not found. Please make sure you have either run data_download.py or '
#     #     'set both arguments --train_data and --test_data.' % data_file)

#     def parse_dataset_lines(value):
#         # print('Parsing dataset...')
#         columns = tf.io.decode_csv(value, record_defaults=CSV_COLUMN_DEFAULTS, select_cols=[0,1,2,3,4,6,7])
#         print(columns)
#         print()
#         # refactor our data. TODO change this directly when creating dataset
#         lbl = columns.pop(-1)
#         labels = {'Label': tf.py_function(func=preproc_lbl,
#                                                 inp=[lbl],
#                                                 Tout=tf.int32)}
#         # labels = lbl
#         CSV_COLUMNS[-1] = 'Selected_Service_ID'
#         columns = columns + [columns[3]]
#         print("Pass1")
#         features = dict(zip(CSV_COLUMNS, columns))
#         features.pop(CSV_COLUMNS[5])
#         features[CSV_COLUMNS[-1]] = columns[3]
#         print("Pass2")
#         for i in (3,4):
#             feat = features[CSV_COLUMNS[i]]
#             features[CSV_COLUMNS[i]] = tf.py_function(func=preproc_serv,
#                                                       inp=[feat],
#                                                       Tout=(tf.float32))
#         features[CSV_COLUMNS[5]] = tf.py_function(func=preproc_query,
#                                                       inp=[feat],
#                                                       Tout=(tf.float32))
#         print(features)
#         # features[CSV_COLUMNS[5]] = tf.py_function(func=preproc_rec,
#                                                             #  inp=[features[CSV_COLUMNS[5]]],
#         print("Pass3")                                        #  Tout=(tf.int32))
#         return features, labels
        
#     # Extract lines from input files using the Dataset API.
#     # data_files = tf.io.matching_files(tf.convert_to_tensor(data_files))
    
#     dfs = tf.data.Dataset.from_tensor_slices(data_files)
#     # dataset = tf.data.TextLineDataset(data_files)
#     dataset = tf.data.TextLineDataset(data_files)
#     print(dataset)
#     if shuffle:
#         dataset = dataset.shuffle(buffer_size=TRAIN_SIZE)
#     dataset = dataset.map(parse_dataset_lines, num_parallel_calls=multiprocessing.cpu_count())
#     print("Pass4")
#     # dataset = dataset.map(fn, num_parallel_calls=multiprocessing.cpu_count())
#     # dataset = dataset.map(parse_dataset, num_parallel_calls=multiprocessing.cpu_count())

#     # We call repeat after shuffling, rather than before, to prevent separate
#     # epochs from blending together.
#     print(dataset)
#     dataset = dataset.repeat(TRAIN_EPOCHS)
#     dataset = dataset.batch(batch_size)
#     dataset = dataset.prefetch(buffer_size=BATCH_SIZE)
#     print("Pass5")
#     return dataset

In [0]:
# tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)

# # Clean up the model directory if present
# shutil.rmtree(MODEL_DIR, ignore_errors=True)
# model = build_estimator(MODEL_DIR, LEARN_TYPE)

In [0]:

# for n in range(TRAIN_EPOCHS // EPOCHS_PER_EVAL):
#     model.train(input_fn=lambda: input_fn(
#         data_files=train_files,
#         num_epochs=EPOCHS_PER_EVAL,
#         shuffle=True,
#         batch_size=BATCH_SIZE))

#     results = model.evaluate(input_fn=lambda: input_fn(
#         data_files=test_files,
#         num_epochs=1,
#         shuffle=False,
#         batch_size=BATCH_SIZE))

#     # Display evaluation metrics
#     print('Results at epoch', (n + 1) * EPOCHS_PER_EVAL)
#     print('-' * 60)

#     for key in sorted(results):
#         print('%s: %s' % (key, results[key]))