In [7]:
#!/usr/bin/python
# -*- coding: utf-8 -*-

"""
Prepare consolidated data

"""

# Import libraries

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import pandas as pd
import aerospike
import json

print(__doc__)


class Consolidate:
    
    def __init__(self):
        pass
        
    def read(self, file):
        data = pd.read_excel(file)
        return data

    def clean_data(self, data):
    
        # Select numeric data alone from data object
        numeric_data = data.loc[:, data.dtypes != object]
    
        # Select string data alone from data object to trim
        trim_data = data.select_dtypes(['object'])
    
        # Trim all string objects
        trim_data = trim_data.apply(lambda x: x.str.strip())
    
        # Merge numeric and string data in to data object
        data = pd.concat([numeric_data, trim_data], axis=1)
    
        # Change column name to lower case
        data.columns = [x.lower() for x in data.columns]
        return data
    
    def load_data(self, data):
            
        # Split dataframe into key and records
        key = data['cust_id']
    
        record_json = data.to_dict(orient='records')
    
        config = {
            'hosts': [ ('127.0.0.1', 3000) ]
        }

        # Create a client and connect it to the cluster
        try:
            client = aerospike.client(config).connect()
        except:
            import sys
            print("failed to connect to the cluster with", config['hosts'])
            sys.exit(1)
    
        # Records are addressable via a tuple of (namespace, set, key)
        for pk, bin in zip(key, record_json):
            string = "('dev', 's1bd', "
            string +=`pk`
            string = string + ")"
        
            from ast import literal_eval as make_tuple
            string = make_tuple(string)
        
            try:
                client = aerospike.client(config).connect()
                # Write a record
                client.put(string, bin)
            except Exception as e:
                import sys
                print("error: {0}".format(e), file=sys.stderr)

            # Read a record after insert for verification
            (key, metadata, record) = client.get(string)
            print (key, record)
        
            # Close the connection to the Aerospike cluster
            client.close()
    
    def remove_data(self, data):

        key = data['cust_id']
    
        config = {
          'hosts': [ ('127.0.0.1', 3000) ]
        }

        # Create a client and connect it to the cluster
        try:
            client = aerospike.client(config).connect()
        except:
            import sys
            print("failed to connect to the cluster with", config['hosts'])
            sys.exit(1)
    
        # Records are addressable via a tuple of (namespace, set, key)
        for pk in key:
            string = "('dev', 's1bd', "
            string +=`pk`
            string = string + ")"
        
            from ast import literal_eval as make_tuple
            string = make_tuple(string)
        
            try:
                client = aerospike.client(config).connect()
                # Remove a record
                client.remove(string)
            except Exception as e:
                import sys
                print("error: {0}".format(e), file=sys.stderr)

            # Close the connection to the Aerospike cluster
            client.close()
    
    def load_address(self, data):
            
        grouped = data1.groupby('cust_id')
        
        from collections import defaultdict
    
        results = defaultdict(lambda: defaultdict(dict))
        
        j = (data1.groupby(['cust_id'], as_index=True)
             .apply(lambda x: x[['address_type', 'strt_address', 'city', 'state', 'country', 'pincode']].to_dict('r'))
             .reset_index()
             .rename(columns={0:'address'})
             .to_json(orient='records'))
    
        j1 = json.dumps(json.loads(j), indent=2, sort_keys=True)
        key_data1 = data1.drop_duplicates('cust_id').sort_values('cust_id')
    
        # Split dataframe into key and records
        key = key_data1['cust_id']
    
        record_json = j1
    
        config = {
            'hosts': [ ('127.0.0.1', 3000) ]
        }

        # Create a client and connect it to the cluster
        try:
            client = aerospike.client(config).connect()
        except:
            import sys
            print("failed to connect to the cluster with", config['hosts'])
            sys.exit(1)
    
        # Records are addressable via a tuple of (namespace, set, key)
        for pk, bin in zip(key, record_json):
            string = "('test', 's1ad', "
            string +=`pk`
            string = string + ")"
        
            from ast import literal_eval as make_tuple
            string = make_tuple(string)
        
            try:
                client = aerospike.client(config).connect()
                # Write a record
                client.put(string, bin)
            except Exception as e:
                import sys
                print("error: {0}".format(e), file=sys.stderr)

            # Read a record after insert for verification
            (key, metadata, record) = client.get(string)
            print (key, record)
        
            # Close the connection to the Aerospike cluster
            client.close()
    
    def remove_address(self, data):

        key = data['cust_id']
    
        config = {
          'hosts': [ ('127.0.0.1', 3000) ]
        }

        # Create a client and connect it to the cluster
        try:
            client = aerospike.client(config).connect()
        except:
            import sys
            print("failed to connect to the cluster with", config['hosts'])
            sys.exit(1)
    
        # Count a record for verification before delete
        #query = client.query('test', 'customer')
        #query.select('frequency' )
        #query.apply('countNumOfBinsWithValues', 'count')

        # Callback function prints the records as they are read
        #def print_result(value):
        #    print(value)

        # Execute the query and call print_result for each result
        #query.foreach(print_result)
        
        # Records are addressable via a tuple of (namespace, set, key)
        for pk in key:
            string = "('dev', 's1bd', "
            string +=`pk`
            string = string + ")"
        
            from ast import literal_eval as make_tuple
            string = make_tuple(string)
        
            try:
                client = aerospike.client(config).connect()
                # Remove a record
                client.remove(string)
            except Exception as e:
                import sys
                print("error: {0}".format(e), file=sys.stderr)

            # Count a record for verification after delete
    
            # Close the connection to the Aerospike cluster
            client.close()
    
    
def drive(mode):
    
    cons = Consolidate()

    # Read customer details excel
    data1 = cons.read("/home/azureuser/Aerospike/data/customer/customer details.xls")
    
    # Clean data1
    data1 = cons.clean_data(data1)
    #print (data1.columns)
    
    # Read Customer vehicle excel
    data2 = cons.read("/home/azureuser/Aerospike/data/customer/Customer_vehicle.xls")
    
    # Clean data2
    data2 = cons.clean_data(data2)
    #print (data2.columns)
    
    # Merge data1 and data2 by cust_id
    data2 = pd.merge(data1, data2, on='cust_id', how='inner')
    
    # Read Motor vehicle report excel
    data3 = cons.read("/home/azureuser/Aerospike/data/customer/Motor_vehicle_report.xls")
    
    # Clean data3
    data3 = cons.clean_data(data3)
    
    # Merge data2 and data3 by vehicle_no
    data3 = pd.merge(data2, data3, on='vehicle_no', how='inner')
    #print (data3.columns)
    
    if mode == 'load':
        # Load data in to aerospike
        cons.load_data(data3)
    else:
        # Remove data from aerospike
        cons.remove_data(data3)
    
if __name__ == '__main__':
    drive('delete')


Prepare consolidated data




error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/main/client/remove.c', 110)
error: (2L, 'AEROSPIKE_ERR_RECORD_NOT_FOUND', 'src/m