In [None]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession

import tensorflow as tf
from tensorflow import keras
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Input, Embedding, Concatenate, Dense, Flatten, Reshape, BatchNormalization, Dropout

import horovod.spark.keras as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store
from pyspark.ml.feature import Normalizer, MinMaxScaler, MaxAbsScaler, StandardScaler

from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

import csv
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import col

args = {}
args['master'] = "local[{}]"
args['learning_rate'] = 0.001
args['batch-size'] = 100
args['epochs'] = 10
args["num-proc"] = 4
args['data-file'] = "/opt/train_data/auto-mpg.data"

# Create Spark session for data preparation.
conf = SparkConf().setAppName('Keras linear model example').set('spark.sql.shuffle.partitions', '16')
conf.setMaster(args["master"].format(args["num-proc"]))
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [None]:
print('================')
print('Data preparation')
print('================')

def normalize_pytorch_dataset(dataset, 
                              output_columns : list = [], 
                              model = None):
    feature_columns = dataset.columns
    # print(feature_columns)
    if model == None:
        assembler = VectorAssembler(inputCols=feature_columns,outputCol="Vectors")
        pipeline = Pipeline(stages=[assembler])
        df = pipeline.fit(dataset).transform(dataset)
        standardScaler = StandardScaler()
        standardScaler.setInputCol("Vectors")
        standardScaler.setOutputCol("scaled")
        model_scaler = standardScaler.fit(df)
    else:
        model_scaler = model
    normalized_dataset = dataset.withColumn(feature_columns[0],col=dataset[feature_columns[0]])
    for i in range(len(feature_columns)):
        column_name = feature_columns[i]
        if column_name not in output_columns:
            normalized_dataset = normalized_dataset.withColumn(column_name, 
                                            (dataset[column_name] - model_scaler.mean[i])/ \
                                            model_scaler.std[i] )
    return normalized_dataset, model_scaler

target = spark.sparkContext.textFile(args['data-file'])

df = target.map(lambda line: line.split("\t")[0]).filter(lambda line: '?' not in line)
df = df.mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition], delimiter=' ',
                                                    skipinitialspace=True, quotechar='"',))

column_names = ['MPG','Cylinders','Displacement','Horsepower','Weight',
                'Acceleration', 'Model Year', 'Origin']

cust_schema = [StructField(col, StringType(), True) for col in column_names]
cust_schema = StructType(cust_schema)

df = spark.createDataFrame(data=df, schema = cust_schema)
# df = df.select(*[col(c).cast("float") for c in df.columns])

# df.show(1)
# print(df.select("MPG").collect()[0])
df = df.withColumn("USA",df["origin"]=="1")
df = df.withColumn("Europe",df["origin"]=="2")
df = df.withColumn("Japan",df["origin"]=="3")
df = df.drop("Origin")
df = df.select(*[col(c).cast("float") for c in df.columns])
dataset = df
# df.show(1)

for i in range(len(dataset.columns)):
    column_name = dataset.columns[i]
    new_name = column_name.replace(' ','_')
    if column_name != new_name:
        dataset = dataset.withColumnRenamed(column_name, new=new_name)

print(dataset.columns)

splits_data = dataset.randomSplit([0.8, 0.2],0)
train_dataset = splits_data[0]
test_dataset = splits_data[1]

n_train, model_train = normalize_pytorch_dataset(train_dataset, output_columns=["MPG"])
n_test, _ = normalize_pytorch_dataset(test_dataset, output_columns=["MPG"], model=model_train)

In [None]:
print('=============================')
print('Tensorflow model construction')
print('=============================')

def build_model(columns):
  inputs = [Input(shape=(1,), name=col) for col in columns]
  x = Concatenate()(inputs)
  x = Dense(16, activation='relu')(x)
  x = Dense(16, activation='relu')(x)
  output = Dense(1)(x)
  model = tf.keras.Model(inputs, output)

  # model = keras.Sequential([
  #   Dense(64, activation='relu', input_dim=input_s),
  #   Dense(64, activation='relu'),
  #   Dense(1)
  # ])

  # optimizer = tf.keras.optimizers.RMSprop(args['learning_rate'])

  # model.compile(loss='mse',
  #               optimizer=optimizer,
  #               metrics=['mae', 'mse'])
  return model


features = [i for i in n_train.columns if i not in ["MPG"]]
# input_shape = len(n_train.columns) - 1
# print(input_shape)
model = build_model(features)
model.summary()

In [None]:
print('======================')
print('Hovorod model training')
print('======================')

optimizer = tf.keras.optimizers.RMSprop(args['learning_rate'])
loss='mse'
metrics=['mae', 'mse']
store = Store.create('/home/daniel/Documentos/AWS/apache_spark/datasets/')

features = [i for i in n_train.columns if i not in ["MPG"]]
print(features)

keras_estimator = hvd.KerasEstimator(
#    num_proc=1,
    store=store,
    model=model,
    optimizer=optimizer,
    loss=loss,
    metrics=metrics,
    feature_cols=features,
    label_cols=['MPG'],
    batch_size=args['batch-size'],
    epochs=args['epochs'],
    )

keras_model = keras_estimator.fit(n_train).setOutputCols(['MPG'])

In [None]:
print("----------------")
print("Final prediction")
print("----------------")

n_test = n_test.drop("MPG")

pred_df=keras_model.transform(n_test)
pred_df.printSchema()
pred_df.show(5)