In [1]:
import os
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home'

In [2]:
import pyspark as ps
from pyspark import SparkContext
from pyspark.ml import Pipeline,PipelineModel
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
# load a pipeline
path = "/Users/mineryang/Desktop/Team09_GamePopularity/RatingModelTraining/my_pipeline"
pipeline = Pipeline.load(path)

In [4]:
ss = SparkSession \
    .builder \
    .appName("ServerTraing") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:
class MyModel:
    def __init__(self, newmodel, accuracy):
        self.newmodel = newmodel
        self.accuracy = accuracy

In [6]:
# training a new model
def trainNew(p1,p2):
    print("begin to train a new model")
    origindf = ss.read.parquet("/Users/mineryang/Desktop/Team09_GamePopularity/RatingModelTraining/cleandata.parquet")
    training, test = origindf.randomSplit([p1,p2])
    model = pipeline.fit(training)
    print("training completed")
    # compute accuracy on the test set
    predictions = model.transform(test)
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    return MyModel(model,accuracy)

In [7]:
from pyspark.sql.functions import split
def parseData(df):
    df1 = df.withColumn('developer', split(df["developer"], ";"))
    df2 = df1.withColumn('publisher', split(df["publisher"], ";"))
    df3 = df2.withColumn("platforms", split(df["platforms"], ";"))
    df4 = df3.withColumn("categories", split(df["categories"], ";"))
    df5 = df4.withColumn("tags", split(df["tags"], ";"))
    return df5

In [8]:
def parseResult(df,colname):
    prow = df.select(df[colname]).collect()
    res = [r[colname] for r in prow][0]
    return res

In [9]:
# init a basic pretict model
mypath = "/Users/mineryang/Desktop/Team09_GamePopularity/RatingModelTraining/best_model"
loadmodel = PipelineModel.load(mypath)
predictmodel = MyModel(loadmodel, 0.7)
# predictmodel = trainNew(0.6,0.4)
print("Test set accuracy = " + str(predictmodel.accuracy))

Test set accuracy = 0.7


In [10]:
schema = t.StructType(
    [
        t.StructField("developer",t.StringType()),
        t.StructField("publisher",t.StringType()),
        t.StructField("platforms",t.StringType()),
        t.StructField("categories",t.StringType()),
        t.StructField("tags",t.StringType()),
        t.StructField("price",t.DoubleType()),
    ]
)

In [11]:
# start real time server
from flask import Flask
from flask import request, redirect, url_for
from flask_cors import CORS
from flask_restful import reqparse, abort, Api, Resource

# create a Flask instance
app = Flask(__name__)
CORS(app)
api = Api(app)

In [12]:
# create a parser
# fill a parser with information about arguments 
parser = reqparse.RequestParser()
parser.add_argument("developer",type=str)
parser.add_argument("publisher",type=str)
parser.add_argument("platforms",type=str)
parser.add_argument("categories",type=str)
parser.add_argument("tags",type=str)
parser.add_argument("price",type=float)

<flask_restful.reqparse.RequestParser at 0x11427be50>

In [13]:
# create another parser
parser2 = reqparse.RequestParser()
parser2.add_argument("command",type=str)
parser2.add_argument("p1",type=float)
parser2.add_argument("p2",type=float)

<flask_restful.reqparse.RequestParser at 0x11428b5d0>

In [14]:
@app.route('/')
def index():
    return "welcome"

@app.route('/init',methods=['GET'])
def init():
    return jsontext

@app.route('/predict',methods=['GET', 'POST'])
def predict():
    if request.method == 'POST':
        args = parser.parse_args()
        dev = args["developer"]
        pub = args["publisher"]
        plt = args["platforms"]
        cat = args["categories"]
        tag = args["tags"]
        prc = args["price"]
        
        X = {
            'developer':dev,
            'publisher':pub,
            'platforms':plt,
            'categories':cat,
            'tags':tag,
            'price':prc
        }
        df=ss.createDataFrame([X],schema)
        df1 = parseData(df)
        df1.show()
        prediction = predictmodel.newmodel.transform(df1)
        label = str(parseResult(prediction,'prediction'))
        res = label+","+str(predictmodel.accuracy)
        print("result: "+ label+", accuracy: "+ str(predictmodel.accuracy))
        return str(res)
    else:
        return "in get situation"

@app.route('/train',methods=['GET', 'POST'])
def train():
    if request.method == 'POST':
        args = parser2.parse_args()
        cmd = args["command"]
        p1 = args["p1"]
        p2 = args["p2"]
        if cmd == 'true':
            mymodel = trainNew(p1,p2)
            predictmodel.newmodel = mymodel.newmodel
            predictmodel.accuracy = mymodel.accuracy
            print("new model accuracy: "+ str(predictmodel.accuracy))
        return str(predictmodel.accuracy)
    else:
        return "in get situation"

In [None]:
if __name__ == "__main__":
    app.run(debug=True, use_reloader=False)

 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: on


 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [14/Apr/2020 16:27:28] "OPTIONS /predict HTTP/1.1" 200 -


+---------+---------+--------------------+--------------------+--------------------+-----+
|developer|publisher|           platforms|          categories|                tags|price|
+---------+---------+--------------------+--------------------+--------------------+-----+
|  [Valve]|  [Valve]|[linux, mac, wind...|[Multi-player, Cr...|[Massively Multip...| 7.19|
+---------+---------+--------------------+--------------------+--------------------+-----+



127.0.0.1 - - [14/Apr/2020 16:27:31] "POST /predict HTTP/1.1" 200 -


result: 3.0, accuracy: 0.7


127.0.0.1 - - [14/Apr/2020 16:28:00] "OPTIONS /train HTTP/1.1" 200 -


begin to train a new model
training completed


127.0.0.1 - - [14/Apr/2020 16:29:19] "POST /train HTTP/1.1" 200 -


new model accuracy: 0.6981225296442688


127.0.0.1 - - [14/Apr/2020 16:30:03] "OPTIONS /predict HTTP/1.1" 200 -


+---------+---------+--------------------+--------------------+---------------+-----+
|developer|publisher|           platforms|          categories|           tags|price|
+---------+---------+--------------------+--------------------+---------------+-----+
|  [Valve]|  [Valve]|[linux, mac, wind...|[Multi-player, Cr...|[Multiplayer, ]| 7.09|
+---------+---------+--------------------+--------------------+---------------+-----+



127.0.0.1 - - [14/Apr/2020 16:30:04] "POST /predict HTTP/1.1" 200 -


result: 3.0, accuracy: 0.6981225296442688


In [None]:
# import json
# def readfeature(fpath):
#     f = open(fpath, "r")
#     return f.read()
# devstr = readfeature("/Users/mineryang/Desktop/Team09_GamePopularity/RatingModelTraining/selectionFile/select_developer.txt")
# pubstr = readfeature("/Users/mineryang/Desktop/Team09_GamePopularity/RatingModelTraining/selectionFile/select_publisher.txt")
# pltstr = readfeature("/Users/mineryang/Desktop/Team09_GamePopularity/RatingModelTraining/selectionFile/select_platforms.txt")
# catstr = readfeature("/Users/mineryang/Desktop/Team09_GamePopularity/RatingModelTraining/selectionFile/select_categories.txt")
# tagstr = readfeature("/Users/mineryang/Desktop/Team09_GamePopularity/RatingModelTraining/selectionFile/select_tags.txt")
# # jsontext = json.dumps({'devstr': devstr, 
# #                           'pubstr': pubstr, 
# #                           'pltstr': pltstr,
# #                           'catstr': catstr,
# #                           'tagstr': tagstr})

# print(devstr.split(";"))