   # Regression Analysis

### The following code is used to collect and process data to compute the features, which is then used to train and estimate a regression model

### Two functions defined below are called by multiple processes in parallel
1.**collect_sqlm** - This function takes three inputs, the start and end point (which give the number of queries to be processed) and the dataframe with the queries. Passing the start and end point allows this function to be called in parallel by different processes.In this case, we use 8 processes to cut down the processing time by more than half. After processing the data we write the computed values (operand size,index size,number of perations,number of filters,log of execution time) to a queue. 
    
2.**read_queue** - As the processes write data to the queue, another process starts to consume the values from the queue to write it to a data frame. 

In [None]:
from multiprocessing import Process, Queue
import glob
import pandas as pd
import os
import re
import matplotlib.pyplot as plt
import math as m
import numpy as np
%matplotlib inline
plt.style.use('ggplot')

def collect_sqlm(num1,num2,queue):
    
    #For each query in the given range allocated to the process calling this function.
    for i in range (num1,num2):
        
        #Declare the variable names to store the metrics to be computed.
        tablenames = []
        operand_size = 0
        index_size = 0
        num_operations = 0
        num_filters = 0
        log_os = 0
        log_is = 0
        regex = []
        
        #The following regular expressions parse the query to identify the table names
        query = initial_sqlm_df.loc[i,('query')].upper()
        query = re.sub(r',\s+', ',', query)
        if re.search(r'\ASELECT|INSERT|DELETE',query):
            regex = re.findall(r'(?<=FROM|JOIN|INTO)\s+[\w.]+',query)
        if re.search(r'\AUPDATE',query):
            regex = re.findall(r'(?<=UPDATE)\s+[\w.]+',query)
        if re.search(r'\AMERGE',query):
            regex1 = re.findall(r'(?<=INTO)\s+[\w.]+',query)
            regex2 = re.findall(r'(?<=MERGE)\s+[\w.]+',query)
            regex = regex1+regex2
        for item in regex:
            for table in item.split(','):
                tablenames.append(table)
        
        #The following loop computes the operand size and the index size
        for table in tablenames:
            tableschema = ''
            tablename = ''
            if len(table.split('.')) == 2:
                tableschema = table.split('.')[0]
                tablename = table.split('.')[1]
                try:
                    operand_size = operand_size + sizedf.loc[tableschema.strip(),tablename.strip()].object_size
                    
                except KeyError:
                    operand_size = 0
                try:
                    index_size = index_size + sizedf.loc[tableschema.strip(),tablename.strip()].index_size
                except KeyError:
                    index_size = 0
                    
        #This section computes the number of operations in the query
        base_operation = 'SELECT|INSERT|DELETE|UPDATE|MERGE|VALUES'
        set_operation = 'JOIN|UNION'
        final_operation = 'ORDER BY| GROUP BY'
        regex = re.findall(base_operation,query)
        num_operations = num_operations+len(regex)
        regex = re.findall(set_operation,query)
        num_operations = num_operations+len(regex)
        regex = re.findall(final_operation,query)
        num_operations = num_operations+len(regex)
        
        #This section computes the number of filters in the query
        regex = re.findall(r'WHERE',query)
        num_filters = len(regex)
        regex = re.findall(r' AND | OR ',query)
        num_filters = num_filters + len(regex)
        
        objects_size_tuple = (i,operand_size/1024/1024,index_size/1024/1024,num_operations,num_filters)
        queue.put(objects_size_tuple)
        
    #The process wites a series of '-1' to indicate that the process has completed its set of queries.
    queue.put((-1,-1,-1,-1,-1))
    return

def read_queue(queue):
    global sizobj_list
    global num_procs
    count = 0
    while True:
            location , operand_size,index_size,num_operations,num_filters = queue.get()
            if (operand_size == -1):
                count = count + 1
                #print (count)
            else:
                sqlm_list.append((location,operand_size,index_size,num_operations,num_filters))
            if (count == num_procs):
                print ("done")
                break
    return

### The following code creates two data frames:
    1. SQL data 
    2. Workload data

In [None]:
#SQL data. The data is collected in multiple files. Each file corresponding to the SQL cache in a given one hour 
#window
path = r'/data/developer/python'    
allFiles = glob.glob(path + "/SQL_Metrics*.csv")     
list_ = []
notvalues = []
for file_ in allFiles:
    regex = re.search('SQL_Metrics_([0-9]*-[0-9]*-[0-9]*)-([0-9]*)', file_)
    df = pd.read_csv(file_,sep = '@',names = ['stmt_hash','query','exec_time'],index_col=False)
    df['date'] = regex.group(1)
    df['hour'] = regex.group(2)
    df.exec_time = pd.to_numeric(df.exec_time,errors='coerce')
    df.hour = pd.to_numeric(df.hour,errors='coerce')
    list_.append(df)
initial_sqlm_df = pd.concat(list_,ignore_index=True)
initial_sqlm_df = initial_sqlm_df[initial_sqlm_df.exec_time > 0].reset_index().drop('index',axis=1)       

#Table/index sizes in the database needed to compute the table and indexes being in a query
allFiles = glob.glob(path + "/size*.csv")     
list_ = []
for file_ in allFiles:
    regex = re.search('size*', file_)
    sizedf = pd.read_csv(file_,sep = '@',names = ['tableschema','tablename',
                                                  'object_size','index_size',
                                                  'lob_size'],index_col=False)
    sizedf = sizedf.set_index(['tableschema', 'tablename'])


#Workload Metrics
fields = ['time','lock_wait_time','total_section_sort_time','diaglog_write_wait_time',
          'direct_read_time','direct_write_time','log_buffer_wait_time','log_disk_wait_time',
          'pool_read_time','pool_write_time','prefetch_wait_time','total_act_time','total_act_wait_time',
          'total_cpu_time','total_extended_latch_wait_time','span']

path = r'/data/developer/python'    
allFiles = glob.glob(path + "/workload_stats_*.csv")    
list_ = []
for file_ in allFiles:
    regex = re.search('workload_stats_([0-9]*-[0-9]*-[0-9]*)', file_)
    wldf = pd.read_csv(file_,sep = '@',names = fields,index_col=False)
    list_.append(wldf)
final_wldf = pd.concat(list_,ignore_index=True)
final_wldf['hour'] = 0
final_wldf['date'] = 0
for i in range (0,len(final_wldf)):
    regex = re.search('([0-9-]*)T([0-9]*)', final_wldf.loc[i,('time')])
    final_wldf.loc[i,('date')] = regex.group(1)
    final_wldf.loc[i,('hour')] = int(regex.group(2)) + 5
    
final_wldf = final_wldf.set_index(['date', 'hour'])
final_wldf.drop('time',axis=1,inplace = True)
final_wldf.drop('span',axis=1,inplace = True)

### We compute the log transform of all the workload metrics.

In [None]:
final_wldf['log_lock_wait_time'] = 0
final_wldf['log_total_section_sort_time'] = 0
final_wldf['log_diaglog_write_wait_time'] = 0
final_wldf['log_direct_read_time'] = 0
final_wldf['log_direct_write_time'] = 0
final_wldf['log_log_buffer_wait_time'] = 0
final_wldf['log_log_disk_wait_time'] = 0
final_wldf['log_pool_read_time'] = 0
final_wldf['log_pool_write_time'] = 0
final_wldf['log_prefetch_wait_time'] = 0
final_wldf['log_total_act_time'] = 0
final_wldf['log_total_act_wait_time'] = 0
final_wldf['log_total_cpu_time'] = 0
final_wldf['log_total_extended_latch_wait_time'] = 0
for i in range (0,len(final_wldf)):
        if (final_wldf.iloc[i].lock_wait_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_lock_wait_time')
                          ] = m.log2(final_wldf.iloc[i].lock_wait_time)
        if (final_wldf.iloc[i].total_section_sort_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_total_section_sort_time')
                          ] = m.log2(final_wldf.iloc[i].total_section_sort_time)
        if (final_wldf.iloc[i].diaglog_write_wait_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_diaglog_write_wait_time')
                          ] = m.log2(final_wldf.iloc[i].diaglog_write_wait_time)
        if (final_wldf.iloc[i].direct_read_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_direct_read_time')
                          ] = m.log2(final_wldf.iloc[i].direct_read_time)
        if (final_wldf.iloc[i].direct_write_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_direct_write_time')
                          ] = m.log2(final_wldf.iloc[i].direct_write_time)
        if (final_wldf.iloc[i].log_buffer_wait_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_log_buffer_wait_time')
                          ] = m.log2(final_wldf.iloc[i].log_buffer_wait_time)
        if (final_wldf.iloc[i].log_disk_wait_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_log_disk_wait_time')
                          ] = m.log2(final_wldf.iloc[i].log_disk_wait_time)
        if (final_wldf.iloc[i].pool_read_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_pool_read_time')
                          ] = m.log2(final_wldf.iloc[i].pool_read_time)
        if (final_wldf.iloc[i].pool_write_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_pool_write_time')
                          ] = m.log2(final_wldf.iloc[i].pool_write_time)
        if (final_wldf.iloc[i].prefetch_wait_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_prefetch_wait_time')
                          ] = m.log2(final_wldf.iloc[i].prefetch_wait_time)
        if (final_wldf.iloc[i].total_act_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_total_act_time')
                          ] = m.log2(final_wldf.iloc[i].total_act_time)
        if (final_wldf.iloc[i].total_act_wait_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_total_act_wait_time')
                          ] = m.log2(final_wldf.iloc[i].total_act_wait_time)
        if (final_wldf.iloc[i].total_cpu_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_total_cpu_time')
                          ] = m.log2(final_wldf.iloc[i].total_cpu_time)
        if (final_wldf.iloc[i].total_extended_latch_wait_time!=0):
           final_wldf.iloc[i,final_wldf.columns.get_loc('log_total_extended_latch_wait_time')
                          ] = m.log2(final_wldf.iloc[i].total_extended_latch_wait_time)

### We start 8 parallel processes and invoke collect_sqlm to compute the SQL metrics

In [None]:
import time
queue = Queue()
num_procs = 8
sqlm_list = []
pro_list = []
num_rows = len(initial_sqlm_df)
group_size = int(num_rows/num_procs)
for group_num in range(0,num_procs):
    num1 = group_num*(group_size)
    num2 = num1 + group_size - 1
    proc = Process(target=collect_sqlm, args=(num1,num2,queue))
    pro_list.append(proc)

start = time.time()
for p in pro_list:
    p.start()
    
read_queue(queue)

intermediate_sqlm_df = pd.DataFrame(sqlm_list, columns=['location', 'operand_size','index_size',
                                    'num_operations','num_filters']).set_index('location')

 
sqlm_df = initial_sqlm_df.merge(intermediate_sqlm_df, left_index = True, 
                                right_index=True, how='inner').reset_index().drop('index',axis = 1)

sqlm_df = sqlm_df[sqlm_df.operand_size != 0]
#sqlm_df = sqlm_df.set_index(['date', 'hour'])

processtime = time.time() - start
print("Took {} seconds with 8 processes.".format(processtime))

sqlm_grpby_df = sqlm_df.groupby(['date','hour'])['operand_size','index_size','num_operations',
                                                 'num_filters','exec_time'].mean()

final_df = final_wldf.merge(sqlm_grpby_df, left_index = True, right_index=True, how='inner')

print(final_df.shape)


### Merge the workload and SQL metrics into one data frame and remove the outliers based on EDA. 

In [None]:
final_df = final_df.reset_index()
final_df = final_df[final_df.index!=6]
final_df = final_df[final_df.index!=135]
final_df = final_df[final_df.index!=148]
final_df = final_df[final_df.index!=147]
final_df = final_df[final_df.index!=139]
final_df = final_df[final_df.index!=140]
final_df = final_df[final_df.index!=138]
final_df['log_exec_time'] = 0
#final_df['log_lock_wait_time'] = 0
for i in range (0,len(final_df)):
        final_df.iloc[i,final_df.columns.get_loc('log_exec_time')] = m.log2(final_df.iloc[i].exec_time)


### Estimate regression model

In [None]:
import statsmodels.formula.api as smf
import statsmodels.api as sm

#X = final_df[['total_cpu_time','operand_size','num_filters']]
#X = final_df[['operand_size','num_filters','log_total_cpu_time']]
#X = final_df [['log_total_cpu_time','lock_wait_time','operand_size','num_filters','num_operations']]
X = final_df [['log_total_cpu_time','operand_size','num_filters','num_operations']]
y = final_df['log_exec_time']

#Total CPU Time , Lock time out, operand size , number of filters , number of operations 

est = smf.OLS(y, X)

est = est.fit()
est.summary()