Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge pull request #76 from Faezehvaseghi/master
Update test
  • Loading branch information
xun-hu-at-futurewei-com committed Oct 27, 2020
2 parents 6e11063 + 1fb6ce1 commit c0a95527f608a47ed084ba1822a7dfc9e8d1a1b4
Showing 5 changed files with 237 additions and 111 deletions.

This file was deleted.

@@ -0,0 +1,36 @@
test:
table_name: "factdata_hq_09222020"
timer: 60

pipeline:
time_series: # This is done on whole bucketized data
factdata_table_name: "unittest_factdata_10122020"
conditions: []
yesterday: "2020-03-22" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 2
bucket_size: 10 # maximum number of buckets to process starting from 0
bucket_step: 1 # size of bucket batch that is processed in one iteration
output_table_name: 'unittest_pipeline_ts_10122020' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
uckey_clustring: # This is done on whole data, not slicing on buckets
pre_cluster_table_name: 'unittest_pipeline_pre_cluster_10122020'
create_pre_cluster_table: True
output_table_name: 'unittest_pipeline_cluster_10122020'
cluster_size:
number_of_virtual_clusters: 0
datapoints_min_th: 0.15
datapoints_th_uckeys: 0.5
datapoints_th_clusters: 0.5
popularity_norm: 0.01
popularity_th: 5
median_popularity_of_dense: 2
normalization: # This is done on whole data, not slicing on buckets
output_table_name: 'unittest_trainready_10132020'
columns: {
'price_cat':['1','2','3'],
'a': ['','1','2','3','4','5','6'],
'g':['','g_f','g_m','g_x'],
't':['UNKNOWN','3G','4G','WIFI','2G'],
'r':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'si':['d4d7362e879511e5bdec00163e291137', 'b6le0s4qo8', 'd47737w664', 'd971z9825e', '72bcd2720e5011e79bc8fa163e05184e', 'j1430itab9wj3b', 'w3wx3nv9ow5i97', 'g9iv6p4sjy', '71bcd2720e5011e79bc8fa163e05184e', '7b0d7b55ab0c11e68b7900163e3e481d', 'm1040xexan', 'x2fpfbm8rt', '05', '66bcd2720e5011e79bc8fa163e05184e', 'g7m2zuits8', 'l2d4ec6csv', 'a8syykhszz', 'w9fmyd5r0i', 'a47eavw7ex', 'p7gsrebd4m', 'q4jtehrqn2', '03', 'l03493p0r3', 's4z85pd1h8', 'f1iprgyl13', '17dd6d8098bf11e5bdec00163e291137', 'e351de37263311e6af7500163e291137', '68bcd2720e5011e79bc8fa163e05184e', '5cd1c663263511e6af7500163e291137', 'k4werqx13k', 'x0ej5xhk60kjwq', '04', 'a290af82884e11e5bdec00163e291137', '15e9ddce941b11e5bdec00163e291137', 'z041bf6g4s', 'd9jucwkpr3', 'c4n08ku47t']
}
holidays: ['2019-11-09', '2019-11-10', '2019-11-11', '2019-11-25', '2019-11-26', '2019-11-27','2019-11-28', '2019-12-24','2019-12-25', '2019-12-26','2019-12-31', '2020-01-01', '2020-01-02', '2020-01-19','2020-01-20', '2020-01-21', '2020-01-22', '2020-01-23', '2020-01-24', '2020-01-25', '2020-02-08']
@@ -0,0 +1,41 @@
# Copyright 2020, Futurewei Technologies
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
import yaml
from pyspark import SparkContext
from pyspark.sql import HiveContext


class TestBase(unittest.TestCase):

@classmethod
def setUpClass(cls):
with open('config.yml', 'r') as ymlfile:
cfg = yaml.load(ymlfile)
cls.cfg = cfg
sc = SparkContext().getOrCreate()
sc.setLogLevel('warn')
cls.hive_context = HiveContext(sc)
cls.timeout = cfg['test']['timer']

def compare_dfs(self, df1, df2):
col1 = sorted(df1.collect())
col2 = sorted(df2.collect())
return col1 == col2
@@ -0,0 +1,78 @@
# Copyright 2020, Futurewei Technologies
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
import multiprocessing
import yaml, threading

from pyspark import SparkContext
from pyspark.sql import HiveContext

def load_df(hive_context, table_name, bucket_id):
command = """select * from {} where bucket_id = {}""".format(table_name, bucket_id)
return hive_context.sql(command)

class TestBase(unittest.TestCase):

@classmethod
def setUpClass(cls):
with open('config.yml', 'r') as ymlfile:
cfg = yaml.load(ymlfile)
cls.cfg = cfg

sc = SparkContext().getOrCreate()
sc.setLogLevel('warn')
cls.hive_context = HiveContext(sc)
cls.table_name = cfg['test']['table_name']
cls.df = load_df(hive_context=cls.hive_context, table_name=cls.table_name, bucket_id=1)
cls.df2 = load_df(hive_context=cls.hive_context, table_name=cls.table_name, bucket_id=2)
cls.timeout = cfg['test']['timer']
cls.manager = multiprocessing.Manager()
cls.return_dic = cls.manager.dict()

def timer(self, timeout, func, args=(), kwargs={}):
""" Run func with the given timeout.
If func didn't finish running within the timeout, return -1.
"""

class UnitTestFuncThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = None
self._stop_event = threading.Event()

def run(self):
self.result = func(*args, **kwargs)

def stop(self):
self._stop_event.set()

def stopped(self):
return self._stop_event.is_set()

func_thread = UnitTestFuncThread()
func_thread.daemon = True
func_thread.start()
func_thread.join(timeout)
if func_thread.isAlive():
func_thread.stop()
return -1 # -1: the outtime failure with failed message.
else:
return 0

@@ -0,0 +1,82 @@
# Copyright 2020, Futurewei Technologies
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
1. Check if the table has partitions or not.
2. Check if the count, distinct count and bucket list health check takes more than timeout time or not
"""
import unittest
from test_base_hive_db import TestBase


def load_df(hive_context, table_name, bucket_id):
command = """select * from {} where bucket_id = {}""".format(table_name, bucket_id)
return hive_context.sql(command)


def distinct_count(df, column):
return df.select(column).distinct().count()


def total_count(df):
return df.count


def bucket_id_health(df, df2):
d = df.join(df2, on='uckey', how='inner')
return d.count()


def partition_check(table_name, hive_context):
try:
command = """show PARTITIONS {}""".format(table_name)
hive_context.sql(command)
return 0
except Exception:
return -1


class factdata_health(TestBase):

def test_partition_check(self):
result = partition_check(self.table_name, self.hive_context)
self.assertEqual(result, 0, "passed")

def test_total_check(self):
result = self.timer(self.timeout, total_count, args=(self.df))
self.assertEqual(result, 0, "passed")

def test_total_distinct_check(self):
column = 'day'
result = self.timer(self.timeout, distinct_count, args=(self.df, column))
self.assertEqual(result, 0, "passed")

def test_distinct_count(self):
column = 'uckey'
result = self.timer(self.timeout, distinct_count, args=(self.df, column))
self.assertEqual(result, 0, "passed")

def test_bucket_check(self):
result = self.timer(self.timeout, bucket_id_health, args=(self.df, self.df2))
self.assertEqual(result, 0, "passed")


if __name__ == '__main__':
unittest.main()

0 comments on commit c0a9552

Please sign in to comment.