This notebook contains the code for ** Random Data Generation** and it's storage to **ElasticSearch**.

In [1]:
import numpy as np
import random
import datetime 
import uuid
import time

In [2]:
# connect to our cluster
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])


## Random Generator

In [3]:
# Random Data Size, 1 million

dataset_size = 1000000

In [4]:
# Gaussian distribution of #Bytes
# The No.of transfered are Random Gaussian Distributions around 2GB,5GB and 8GB.

mu1, sigma1 = 2000000000, 350000000
mu2, sigma2 = 5000000000, 300000000
mu3, sigma3 = 8000000000, 350000000

size = list(np.random.normal(mu1, sigma1, 330000))+list(np.random.normal(mu2, sigma2, 330000))+list(np.random.normal(mu3, sigma3, 340000))
random.shuffle(size)

In [5]:
# SOURCE SITE GENERATION
# It is assumed that  site A is "better" and has 40% of transfers foloowed by B(30%), C(10%),D(10%),E(10%) 

source_site = ["A"]*int(0.40*dataset_size)+["B"]*int(0.30*dataset_size)+["C"]*int(0.10*dataset_size)+["D"]*int(0.10*dataset_size)+["E"]*int(0.10*dataset_size)

random.shuffle(source_site)

def src_site(i):
    return source_site[i]

In [6]:
# DESTINATION SITE GENERATION

def dst_site(site):
    src = ["A","B","C","D","E"]
    src.remove(site)
    return np.random.choice(src,p=[0.4, 0.3, 0.2, 0.1])

In [7]:
# Generate Timestamp start ---- Taking start time fixed as 08/03/2017-10:00

startDate = datetime.datetime(2017, 3, 8,10,00)
def timestamp_start():
    return startDate

In [8]:
# Generate Timestamp end  
#Assuming 1sec transfer time for 10MB Data, for linear dependence of time duration with #Bytes


def timestamp_end(bytes):
    return startDate + datetime.timedelta(seconds=(bytes/(10240.0*1024.0)))

In [9]:
# Generate UUID

def generate_uuid():
    return str(uuid.uuid4())

In [10]:
# Get final_event_type, keeping 95% of success and 5% Failure

count = {"transfer-success": 0,
"transfer-failed" : 0}
def get_final_event_type():
    events = ["transfer-success","transfer-failed"]
    if count["transfer-success"] >= (0.95*(dataset_size)):
        count["transfer-failed"] +=1
        return "transfer-failed"
    
    count["transfer-success"] +=1
    return "transfer_success"

In [None]:
# Random Dataset Generator
# This process takes around 8-9 Mins (will vary depending of machine)

INDEX_NAME = 'atlas'
op_dict_1 = {
        "index": {
            "_index": 'atlas', 
            "_type": 'start'
            
        }
    }
op_dict_2 = {
        "index": {
            "_index": 'atlas', 
            "_type": 'end'
            
        }
    }
request_body = {
    "settings" : {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval": "30s"
    }
}

bulk_start = []
bulk_end=[]
if es.indices.exists('atlas'):

    res = es.indices.delete(index = 'atlas')
    print(" response: '%s'" % (res))

print("creating '%s' index..." % (INDEX_NAME))
res = es.indices.create(index = 'atlas', body = request_body)

start_time = time.time()
print "Transfer to elasticsearch started ....."
for i in range(dataset_size):
    event={}

    transfer_src = src_site(i)
    transfer_dst = dst_site(transfer_src)
    transfer_starttime = startDate
    event = {"event_type": 'transfer-queued',
                       "uuid": generate_uuid(),
                       "bytes": size[i],
                       "src_site": transfer_src,
                       "dst_site": transfer_dst,
                       "timestamp": str(transfer_starttime)}

    
    bulk_start.append(op_dict_1)
    bulk_start.append(event)

    event_new = {}
    event_new["event_type"] = event["event_type"]
    event_new["uuid"] = event["uuid"]
    event_new["bytes"] = event["bytes"]
    event_new["src_site"] = event["src_site"]
    event_new["dst_site"] = event["dst_site"]
    
    event_type = get_final_event_type()
    event_new["event_type"] = event_type
    event_new["timestamp"] = str(timestamp_end(event["bytes"]))

    bulk_end.append(op_dict_2)
    bulk_end.append(event_new)

    if len(bulk_start) == 2500:
            res = es.bulk(index = 'atlas', body = bulk_start, refresh = True)
            bulk_start=[]

    if len(bulk_end) == 2500:
            res = es.bulk(index = 'atlas', body = bulk_end, refresh = True)
            bulk_end=[]

print "Transfer completed...."            
print("--- %s seconds ---" % (time.time() - start_time))

 response: '{u'acknowledged': True}'
creating 'atlas' index...
Transfer to elasticsearch started .....
