In [1]:
# Import necessary packages
import pandas as pd
from pymongo import MongoClient
import time
import psutil
import cudf
from extract_load_functions_mongodb import extraction, loading

#set export location
exportLocation = r'/home/jeff/'

#set number of test runs to prefrom
iterations = 31

# Connect to MongoDb
myclient = MongoClient("mongodb://localhost:27017/")
db = myclient.ds7330 
item = db.item
order_line = db.order_line
orderHeader = db.orderHeader


etlTimerStart = time.perf_counter()    
# HEAVY EXTRACTION #
###################################################################################################################
### Pull item table and capture CPU, RAM and elapsed time to perform operations

extractionTimerStart = time.perf_counter()
### Run 30 iterations to collect a df of performance metrics
### light extraction performance metrics
column_names = ["CPU_utilization", "RAM_utilization", "elapsed_time"]
heavyExtractionPrfm = pd.DataFrame(columns = column_names)

### base metrics
print("STARTING EXTRACTION...")
print("Base CPU utilization: ", psutil.cpu_percent())
print("Base RAM utilization: ", psutil.virtual_memory().percent)
print("RUNNING...")

### Run n iterations to collect a df of performance metrics
for sampleNo in range(iterations):

    #Initiate timer for query
    start = time.perf_counter()

    #insert data into dataFrame
    itemdf = pd.DataFrame(list(item.find()))
    order_linedf = pd.DataFrame(list(order_line.find()))
    orderHeaderdf = pd.DataFrame(list(orderHeader.find()))
    
    #Stop timer  
    stop = time.perf_counter()

    #load df with performance metrics    
    heavyExtractionPrfm = heavyExtractionPrfm.append(pd.DataFrame({'CPU_utilization': psutil.cpu_percent(),
                                                               'RAM_utilization':  psutil.virtual_memory().percent,
                                                               'elapsed_time': stop - start},
                                                              index=[1]), ignore_index=True)
    time.sleep(2)
extractionTimerEnd= time.perf_counter()   

print("Data frame loading complete.\n")
print("Total EXTRACTION time: ", extractionTimerEnd - extractionTimerStart, "s")
print("Iterations performed: ", iterations)
print("Average EXTRACTION iteration time: ", heavyExtractionPrfm.elapsed_time.mean(), "s")
print("Average CPU utilization: ", heavyExtractionPrfm.CPU_utilization.mean())
print("Average RAM utilization: ", heavyExtractionPrfm.RAM_utilization.mean(),"\n\n")

#Export DF to csv
heavyExtractionPrfm.to_csv (r'/home/jeff/heavyExtractionPrfm_MongoDB.csv', index = False, header=True)

#### Heavy TRANSFORMATION ####
###################################################################################################################
# Preform medium transformation workload.  In this case, join orderline and item tables, identify all item descriptions with "Blue" in them 
# and change them to "Navy"

#Cast datatypes to objects
order_linedf[['Line', 'eaches_qty']] = order_linedf[['Line', 'eaches_qty']].apply(pd.to_numeric) 
order_linedf[['Order', 'orderedItem','Ponum']] = order_linedf[['Order', 'orderedItem','Ponum']].astype(str) 

itemdf[['selling_price']] = itemdf[['selling_price']].apply(pd.to_numeric)
itemdf[['orderedItem']] = itemdf[['orderedItem']].astype(str)

orderHeaderdf[['site_num']] = orderHeaderdf[['site_num']].apply(pd.to_numeric)
orderHeaderdf[['Ponum']] = orderHeaderdf[['Ponum']].astype(str)

column_names = ["CPU_utilization", "RAM_utilization", "elapsed_time"]
heavyTransPrfm = pd.DataFrame(columns = column_names)
#lightTrans.item_desc.str.contains("^Blue")

print("STARTING TRANSFORMATION...")
print("Base CPU utilization: ", psutil.cpu_percent())
print("Base RAM utilization: ", psutil.virtual_memory().percent)
print("RUNNING...")

transTimerStart = time.perf_counter()
#Run 30 iterations to collect transformation df of performance metrics

for sampleNoTransform in range(iterations):

    # Start Timer and progress tracker
    start = time.perf_counter()
    
    #Working Code
    itemtrans = itemdf.drop(index=itemdf.index[[-1]])
    order_linetrans = order_linedf.drop(index=order_linedf.index[[-1]])
    orderHeadertrans = order_linedf.drop(index=orderHeaderdf.index[[-1]])
    
    itemtrans =itemdf.drop(itemdf.columns[[0]], axis = 1) 
    order_linetrans = order_linedf.drop(order_linedf.columns[[0]], axis = 1) 
    orderHeadertrans = orderHeaderdf.drop(orderHeaderdf.columns[[0]], axis = 1) 
    
    medium = order_linetrans.merge(right=itemtrans, on="orderedItem")
    heavy = orderHeadertrans.merge(right=medium, on="Ponum")
    
    heavy.item_desc = heavy.item_desc.str.replace('Blue', 'Navy', regex=True)
    heavy.item_desc = heavy.item_desc.str.replace('Tee', 'CottonTee', regex=True)
    heavy['extended_price'] = heavy.selling_price * medium.eaches_qty
    
    #Stop timer  
    stop = time.perf_counter()
    
    
    heavyTransPrfm = heavyTransPrfm.append(pd.DataFrame({'CPU_utilization': psutil.cpu_percent(),
                                                         'RAM_utilization':  psutil.virtual_memory().percent,
                                                         'elapsed_time':stop - start},
                                                          index=[1]), ignore_index=True)
    #lightTrans = orderItemJoin
    time.sleep(2)
transTimerEnd= time.perf_counter() 

print("Pandas transformation metrics captured.\n")
print("Total extraction time: ", transTimerEnd - transTimerStart, "s")
print("Iterations performed: ", iterations)
print("Average extraction iteration time: ", heavyTransPrfm.elapsed_time.mean(), "s")
print("Average CPU utilization: ", heavyTransPrfm.CPU_utilization.mean())
print("Average RAM utilization: ", heavyTransPrfm.RAM_utilization.mean(), "\n\n")

# Export Light Transformation Performance to local
heavyTransPrfm.to_csv (r'/home/jeff/heavyTransPrfm_mongoDB.csv', index = False, header=True)


#### cuDF TRANSFORMATION ####
###################################################################################################################

column_names = ["CPU_utilization", "RAM_utilization", "elapsed_time"]
heavyTransPrfmCU = pd.DataFrame(columns = column_names)

print("Starting cuDF TRANSFORMATION...")
print("Base CPU utilization: ", psutil.cpu_percent())
print("Base RAM utilization: ", psutil.virtual_memory().percent)
print("RUNNING...")

#Run 30 iterations to collect transformation df of performance metrics
transCuTimerStart = time.perf_counter()
for sampleNoTransform in range(iterations):
    ## Transform pandas df to cuDF
    itemCU = cudf.DataFrame.from_pandas(itemtrans)
    order_lineCU = cudf.DataFrame.from_pandas(order_linetrans)
    orderheaderCU = cudf.DataFrame.from_pandas(orderHeadertrans)
    
    # Start Timer and progress tracker
    start = time.perf_counter()
    
    #Working Code
    joined_dataCU = order_lineCU.merge(right=itemCU, on="orderedItem")
    heavyCU = orderheaderCU.merge(right=joined_dataCU, on="Ponum")
    
    heavyCU.item_desc = heavyCU.item_desc.str.replace('Blue', 'Navy', regex=True)
    heavyCU.item_desc = heavyCU.item_desc.str.replace('Tee', 'CottonTee', regex=True)
    heavyCU['extended_price'] = heavyCU.selling_price * heavyCU.eaches_qty
    
    #Stop timer  
    stop = time.perf_counter()
    
    heavyTransPrfmCU = heavyTransPrfmCU.append(pd.DataFrame({'CPU_utilization': psutil.cpu_percent(),
                                                         'RAM_utilization':  psutil.virtual_memory().percent,
                                                         'elapsed_time':stop - start},
                                                          index=[1]), ignore_index=True)
    #lightTrans = orderItemJoin
    time.sleep(2)
transCuTimerEnd = time.perf_counter() 

print("cuDF performance metrics captured loading complete.\n")
print("Total cuDF TRANSFORMATION time: ", transCuTimerEnd - transCuTimerStart, "s")
print("Iterations performed: ", iterations)
print("Average cuDF TRANSFORMATION iteration time: ", heavyTransPrfmCU.elapsed_time.mean(), "s")
print("Average cuDF TRANSFORMATION CPU utilization: ", heavyTransPrfmCU.CPU_utilization.mean())
print("Average cuDF TRANSFORMATION RAM utilization: ", heavyTransPrfmCU.RAM_utilization.mean(),"\n\n")

# Export medium cuDF Transformation Performance to local
heavyTransPrfmCU.to_csv (r'/home/jeff/heavyTransPrfmCU_mongoDB.csv', index = False, header=True)

#### Load data ####
###################################################################################################################
loading(myclient,'heavy', heavy, iterations, exportLocation, "heavyMongoDBLoad.csv")

etlTimerEnd = time.perf_counter() 
print('ETL is complete')
print('Elapsed ETL time is: ', (etlTimerEnd-etlTimerStart)/60, ' minutes')

STARTING EXTRACTION...
Base CPU utilization:  5.8
Base RAM utilization:  20.2
RUNNING...
Data frame loading complete.

Total EXTRACTION time:  109.21002660199883 s
Iterations performed:  31
Average EXTRACTION iteration time:  1.520272817484276 s
Average CPU utilization:  3.370967741935484
Average RAM utilization:  21.196774193548393 


STARTING TRANSFORMATION...
Base CPU utilization:  2.2
Base RAM utilization:  21.1
RUNNING...
Pandas transformation metrics captured.

Total extraction time:  77.30225178199908 s
Iterations performed:  31
Average extraction iteration time:  0.49091035883869943 s
Average CPU utilization:  2.6354838709677413
Average RAM utilization:  21.580645161290327 


Starting cuDF TRANSFORMATION...
Base CPU utilization:  1.2
Base RAM utilization:  21.6
RUNNING...
cuDF performance metrics captured loading complete.

Total cuDF TRANSFORMATION time:  66.96959943999536 s
Iterations performed:  31
Average cuDF TRANSFORMATION iteration time:  0.05821477983878257 s
Average cu