In [None]:
import datetime
import os
import sys
import time
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.preprocessing import MinMaxScaler, OrdinalEncoder
from sklearn.model_selection import train_test_split

In [None]:
print(sys.version)
print(f'tf version : {tf.__version__}')

# Preparing data

Data is from https://www.kaggle.com/competitions/h-and-m-personalized-fashion-recommendations/data

In [None]:
hm_data_path = '../datasets/h&m/' 
customer_df = pd.read_parquet(os.path.join(hm_data_path, 'customers.parquet'))
int_df = pd.read_csv(os.path.join(hm_data_path, 'transactions_train.csv'))

In [None]:
print(f"{int_df.memory_usage().sum():,} bytes")
print(int_df.shape)

In [None]:
print(f"memory(bytes) : {customer_df.memory_usage().sum():,}")
print(customer_df.shape)

In [None]:
cust_cols = ['customer_id', 'club_member_status', 'fashion_news_frequency', 'age']
customer_df = (customer_df[cust_cols]
                .copy()
                .dropna(subset=['age'])
                .replace(['NONE', None], "None")
              )

In [None]:
int_cols = ['t_dat', 'customer_id', 'sales_channel_id','price']
df = pd.merge(int_df[int_cols], customer_df, on=['customer_id'], how='left')

methods for OHE : https://www.thetestspecimen.com/posts/one-hot-encoding/

In [None]:
df['sales_channel_id'] = df['sales_channel_id'].astype('string')
df['t_dat'] = pd.to_datetime(df['t_dat'])
df.sort_values("t_dat", ascending=True, inplace=True)

In [None]:
train_index = int(len(df)*0.8)
train_df = df.iloc[:train_index].copy()
test_df = df.iloc[train_index:].copy()

In [None]:
ordinal_cols = ['club_member_status', 'fashion_news_frequency', 'sales_channel_id']
num_cols = ['age']

mm_scaler = MinMaxScaler()
ord_enc = OrdinalEncoder()

fit_mm_scaler = mm_scaler.fit(train_df[num_cols])
train_df[num_cols] = fit_mm_scaler.transform(train_df[num_cols])
test_df[num_cols] = fit_mm_scaler.transform(test_df[num_cols])

fit_oe = ord_enc.fit(train_df[ordinal_cols].values)
train_df[ordinal_cols] = fit_oe.transform(train_df[ordinal_cols])
test_df[ordinal_cols] = fit_oe.transform(test_df[ordinal_cols])

# Saving data

## parquet

In [None]:
# ==== Save train/test into single parquet file
single_parquet_path = '../datasets/h&m/pp_parquets/'

# datetime to string just to replicate our data on the cloud.
def save_single_parquet():
    train_df.to_parquet(single_parquet_path + 'train_df.parquet', index=False)
    test_df.to_parquet(single_parquet_path + 'test_df.parquet', index=False)
save_single_parquet()

# ==== Save train/test into multiple parquet with 128mb each
# Train : single file is 595,894kb -> 580mb -> 116mb*5
# Test : 151366 -> 147mb -> already in fairly optimal size.
def save_multi_parquets(df, folder_path, chunk_size, dataset_type='train'):
    for idx, df_chunk in enumerate(np.array_split(df, chunk_size)):
        df_chunk.to_parquet(f'{folder_path}/{dataset_type}_df_{idx}.parquet', index=False)
save_multi_parquets(train_df, os.path.join(single_parquet_path, 'optimal'),chunk_size=5)
save_multi_parquets(train_df, os.path.join(single_parquet_path, 'multi'), chunk_size=200)

save_multi_parquets(test_df, os.path.join(single_parquet_path, 'optimal'),
                    chunk_size=2, dataset_type='test')
save_multi_parquets(test_df, os.path.join(single_parquet_path, 'multi'), 
                    chunk_size=100, dataset_type='test')

## TFRecord files

In [None]:
def _bytes_feature(value):
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [None]:
%%time
# whole df to single tfrecords
# adding compression
options = tf.io.TFRecordOptions(compression_type='GZIP', )
single_tfrecord_path = '../datasets/h&m/tfrecords/'

with tf.io.TFRecordWriter(os.path.join(single_tfrecord_path, 'test_data.tfrecord'),
                          options=options) as writer:
    for row in test_df.itertuples():
        t_dat = getattr(row, 't_dat')
        customer_id = getattr(row, 'customer_id')
        sales_channel_id = getattr(row, 'sales_channel_id')
        price = getattr(row, 'price')
        club_member_status = getattr(row, 'club_member_status')        
        fashion_news_frequency = getattr(row, 'fashion_news_frequency')                       
        age = getattr(row, 'age')   
        price = getattr(row, 'price')
        feature_dict = {
            't_dat':_bytes_feature(t_dat.encode('utf-8')),
            'customer_id': _bytes_feature(customer_id.encode('utf-8')),
            'sales_channel_id':_float_feature(sales_channel_id),
            'club_member_status':_float_feature(club_member_status),
            'fashion_news_frequency':_float_feature(fashion_news_frequency),
            'age':_float_feature(age),
            'price':_float_feature(price)
        }
        example_proto = tf.train.Example(features=tf.train.Features(feature=feature_dict))
        writer.write(example_proto.SerializePartialToString())

In [None]:
"""
For saving multiple/optimal tfercord files

single file tfrecord 
- train = 725mb -> 7 tfrecords
- test = 182mb -> 2 tfrecords
"""
def write_tfrecord(df, file_path, folder_path):
    with tf.io.TFRecordWriter(os.path.join(folder_path, file_path), options=options) as writer:
        for row in df.itertuples():
            t_dat = getattr(row, 't_dat')
            customer_id = getattr(row, 'customer_id')
            sales_channel_id = getattr(row, 'sales_channel_id')
            price = getattr(row, 'price')
            club_member_status = getattr(row, 'club_member_status')        
            fashion_news_frequency = getattr(row, 'fashion_news_frequency')                       
            age = getattr(row, 'age')   
            price = getattr(row, 'price')
            feature_dict = {
                't_dat':_bytes_feature(t_dat.encode('utf-8')),
                'customer_id': _bytes_feature(customer_id.encode('utf-8')),
                'sales_channel_id':_float_feature(sales_channel_id),
                'club_member_status':_float_feature(club_member_status),
                'fashion_news_frequency':_float_feature(fashion_news_frequency),
                'age':_float_feature(age),
                'price':_float_feature(price)}
            example_proto = tf.train.Example(features=tf.train.Features(feature=feature_dict))
            writer.write(example_proto.SerializePartialToString())

optimal_tfrecord_folder_path = '../datasets/h&m/tfrecords/optimal'
for idx, df_chunk in enumerate(np.array_split(train_df, 7)):
    write_tfrecord(df_chunk, f'train_data_{idx}.tfrecord', optimal_tfrecord_folder_path)
for idx, df_chunk in enumerate(np.array_split(test_df, 2)):
    write_tfrecord(df_chunk, f'test_data_{idx}.tfrecord', optimal_tfrecord_folder_path)

In [None]:
multi_tfrecord_folder_path = '../datasets/h&m/tfrecords/multi'
for idx, df_chunk in enumerate(np.array_split(train_df, 200)):
    write_tfrecord(df_chunk, f'train_data_{idx}.tfrecord', multi_tfrecord_folder_path)
    
for idx, df_chunk in enumerate(np.array_split(test_df, 100)):
    write_tfrecord(df_chunk, f'test_data_{idx}.tfrecord', multi_tfrecord_folder_path)

# Training

In [None]:
inputs = tf.keras.layers.Input(shape=(4,))
dense = tf.keras.layers.Dense(4, activation='relu')(inputs)
dense = tf.keras.layers.Dense(4, activation='relu')(dense)
dense = tf.keras.layers.Dense(4, activation='relu')(dense)
outputs = tf.keras.layers.Dense(1, activation=None)(dense)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile(
    loss = tf.keras.losses.mean_squared_error,
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001),
    metrics = [tf.keras.metrics.RootMeanSquaredError()])

## petastorm

In [None]:
"""
Configurations for petastorm

"""
from petastorm import make_reader, make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset

features = ['age','sales_channel_id', 'club_member_status', 'fashion_news_frequency']
batch_size = 512
n_epochs = 3
target_col = 'price'

tr_reader_kwargs = {
    'reader_pool_type':'thread',
    'schema_fields' : features + ['price'],
    'shuffle_rows':False,
    'shuffle_row_groups':True,
    'workers_count': 10
}
val_reader_kwargs = {
    'reader_pool_type':'thread',
    'schema_fields' : features + ['price'],
    'shuffle_rows':False,
    'shuffle_row_groups':False,
    'workers_count': 10
}

curr_datetime = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
model_configs = {
    "use_multiprocessing":False,
    "workers":1,
#     "callbacks":[tf.keras.callbacks.TensorBoard(log_dir=f'tb_logs/{curr_datetime}', profile_batch=(10, 15))]
}

In [None]:
def parse_dataset(e):
    tensors = []
    for col_nm in features:
        tensors.append(getattr(e, col_nm))
    X = tf.cast(tf.stack(tensors, axis=1), tf.float32)
    y = getattr(e, target_col)
    return X, y

def train_model_from_petastorm(tr_parquet_path, val_parquet_path, model):
    start_time = time.perf_counter()
    with make_batch_reader(tr_parquet_path, **tr_reader_kwargs) as tr_reader:      
        with make_batch_reader(val_parquet_path, **val_reader_kwargs) as val_reader:
            tr_dataset = make_petastorm_dataset(tr_reader).unbatch().batch(batch_size).map(parse_dataset)
            val_dataset = make_petastorm_dataset(val_reader).unbatch().batch(batch_size).map(parse_dataset)
            model.fit(tr_dataset, validation_data=val_dataset, epochs=n_epochs,
                     **model_configs)
    print(f"Elapsed time: {time.perf_counter() - start_time}")

In [None]:
base_url = "file:///Users/haneu/Desktop/PROJECTS/datasets/h&m/pp_parquets"
folder_path = '../datasets/h&m/pp_parquets/'

single_tr_parquet_path = os.path.join(base_url, "train_df.parquet")
single_val_parquet_path = os.path.join(base_url, "test_df.parquet")

optimal_tr_parquet_path = [f"{base_url}/optimal/{f}" for f in os.listdir(os.path.join(folder_path, "optimal"))
                           if f.startswith("train")]
optimal_val_parquet_path = [f"{base_url}/optimal/{f}" for f in os.listdir(os.path.join(folder_path, "optimal")) 
                            if f.startswith("test")]
multi_tr_parquet_path = [f"{base_url}/multi/{f}" for f in os.listdir(os.path.join(folder_path, "multi")) if f.startswith("train")]
multi_val_parquet_path = [f"{base_url}/multi/{f}" for f in os.listdir(os.path.join(folder_path, "multi")) if f.startswith("test")]

In [None]:
print(len(single_tr_parquet_path))
train_model_from_petastorm(single_tr_parquet_path, single_val_parquet_path, model)

In [None]:
print(len(optimal_tr_parquet_path))
train_model_from_petastorm(optimal_tr_parquet_path, optimal_val_parquet_path, model)

In [None]:
print(len(multi_tr_parquet_path))
train_model_from_petastorm(multi_tr_parquet_path, multi_val_parquet_path, model)

## TFrecords

In [None]:
# This only works for names dictionary,,,, 회사에선 둘다 된거 같은데;;;
features = ['age','sales_channel_id', 'club_member_status', 'fashion_news_frequency']

inputs = []
for f in features:
    inputs.append(tf.keras.layers.Input(shape=(1,), name=f))

concat_input = tf.keras.layers.concatenate(inputs)
dense = tf.keras.layers.Dense(4, activation='relu')(concat_input)
dense = tf.keras.layers.Dense(4, activation='relu')(dense)
dense = tf.keras.layers.Dense(4, activation='relu')(dense)
outputs = tf.keras.layers.Dense(1, activation=None)(dense)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile(
    loss = tf.keras.losses.mean_squared_error,
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001),
    metrics = [tf.keras.metrics.RootMeanSquaredError()])

In [None]:
"""
Configurations for tfrecords
"""
def parse_tfrecord(serialized_example):
    # example = tf.io.parse_single_example(serialized_example, feature_desc)
    # for batch
    example = tf.io.parse_example(serialized_example, feature_desc)
    
    # since it does not provide projection, need to store it w/o or exclude it manually
    example.pop("t_dat")
    example.pop("customer_id")
    
    y = example.pop('price')
    return example, y

feature_desc = {
    "t_dat": tf.io.FixedLenFeature([], tf.string),
    'customer_id': tf.io.FixedLenFeature([], tf.string),
    'sales_channel_id': tf.io.FixedLenFeature([], tf.float32),
    'club_member_status': tf.io.FixedLenFeature([], tf.float32),
    'fashion_news_frequency': tf.io.FixedLenFeature([], tf.float32),
    'age': tf.io.FixedLenFeature([], tf.float32),
    'price': tf.io.FixedLenFeature([], tf.float32),
}

In [None]:
def train_model_from_tfrecords(tr_data_paths, val_data_paths, model):
    start_time = time.perf_counter()
    tr_serialized_dataset = tf.data.TFRecordDataset(tr_data_paths, compression_type='GZIP')
    tr_dataset = tr_serialized_dataset.shuffle(100_000).batch(batch_size).map(parse_tfrecord)
    
    val_serialized_dataset = tf.data.TFRecordDataset(val_data_paths, compression_type='GZIP')
    val_dataset = val_serialized_dataset.batch(batch_size).map(parse_tfrecord)
    model.fit(tr_dataset, validation_data=val_dataset, epochs=n_epochs,
              **model_configs)
    print(f"Elapsed time: {time.perf_counter() - start_time}")

In [None]:
tr_single_tfrecord = '../datasets/h&m/tfrecords/train_data.tfrecord'
val_single_tfrecord = '../datasets/h&m/tfrecords/test_data.tfrecord'

tr_optimal_tfrecord = tf.io.gfile.glob('../datasets/h&m/tfrecords/optimal/train_*.tfrecord')
val_optimal_tfrecord = tf.io.gfile.glob('../datasets/h&m/tfrecords/optimal/test_*.tfrecord')

tr_multi_tfrecord = tf.io.gfile.glob('../datasets/h&m/tfrecords/multi/train_*.tfrecord')
val_multi_tfrecord = tf.io.gfile.glob('../datasets/h&m/tfrecords/multi/test_*.tfrecord')

In [None]:
train_model_from_tfrecords(tr_single_tfrecord, val_single_tfrecord, model)

In [None]:
print(len(tr_optimal_tfrecord))
train_model_from_tfrecords(tr_optimal_tfrecord, val_optimal_tfrecord, model)

In [None]:
print(len(tr_multi_tfrecord))
train_model_from_tfrecords(tr_multi_tfrecord, val_multi_tfrecord, model)

----

# Others

## spark-tensorflow-connector

Writing tfrecords from pyspark dataframe.

In [None]:
full_path = os.path.join(os.getcwd(), 'spark-tensorflow-connector-1.0.0-s_2.11.jar')

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
            .appName('stc-test')\
            .config('spark.jars', 'spark-tensorflow-connector-1.0.0-s_2.11.jar')\
            .getOrCreate()
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')

In [None]:
import pyspark
print(pyspark.__version__)

In [None]:
train_pdf = spark.createDataFrame(train_df)

In [None]:
train_pdf.show(2)

In [None]:
# http://spark.apache.org/third-party-projects.html
train_pdf.write.format('tfrecords').option('writeLocality', 'local').save("/tfrecords")

## Reviews

In [None]:
ds2 = tf.data.Dataset.from_tensor_slices(tf.random.uniform([4,10]))

print("- - - - - - - - ds2 - - - - - - - -")
print(tf.random.uniform([4,10]))
ds3 =(tf.random.uniform([4]),
      tf.random.uniform([4, 4], maxval=100, dtype=tf.int32))

print("- - - - - - - - ds3 - - - - - - - -")
print(ds3)
ds3 = tf.data.Dataset.from_tensor_slices(ds3)

# print(ds.element_spec)
print(ds2.element_spec)
print(ds3.element_spec)

inc_ds = tf.data.Dataset.range(100)
inc_ds2 = tf.data.Dataset.range(100, 200)
comb_ds = tf.data.Dataset.zip((inc_ds, inc_ds2))
batch_ds = comb_ds.batch(10)
list(iter(comb_ds))[:3]

for e in batch_ds: # this continues until all elements 
    print(e) 
    break
    

for e in batch_ds.take(2): # only take(n), n element within ds.
    print(e)
    
"""
Since each element may be different length, especially in sequential models. tf provide Dataset.padded_batch
function
"""
# dataset = tf.data.Dataset.range(100)
# dataset = dataset.map(lambda x: tf.fill([tf.cast(x, tf.int32)], x))
# dataset = dataset.padded_batch(4, padded_shapes=(None,))

# for batch in dataset.take(2):
#     print(batch.numpy())
#     print()