Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge branch 'apache-migration' into master
  • Loading branch information
radibnia77 committed Feb 24, 2021
2 parents 2e83de0 + c4b3819 commit 43bd43f0338bc1518637219be5a43d1d1d7833f2
Show file tree
Hide file tree
Showing 116 changed files with 4,025 additions and 4,026 deletions.
@@ -1,5 +1,4 @@

IMService/gradlew
IMService/gradlew.bat
IMService/gradle/wrapper/gradle-wrapper.jar
IMService/gradle/wrapper/gradle-wrapper.properties
IMService/gradle/wrapper/gradle-wrapper.properties
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

Large diffs are not rendered by default.

@@ -1,32 +1,32 @@
### What is predictor_dl_model?
predictor_dl_model is a suite of offline processes to forecast traffic inventory. The suite contains the following modules. More information is included in the module’s directory.

1. datagen: This module generates factdata table which contains traffic data.
2. trainer: This module builds and trains a deep learning model based on the factdata table.
3. pipeline: This module processes factdata table into training-ready data which is used to train the neural network.

### Prerequisites
Cluster: Spark 2.3/HDFS 2.7/YARN 2.3/MapReduce 2.7/Hive 1.2
Driver: Python 3.6, Spark Client 2.3, HDFS Client, tensorflow-gpu 1.10

To install dependencies run:
pip install -r requirements.txt


### Install and Run
1. Download the blue-martin/models project
2. Transfer the predictor_dl_model directory to ~/code/predictor_dl_model/ on a GPU machine which also has Spark Client.
3. cd predictor_dl_model
4. pip install -r requirements.txt to install required packages. These packages are install on top of python using pip.
5. python setup install (to install predictor_dl_model package)
6. (optional) python set_up.py bdist_egg (to create .egg file to provide to spark-submit)
7. Follow the steps in ~/code/predictor_dl_model/datagen/README.md to generate data
8. Go to directory ~/code/predictor_dl_model/predictor_dl_model
9. Run run.sh or each script individually


### Documentation
Documentation is provided through comments in config.yml and README files

### Note
saved_model_cli show --dir <model_dir>/<version> --all
### What is predictor_dl_model?
predictor_dl_model is a suite of offline processes to forecast traffic inventory. The suite contains the following modules. More information is included in the module’s directory.

1. datagen: This module generates factdata table which contains traffic data.
2. trainer: This module builds and trains a deep learning model based on the factdata table.
3. pipeline: This module processes factdata table into training-ready data which is used to train the neural network.

### Prerequisites
Cluster: Spark 2.3/HDFS 2.7/YARN 2.3/MapReduce 2.7/Hive 1.2
Driver: Python 3.6, Spark Client 2.3, HDFS Client, tensorflow-gpu 1.10

To install dependencies run:
pip install -r requirements.txt


### Install and Run
1. Download the blue-martin/models project
2. Transfer the predictor_dl_model directory to ~/code/predictor_dl_model/ on a GPU machine which also has Spark Client.
3. cd predictor_dl_model
4. pip install -r requirements.txt to install required packages. These packages are install on top of python using pip.
5. python setup install (to install predictor_dl_model package)
6. (optional) python set_up.py bdist_egg (to create .egg file to provide to spark-submit)
7. Follow the steps in ~/code/predictor_dl_model/datagen/README.md to generate data
8. Go to directory ~/code/predictor_dl_model/predictor_dl_model
9. Run run.sh or each script individually


### Documentation
Documentation is provided through comments in config.yml and README files

### Note
saved_model_cli show --dir <model_dir>/<version> --all
File renamed without changes.

Large diffs are not rendered by default.

@@ -1,3 +1,3 @@
{
"python.pythonPath": "/home/reza/anaconda3/envs/py27/bin/python"
{
"python.pythonPath": "/home/reza/anaconda3/envs/py27/bin/python"
}
@@ -1,21 +1,21 @@
### Pipleline Steps
Pipeline takes the following steps:

1. Reads factdata from hive table from day(-365)(configurable) to day(-1), input is day(-1). day(0) is today and day(-1) is yesterday.
2. Processes data using spark and writes results into tfrecords e.g. factdata.tfrecords.<date> (configurable)
3. Starts trainer to read the rfrecords and create the model
4. Writes model into local directory
5. Compare the new model and old model (new model evaluation)(future)
6. Set the predictor to use the new model - predictor reads the name of the model that it uses from Ealsticsearch (future)

### UCKEY Elements
uckey consists of the following items.

ucdoc.m = parts[0] #media-type
ucdoc.si = parts[1] #slot-id
ucdoc.t = parts[2] #connection-type
ucdoc.g = parts[3] #gender
ucdoc.a = parts[4] #age
ucdoc.pm = parts[5] #price-model
ucdoc.r = parts[6] #resident-location
ucdoc.ipl = parts[7] #ip-location
### Pipleline Steps
Pipeline takes the following steps:

1. Reads factdata from hive table from day(-365)(configurable) to day(-1), input is day(-1). day(0) is today and day(-1) is yesterday.
2. Processes data using spark and writes results into tfrecords e.g. factdata.tfrecords.<date> (configurable)
3. Starts trainer to read the rfrecords and create the model
4. Writes model into local directory
5. Compare the new model and old model (new model evaluation)(future)
6. Set the predictor to use the new model - predictor reads the name of the model that it uses from Ealsticsearch (future)

### UCKEY Elements
uckey consists of the following items.

ucdoc.m = parts[0] #media-type
ucdoc.si = parts[1] #slot-id
ucdoc.t = parts[2] #connection-type
ucdoc.g = parts[3] #gender
ucdoc.a = parts[4] #age
ucdoc.pm = parts[5] #price-model
ucdoc.r = parts[6] #resident-location
ucdoc.ipl = parts[7] #ip-location
@@ -1,18 +1,18 @@
# Copyright 2019, Futurewei Technologies
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# Copyright 2019, Futurewei Technologies
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
@@ -1,108 +1,108 @@
# Copyright 2019, Futurewei Technologies
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import json
import requests
from elasticsearch import Elasticsearch
import yaml
import argparse

class ESClient:

def __init__(self, host, port, es_index, es_type):
self.es_index = es_index
self.es_type = es_type
self.es = Elasticsearch([{'host': host, 'port': port}])

def __put(self, uckey, dict=dict):
dict_res = self.es.index(index=self.es_index, doc_type=self.es_type, id=uckey, body=dict)
return dict_res

def __get(self, uckey):
dict_res = self.es.get(index=self.es_index, doc_type=self.es_type, id=uckey)
return dict_res

def put(self, doc_id, ucdoc):
json_doc = json.dumps(ucdoc, default=lambda x: x.__dict__)
return self.__put(doc_id, json_doc)

def index(self, id, doc):
return self.es.index(index=self.es_index, doc_type=self.es_type, id=id, body=doc)

def does_exist(self, uckey):
try:
return self.es.exists(index=self.es_index, doc_type=self.es_type, id=uckey)
except:
return False

def get(self, uckey):
dict_res = self.__get(uckey)
return dict_res

def get_source(self, uckey):
dict_res = self.__get(uckey)
if '_source' in dict_res:
return dict_res['_source']
return None

def refresh_indices(self):
self.es.indices.refresh(index=self.es_index)

def get_last_update(self, uckey):
res = self.__get(uckey)
js = res['_source']
if 'lastUpdate' in js.keys():
return js['lastUpdate']
return None

def partial_update(self, uckey, key, value):
to_be_updated = {key: value}
doc = {'doc': to_be_updated}
str_to_be_updated = json.dumps(doc, default=lambda x: x.__dict__)
res = self.es.update(index=self.es_index, doc_type=self.es_type, id=uckey, body=str_to_be_updated)
return res

def update_doc_by_query(self, id, body_str):
res = self.es.update(index=self.es_index, doc_type=self.es_type, id=id, body=body_str)
return res

def post_date(self, uckey, date_str):
to_be_updated = {'date': date_str}
return self.partial_update(uckey, 'lastUpdate', to_be_updated)