Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
update dlpredictor
  • Loading branch information
radibnia77 committed Apr 21, 2021
1 parent ecf42a5 commit 669cae028f642e0c4a31614a18a9d70a29d696b6
Showing 11 changed files with 363 additions and 99 deletions.
@@ -14,7 +14,7 @@ pip install -r requirements.txt
2. Transfer the dlpredictor directory to ~/code/dlpredictor on a machine which also has Spark Client.
3. cd dlpredictor
4. pip install -r requirements.txt (to install required packages)
5. python setup install (to install predictor_dl_model package)
5. python setup.py install (to install predictor_dl_model package)
6. Run run.sh

### Documentation
@@ -1,12 +1,18 @@
log_level: 'INFO'
product_tag: 'dlpm'
pipeline_tag: '11092020'
factdata_table: 'factdata_hq_09222020_bucket_0'
distribution_table: '{product_tag}_{pipeline_tag}_tmp_distribution'
norm_table: '{product_tag}_{pipeline_tag}_trainready'
model_stat_table: '{product_tag}_{pipeline_tag}_model_stat'
log_level: 'WARN'
product_tag: 'dlpredictor'
pipeline_tag: '04212021'

#input tables from dlpm pipeline
factdata_table: 'factdata_hq_09222020_r_ipl_mapped_11052020' # this looks like dlpm_03182021
distribution_table: 'dlpm_03182021_tmp_distribution'
norm_table: 'dlpm_03182021_trainready'
model_stat_table: 'dlpm_03182021_model_stat'
bucket_size: 10
bucket_step: 1

yesterday: '2020-05-31'
serving_url: 'http://10.193.217.105:8505/v1/models/dlpm6:predict'

es_host: '10.213.37.41'
es_port: '9200'
es_predictions_index: '{product_tag}_{pipeline_tag}_predictions'
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow import DAG
import datetime as dt
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

default_args = {
'owner': 'dlpredictor',
'depends_on_past': False,
'start_date': dt.datetime(2021, 3, 15),
'retries': 0,
'retry_delay': timedelta(minutes=1),
}

dag = DAG(
'dlpredictor',
default_args=default_args,
schedule_interval=None,

)

def sparkOperator(
file,
task_id,
executor_cores=5,
num_executors=10,
**kwargs
):
return SparkSubmitOperator(
application='/home/airflow/airflow-apps/dlpredictor/dlpredictor/{}'.format(file),
application_args=['/home/airflow/airflow-apps/dlpredictor/conf/config.yml'],
conn_id='spark_default',
executor_memory='16G',
conf={'spark.driver.maxResultSize': '8g'},
driver_memory='16G',
executor_cores=executor_cores,
num_executors=num_executors,
task_id=task_id,
dag=dag,
**kwargs
)


show_config = sparkOperator('show_config.py', 'show_config')

dlpredictor = sparkOperator('main_spark_es.py',
'dlpredictor',
py_files='/home/airflow/airflow-apps/dlpredictor/dist/dlpredictor-1.6.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/imscommon-2.0.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg',
jars='/home/airflow/airflow-apps/dlpredictor/lib/elasticsearch-hadoop-6.8.0.jar')

es_push = sparkOperator('main_es_push.py',
'es_push',
3,
3,
py_files='/home/airflow/airflow-apps/dlpredictor/dist/dlpredictor-1.6.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/imscommon-2.0.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg',
jars='/home/airflow/airflow-apps/dlpredictor/lib/elasticsearch-hadoop-6.8.0.jar')

show_config >> dlpredictor >> es_push
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re

def resolve_placeholder(in_dict):
stack = []
for key in in_dict.keys():
stack.append((in_dict, key))
while len(stack) > 0:
(_dict, key) = stack.pop()
value = _dict[key]
if type(value) == dict:
for _key in value.keys():
stack.append((value, _key))
elif type(value) == str:
z = re.findall('\{(.*?)\}', value)
if len(z) > 0:
new_value = value
for item in z:
if item in in_dict and type(in_dict[item]) == str:
new_value = new_value.replace(
'{'+item+'}', in_dict[item])
_dict[key] = new_value

@@ -17,9 +17,13 @@
import logging.config
import os

# Get the base path of this installation.
# Assuming that this file is packaged in a .egg file.
basedir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
path = 'conf/log.conf'
fullpath = os.path.join(basedir, path)

logging.config.fileConfig(path)
logging.config.fileConfig(fullpath)
logger_operation = logging.getLogger('operation')
logger_run = logging.getLogger('run')
logger_security = logging.getLogger('security')
@@ -0,0 +1,83 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import sys
import yaml

from pyspark import SparkContext
from pyspark.sql import HiveContext

from dlpredictor.configutil import *
from dlpredictor.log import *
from dlpredictor import transform


def run (cfg):
sc = SparkContext()
hive_context = HiveContext(sc)
sc.setLogLevel(cfg['log_level'])

# Load the data frame from Hive.
table_name = cfg['es_predictions_index']
command = """select * from {}""".format(table_name)
df = hive_context.sql(command)

# Select the columns to push to elasticsearch.
rdd = df.rdd.map(lambda x: transform.format_data(x, 'ucdoc'))

# Write the data frame to elasticsearch.
es_write_conf = {"es.nodes": cfg['es_host'],
"es.port": cfg['es_port'],
"es.resource": cfg['es_predictions_index']+'/'+cfg['es_predictions_type'],
"es.batch.size.bytes": "1000000",
"es.batch.size.entries": "100",
"es.input.json": "yes",
"es.mapping.id": "uckey",
"es.nodes.wan.only": "true",
"es.write.operation": "upsert"}
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)

sc.stop()


if __name__ == '__main__':

# Get the execution parameters.
parser = argparse.ArgumentParser(description='Prepare data')
parser.add_argument('config_file')
args = parser.parse_args()

# Load config file.
try:
with open(args.config_file, 'r') as ymlfile:
cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
resolve_placeholder(cfg)
logger_operation.info("Successfully open {}".format(args.config_file))
except IOError as e:
logger_operation.error("Open config file unexpected error: I/O error({0}): {1}".format(e.errno, e.strerror))
except:
logger_operation.error("Unexpected error:{}".format(sys.exc_info()[0]))
raise

# Run this module.
run(cfg)

0 comments on commit 669cae0

Please sign in to comment.