In [1]:
import os 
os.environ["JAVA_HOME"] = '/usr/lib/jvm/jdk1.8.0_221'
os.environ["PATH"] += os.pathsep + os.environ["JAVA_HOME"] + '/bin'

In [2]:
from job_api import *
import importlib
from datetime import datetime
import sys
import pyspark

In [7]:
%%py_script  --task --name augmentation.py --input_data_path augmentation_in --output_data_path augmentation_out --start_date 2019-05-28T08:00:00Z --end_date 2019-05-31T16:00:00Z

from pyspark.sql import SparkSession
from ai4ops_db import *
from pyspark.sql.functions import spark_partition_id, pandas_udf
from pyspark.sql.functions import PandasUDFType
from datetime import datetime, timedelta
from apigee_ingest_utils import ApigeeIngest, ISO_TIME_FORMAT
from pytz import timezone
import pandas as pd
import argparse
import time
from job_api import Task
import sys

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.addPyFile('yarn_logging.py')
import yarn_logging
import gc

logger = yarn_logging.YarnLogger()


def prepare_augmented_pd_light(pdf, start, end, time_unit='m', chunk_id=''):
    source = pdf.loc[:1, 'source'].values.tolist()[0]
    names = pdf.groupby(['metric'], as_index=False).agg({})['metric'].values.tolist()
    logger.info('Chunk ID: {}\nmetrics: {},\nsource: {}'.format(chunk_id, names, source))
    time_range_size = int((end - start) / ApigeeIngest.delta(1, 1, time_unit)) + 1
    time_range = [
        (start + ApigeeIngest.delta(i, 1, time_unit)).strftime(ISO_TIME_FORMAT) for i in range(time_range_size)
    ]
    time_range = pd.DataFrame(time_range, columns=['time'])
    time_range.loc[:, 'tmp'] = 1
    time_range.loc[:, 'source_new'] = '{}-empty'.format(source)
    gc.collect()
    metrics = pdf.groupby(['metric'], as_index=False).agg({})
    metrics.loc[:, 'tmp'] = 1
    augmented = pd.merge(time_range, metrics, on=['tmp'], how='inner')
    augmented.loc[:, 'value_new'] = None
    gc.collect()

    augmented = pd.merge(pdf, augmented, on=['time', 'metric'], how='right')
    augmented['source'].fillna(augmented['source_new'], inplace=True)
    augmented['value'].fillna(augmented['value_new'], inplace=True)
    gc.collect()
    return augmented.drop(['tmp', 'source_new', 'value_new'], axis=1)


def prepare_augmented_udf(start, end, time_unit='m', chunk_id=''):
    print(f'from {start} to {end}')
    return pandas_udf(lambda p: prepare_augmented_pd_light(p, start, end, time_unit, chunk_id),
                      returnType=DB.metrics_schema(),
                      functionType=PandasUDFType.GROUPED_MAP)


class MyTask(Task):
    def run():
        parser = argparse.ArgumentParser()
        parser.add_argument('--input_data_path', type=str, help='Input Data files path including wildcards', default='')
        parser.add_argument('--output_data_path', type=str, help='Output data files path', default='')
        parser.add_argument('--start_date', type=str, help='Epoch start date in ISO format %Y-%m-%dT%H:%M:%SZ', default='')
        parser.add_argument('--end_date', type=str, help='Epoch end date (exclusive) in ISO format %Y-%m-%dT%H:%M:%SZ', default='')
        args, d = parser.parse_known_args()

        sc = spark.sparkContext
        df = (spark.read.format("csv").
              option("header", "false").
              schema(DB.metrics_schema()).
              option('delimiter', ',').
              load(args.input_data_path.split(',')))

        start_time = datetime.strptime(args.start_date, ISO_TIME_FORMAT)
        start_time = start_time.replace(tzinfo=timezone('UTC'))
        end_time = datetime.strptime(args.end_date, ISO_TIME_FORMAT)
        end_time = end_time.replace(tzinfo=timezone('UTC'))
        chunk_id = '{}_{}'.format(start_time.strftime('%Y-%m-%d-%H-%M'), end_time.strftime('%Y-%m-%d-%H-%M'))
        chunk = (df.repartition(2000, "metric")
                 .groupby(spark_partition_id())
                 .apply(prepare_augmented_udf(start_time, end_time + timedelta(minutes=-1), time_unit='m', chunk_id=chunk_id)))
        chunk.write.format('csv').save(args.output_data_path + '/chunk-{}'.format(chunk_id))
MyTask.run()

Missing LOG_DIRS environment variable, pyspark logging disabled

out: from 2019-05-28 08:00:00+00:00 to 2019-05-31 15:59:00+00:00



<job_api.PyScript at 0x7fe1a60bd320>

In [8]:
builder = DataprocJobBuilder()
bucket = 'ai4ops-main-storage-bucket'
project = 'kohls-kos-cicd'
cluster = 'ai4ops'
region='global'


arguments = {"--input_data_path":f"gs://{bucket}/jobs-root/augmentation_in",\
        "--output_data_path":f"gs://{bucket}/jobs-root/augmentation_out", \
        "--start_date":"2019-05-28T08:00:00Z","--end_date":"2019-05-31T16:10:00Z"}

s_path = 'poc/spark/ingest/'

AI4OPS_HISTORY_PATH=f"gs://{bucket}/apigee_history/apigee/metrics/history"

job_name = "augmentation_{}".format(int(datetime.now().timestamp()))

job = builder.task_script('augmentation.py')\
.job_id(job_name)\
.py_file(f'{s_path}apigee_ingest_utils.py')\
.py_file(f'{s_path}ai4ops_db.py')\
.py_file(f'{s_path}/yarn_logging.py')\
.arguments(**arguments)\
.build_job()

session = Session(bucket, region, cluster, project)

executor = DataprocExecutor(job, session)

In [9]:
job = executor.submit_job(run_async=False)

Job with id augmentation_1566207207 was submitted to the cluster ai4ops
Job STATUS was set to PENDING at 2019-08-19 09:33:29
Job STATUS was set to SETUP_DONE at 2019-08-19 09:33:29
      Yarn APP augmentation.py with STATUS RUNNING has PROGRESS 10
Job STATUS was set to RUNNING at 2019-08-19 09:33:30
      Yarn APP augmentation.py with STATUS FINISHED has PROGRESS 100
Job STATUS was set to DONE at 2019-08-19 09:41:35


In [None]:
executor.download_output()