In [5]:
import datetime
import argparse
import logging
import pandas as pd
from datetime import date
from dataclasses import dataclass
from pyspark import SparkContext
from pyspark.sql import SparkSession

#custom event frame definition           
@dataclass
class historic_event:
            operation: str
            details: str
            date_time: datetime
            duration: float
                
@dataclass
class cumulated_event:
            resource: str
            description: str
            way: str
            timestamp: datetime
                        
#initial env setup 
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
logging.info("------------------------------------------------------------------------")
logging.info("------------------Starting the log parsing process----------------")
logging.info("----------Seting up the environment...")
#better display for the dataframe in here!
pd.set_option('display.width', 2000)
parser = argparse.ArgumentParser(description='Job Arguments')
parser.add_argument('--batchID')
args, unknown = parser.parse_known_args()
if args.batchID is None: 
    workingBatch = date.today().strftime("%H%m%d%Y")
else:
    workingBatch = args.batchID
workingPathHDFS = f'/user/akorobeinykov/korobeinykov-{workingBatch}' 
spark = SparkSession.builder.appName("OperationDurationAnalyze").master('yarn').getOrCreate()
sc = SparkContext.getOrCreate()
#https://kb.databricks.com/jobs/spark-overwrite-cancel.html
spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")

logging.info("----------Retrieving data from HDFS...")
df_main = spark.read.load(f'{workingPathHDFS}/parsed-*',
           format="csv", sep=",", header="true", encoding="utf-8").toPandas()

logging.info("----------Processing data...")
df_main['timestamp'] = pd.to_datetime(df_main['timestamp'], format="%Y-%m-%dT%H:%M:%S.%fZ")
df_main = df_main.sort_values(by=['timestamp']).reset_index(drop=True) #agreed that operations are sequentioal and should be naturally sorted if not already
final_events = []
for index, row in df_main.iterrows():
    if row['start']!='false' and 'operationStarted' not in row['operationDetail']:
        if 'shuttle_move' in row['operation']:
            way =' (' + row['stationOrigin'] + ' > ' + row['stationDestination'] + ')'
        else:
            way=''
        if 'tagReceived' in row['operation']:
            description = 'End'
        else:
            description = row['operation']
        
        final_events.append(cumulated_event(row['resource'], description, way, row['timestamp']))
            
df_all = pd.DataFrame(final_events)
    
#agreed, the events for one resource can not overlap - it should be safe to get duration as "input-end-time" - "output-actionDefinition-time"
final_events_deltas = []
for index, row in df_all.iterrows():
    if 'End' not in row['description']:
        for i, item in df_all[index+1:len(df_all)].iterrows():
        #iterate on same list from current position until finds an end for this event
            if (item['resource']==row['resource']) & ('End' in item['description']): 
                final_events_deltas.append(historic_event(str(row['resource']+' - '+row['description']), row['way'],row['timestamp'],(item['timestamp']-row['timestamp'])))
                break
                   
df_deltas = pd.DataFrame(final_events_deltas)
df_deltas['duration']=df_deltas['duration'].astype('timedelta64[ms]')/1000
df_deltas = df_deltas.sort_values(by=['operation', 'details', 'date_time']).reset_index(drop=True)


#get the averages before this-----------------------------------------------------------------------------------------------------------
logging.info("----------Saving results in Hive...")
spark.sql("CREATE TABLE IF NOT EXISTS eventDurationHistoryData (operation string, details string, date_time timestamp, duration float) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
spark.createDataFrame(df_deltas).write.mode("append").format("hive").saveAsTable("default.eventDurationHistoryData")

spark.sql(f"CREATE TABLE IF NOT EXISTS eventDurationHistoryData{workingBatch} (operation string, details string, date_time timestamp, duration float) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
spark.createDataFrame(df_deltas).write.mode("overwrite").format("hive").saveAsTable(f"default.eventDurationHistoryData{workingBatch}")

logging.info("---------------------------Process Finished!!!--------------------------")
logging.info("------------------------------------------------------------------------")

2021-03-01 19:36:02,990 - INFO - ------------------------------------------------------------------------
2021-03-01 19:36:02,991 - INFO - ------------------Starting the log parsing process----------------
2021-03-01 19:36:02,992 - INFO - ----------Seting up the environment...
2021-03-01 19:36:02,997 - INFO - ----------Retrieving data from HDFS...
2021-03-01 19:36:06,617 - INFO - ----------Processing data...
2021-03-01 19:36:06,658 - INFO - ----------Saving results in Hive...
2021-03-01 19:36:09,091 - INFO - ---------------------------Process Finished!!!--------------------------
2021-03-01 19:36:09,092 - INFO - ------------------------------------------------------------------------
