# LN - Data Pre-Processing - Normalized rankings

## Import Libraries

In [76]:
import numpy as np
import pandas as pd
import networkx as nx
import itertools
#import matplotlib.pyplot as plt
import time
import pickle

import os
import re
import sys
import io
import random
import itertools
import math

from tqdm.notebook import trange, tqdm
#from tqdm.notebook import trange
#from tqdm import tqdm_notebook as tqdm
from time import sleep

from dask_cloudprovider import FargateCluster
from dask.distributed import Client
import dask.array as da
import dask
dask.config.set({'distributed.scheduler.allowed-failures': 50}) 


import boto3

In [62]:
# Define parameters

bucket='ln-strategy-data'
extraction_id=1587447789


In [2]:
# Initiate s3 resource

session = boto3.session.Session()
s3 = session.resource('s3')

## Load Data

In [63]:
# Load objects form S3
# Dataframe

decisions_load = s3.Object(bucket_name=bucket, key='decisions_df.csv').get()
decisions_df=pd.read_csv(io.BytesIO(decisions_load['Body'].read()))

# Channel closures
closure_file = s3.Object(bucket_name=bucket, key='channel_closures.p').get()
channel_closures = pickle.loads(closure_file['Body'].read())
    
    
# Channel openings 
opens_file = s3.Object(bucket_name=bucket, key='channel_opens.p').get()
channel_opens = pickle.loads(opens_file['Body'].read())

In [195]:
# Transform data: Create list of .items with nodes involved in opens/closures per block
open_nodes_list=[(opens[0],list(set([i for t in opens[1] for i in t[:2]]))) for opens in sorted(list(channel_opens.items()))]
closure_nodes_list=[(closes[0],list(set([i for t in closes[1] for i in t[:2]]))) for closes in sorted(list(channel_closures.items()))]
print('--OPENS---')
print(open_list_sets[:10])
print('--CLOSURES---')
print(closure_list_sets[:10])


--OPENS---
[(505149, [5314, 6038]), (506402, [934, 3023]), (506847, [576, 3452]), (508075, [3436, 3310]), (508090, [2378, 4223]), (508320, [1912, 422]), (508400, [1912, 5154]), (508447, [6656, 6595, 2120, 4688, 4119]), (508503, [422, 2953, 5426, 7059, 3957, 7478, 2518, 5725]), (508666, [422, 5294])]
--CLOSURES---
[(505149, []), (506402, []), (506847, []), (508075, []), (508090, []), (508320, []), (508400, []), (508447, []), (508503, []), (508666, [])]


In [None]:
graph_keys = [obj.key 
        for obj in s3.Bucket(name=bucket).objects.all()
        if re.match(".*"+str(extraction_id)+"_connected/.*\.gpickle",obj.key)]

In [196]:
# Base lists to be populated
blocks=[]
base_ix=6 # From this index onward the connected component has more than 3 items. 
final_ix=1000
#final_ix=len(graph_keys)
extract_keys=graph_keys[base_ix:final_ix] # Blocks below 6th index are <3 and affect some graph metrics

for key in extract_keys: 
    
    # Create block list from file_names
    block_i=int(key.split(".")[0].split("/")[-1]) 
    blocks.append(block_i)
    
# Update node lists
open_nodes=open_nodes_list[base_ix:final_ix]
closure_nodes=closure_nodes_list[base_ix:final_ix]

In [198]:
cluster = FargateCluster(n_workers=10,scheduler_timeout='20 minutes',image='dsrincon/dask-graph:nx-scipy-v1',scheduler_cpu=4096,scheduler_mem=16384)

In [199]:
cluster

VBox(children=(HTML(value='<h2>FargateCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n  …

In [200]:
client=Client(cluster)


python
+---------------------------+---------------+
|                           | version       |
+---------------------------+---------------+
| client                    | 3.7.3.final.0 |
| scheduler                 | 3.7.4.final.0 |
| tcp://172.31.22.145:42769 | 3.7.4.final.0 |
| tcp://172.31.38.7:37551   | 3.7.4.final.0 |
| tcp://172.31.44.211:40181 | 3.7.4.final.0 |
| tcp://172.31.52.161:46465 | 3.7.4.final.0 |
| tcp://172.31.56.17:42725  | 3.7.4.final.0 |
| tcp://172.31.76.24:34171  | 3.7.4.final.0 |
| tcp://172.31.83.221:43709 | 3.7.4.final.0 |
| tcp://172.31.87.105:44503 | 3.7.4.final.0 |
| tcp://172.31.89.169:34801 | 3.7.4.final.0 |
| tcp://172.31.92.34:40195  | 3.7.4.final.0 |
+---------------------------+---------------+

tornado
+---------------------------+---------+
|                           | version |
+---------------------------+---------+
| client                    | 6.0.3   |
| scheduler                 | 6.0.4   |
| tcp://172.31.22.145:42769 | 6.0.4   |
| tcp:/

In [24]:


def graph_ranking(input_tuple):
    
    # Unpacking input
    block_num=input_tuple[0]
    measurement=input_tuple[1]
    extraction_id=input_tuple[2]
    key_rawscore=input_tuple[3]
    bucket=input_tuple[4]
    
    
    # Retrieve snapshot from S3
    session = boto3.session.Session()
    s3 = session.resource('s3')
    response = s3.Object(bucket_name=bucket, key=key_rawscore).get()
    snapshot=pickle.loads(response['Body'].read())
    
    
    # Calculate ranking for snapshot
    max_value = max(snapshot.values())
    norm_rank = {k: v / max_value for k, v in snapshot.items()}
    
    
    # Write output into S3
    key_out='graph_snapshots/'+str(extraction_id)+'_connected/.data_transformations/'+measurement+'/norm_rank/'+str(block_num)+'.pkl'
    pickle_byte_obj = pickle.dumps(norm_rank)
    response=s3.Object(bucket,key_out).put(Body=pickle_byte_obj)['ResponseMetadata']['HTTPStatusCode']
    
    
    return response 

In [None]:
# TEST calculate_ranking 

test_block=516790
measurement='channels'
input_tuple=(test_block,measurement,extraction_id)
response_test=calculate_ranking(input_tuple)

# Test if function saved result correctly and download result
if response_test==200:
    key_test='graph_snapshots/'+str(extraction_id)+'_connected/.data_transformations/'+measurement+'/norm_rank/'+str(test_block)+'.pkl'
    g_rank_test_load = s3.Object(bucket_name=bucket, key=key_test).get()
    g_rank_test = pickle.loads(g_rank_test_load['Body'].read())
    g_rank_values=sorted([v for k,v in g_rank_test.items()])
    #print(g_rank_values)
    #print('The dic saved has these first items: {}'.format(list(g_rank_test.items())))



In [50]:
def collection_ranking(extraction_id,blocks,measurement,bucket):

    session = boto3.session.Session()
    s3 = session.resource('s3')
    
  
    

    delayed_responses=[]
    with tqdm(total=len(blocks)) as pbar:
        for block_num in blocks:

            
            # Create key
            key='graph_snapshots/'+str(extraction_id)+'_connected/.data_transformations/'+measurement+'/raw_score/'+str(block_num)+'.pkl'
            
            # Create input tuple
            input_tuple=(block_num,measurement,extraction_id,key,bucket)
            
            # Run delayed function using dask
            response=dask.delayed(graph_ranking)(input_tuple)
            delayed_responses.append(response)
            
            # Update progress bar
            pbar.update(1)

    # Collect futures
    futures = dask.persist(*delayed_responses)

    # Run parallel computations
    start=time.time()
    final_responses = dask.compute(*futures)
    end=time.time()
    print('Compute in seconds: {}'.format(end-start))

    return final_responses






In [56]:
# Test collection_ranking
test_responses=collection_ranking(extraction_id,blocks,'channels',bucket)

HBox(children=(FloatProgress(value=0.0, max=36536.0), HTML(value='')))


Compute in seconds: 591.6712484359741


In [54]:
print(test_responses[:20])

(200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200)


## Calculate normalized rankings for measures

**Age ranking**

In [57]:
age_responses=collection_ranking(extraction_id,blocks,'age',bucket)

HBox(children=(FloatProgress(value=0.0, max=36536.0), HTML(value='')))


Compute in seconds: 384.95358204841614


**Capacity ranking**

In [58]:
capacity_responses=collection_ranking(extraction_id,blocks,'capacity',bucket)

HBox(children=(FloatProgress(value=0.0, max=36536.0), HTML(value='')))


Compute in seconds: 360.1695795059204


**Betweeness ranking**

In [59]:
betweeness_responses=collection_ranking(extraction_id,blocks,'betweeness_curr_aprox',bucket)

HBox(children=(FloatProgress(value=0.0, max=36536.0), HTML(value='')))


Compute in seconds: 363.8422577381134


**Growth ranking**

In [60]:
growth_responses=collection_ranking(extraction_id,blocks,'capacity_growth',bucket)

HBox(children=(FloatProgress(value=0.0, max=36536.0), HTML(value='')))


Compute in seconds: 408.5428283214569


distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError


**Channels ranking**

In [None]:
channels_responses=collection_ranking(extraction_id,blocks,'channels',bucket)

**TETS: Test correct norm_ranking creation**

In [144]:
test_block=blocks[-100]
measurement='betweeness_curr_aprox'
key_raw='graph_snapshots/'+str(extraction_id)+'_connected/.data_transformations/'+measurement+'/raw_score/'+str(test_block)+'.pkl'
key_norm='graph_snapshots/'+str(extraction_id)+'_connected/.data_transformations/'+measurement+'/norm_rank/'+str(test_block)+'.pkl'


raw_load = s3.Object(bucket_name=bucket, key=key_raw).get()
dic_raw = pickle.loads(raw_load['Body'].read())
items_raw=list(dic_raw.items())

norm_load = s3.Object(bucket_name=bucket, key=key_norm).get()
dic_norm = pickle.loads(norm_load['Body'].read())
items_norm=list(dic_norm.items())


print(items_raw[:10])
print(items_norm[:10])
print(len(items_raw)==len(items_norm))

[(5314, 2.7010283123980915e-05), (934, 4.241744824033501e-05), (3023, 7.752735612136513e-15), (3452, 1.0746104212680437e-05), (576, 3.408679201523418e-16), (3436, 0.02993702469278996), (3310, 1.2395197096448792e-15), (4223, 0.0009586206254005652), (422, 0.029096542491107023), (1912, 3.51576918762022e-05)]
[(5314, 0.000191980840699471), (934, 0.00030149026339808336), (3023, 5.510407624040229e-14), (3452, 7.638002576740317e-05), (576, 2.422785039977094e-15), (3436, 0.21278322563971583), (3310, 8.810127418098523e-15), (4223, 0.006813582543044772), (422, 0.20680933492071135), (1912, 0.00024989013304542766)]
True


## Add normalized rankings to DataFrame

In [164]:
def extract_values(input_tuple):
    
    # Unpack input tuple
    
    bucket=input_tuple[0]
    extraction_id=input_tuple[1]
    measurement=input_tuple[2]
    score_type=input_tuple[3]
    prev_block=input_tuple[4]
    act_block=input_tuple[5]
    nodes_lists=input_tuple[6]
    decision_type=input_tuple[7]
    
    
    # Initialize list of lists to return
    values_lists=[]
    
    # Start S3 session
    session = boto3.session.Session()
    s3 = session.resource('s3')
    
    # Create keys
    dic_key='graph_snapshots/'+str(extraction_id)+'_connected/.data_transformations/'+measurement+'/'+score_type+'/'+str(prev_block)+'.pkl'
    g_key='graph_snapshots/'+str(extraction_id)+'_connected/'+str(prev_block)+'.gpickle'
    
    
    # Load dic
    dic_load = s3.Object(bucket_name=bucket, key=dic_key).get()
    dic = pickle.loads(dic_load['Body'].read())
                       
    # Load graph
    g_load = s3.Object(bucket_name=bucket, key=g_key).get()
    g = pickle.loads(g_load['Body'].read())
                     
                       
    # Extract relevant values
    for nodes in nodes_lists:
        # Define list to return
        values_i=[]
        
        # loop over nodes in lists
        for node in nodes:
            # Check if node is in graph and retrieve metric from dic, else set to 0
            if g.has_node(node):         
                values_i.append(dic[node])
            else:
                values_i.append(0)   
        
        # Update value list in global return list
        values_lists.append(values_i)
        
    # Save value to S3
    key_out='graph_snapshots/'+str(extraction_id)+'_connected/.data_transformations/'+measurement+'/'+decision_type+'_'+score_type+'/'+str(act_block)+'.pkl'
    pickle_byte_obj = pickle.dumps(values_lists)
    response=s3.Object(bucket,key_out).put(Body=pickle_byte_obj)['ResponseMetadata']['HTTPStatusCode']
                     
    return response
                

In [158]:
# TEST extract_values function

# set parameters
test_ix=-1005
test_act_block=blocks[test_ix]
test_prev_block=blocks[test_ix-1]
test_nodes=[open_nodes[test_ix][1],closure_nodes[test_ix][1]]
measurement='channels'
score_type='norm_rank'
decision_type='open'

print('Test Block:{}'.format(test_block))
print('Test Nodes:{}'.format(test_nodes))

# run function and print results
start=time.time()
test_input_tuple=(bucket,extraction_id,measurement,score_type,test_prev_block,test_act_block,test_nodes,decision_type)
response=extract_values(test_input_tuple)

end=time.time()


if response==200:
    
    # Check value recorded to s3
    values_key='graph_snapshots/'+str(extraction_id)+'_connected/.data_transformations/'+measurement+'/'+decision_type+'_'+score_type+'/'+str(test_act_block)+'.pkl'
    test_load = s3.Object(bucket_name=bucket, key=values_key).get()
    values = pickle.loads(test_load['Body'].read())
    
    # Print results
    print('Elapsed time seconds:{}'.format(end-start))
    print('Values:{}'.format(values))
    
else:
    print('Did not save to S3 correctly')
    


Test Block:615886
Test Nodes:[[2940, 668, 847, 6748], [5321, 4517]]
Elapsed time seconds:0.2798001766204834
Values:[[0, 0.15827338129496402, 0.17176258992805754, 0.4433453237410072], [0.7437050359712231, 0.00539568345323741]]


In [188]:
def collection_extract_values(decisions_df,blocks,decision_type,measurement,score_type):
    
    # Define dictonary to return
    delayed_responses=[]
    
    # Set list of blocks and column names
    if decision_type=='open':
        column_name='open_block'
        dec_blocks=[b for b,o in open_nodes if len(o)>0]
        
    if decision_type=='close':
        column_name='close_block'
        dec_blocks=[b for b,o in closure_nodes if len(o)>0]
    
    # Loop over blocks in decision type, starting from the 2nd one
    print(len(dec_blocks))
    with tqdm(total=len(range(1,len(dec_blocks)))) as pbar:
        for i in range(1,len(dec_blocks)):

            # Define blocks to look at decisions and prev block to query measurments
            dec_block_i=dec_blocks[i]
            prev_block=blocks[blocks.index(dec_block_i)-1]

            # Select nodes in node0,node1 and create list of lists
            node0_nodes=decisions_df[decisions_df[column_name]==dec_block_i]['node0_id'].tolist()
            node1_nodes=decisions_df[decisions_df[column_name]==dec_block_i]['node1_id'].tolist()
            nodes_lists=[node0_nodes,node1_nodes]

            # Run delayed function to extract values for node0 and node1 in prev_block
            input_tuple=(bucket,extraction_id,measurement,score_type,prev_block,dec_block_i,nodes_lists,decision_type)
            response=dask.delayed(extract_values)(input_tuple)
            delayed_responses.append(response)
            
            # Update progress bar
            pbar.update(1)
        

        # Collect futures
        futures = dask.persist(*delayed_responses)

        # Run parallel computations
        start=time.time()
        final_responses = dask.compute(*futures)
        end=time.time()
        print('Compute in seconds: {}'.format(end-start))
    
    
    return final_responses
    

**Channels: DF update extraction**

In [201]:
open_channelsrank_responses=collection_extract_values(decisions_df,blocks,'open','age','norm_rank')

994


HBox(children=(FloatProgress(value=0.0, max=993.0), HTML(value='')))

Compute in seconds: 15.531179904937744



distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError


In [191]:
# Test open_channelsrank_responses

994

In [None]:
close_channelsrank_responses=collection_extract_values(decisions_df,blocks,'close','age','norm_rank')

## Add normalized rankings to DataFrame

In [None]:
def add_to_dataframe(s3,decisions_df,decison_blocks,node_values,decision_type,measurement,score_type):
    
    for block in decision_blocks:
        
        # Extract measurement values for block from S3:
        values_key='graph_snapshots/'+str(extraction_id)+'_connected/.data_transformations/'+measurement+'/'+decision_type+'_'+score_type+'/'+str(block)+'.pkl'
        values_load = s3.Object(bucket_name=bucket, key=values_key).get()
        values = pickle.loads(values_load['Body'].read())
        
        for 
        

