# Weekly Community Detection using Louvains Algorithm

#### Author: Boikanyo Radiokana
#### Version: 1.1
#### Description: Application of Communtiy detection algorithms to detect entitities who engage in commission abuse activities
#### Dataset: Daily Transaction rollup
#### Market: Tanzania

In [9]:
# !pip install --upgrade pip
# !pip install pyathena
# !pip install pandas
# !pip install --upgrade botocore==1.27.12
# !pip install awswrangler
# !pip install scipy
# !pip install pyvis
# !pip install networkx
# !pip install sklearn
# !pip install python-louvain

In [3]:
from pyathena import connect
import pandas as pd
pd.set_option('display.max_columns', None)
import numpy as np
import awswrangler as wr
import boto3
from pyvis.network import Network
import networkx as nx

import pickle
import bz2
from time import strftime
import logging
import sys
import datetime
from datetime import datetime
from datetime import date
from datetime import timedelta
from community import community_louvain

# Data Preprocessing

In [10]:
def get_data(bucket: str, prefix: str, filetype: str):
    #Get data from an S3 bucket >> read parquet files and combine into 1 df
    conn = boto3.client('s3')
    contents = conn.list_objects(Bucket=bucket, Prefix=prefix)['Contents']
    filenames = []
    for content in contents:
        filenames.append(f"s3://{bucket}/{content['Key']}")
    
    subset_filenames = filenames[1:] #Change as per your requirement 
    
    if filetype == 'CSV':
        data = wr.s3.read_csv(path=subset_filenames)
        
    elif filetype == 'PARQUET':
        data = wr.s3.read_parquet(path=subset_filenames)    
    return data


def fill_nulls(df:pd.DataFrame):

    df.rename(columns={'PartyA':'identity_a', 'PartyB':'identity_b'}, inplace=True)
    df.loc[(df.tt_description=='Cash In' ) & (df.identity_a.isnull()) ,'identity_a'] = df.loc[(df.tt_description=='Cash In' ) & (df.identity_a.isnull()) ,'identity_a'].fillna('Agent')
    df.loc[(df.tt_description=='Cash In' ) & (df.identity_b.isnull()) ,'identity_b'] = df.loc[(df.tt_description=='Cash In' ) & (df.identity_b.isnull()) ,'identity_b'].fillna('Customer')

    df.loc[(df.tt_description=='Cash Out' ) & (df.identity_a.isnull()) ,'identity_a'] = df.loc[(df.tt_description=='Cash Out' ) & (df.identity_a.isnull()) ,'identity_a'].fillna('Customer')
    df.loc[(df.tt_description=='Cash Out' ) & (df.identity_b.isnull()) ,'identity_b'] = df.loc[(df.tt_description=='Cash Out' ) & (df.identity_b.isnull()) ,'identity_b'].fillna('Agent')

    df.loc[(df.tt_description=='Organisation To Organisation Transfer' ) & (df.identity_a.isnull()) ,'identity_a'] = df.loc[(df.tt_description=='Organisation To Organisation Transfer' ) & (df.identity_a.isnull()) ,'identity_a'].fillna('Agent')
    df.loc[(df.tt_description=='Organisation To Organisation Transfer' ) & (df.identity_b.isnull()) ,'identity_b'] = df.loc[(df.tt_description=='Organisation To Organisation Transfer' ) & (df.identity_b.isnull()) ,'identity_b'].fillna('Agent')


    df.loc[(df.tt_description=='Merchant Cash Out' ) & (df.identity_a.isnull()) ,'identity_a'] = df.loc[(df.tt_description=='Merchant Cash Out' ) & (df.identity_a.isnull()) ,'identity_a'].fillna('Merchant')
    df.loc[(df.tt_description=='Merchant Cash Out' ) & (df.identity_b.isnull()) ,'identity_b'] = df.loc[(df.tt_description=='Merchant Cash Out' ) & (df.identity_b.isnull()) ,'identity_b'].fillna('Agent')


    df.loc[(df.tt_description=='Pay Bill OTC' ) & (df.identity_a.isnull()) ,'identity_a'] = df.loc[(df.tt_description=='Pay Bill OTC' ) & (df.identity_a.isnull()) ,'identity_a'].fillna('Agent')
    df.loc[(df.tt_description=='Pay Bill OTC' ) & (df.identity_b.isnull()) ,'identity_b'] = df.loc[(df.tt_description=='Pay Bill OTC' ) & (df.identity_b.isnull()) ,'identity_b'].fillna('Business')

    df.loc[df.identity_a.isnull() ,'identity_a'] = df.loc[df.identity_a.isnull() ,'identity_a'].fillna('Unknown')
    df.loc[df.identity_b.isnull() ,'identity_b'] = df.loc[df.identity_b.isnull() ,'identity_b'].fillna('Unknown')
    
    df.drop(columns=['RT_ID#0','C_DATA_KEY','C_DATA_VALUE','rundate'],axis=1, inplace=True)
    
    return df


def fill_missing_values(df: pd.DataFrame):

    df[['amount','mpesa_fee_revenue','mpesa_comm_expense','comm_revenue_a','comm_revenue_b','fee_expense_a','fee_revenue_a','fee_revenue_b','fee_expense_b','levy']] =(df.loc[:,['amount','mpesa_fee_revenue',
    'mpesa_comm_expense','comm_revenue_a','comm_revenue_b','fee_expense_a','fee_revenue_a','fee_revenue_b','fee_expense_b','levy']].fillna(0))
    
    df[['id_a', 'it_a', 'at_a', 'id_b', 'it_b', 'at_b',  'tt_id', 'rt_id',]] =df.loc[:,['id_a', 'it_a', 'at_a', 'id_b', 'it_b', 'at_b',  'tt_id', 'rt_id']].astype('str')
   
    return df



def get_comm_transactions(df: pd.DataFrame):
    #filter only on transactions that have commission greater than 0
    df1 = df[(df['comm_revenue_a']>0) | (df['comm_revenue_b']>0)] 
    df2 = df1[df1['tt_id'] !=  6625 ]
    df3 = df2[df2['tt_id'] != 6627 ]
    df4 = df3[df3['tt_id'] != 6565 ]
  
    return df4

def preprocessing(bucket: str, prefix: str, filetype: str):

    transrollup = get_data(bucket, prefix,filetype)
    transrollup1 = fill_missing_values(transrollup)
    transrollup2=  fill_nulls(transrollup1)
    transrollup3 = get_comm_transactions(transrollup2)

    return transrollup3

# Graph Model and Savings

In [11]:
def create_graph(df: pd.DataFrame ,graph_type: str , weight: float):    
    #create a directional graph using the transaction rollup as an edge list
    G = nx.DiGraph()    
    if graph_type == 'weighted': 
        #load the edges
        edge_list=list(df[['id_a','id_b',weight]].itertuples(index=False, name=None))
        G.add_weighted_edges_from(edge_list)
        
    elif graph_type == 'not_weighted': 
        edge_list=list(df[['id_a','id_b']].itertuples(index=False, name=None))
        G.add_edges_from(edge_list)      
    else:
        return "Graph Not Specified!"
    return G


# Model Pickling 
def _compress_file(model):
    pickle_byte_obj = pickle.dumps(model)
    compressed = bz2.compress(pickle_byte_obj)
    logging.info(f'Compression Ratio achieved: {sys.getsizeof(compressed)/sys.getsizeof(model)}')
    return compressed

def save_model(s3_resource, model, model_name, model_prefix, usecase_bucket):
    timestamp = str(strftime("%Y%m%d_%H%M%S"))
    model_name = f'{model_name}_{timestamp}'
    filename = f'{model_prefix}{model_name}.pkl'
    path = f"s3://{usecase_bucket}/{filename}"
    compressed = _compress_file(model)
    s3_resource.Object(usecase_bucket,filename).put(Body=compressed)
    logging.info(f'Model has been saved: {filename}')
    return filename
    
def _decompress_file(loaded_obj):
    decompressed = bz2.decompress(loaded_obj)
    loaded_model = pickle.loads(decompressed)
    return loaded_model

def load_model(s3_resource, usecase_bucket, filename):
    loaded_obj = s3_resource.Object(usecase_bucket,filename).get()["Body"].read()
    loaded_model = _decompress_file(loaded_obj)
    logging.info(f'Model loaded.')
    return loaded_model


def is_pickled_success(main_graph,pickled_G):
   # Check if Model was pickled correctly 
    main_nodes = main_graph.number_of_nodes()
    pickled_nodes = pickled_G.number_of_nodes()

    if main_nodes == pickled_nodes:
        return True
    else:
        return False
    
    
def graph_model_saving(df: pd.DataFrame, foldername: str, bucket:str):
    #create the main graph
    main_graph = create_graph(df , 'weighted','amount')

    # Save the main graph model

    key= f'output_data/mpesa/tanz/test/tanzania_comm_abuse_output/graph_model/{foldername}/'
    model_name = f'main_graph_{foldername}'

    s3_resource = boto3.resource('s3')
    model_filename =save_model(s3_resource, main_graph, model_name,key, bucket)
    pickled_G = load_model(s3_resource, bucket, model_filename)
    isSavedSuccess = is_pickled_success(main_graph, pickled_G)
        
    if isSavedSuccess == True:
        return main_graph

    else:
        print('Saving Failed')
        return
     
    # Enter fail state - Stop the process
    return main_graph

# Louvain's Community Detection Algorithm

In [12]:
def community_detection(main_graph):
    #convert the main graph as an undirected graph
    H = main_graph.to_undirected()
    communities = community_louvain.best_partition(H)
    #Get the group of communities
    comm_dict_df = pd.DataFrame(communities.items(), columns=['id','comm_group'])
    list_of_c = comm_dict_df.groupby(['comm_group']).count()
    list_of_c = list_of_c.reset_index()
    list_of_c.rename(columns={'id':'Count_of_nodes'},inplace=True)
    sorted_list_of_comms = list_of_c.sort_values(by = ['Count_of_nodes'], ascending = False)
    
    return comm_dict_df,sorted_list_of_comms
      

# Feature Engineering

In [13]:
def get_community_transactions(comm_group: int ,comm_dict_df: pd.DataFrame,transrollup_x:pd.DataFrame ):
    #get all then nodes that belong to the respective group
    sub_comm_dict = comm_dict_df[comm_dict_df['comm_group']==comm_group]
    sub_comm_nodes = sub_comm_dict.iloc[:,0]
    #Extract only the nodes in the subgraph from the edges
    sub_comm_edges = transrollup_x[transrollup_x['id_a'].isin(sub_comm_nodes) | transrollup_x['id_b'].isin(sub_comm_nodes)]
    sub_comm_edges.loc[:,'comm_group'] = comm_group

    return sub_comm_edges
    
    
def get_unique_ids(sub_comm_edges: pd.DataFrame):
    #extract the unique ids from the sub community transactions
    id_a = sub_comm_edges[['id_a','identity_a']]
    id_b = sub_comm_edges[['id_b','identity_b']]
    id_a = id_a.rename(columns={'id_a':'id','identity_a':'id_type'})
    id_b = id_b.rename(columns={'id_b':'id','identity_b':'id_type'})    
    ids = pd.concat([id_a, id_b])
    #get unique list of ids
    ids = ids.drop_duplicates(subset=['id','id_type'])
    #get count per identity type
    identities = ids.id_type.value_counts().rename_axis('identity_type').reset_index(name='counts')
    identities
    
    return ids, identities
    # return ids

def aggregate_types(df: pd.DataFrame , col: str, comm_group:int,  list_of_comms: pd.DataFrame):
    
    for j in range(len(df)):
        type_ =  df.loc[j, col]
        count = df.loc[j,'counts']
        list_of_comms.loc[comm_group,type_] = count
    
    return list_of_comms

def get_other_features(sub_comm_edges: pd.DataFrame, list_of_comms: pd.DataFrame,comm_group:int, ids):
    
    total_amount = sub_comm_edges.amount.sum()
    total_mpesa_fee_revenue = sub_comm_edges.mpesa_fee_revenue.sum()
    total_mpesa_comm_expense = sub_comm_edges.mpesa_comm_expense.sum()

    list_of_comms.loc[comm_group,'total_amount'] = total_amount
    list_of_comms.loc[comm_group,'mpesa_fee_revenue'] = total_mpesa_fee_revenue 
    list_of_comms.loc[comm_group,'mpesa_comm_expense'] =  total_mpesa_comm_expense
    list_of_comms.loc[comm_group,'number_of_nodes']=  len(ids)
    list_of_comms.loc[comm_group,'number_of_transactions']= len(sub_comm_edges)
    
    return list_of_comms
    
    
def get_margin_features(community_features: pd.DataFrame):
    #calculate the revenue margin for each community
    community_features['ave_trans_amount'] =  community_features.loc[:,'total_amount']/community_features.loc[:,'number_of_transactions']
    community_features['revenue_margin'] = community_features.loc[:,'mpesa_fee_revenue']-community_features.loc[:,'mpesa_comm_expense']
    community_features1 = community_features.sort_values(by=['revenue_margin'], ascending=True)
    community_features2 = community_features1.drop(['Count_of_nodes'], axis=1)
    
    return community_features2
    
def save_subcomm_data_s3(comm_group: int, sub_comm_edges: pd.DataFrame, bucket:str,foldername: str):
    #save csv files of community groups to s3
    sub_community_name = str(comm_group)
    dest_dir = f"output_data/mpesa/tanz/test/tanzania_comm_abuse_output/{foldername}/sub_comm_rollup{sub_community_name}.csv"
    write_path = f"s3://{bucket}/{dest_dir}"       
    wr.s3.to_csv(df=sub_comm_edges,path=write_path,dataset=False, index=False)
    
# #get 4CIT format
def save_dataframe(features_dataframe, bucket, prefix, name):
    timestamp = str(strftime("%Y%m%d_%H%M%S"))
    filename = f'weekly_commission_abuse_v001_{name}_final_result_{timestamp}.csv'
    wr.s3.to_csv(df=features_dataframe, path=f's3://{bucket}/{prefix}/{filename}', dataset=False,index=False)
    logging.info(f'Dataframe saved: s3://{bucket}/{prefix}/{filename}')
    
    return



def sub_feature_engineering(list_of_comms, transrollup_x, foldername,comm_dict_df, sub_comm_group):
   
    bucket='aws-glue-assets-461675780654-af-south-1'   
    feature4 = pd.DataFrame(columns = ['comm_group', 'Customer', 'Agent', 'Merchant', 'total_amount',
       'mpesa_fee_revenue', 'mpesa_comm_expense', 'number_of_nodes',
       'number_of_transactions', 'Business','Super Agent','Unknown','ave_trans_amount', 'revenue_margin'])
    
    for comm_group in range(len(list_of_comms)):

        sub_comm_edges = get_community_transactions(comm_group, comm_dict_df,transrollup_x)
        sub_comm_edges['comm_group'] = sub_comm_edges.apply(lambda row: f'{int(sub_comm_group)}_{int(row.comm_group)}', axis=1)
        
        total_mpesa_fee_revenue = sub_comm_edges.mpesa_fee_revenue.sum()
        total_mpesa_comm_expense = sub_comm_edges.mpesa_comm_expense.sum()
        margin = total_mpesa_fee_revenue - total_mpesa_comm_expense
        comply = 0
     
        if (margin < 0):
        
            comply = 1
            ids, identities = get_unique_ids(sub_comm_edges)

            #Feature 1 - Count of entity descriptions
            feature1 = aggregate_types(identities,'identity_type', comm_group, list_of_comms)        

            #Features 2 - Other
            feature2=  get_other_features(sub_comm_edges, feature1, comm_group,ids) 
            feature3 = feature2.fillna(0)

            save_subcomm_data_s3(comm_group, sub_comm_edges,bucket,foldername )

    #Features 4 - Revenue Margin
    if  (comply == 1) :
        feature4 =  get_margin_features(feature3)

    return feature4

    
def feature_engineering(list_of_comms, transrollup_x, foldername,comm_dict_df):
   
    bucket='aws-glue-assets-461675780654-af-south-1'   
    for comm_group in range(len(list_of_comms)):
        
        # print(comm_group)
        sub_comm_edges = get_community_transactions(comm_group, comm_dict_df,transrollup_x)
        
    
        if (sub_comm_edges.shape[0] >10):
            
            ids, identities = get_unique_ids(sub_comm_edges)

            #Feature 1 - Count of entity descriptions
            feature1 = aggregate_types(identities,'identity_type', comm_group, list_of_comms)        

            #Features 2 - Other
            feature2=  get_other_features(sub_comm_edges, feature1, comm_group,ids) 
            feature3 = feature2.fillna(0)

            save_subcomm_data_s3(comm_group, sub_comm_edges,bucket,foldername )

    #Features 4 - Revenue Margin

    feature4 =  get_margin_features(feature3)
        # return

    return feature4


In [14]:
def get_file_name():
    
    end_dt = (datetime.today().replace(hour=23, minute=59, second=59, microsecond=59)) - timedelta(days=1)
   
    #Start Date (First day of the past 7days) not including today
    start_dt = end_dt - timedelta(days=6)
    start_dt = start_dt.replace(hour=0, minute=0, second=0, microsecond=0)
    
    #get the day, month and year to create the week range for the file name
    earliest_day = str(start_dt.day)
    earliest_month = str(start_dt.month)
    earliest_year = str(start_dt.year)

    lastest_day = str(end_dt.day)
    lastest_month = str(end_dt.month)
    lastest_year =str(end_dt.year)
    
    #padding
    if len(earliest_day) == 1:
        earliest_day = earliest_day.rjust(2, '0')
        
    if len(earliest_month) == 1:
        earliest_month = earliest_month.rjust(2, '0')
    
    if len(lastest_day) == 1:
        lastest_day = lastest_day.rjust(2, '0')
        
    if len(lastest_month) == 1:
         lastest_month = lastest_month.rjust(2, '0')
    
    week_range="{}_{}_{}-{}_{}_{}".format(earliest_day,earliest_month,earliest_year,lastest_day,lastest_month,lastest_year )
    
    return week_range

# Graph Visualisation

In [15]:
def pyvisgraph(df, filename):
    
    # Graph layout
    pg_netw = Network(height='1000px', width='100%',directed=True, bgcolor='black', font_color='white')
    colour_dict = {'Super Agent':'blue',
                   'Customer':'red',
                   'Money Provider':'purple',
                   'Agent':'green',
                   'Super Dealer':'yellow',
                   'Bank':'pink',
                   'Other':'pink',
                   'Merchant':'white', 
                   'Walker':'brown', 
                   'Utility':'violet',
                   'Corporate':'violet',
                   'Unkown':'orange',
                  'Open API':'violet', 
                   'Business':'violet',
                   'Vodacom Head Office':'violet',
                   'Vodacom Store':'violet'
                  }
  
    # set the physics layout of the network
    pg_netw.barnes_hut()

    sources = df['id_a'].astype(str)
    targets =df['id_b'].astype(str)
    weights = df['amount']
    
    name = df['trans_index']
    src_cat = df['identity_a']
    dst_cat =df['identity_b'] 
    
    src_col = src_cat.replace(colour_dict)
    dst_col = dst_cat.replace(colour_dict)

    edge_data = zip(sources, targets, weights,src_cat, dst_cat, src_col, dst_col,name)

    for e in edge_data:
        src = e[0]
        dst = e[1]
        w = e[2]

        pg_netw.add_node(src, src, title=e[3]+'_'+ str(e[0]),color=e[5])
        pg_netw.add_node(dst, dst, title=e[4]+'_'+str(e[1]), color=e[6])
        pg_netw.add_edge(src, dst, label=e[7] , value=w)

    neighbor_map = pg_netw.get_adj_list()
   
    # return pg_netw.show(filename+'.html')
    return pg_netw.generate_html(notebook=False)


def write_graphs_to_s3 (neg_comm2, foldername, group_type):

    for i in neg_comm2['comm_group']:
  
        group = str(i)
        #get the index
        filename = 'sub_graph_' + group

        if group_type == 'main':
            prefix = f's3://aws-glue-assets-461675780654-af-south-1/output_data/mpesa/tanz/test/tanzania_comm_abuse_output/groups/{foldername}/sub_comm_rollup{group}.csv'
      
        
        elif group_type == 'sub':
            x = group.split('_')
            sub = x[0]
            grp = x[1]
            prefix = f's3://aws-glue-assets-461675780654-af-south-1/output_data/mpesa/tanz/test/tanzania_comm_abuse_output/groups/subgroups/{foldername}/{foldername}_group_{sub}/sub_comm_rollup{grp}.csv'
            
        dfx = wr.s3.read_csv(prefix)

        #Get graph networkk - transaction rollup
        graph_text =  pyvisgraph(dfx, filename)  


        path = f's3://aws-glue-assets-461675780654-af-south-1/output_data/mpesa/tanz/test/tanzania_comm_abuse_output/graph_visuals/{foldername}/sub_graph_{group}.html'
        
        temp = 'weekly_temp_graph/'+filename+'.html'
        file_ = open(temp, "w")
        file_.write(graph_text)
        file_.close()

        wr.s3.upload(local_file=temp, path=path)
        
        if os.path.exists(temp):
            os.remove(temp)
        else:
            print("Cannot delete the file as it doesn't exists")
  
    return

## First cycle

In [16]:
%%time
import warnings
warnings.filterwarnings('ignore')  # To ignore settingwithcopywarning

def first_cycle(bucket,prefix,filetype,foldername):
    #preprocessing
    
    data_ = preprocessing(bucket,prefix,filetype)    #show output
    
    #graph model & community detection
  
    main_graph = graph_model_saving(data_, foldername, bucket)  #artefact
    comm_dict, communities =  community_detection(main_graph)  #show outputs
    
    #feature extraction
    groups_dest = 'groups/'+foldername
    community_features= feature_engineering(communities, data_, groups_dest,comm_dict)  #show output #includes group artefact
    ## remove unsaved communities
    community_features1 = community_features[(community_features['number_of_transactions']!= 0) ]
    features_dest = f'output_data/mpesa/tanz/test/tanzania_comm_abuse_output/features/{foldername}'
    save_dataframe(community_features1, bucket, features_dest, 'comm_feat') #artefact
    
    #filter on large positive communities
    comm_of_comm =community_features1[(community_features1['revenue_margin'] >0) & (community_features1['number_of_transactions'] >10000) ] 
    
    return comm_of_comm,community_features1


CPU times: user 18 µs, sys: 18 µs, total: 36 µs
Wall time: 44.3 µs


In [24]:
%%time

def second_cycle(bucket, foldername,comm_of_comm):
    
    sub_community_features1 = pd.DataFrame(columns = ['comm_group', 'Customer', 'Agent', 'Merchant', 'total_amount',
           'mpesa_fee_revenue', 'mpesa_comm_expense', 'number_of_nodes',
           'number_of_transactions', 'Business','Super Agent','Unknown','ave_trans_amount', 'revenue_margin'])

    for i in comm_of_comm['comm_group']:


        foldername2=f'{foldername}_group_{i}'      
        group_data = wr.s3.read_csv(f's3://aws-glue-assets-461675780654-af-south-1/output_data/mpesa/tanz/test/tanzania_comm_abuse_output/groups/{foldername}/sub_comm_rollup{i}.csv')

        foldername3=f'submodels/{foldername}/{foldername2}'
        print(foldername2)
        main_graph = graph_model_saving(group_data, foldername3, bucket)  #artefact
        comm_dict, communities =  community_detection(main_graph)  #show outputs 

        prefix = f'groups/subgroups/{foldername}/{foldername2}'
        sub_community_features= sub_feature_engineering(communities, group_data, prefix,comm_dict,i)  #show output #includes group artefact
        sub_community_features = sub_community_features[(sub_community_features['revenue_margin'] <0) &  (sub_community_features['number_of_transactions'] >10) ]


        if sub_community_features.shape[0] != 0:
         
            sub_community_features['comm_group'] = sub_community_features.apply(lambda row: f'{int(i)}_{int(row.comm_group)}', axis=1)
            sub_community_features1 = pd.concat([sub_community_features1, sub_community_features])

    return sub_community_features1

   

CPU times: user 14 µs, sys: 0 ns, total: 14 µs
Wall time: 23.4 µs


## Second Cycle

## Combining Files (Cycle1 + Cycle 2)

### Features

In [51]:
def combine_feature_files_s3(comm_of_comm, community_features1,sub_community_features, bucket, foldername):
    
    #Get relevant communitites from the main group features and sub group features
    sub_community_features = sub_community_features[(sub_community_features['revenue_margin'] <0)]
    community_features2 = community_features1[community_features1['revenue_margin'] <0]

    #final feature file
    final_features = pd.concat([community_features2, sub_community_features])
    final_features = final_features.sort_values(by=['revenue_margin'], ascending=True)
    final_features[['Customer','Agent','Merchant','Business','Super Agent','Unknown']] =(final_features.loc[:,['Customer','Agent','Merchant','Business','Super Agent','Unknown']].fillna(0))
    final_features[['Customer','Agent','Merchant','Business','Super Agent','Unknown', 'number_of_nodes','number_of_transactions']] =(final_features.loc[:,['Customer','Agent','Merchant','Business',
                                                                                                                'Super Agent','Unknown','number_of_nodes','number_of_transactions']]).astype(int)
    df = final_features[['comm_group', 'Customer', 'Agent','Super Agent', 'Merchant', 'Business', 'Unknown',
                                     'total_amount', 'mpesa_fee_revenue', 'mpesa_comm_expense',
                                     'number_of_nodes', 'number_of_transactions', 
                                     'ave_trans_amount', 'revenue_margin']]
    df['run_date'] = datetime.today().date()
    features_dest = f'output_data/mpesa/tanz/test/tanzania_comm_abuse_output/features/{foldername}'
    save_dataframe(df, bucket, features_dest, 'combined_comm_feat') #artefact

    #4CIT
    path = f'output_data/mpesa/tanz/test/tanzania_comm_abuse_output/4CIT/{foldername}'
    # path = f'output_data/mpesa/Tanzania/usecases/commissionAbuse/{foldername}'
    save_dataframe(df, bucket, path, 'comm_feat') #artefact
    
    return df

### Groups

In [52]:
%%time
def combine_group_files_s3(foldername, bucket,final_features):
    
    #main groups
    path2 = f'output_data/mpesa/tanz/test/tanzania_comm_abuse_output/groups/{foldername}'
    main_group = get_data(bucket, path2, 'CSV') #show output 


    #sub groups
    path3 = f'output_data/mpesa/tanz/test/tanzania_comm_abuse_output/groups/subgroups/{foldername}/'
    sub_main_group = get_data(bucket, path3, 'CSV')
 

    #combine main and sub to create final group file
    combined_groups = pd.concat([main_group, sub_main_group])
    #only filter on relevant communities of interest - only the ones in the features file
    final_groups = combined_groups[(combined_groups['comm_group']).isin(final_features['comm_group'].unique())]
    final_groups['run_date'] = datetime.today().date()

    path = f'output_data/mpesa/tanz/test/tanzania_comm_abuse_output/groups/{foldername}'
    save_dataframe(final_groups, bucket, path, 'combined_comm_groups') #artefact

    #4CIT
    path = f'output_data/mpesa/tanz/test/tanzania_comm_abuse_output/4CIT/{foldername}'
    # path = f'output_data/mpesa/Tanzania/usecases/commissionAbuse/{foldername}'
    save_dataframe(final_groups, bucket, path, 'comm_groups') #artefact
    
    return main_group, sub_main_group

CPU times: user 17 µs, sys: 0 ns, total: 17 µs
Wall time: 26.9 µs


# MAIN

In [10]:
def main():
    bucket = 'aws-glue-assets-461675780654-af-south-1'
    foldername = get_file_name()
    prefix = f'output_data/mpesa/tanz/test/tanz_commission_abuse_input/{foldername}'
    filetype = 'PARQUET'

    
    comm_of_comm,community_features = first_cycle(bucket,prefix,filetype,foldername)
    
    sub_community_features = second_cycle(bucket, foldername,comm_of_comm)
    
    final_features = combine_feature_files_s3(comm_of_comm, sub_community_features, community_features, bucket, foldername)
    
    main_group,sub_main_group = combine_group_files_s3(foldername, bucket,final_features)
    
#     visual_main = main_group[(main_group['comm_group']).isin( final_features['comm_group'].unique())]
#     write_graphs_to_s3(visual_main,foldername,'main')
    
#     visual_sub = sub_main_group[(sub_main_group['comm_group']).isin( final_features['comm_group'].unique())]
#     write_graphs_to_s3(visual_sub,foldername, 'sub')
    
    return

## Testing individual step separately - Ignore

In [23]:
%%time
bucket = 'aws-glue-assets-461675780654-af-south-1'
foldername = get_file_name()
prefix = f'output_data/mpesa/tanz/test/tanz_commission_abuse_input/{foldername}'
filetype = 'PARQUET'


comm_of_comm,community_features = first_cycle(bucket,prefix,filetype,foldername)

In [38]:
%%time
sub_community_features = second_cycle(bucket, foldername,comm_of_comm)

CPU times: user 7 µs, sys: 1e+03 ns, total: 8 µs
Wall time: 16.5 µs


In [49]:
%%time
final_features = combine_feature_files_s3(comm_of_comm, sub_community_features, community_features, bucket, foldername)

CPU times: user 30.5 ms, sys: 7.98 ms, total: 38.5 ms
Wall time: 170 ms


In [56]:
# %%time
main_group,sub_main_group = combine_group_files_s3(foldername, bucket,final_features)

In [55]:
# %%time
# import os
# visual_main = main_group[(main_group['comm_group']).isin( final_features['comm_group'].unique())]
# write_graphs_to_s3(visual_main,foldername,'main')

In [None]:
# %%time
# visual_sub = sub_main_group[(sub_main_group['comm_group']).isin( final_features['comm_group'].unique())]
# write_graphs_to_s3(visual_sub,foldername, 'sub')