Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Add dlpredictor
  • Loading branch information
spyglass700 committed Feb 6, 2020
1 parent 2ad4815 commit 7f155309dcaaa422b671e4b7ccb23de3c08109de
Show file tree
Hide file tree
Showing 41 changed files with 895 additions and 4 deletions.
@@ -0,0 +1,84 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "PySpark",
"type": "python",
"request": "launch",
"linux": {
"pythonPath": "/home/reza/opt/sparkclient/Spark/spark/bin/spark-submit",
"args": ["--jars","/home/reza/eshadoop/elasticsearch-hadoop-6.5.2/dist/elasticsearch-hadoop-6.5.2.jar"]
},
"program": "${file}",
"envFile": "${workspaceFolder}/dev.env",
"preLaunchTask": "kinit"
},
{
"name": "Python: Predictor",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"envFile": "${workspaceFolder}/dev.env",
"preLaunchTask": "kinit"
},
{
"name": "Python: Remote Attach",
"type": "python",
"request": "attach",
"port": 5678,
"host": "localhost",
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
]
},
{
"name": "Python: Module",
"type": "python",
"request": "launch",
"module": "enter-your-module-name-here",
"console": "integratedTerminal"
},
{
"name": "Python: Django",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/manage.py",
"console": "integratedTerminal",
"args": [
"runserver",
"--noreload",
"--nothreading"
],
"django": true
},
{
"name": "Python: Flask",
"type": "python",
"request": "launch",
"module": "flask",
"env": {
"FLASK_APP": "app.py"
},
"args": [
"run",
"--no-debugger",
"--no-reload"
],
"jinja": true
},
{
"name": "Python: Current File (External Terminal)",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "externalTerminal"
}
]
}
@@ -0,0 +1,10 @@
{
"python.linting.pylintEnabled": true,
"python.linting.pycodestyleEnabled": false,
"python.linting.enabled": true,
"python.linting.pycodestyleArgs": ["--max-line-length=200" ],
"python.pythonPath": "C:\\Python\\Python36\\python.exe",
"files.exclude": {
"**/*.pyc": true
}
}
@@ -0,0 +1,12 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"label": "kinit",
"type": "shell",
"command":"./dev.sh"
}
]
}
File renamed without changes.
File renamed without changes.
@@ -3,7 +3,7 @@ bucket_size: 1 # same as log_processor; about 5000 records per bucket
bucket_step: 1
es_host: '10.193.217.111'
es_port: '9200'
es_predictions_index: 'predictions_02012020'
es_predictions_index: 'predictions_02062020'
es_predictions_type: 'doc'
es_model_index: 'model_stats'
es_model_type: 'stat'
File renamed without changes.
File renamed without changes.
@@ -59,7 +59,7 @@ def run(cfg, model_name, model_version, serving_url):

# Read factdata table
command = """
select count_array,day,hour,uckey from {} where bucket_id between {} and {}'
select count_array,day,hour,uckey from {} where bucket_id between {} and {} and not day='2018-03-29' and not day='2018-03-30' and not day='2018-03-31'
""".format(factdata, str(start_bucket), str(end_bucket))

start_bucket = end_bucket + 1
@@ -83,5 +83,5 @@ def get_model_stats(cfg, model_name, model_version):
doc = es.search(body)
if doc == None or len(doc) != 1:
raise Exception(
'model/version {}{} not found'.format(model_name, model_version))
'model/version {}/{} not valid'.format(model_name, model_version))
return doc[0]
File renamed without changes.
File renamed without changes.
@@ -3,5 +3,5 @@
#Start the predictor
if true
then
spark-submit --jars lib/elasticsearch-hadoop-6.5.2.jar dlpredictor/main_spark_es.py conf/config.yml 's32' '1' 'http://10.193.217.108:8501/v1/models/faezeh:predict'
spark-submit --jars lib/elasticsearch-hadoop-6.5.2.jar dlpredictor/main_spark_es.py conf/config.yml 's32' '2' 'http://10.193.217.108:8501/v1/models/faezeh:predict'
fi
File renamed without changes.
@@ -0,0 +1,113 @@
from imscommon.es.ims_esclient import ESClient
from pyspark import SparkContext, SparkConf, Row
from pyspark.sql.functions import concat_ws, count, lit, col, udf, expr, collect_list
from pyspark.sql import HiveContext
from pyspark.sql.types import IntegerType, StringType
import math

# read es
es_host = '10.193.217.111'
es_port = '9200'
es_index = 'predictions_02052020'
es_type = 'doc'
es = ESClient(es_host, es_port, es_index, es_type)
hits = es.search({"size": 1000})

es_records = {}
for ucdoc in hits:
uckey = ucdoc['uckey']
predictions = ucdoc['ucdoc']['predictions']
for day, hours in predictions.items():
hour = -1
for hour_doc in hours:
hour += 1
es_records[(uckey, day, hour, '0')] = hour_doc['h0']
es_records[(uckey, day, hour, '1')] = hour_doc['h1']
es_records[(uckey, day, hour, '2')] = hour_doc['h2']
es_records[(uckey, day, hour, '3')] = hour_doc['h3']

# print(next(iter(es_records.items())))
# print('************')

sc = SparkContext()
hive_context = HiveContext(sc)
sc.setLogLevel('WARN')

# Reading the max bucket_id
bucket_size = 1
bucket_step = 1
factdata = 'factdata3m2'

start_bucket = 0
while True:

end_bucket = min(bucket_size, start_bucket + bucket_step)

if start_bucket > end_bucket:
break

# Read factdata table
command = """
select count_array,day,hour,uckey from {} where bucket_id between {} and {}
""".format(factdata, str(start_bucket), str(end_bucket))

command = """
select * from trainready_tmp
"""

start_bucket = end_bucket + 1

df = hive_context.sql(command)
#df = df.where('day="2018-03-29" or day="2018-03-30" or day="2018-03-31"')

# statistics
found_items = 0
total_error = 0

# for row in df.collect():
# uckey = row['uckey']
# day = row['day']
# hour = row['hour']
# for item in row['count_array']:
# parts = item.split(':')
# h = parts[0]
# count = int(parts[1])
# key=(uckey,day,hour,h)
# if key in es_records:
# # do not count real 0
# # if count == 0:
# # continue
# found_items+=1
# pcount = es_records[key] # predicted count
# error = abs(pcount-count)/(count+1)
# print('real:{}, predicted:{}, error:{}, uckey:{}, hour:{}, price_cat:{}'.format(count,pcount, error, uckey, hour, h))
# total_error += error

# print('found_items',found_items)
# print('avg_error',total_error/found_items)

df_collect = df.collect()
for row in df_collect:
uckey = row['uckey']
uph = row['uph']
hour = row['hour']
h = row['price_cat']
ts_n = row['ts_n']
days = [('2018-03-29', ts_n[-3]), ('2018-03-30',
ts_n[-2]), ('2018-03-31', ts_n[-1])]

for day, count in days:
count = round(math.exp(count)-1)
key = (uckey, day, hour, h)
if key in es_records:
found_items += 1
pcount = es_records[key] # predicted count
error = abs(pcount-count)/(count+1)
total_error += error
print('real:{}, predicted:{}, error:{}, uph:{}, hour:{}, price_cat:{}, day:{}'.format(
count, pcount, error, uph, hour, h, day))

print('found_items', found_items)
print('avg_error', total_error/found_items)
print('es hits:{},es_records:{},df_collect:{}'.format(
str(len(hits)), str(len(es_records)), str(len(df_collect))))
File renamed without changes.

0 comments on commit 7f15530

Please sign in to comment.