In [None]:
import pyspark
import os.path

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from getpass import getpass
import pandas as pd
import pickle

In [None]:
fpathA = 'ecommerce/order_items_dataset.csv'
fpathB = 'ecommerce/customers_dataset.csv'
fpathC = 'ecommerce/geolocation_dataset.csv'
fpathD = 'ecommerce/order_payments_dataset.csv'
fpathE = 'ecommerce/orders_dataset.csv'
fpathF = 'ecommerce/products_dataset.csv'
fpathG = 'ecommerce/sellers_dataset.csv'
fpathH = 'ecommerce/product_category_name_translation.csv'
fpathI = 'ecommerce/customer_reviews_dataset.csv'

orders = spark.read.csv(fpathE, inferSchema="true", header="true")
orderItems = spark.read.csv(fpathA, inferSchema="true", header="true")
customerReviews = spark.read.csv(fpathI, inferSchema="true", header="true")
products = spark.read.csv(fpathF, inferSchema="true", header="true")

In [None]:
print('orderItems rows: ', orderItems.count())
print('order rows: ', orders.count())
print('customerReviews rows: ', customerReviews.count())
print('products rows: ', products.count())

# Data preprocessing

In [None]:
# calculate the difference between deliver time and estimate time 
timeFmt = "yyyy-MM-dd HH:mm:ss"
timeDiff = (f.unix_timestamp('order_estimated_delivery_date', format=timeFmt)
            - f.unix_timestamp('order_customer_delivery_date', format=timeFmt))
diff = orders['order_id', 'order_customer_delivery_date', 'order_estimated_delivery_date'].dropna(how='any')
diff = diff.withColumn("Duration", timeDiff)
diff.show(3)
mergedOrders = orders.join(diff, on=['order_id'], how='inner')

In [None]:
colNames_order = ['order_id', 'order_status', 'Duration']
colNames_items = ['order_id', 'product_id', 'price', 'freight_value'] # , 'order_item_id'
colNames_reviews = ['order_id', 'survey_score'] # 'review_id'
colNames_product = ['product_id', 'product_category_name']
mergedOrdersDF = mergedOrders[colNames_order]
orderItemsDF = orderItems[colNames_items]
customerReviewsDF = customerReviews[colNames_reviews]
productDF = products[colNames_product]
print('# of rows in mergedOrdersDF: ', mergedOrdersDF.count())
print('# of rows in orderItemsDF: ', orderItemsDF.count())
print('# of rows in customerReviewsDF: ', customerReviewsDF.count())
print('# of rows in productDF: ', productDF.count())

In [None]:
# merge product and items with product_id
tempDF = orderItemsDF.join(productDF, on=['product_id'], how='inner')
# merge dataframe by aligning order_id
tempDF2 =  mergedOrdersDF.join(tempDF, on=['order_id'], how='inner')
mergedDF = customerReviewsDF.join(tempDF2, on=['order_id'], how='inner')
# mergedDF.show(5)
print('# of rows in mergedDF: ', mergedDF.count())

### Clean mergedDF

In [None]:
# remove null
cleanDF = mergedDF.dropna(how='any')
print('# of rows remaining: ', cleanDF.count())

### Data analytics

In [None]:
from pyspark.sql.types import DoubleType, IntegerType
cleanDF = cleanDF.withColumn("survey_score", cleanDF["survey_score"].cast(DoubleType()))
print('Unbalanced data')
# cleanDF.select('survey_score').toPandas().hist()

# Supervised learning

In [None]:
# using order_id, product_id, review_id, price, freight_value to predict the survey_score
featureCols = ['freight_value', 'Duration', 'price'] # 'order_id', 'order_item_id', 'product_id', , 'product_category_name'
labelCol = 'survey_score'
feature_data = cleanDF[featureCols + [labelCol]].dropna(how='any')
print('Number of features: ', len(featureCols))
print('Number of rows in cleaned DataFrame: ', feature_data.count())
# changedTypedf = feature_data.withColumn("survey_score", feature_data["survey_score"].cast(DoubleType()))
# changedTypedf.printSchema()
# feature_data = changedTypedf

In [None]:
# onehot encoding for id
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
cols = featureCols
# cols = ['product_id', 'freight_value', 'Duration', 'price', 'product_category_name'] # 'order_id', 'order_item_id', 'freight_value'

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in cols
]

encoders = [
    OneHotEncoder(
        inputCol=indexer.getOutputCol(),
        outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

assembler = VectorAssembler(
    inputCols=[encoder.getOutputCol() for encoder in encoders],
    outputCol="features"
)


pipeline = Pipeline(stages=indexers + encoders + [assembler])
vector_data = pipeline.fit(feature_data).transform(feature_data)['features', labelCol]# .show(3)
vector_data.printSchema()

### Split Data

In [None]:
trainDF, testDf = vector_data.randomSplit([0.85, 0.15], seed=12345)
print('trainDF')
trainDF.show(5)
print('testDf')
testDf.show(5)

### Down Sampling

In [None]:
from sklearn.utils import resample
from numpy.random import randint
from pyspark.sql.functions import isnan, when, count, col, rand, isnull, avg, stddev, udf, lit

In [None]:
def dowm_resample(base_features,ratio,class_field,base_class):
    pos = base_features.filter(col(class_field)==base_class)
    neg1 = base_features.filter(col(class_field)==1.0)
    neg2 = base_features.filter(col(class_field)==5.0)
    neg3 = base_features.filter(col(class_field)==3.0)
    neg4 = base_features.filter(col(class_field)==4.0)
    total_pos = pos.count()
    total_neg1 = neg1.count()
    total_neg2 = neg2.count()
    total_neg3 = neg3.count()
    total_neg4 = neg4.count()
    fraction1=float(total_pos*ratio)/float(total_neg1)
    fraction2=float(total_pos*ratio)/float(total_neg2)
    fraction3=float(total_pos*ratio)/float(total_neg3)
    fraction4=float(total_pos*ratio)/float(total_neg4)
    print(fraction1, fraction2, fraction3, fraction4)
    sampled1 = neg1.sample(False,fraction1)
    sampled2 = neg2.sample(False,fraction2)
    sampled3 = neg3.sample(False,fraction3)
    sampled4 = neg4.sample(False,fraction4)
    sampled1 = sampled1.union(pos)
    sampled2 = sampled2.union(sampled1)
    sampled3 = sampled3.union(sampled2)
    return sampled4.union(sampled3)

ratio = 1
resampleDF = dowm_resample(trainDF, ratio, 'survey_score', 2.0)

## Regression (Predict score)

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor, GBTRegressor, GeneralizedLinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

### GBTRegressor (Unbalanced Data)

In [None]:
dt = GBTRegressor(featuresCol ='features', labelCol = labelCol)
dt_model = dt.fit(trainDF)
dt_predictions = dt_model.transform(testDf)
dt_evaluator = RegressionEvaluator(labelCol=labelCol, predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
dt_predictions.select('prediction').toPandas().hist()

### GBTRegressor (Balanced Data)

In [None]:
dt = GBTRegressor(featuresCol ='features', labelCol = labelCol)
dt_model = dt.fit(resampleDF)
dt_predictions = dt_model.transform(testDf)
dt_evaluator = RegressionEvaluator(labelCol=labelCol, predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions) 
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
dt_predictions.select("prediction",labelCol,"features").show()

### DecisionTreeRegressor (Unbalanced Data)

In [None]:
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = labelCol)
dt_model = dt.fit(trainDF)
dt_predictions = dt_model.transform(testDf)
dt_evaluator = RegressionEvaluator(labelCol=labelCol, predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
dt_predictions.select("prediction",labelCol,"features").show()

### DecisionTreeRegressor (Balanced Data)

In [None]:
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = labelCol)
dt_model = dt.fit(resampleDF)
dt_predictions = dt_model.transform(testDf)
dt_evaluator = RegressionEvaluator(labelCol=labelCol, predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
dt_predictions.select('prediction').toPandas().hist()

## MLP and CNN

In [None]:
fpathA = 'ecommerce/order_items_dataset.csv'
fpathB = 'ecommerce/customers_dataset.csv'
fpathC = 'ecommerce/geolocation_dataset.csv'
fpathD = 'ecommerce/order_payments_dataset.csv'
fpathE = 'ecommerce/orders_dataset.csv'
fpathF = 'ecommerce/products_dataset.csv'
fpathG = 'ecommerce/sellers_dataset.csv'
fpathH = 'ecommerce/product_category_name_translation.csv'
fpathI = 'ecommerce/customer_reviews_dataset.csv'

orders = pd.read_csv(fpathE)
orderItems = pd.read_csv(fpathA)
customerReviews = pd.read_csv(fpathI)
products = pd.read_csv(fpathF)

In [None]:
from datetime import  datetime
orders['order_estimated_delivery_date'] = pd.to_datetime(orders['order_estimated_delivery_date'])
orders['order_customer_delivery_date'] = pd.to_datetime(orders['order_customer_delivery_date'])
duration = (orders.order_customer_delivery_date - orders.order_estimated_delivery_date).astype(int)
duration = pd.Series.to_frame(duration)
orders['duration'] = duration

In [None]:
mergedOrdersDF = orders[['order_id', 'order_status', 'duration']]
orderItemsDF = orderItems[['order_id', 'product_id', 'price', 'freight_value']]
customerReviewsDF = customerReviews[['order_id', 'survey_score']]
productDF = products[['product_id', 'product_category_name']]
tempDF = orderItemsDF.merge(productDF, on=['product_id'], how='outer')
tempDF2 =  mergedOrdersDF.merge(tempDF, on=['order_id'], how='outer')
mergedDF = customerReviewsDF.merge(tempDF2, on=['order_id'], how='outer')
mergedDF.count()

In [None]:
mergedDF = mergedDF.dropna(how='any')
mergedDF = mergedDF[['survey_score', 'order_status', 'duration', 'product_id', 'price', 'freight_value', 'product_category_name']]
mergedDF.count()

In [None]:
from keras import Model
from keras.layers import Dense
from keras.layers import Dropout
from keras_pandas.Automater import Automater
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
import warnings
warnings.filterwarnings("ignore")

In [None]:
# Train /test split
train_observations, test_observations = train_test_split(mergedDF, test_size=0.001, random_state=42)
train_observations = train_observations.copy()
test_observations = test_observations.copy()
train_observations.head()

In [None]:
# Transform the data set, using keras_pandas
categorical_vars = ['order_status', 'product_category_name']
numerical_vars = ['duration', 'price', 'freight_value', 'survey_score']
text_vars = ['product_id']

data_type_dict = {'numerical': numerical_vars, 'categorical': categorical_vars, 'text': text_vars}
output_var = 'survey_score'
auto = Automater(data_type_dict=data_type_dict, output_var=output_var)
auto.fit(train_observations)

# Transform data
train_X, train_y = auto.fit_transform(train_observations)
test_X, test_y = auto.transform(test_observations)

# Create and fit keras (deep learning) model.
x = auto.input_nub

# Fill in your own hidden layers
# x = Dense(14, kernel_initializer='normal')(x)
# x = Dropout(rate= 0.1)(x)
x = Dense(12, activation = 'relu', kernel_initializer = 'uniform')(x)
x = Dropout(rate= 0.2)(x)
x = Dense(6, activation='relu', kernel_initializer = 'uniform')(x)
x = Dropout(rate= 0.2)(x)
x = Dense(1, activation='linear', kernel_initializer = 'uniform')(x)

x = auto.output_nub(x)

model = Model(inputs=auto.input_layers, outputs=x)
model.compile(loss=auto.suggest_loss(), optimizer='adam', metrics=['mse','mae'])
history = model.fit(train_X, train_y, epochs=10, batch_size=150, validation_split=.1)

In [None]:
print(history.history.keys())
# "Loss"
import matplotlib.pyplot as plt
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('CNN model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper right')
plt.show()

# Another Approach (Classification)

In [None]:
import pandas as pd
import os.path

import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf, col, avg, mean, stddev
from pyspark.sql.types import BooleanType
from pyspark.ml.stat import Correlation

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from getpass import getpass
import pandas as pd
import pickle
from pprint import pprint

# import data_dict_loader
%matplotlib inline

In [None]:
result2 = '../result2.csv'
result2_df = spark.read.load(result2,format="csv",inferSchema="true",header="false").cache()
result2_df.show()

In [None]:
cleaned_df2 = result2_df.dropna(how='any')

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
stringIndexer = StringIndexer(inputCol="_c8", outputCol="categoryIndex")
model = stringIndexer.fit(cleaned_df2)
indexed = model.transform(cleaned_df2)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()

stringIndexer = StringIndexer(inputCol="_c1", outputCol="stateIndex")
model = stringIndexer.fit(encoded)
indexed = model.transform(encoded)

encoder = OneHotEncoder(inputCol="stateIndex", outputCol="stateVec")
encoded = encoder.transform(indexed)
encoded.show()

In [None]:
from pyspark.ml.feature import Bucketizer
min_sc = encoded.select(f.min(encoded._c7)).collect()[0][0]
max_sc = encoded.select(f.max(encoded._c7)).collect()[0][0]
splits = list(range(int(min_sc), int(max_sc) + 1, 1))
bucketizer = Bucketizer(splits=splits, inputCol= '_c7', outputCol="Bucketed")
bucket_df = bucketizer.transform(encoded)
bucket_df.show(5)

In [None]:
featureCols = ['stateIndex', '_c2','_c5','_c6','_c4', '_c3','_c9']
labelCol = 'categoryIndex'

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=featureCols,
    outputCol="features")
assembled_df = assembler.transform(encoded)[['features', 'categoryIndex']]
assembled_df.show(5)

In [None]:
assembled_df.printSchema()

In [None]:
tmp = assembled_df.randomSplit([0.75, 0.25], 24)
train_df = tmp[0]
test_df = tmp[1]
print('{} = {} + {}'.format(
    train_df.count() + test_df.count(),
    train_df.count(),
    test_df.count()))
train_df.show(5)
test_df.show(5)

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
model = DecisionTreeClassifier(labelCol='categoryIndex', featuresCol="features", maxDepth=15)
model_no_tune = model.fit(train_df)
print(model_no_tune)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
train_pred = model_no_tune.transform(train_df)
test_pred = model_no_tune.transform(test_df)
evaluator = MulticlassClassificationEvaluator(
    labelCol="categoryIndex",
    predictionCol="prediction",
    metricName="accuracy")
train_accu = evaluator.evaluate(train_pred)
test_accu = evaluator.evaluate(test_pred)
print('Train accuracy:', train_accu)
print('Test accuracy:', test_accu)