In [None]:
#!/usr/bin/python
import argparse
import importlib
import time
import datetime
import sys
import os
import pyspark.sql.functions as F
import tensorflow as tf

from bigdl.optim.optimizer import *
from zoo import init_nncontext, init_spark_conf
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, udf, lit
from pyspark.sql.types import FloatType,DoubleType,ArrayType
from zoo.orca.learn.tf.estimator import Estimator
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType


if os.path.exists('jobs.zip'):
    sys.path.insert(0, 'jobs.zip')
else:
    sys.path.insert(0, './jobs')

# load dynamic module
ncf_features = importlib.import_module("jobs.ncf_features")
ncf_model = importlib.import_module("jobs.ncf_model")


__author__ = 'suqiang.song@mastercard.com'

if __name__ == '__main__':
    app_name = "NCF_DL"
    data_source_path = "/opt/work/data/pcard.csv"
    model_file_name = app_name + '.h5'
    save_model_dir = "/opt/work/model/" + model_file_name
    u_limit = 10000
    m_limit = 200
    neg_rate = 5
    sliding_length = 1
    u_output = 50
    m_output = 50
    max_epoch = 5
    batch_size = 400
    predict_output_path = "/opt/work/output/"
    log_dir = "/opt/work/logs/"
    train_start = "201307"
    train_end = "201401"
    validation_start = "201402"
    validation_end = "201403"
    test_start = "201403"
    test_end = "201404"
    inference_start = "201405"
    inference_end = "201406"
    
    
    sparkConf = init_spark_conf()
    sc = init_nncontext(sparkConf)
    spark = SparkSession \
    .builder \
    .appName(app_name) \
    .getOrCreate()

    start = time.time()
    uDF, mDF, tDF = ncf_features.load_csv(spark,data_source_path,u_limit,m_limit)
    trainingDF = ncf_features.genData(tDF,sc,spark,train_start, train_end,neg_rate,sliding_length,u_limit,m_limit)
    #trainingDF.show(5)
    validationDF = ncf_features.genData(tDF,sc,spark,validation_start, validation_end,neg_rate,sliding_length,u_limit,m_limit)
    validationDF.show(5)
    testDF = ncf_features.genData(tDF,sc,spark,test_start,test_end,neg_rate,sliding_length,u_limit,m_limit)
    #testDF.show(5)
    inferenceDF = ncf_features.genData(tDF,sc,spark,inference_start,inference_end,neg_rate,sliding_length,u_limit,m_limit)
    #inferenceDF.show(5)

    model = ncf_model.getKerasModel(u_limit,m_limit,u_output,m_output,log_dir)
    est = Estimator.from_keras(model,model_dir=log_dir)
    est.fit(data=trainingDF,batch_size=batch_size,epochs=max_epoch,feature_cols=['features'],label_cols=['labels'],validation_data=validationDF)
    # save the model
    est.save_keras_model(save_model_dir)
    # metrics ,result and save model
    print(model.metrics_names)
    #Orca the predict function supports native spark data frame ! Just need to tell batch_size and feature_cols
    prediction_df = est.predict(inferenceDF, batch_size=batch_size, feature_cols=['features'])
    prediction_df.show(5)
    score_udf = udf(lambda pred: 0.0 if pred[0] > pred[1] else 1.0, FloatType())
    prediction_df = prediction_df.withColumn('prediction2', score_udf('prediction'))
    prediction_df.show(10)
    # Save Table
    #prediction_final_df.write.mode('overwrite').parquet(predict_output_path)
    prediction_df.select('uid','mid','prediction2').write.mode('overwrite').parquet(predict_output_path)
    #prediction_df.select('uid','mid','prediction2').write.mode('overwrite').format("csv").save(predict_output_path)
    #user_join_df = prediction_df.join(uDF, on=['uid'], how='inner')
    #prediction_final_df = user_join_df.join(mDF, on=['mid'], how='inner').select('u','m','prediction').write.mode('overwrite').parquet(predict_output_path)
    end = time.time()
    print("Took time:"+str((end-start)))