### Housekeeping - Setup & wipe out any prior records on the Aerospike Server

Before you start on our single node cluster, ensure namespace **_test_**  is configured with Replication Factor =  1 and strong-consistency true, then using asadm commands in the terminal, add the node-id to the namespace roster and recluster. MRT transactions are only allowed on an SC namespace. See https://aerospike.com/docs/tools/asadm/live_cluster_mode_guide#roster-1 for details on how to set the roster.

We have a namespace **_test_** pre-defined on the server. Lets truncate it using _asadm_.

This is needed while doing code development. If you want to clear the iJava Kernel of all Java objects and run all cells from scratch, Kernel->Restart & Run All, this will ensure any records written on the underlying Aerospike cluster are purged.

First, we need required imports for using %sh in interactive Java Kernel. (This is specific to the iJava Kernel implementation by **Spencer Park** that we are using.)


# Import to run %sh magic cell

In [2]:
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);

### Running _asadm_ in iJava
We can run _asadm_ commands inline. Below, we will use the truncation command, which normally requires an interactive confirmation, which we will skip by using the _--no-warn_ flag. No output will be displayed.


In [3]:
%sh asadm --enable -e "manage truncate ns test --no-warn" -h "127.0.0.1"

### Displaying shell command output:
**Note**: Will not display _asadm_ or _aql_ executables output.

In [4]:
String command = "pwd";
 
try {
    Process process = Runtime.getRuntime().exec(command);
 
    BufferedReader reader = new BufferedReader(
            new InputStreamReader(process.getInputStream()));
    String line;
    while ((line = reader.readLine()) != null) {
        System.out.println(line);
    }
 
    reader.close();
 
} catch (IOException e) {
    e.printStackTrace();
}

/home/training/student-workbook/v7.0/jn/ver8.0/devMRT


# Add Java Client POM Dependency

In [5]:
%%loadFromPOM
<dependencies>
  <dependency>
    <groupId>com.aerospike</groupId>
    <artifactId>aerospike-client-jdk8</artifactId>
    <version>9.0.2</version>
  </dependency>
</dependencies>

# Add required Imports

In [6]:
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.Value;
System.out.println("Client modules imported.");

Client modules imported.


# Accessing a record on the Aerospike Server
We build the Key object in Java
We need the namespace and the record digest to find the record on the server.
The record digest is computed by the client library using application provided key (integer, string or byte-array) and the set name. If not in a set, use null for set name.

<img src="../graphics/RecordKey.png"
     alt="Record Digest"
     style="float: left; margin-right: 10px;"
     width="600"
     height="400"/>

# Insert test records
* Our one node server is already running on localhost (same server as the Jupyter Notebook server).
* Unlike in iPython kernel for Jupyter Notebook, we can only execute a single line of shell command in IJava kernel.

In [7]:
%sh aql -c "set key_send true"

In [8]:
%sh aql -f "../aqlScripts/insert.aql"

In [9]:
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");

Key key = new Key("test", "testset", "key1");
System.out.println("Working with record key:");
System.out.println(key);

Record record = client.get(null, key);
System.out.println("Read back the record.");

System.out.println("Record values are:");
System.out.println(record);

Initialized the client and connected to the cluster.
Working with record key:
test:testset:key1:bf6c1d13e7cd10c5bd022d27e7df170c0bccd6e1
Read back the record.
Record values are:
null


In [10]:
//Imports
import com.aerospike.client.Txn;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
import com.aerospike.client.CommitStatus;

//CLASS Cell
class MRTResult {
    private Record rec;
    private int status;  //0: success, 1:MRT failed, 2:OK to retry failed transaction
    private String msg;
    
  public MRTResult(){
      rec = null;
      status=0;
      msg="";
  }
  public Record getRec() {
    return rec;
  }
  public int getStatus() {
    return status;
  }
  public String getMsg() {
    return msg;
  }
  public void setRec(Record r) {
    rec = r;
    return;
  }
  public void setStatus(int s) {
    status = s;
    return;
  }
  public void setMsg(String s) {
    msg = s;
    return;
  }
}


In [11]:
//Imports
import com.aerospike.client.AerospikeClient;

import com.aerospike.client.Txn;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
import com.aerospike.client.CommitStatus;

import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.Value;


//CLASS Cell
class MRTTxn {
  MRTResult  mrtResult = new MRTResult();
  public MRTTxn() {   
   mrtResult.setRec(null);
   mrtResult.setStatus(0);
   mrtResult.setMsg("");
  }

    //readTxn() 
    public MRTResult readTxn (AerospikeClient ac, Policy rPolicy, Key key) {
  
        Record rec = null;  
        int status = 0;
        String msg = "";
      
        try {
            rec = ac.get(rPolicy, key);
        } catch (AerospikeException ae){
        int rc = ae.getResultCode();
            if(rc == ResultCode.UNSUPPORTED_FEATURE){               
                msg = "MRT Txn not supported. Check, is namespace SC?";
                status = 1;
                //Abort Txn. Application coding or server namespace configuration error
            }
            else if (rc == ResultCode.CLIENT_ERROR){                
                msg = "Record is from a different namespace than the one associated with this MRT Txn."; 
                msg = msg + "\nMRT Txn namespace is set upon first read or write transaction.";
                status = 1; //Abort Txn. Application coding error                
            }
            else if (rc == ResultCode.MRT_VERSION_MISMATCH){                
                msg = "Record version has changed since last read. Txn will not succeed commit.";
                status = 1;  //Abort Txn.
            }
            else if (rc == ResultCode.MRT_BLOCKED){
                msg = "Record is locked by another MRT";
                status = 1;  //Abort Txn.                
            }
            else if (rc == ResultCode.MRT_EXPIRED){
                msg = "This MRT transaction's Allowable Duration has expired";
                status = 1;  //Abort Txn.
            }
            else if (rc == ResultCode.MRT_COMMITTED){
                msg = "Txn was already committed. No more reads/writes allowed in this Txn.";
                status = 1;  //Abort Txn.
            }
            else if (rc == ResultCode.MRT_ABORTED){
                msg = "Txn was aborted. No more reads/writes allowed in this Txn.";
                status = 1;  //Abort Txn.
            }
        } 
        mrtResult.setRec(rec);
        mrtResult.setStatus(status);
        mrtResult.setMsg(msg);
        return mrtResult;
    }
    //End readTxn()

    //writeTxn() example with one bin updated
    public MRTResult writeTxn (AerospikeClient ac, WritePolicy wPolicy, Key key, Bin bin) {
        
        int status = 0;
        String msg = "";
        boolean inDoubt = false;
        
        wPolicy.durableDelete = true; 
        //MRTs only allow durable deletes.
        //Last bin set to null will delete the record.
        
        try {
            ac.put(wPolicy, key, bin);
        } catch (AerospikeException ae){
            int rc = ae.getResultCode();
            if(rc == ResultCode.UNSUPPORTED_FEATURE){                
                msg = "MRT Txn not supported. Check, is namespace SC?";
                status = 1;  //Abort Txn. Application coding or server namespace configuration error
            }
            else if (rc == ResultCode.CLIENT_ERROR){
                msg = "Record is from a different namespace than the one associated with this MRT Txn."; 
                msg = msg + "\nMRT Txn namespace is set upon first read or write transaction.";
                status = 1; //Abort Txn. Application coding error
            }
            else if (rc == ResultCode.MRT_VERSION_MISMATCH){                
                msg = "Record version has changed since last read. Txn will not succeed commit.";
                status = 1; //Abort Txn.
            } 
            else if (rc == ResultCode.MRT_TOO_MANY_WRITES){
                msg = "MRT writes have exceeded max limit of 4096 max writes. Write failed.";
                status = 1; //Abort Txn.
            } 
            //Server checks for version mismatch before MRT blocked by presence of a provisional.
            else if (rc == ResultCode.MRT_BLOCKED){
                msg = "Record is locked by another MRT";
                status = 2;  //Application can WAIT and RETRY IF the record was NOT READ BEFORE in this MRT
            }
            else if (rc == ResultCode.MRT_EXPIRED){
                msg = "This MRT transaction's Allowable Duration has expired";
                status = 1; //Abort Txn.
            }
            else if (rc == ResultCode.MRT_COMMITTED){
                msg = "Txn was already committed. No more reads/writes allowed in this Txn.";
                status = 1; //Abort Txn.
            }
            else if (rc == ResultCode.MRT_ABORTED){
                msg = "Txn was aborted. No more reads/writes allowed in this Txn.";
                status = 1; //Abort Txn.
            }
            else if (rc == ResultCode.TIMEOUT){
                //Check inDoubt
                inDoubt = ae.getInDoubt();
                if(!inDoubt){
                    msg = "Txn never made it to server. Retry the write.";
                    status = 3; //RETRY without waiting.
                } 
                else {
                    Policy rP = ac.copyReadPolicyDefault();
                    MRTResult mrtReadResult = readTxn(ac, rP, key);  //Re-read the record
                    int readStatus = mrtReadResult.getStatus();
                    if(readStatus == 1){  //Read failed for any number of reasons, including version verification
                        status = 1;  //Abort Txn.
                    } else if(readStatus == 0) { //Read succeeded, 
                    }
                }
                msg = "Txn was aborted. No more reads/writes allowed in this Txn.";
                status = 1; //Abort Txn.
            }
        } 
        mrtResult.setRec(null);
        mrtResult.setStatus(status);
        mrtResult.setMsg(msg);
        return mrtResult;
    }
    //End writeTxn()

    //commitTxn()
    public MRTResult commitTxn (AerospikeClient ac, Txn txn) {
        Record rec = null;  
        int status = 0;
        String msg = "";
        try {
            ac.commit(txn);
        } catch (AerospikeException.Commit cs){   //Commit Status
    
            //System.out.println(cs.getResultCode());
            if(cs.getResultCode() == ResultCode.TXN_FAILED){
                msg = "Transaction failed to commit.";
                status = 1;
                client.abort(txn);
            }
        }
        mrtResult.setRec(null);
        mrtResult.setStatus(status);
        mrtResult.setMsg(msg);
        return mrtResult;  
    }
    //End commitTxn()
}


In [12]:
//MRT 1 - Read transactions
import com.aerospike.client.Txn;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
import com.aerospike.client.CommitStatus;

Txn txn1 = new Txn();  //Generates unique TxnId for this MRT
System.out.println("Begin txn1: " + txn1.getId());
Policy mrtrpolicy1 = client.copyReadPolicyDefault();  //Create a default read policy copy. (Not a reference.)
mrtrpolicy1.txn = txn1;
    
//Read key1
Key key1 = new Key("test", "testset", "key1");

//Read key2
Key key2 = new Key("test", "testset", "key2");

Record reckey1t1 = null;

System.out.println("Check class based read. \n");

MRTTxn mrtTxn = new MRTTxn();
int mrtTxnStatus = 0;

MRTResult result = new MRTResult();
    
result = mrtTxn.readTxn(client, mrtrpolicy1, key1);

if(result.getStatus() == 1){
  System.out.println(result.getMsg());
  mrtTxnStatus++;
}
else {
  System.out.println(result.getRec());   
}

result = mrtTxn.readTxn(client, mrtrpolicy1, key2);

if(result.getStatus() == 1){
  System.out.println(result.getMsg());
  mrtTxnStatus++;  
}
else {
  System.out.println(result.getRec());   
}

//Done
if(mrtTxnStatus == 0){
  result = mrtTxn.commitTxn(client, txn1);
  if(result.getStatus() == 1){
      System.out.println(result.getMsg());
      mrtTxnStatus++;  
  }
} 
else {
    System.out.println("Aborting MRT transaction.");
    client.abort(txn1);
}
txn1.clear();
mrtrpolicy1.txn = null;

Begin txn1: -1731886587176750784
Check class based read. 

MRT Txn not supported. Check, is namespace SC?
MRT Txn not supported. Check, is namespace SC?
Aborting MRT transaction.


In [13]:
//MRT 1 - Read transactions
import com.aerospike.client.Txn;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
import com.aerospike.client.CommitStatus;

Txn txn1 = new Txn();  //Generates unique TxnId for this MRT
System.out.println("Begin txn1: " + txn1.getId());
Policy mrtrpolicy1 = client.copyReadPolicyDefault();  //Create a default read policy copy. (Not a reference.)
mrtrpolicy1.txn = txn1;
    
//Read key1
Key key1 = new Key("test", "testset", "key1");

//Read key2
Key key2 = new Key("bar", "testset", "key2");

Record reckey1t1 = null;

System.out.println("Check non-class based read. \n");
//Record reckey1t1 = record;
//System.out.println("Record Key1: "+ reckey1t1);

//Note: Record not found returns null record but the rest 
// of the MRT transaction can still proceed further.
try {
    reckey1t1 = client.get(mrtrpolicy1, key1);
}catch (AerospikeException ae){
    int rc = ae.getResultCode();
    if(rc == ResultCode.UNSUPPORTED_FEATURE){
        System.out.println("MRT Txn not supported. Check, is namespace SC?");
        //client.abort(txn1);   //Application coding or server namespace configuration error
    }
    else if (rc == ResultCode.CLIENT_ERROR){
        System.out.println("Record is from a different namespace than the one associated with this MRT Txn.");
        System.out.println("MRT Txn namespace is set upon first read or write transaction.");
        //client.abort(txn1);  //Application coding error
    }
    else if (rc == ResultCode.MRT_VERSION_MISMATCH){
        System.out.println("Record version has changed since last read. Txn will not succeed commit.");
        client.abort(txn1);
    }
    else if (rc == ResultCode.MRT_BLOCKED){
        System.out.println("Record is locked by another MRT");
        client.abort(txn1);
    }
    else if (rc == ResultCode.MRT_EXPIRED){
        System.out.println("This MRT transaction's Allowable Duration has expired");
        client.abort(txn1);
    }
    else if (rc == ResultCode.MRT_COMMITTED){
        System.out.println("Txn was already committed. No more reads/writes allowed in this Txn.");
        client.abort(txn1);
    }
    else if (rc == ResultCode.MRT_ABORTED){
        System.out.println("Txn was aborted. No more reads/writes allowed in this Txn.");
        client.abort(txn1);
    }
}

Record reckey2t1 = null;
try {
    reckey2t1 = client.get(mrtrpolicy1, key2);
}catch (AerospikeException ae){
    int rc = ae.getResultCode();
    if(rc == ResultCode.UNSUPPORTED_FEATURE){
        System.out.println("MRT Txn not supported. Check, is namespace SC?");
    }
}
//Done
client.commit(txn1);
txn1.clear();
mrtrpolicy1.txn = null;

System.out.println("Record Key1: "+ reckey1t1);
System.out.println("Record Key2: "+ reckey2t1);

Begin txn1: -1475023830264413800
Check non-class based read. 

MRT Txn not supported. Check, is namespace SC?
Record Key1: null
Record Key2: null


In [17]:
//MRT 2 - Read transactions
import com.aerospike.client.Txn;

Txn txn2 = new Txn();  //Generates unique TxnId for this MRT
System.out.println("Begin txn2: " + txn2.getId());
Policy mrtrpolicy2 = client.copyReadPolicyDefault();  //Create a default read policy copy. (Not a reference.)
mrtrpolicy2.txn = txn2;
    
//Read key1
Key key1 = new Key("test", "testset", "key1");

//Read key2
Key key2 = new Key("test", "testset", "key2");

Record reckey1t2 = null;
Record reckey2t2 = null;
try {
    reckey1t2 = client.get(mrtrpolicy2, key1);
    reckey2t2 = client.get(mrtrpolicy2, key2);
}catch (AerospikeException ae){
    int rc = ae.getResultCode();
    if(rc == ResultCode.UNSUPPORTED_FEATURE){
        System.out.println("MRT Txn not supported. Check, is namespace SC?");
    }
}

Begin txn2: 4992375603053601972
MRT Txn not supported. Check, is namespace SC?


In [14]:
//MRT 2 - Read transactions, continued



//Done
client.commit(txn2);
txn2.clear();
mrtrpolicy2.txn = null;


System.out.println("Record Key1: "+ reckey1t2);
System.out.println("Record Key2: "+ reckey2t2);

Record Key1: (gen:1),(exp:0),(bins:(name:Jack),(age:26))
Record Key2: (gen:1),(exp:0),(bins:(name:Jill),(age:20))


In [21]:
//MRT 2 & 3 - Mixing two Read Write MRT transactions
import com.aerospike.client.Txn;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
import com.aerospike.client.CommitStatus;

Txn txn2 = new Txn();  //Generates unique TxnId for this MRT
System.out.println("Begin txn2: " + txn2.getId());
Policy mrtrpolicy2 = client.copyReadPolicyDefault();  //Create a default read policy copy. (Not a reference.)
WritePolicy mrtwpolicy2 = client.copyWritePolicyDefault();  //Create a default write policy copy. (Not a reference.)
mrtrpolicy2.txn = txn2;
mrtwpolicy2.txn = txn2;


Txn txn3 = new Txn();  //Generates unique TxnId for this MRT
System.out.println("Begin txn3: " + txn3.getId());
Policy mrtrpolicy3 = client.copyReadPolicyDefault();  //Create a default read policy copy. (Not a reference.)
WritePolicy mrtwpolicy3 = client.copyWritePolicyDefault();  //Create a default write policy copy. (Not a reference.)
mrtrpolicy3.txn = txn3;
mrtwpolicy3.txn = txn3;
    
//Read key1
Key key1 = new Key("test", "testset", "key1");

//Write key2
Key key2 = new Key("test", "testset", "key2");

//Start with all reads null  (Jupyter Notebook interactive - hold on to previous cell execution values.)
Record reckey1t2 = null;
Record reckey2t2 = null;
Record reckey1t3 = null;
Record reckey2t3 = null;

Record reckey1t2 = client.get(mrtrpolicy2, key1);
Record reckey2t2 = client.get(mrtrpolicy2, key2);

Record reckey1t3 = client.get(mrtrpolicy3, key1);
if(txn3.monitorExists()){
    System.out.println("Txn3 after get - monitor record exists");
}else{
    System.out.println("Txn3 - no monitor record");
}
client.put(mrtwpolicy3, key2, new Bin("age", 26));
Record reckey2t3 = client.get(mrtrpolicy3, key2);

//Key monitorKey = new Key("test", "<ERO~MRT", txn3.getId());
//Record monRec = client.get(null, monitorKey);
//if(client.get(null, monitorKey) == null){
//    System.out.println("Txn3 monitor rec is null.");
//} else {
//     System.out.println("Txn3 monitor rec got created.");
//}
//System.out.println("Txn3 monitor rec after put:" + monRec);
if(txn3.monitorExists()){
    System.out.println("Txn3 after put - monitor record exists");
}else{
    System.out.println("Txn3 - no monitor record");
}
client.abort(txn3);
if(txn3.monitorExists()){
    System.out.println("Txn3 is not aborted");
}else{
    System.out.println("Txn3 after abort - txn3 is aborted");
}
//if(client.get(null, monitorKey) == null){
//System.out.println("Txn3 monitor record deleted, OK to retry MRT.");
//}

try {
    Record reckey2t2 = client.get(mrtrpolicy2, key2);
} catch (AerospikeException ae){
    int rc = ae.getResultCode();
    if(rc == ResultCode.MRT_VERSION_MISMATCH){
      System.out.println("txn2: Read version mismatch.");
      client.abort(txn2);
    }
    else if(rc == ResultCode.MRT_BLOCKED){
      System.out.println("txn2: Read blocked by another MRT.");      
    }
}

client.commit(txn3);
//Done
try {
  client.commit(txn2);
} catch (AerospikeException.Commit cs){   //Commit Status
    
    System.out.println(cs.getResultCode());
    if(cs.getResultCode() == ResultCode.TXN_FAILED){
      System.out.println("txn2: Transaction failed to commit.");
    }

    client.abort(txn2);
}

txn2.clear();
mrtrpolicy2.txn = null;
mrtwpolicy2.txn = null;

//client.commit(txn3);
txn3.clear();
mrtrpolicy3.txn = null;
mrtwpolicy3.txn = null;

System.out.println("Record Key1: "+ reckey1t2);
System.out.println("Record Key2: "+ reckey2t2);
System.out.println("Record Key1: "+ reckey1t3);
System.out.println("Record Key2: "+ reckey2t3);

Begin txn2: -7905575167406445875
Begin txn3: -8985283095634627668
Txn3 - no monitor record
Txn3 after put - monitor record exists
Txn3 after abort - txn3 is aborted
Txn3 monitor record deleted, OK to retry MRT.
txn2: Read version mismatch.
Record Key1: (gen:1),(exp:0),(bins:(name:Jack),(age:26))
Record Key2: (gen:13),(exp:0),(bins:(name:Jill),(age:26))
Record Key1: (gen:1),(exp:0),(bins:(name:Jack),(age:26))
Record Key2: (gen:14),(exp:0),(bins:(name:Jill),(age:26))


In [16]:
%sh asadm --enable -e "manage truncate ns test --no-warn" -h "127.0.0.1"

In [46]:
%sh aql -f "../aqlScripts/insert.aql"

In [47]:
%sh aql -f "../aqlScripts/insert_bar.aql"

In [107]:
//MRT Txn4 - Mixing two Namespaces in same MRT object / Checking consequence
import com.aerospike.client.Txn;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
import com.aerospike.client.CommitStatus;

if(txn4 != null) {
    txn4.clear();
    txn4 = null;
}

Txn txn4 = new Txn();  //Generates unique TxnId for this MRT
System.out.println("Begin txn4: " + txn4.getId());
Policy mrtrpolicy4 = client.copyReadPolicyDefault();  //Create a default read policy copy. (Not a reference.)
WritePolicy mrtwpolicy4 = client.copyWritePolicyDefault();  //Create a default write policy copy. (Not a reference.)
mrtrpolicy4.txn = txn4;
mrtwpolicy4.txn = txn4;
    
//Key1
Key key1 = new Key("test", "testset", "key1");
Key key2 = new Key("test", "testset", "key2");

//Key4
Key key4 = new Key("bar", "testset", "key4");

//Start with all reads null  (Jupyter Notebook interactive - hold on to previous cell execution values.)
Record reckey1t4 = null;
Record reckey2t4 = null;
Record reckey1t4 = null;
Record reckey2t4 = null;

Record reckey1t4 = client.get(mrtrpolicy4, key1);
Record reckey2t4 = client.get(mrtrpolicy4, key2);
client.abort(txn4);
//txn4.clear();
//txn4 = null;
//txn4 = new Txn();
//mrtrpolicy4.txn = txn4;

Record reckey4t4 = client.get(mrtrpolicy4, key4);
//client.put(mrtwpolicy4, key4, new Bin("age", 126));
//Record reckey4t4 = client.get(mrtrpolicy4, key4);
/*
try {
    Record reckey2t2 = client.get(mrtrpolicy2, key2);
} catch (AerospikeException ae){
    if(ae.getResultCode() == ResultCode.MRT_VERSION_MISMATCH){
      System.out.println("txn2: Read version mismatch.");
      client.abort(txn2);
    }
    if(ae.getResultCode() == ResultCode.MRT_BLOCKED){
      System.out.println("txn2: Read blocked by another MRT.");      
    }
}

client.commit(txn3);
//Done
try {
  client.commit(txn2);
} catch (AerospikeException.Commit cs){   //Commit Status
    
    System.out.println(cs.getResultCode());
    if(cs.getResultCode() == ResultCode.TXN_FAILED){
      System.out.println("txn2: Transaction failed to commit.");
    }

    client.abort(txn2);
}

txn2.clear();
mrtrpolicy2.txn = null;
mrtwpolicy2.txn = null;

//client.commit(txn3);
txn3.clear();
mrtrpolicy3.txn = null;
mrtwpolicy3.txn = null;
*/
System.out.println("Record Key1: "+ reckey1t4);
System.out.println("Record Key2: "+ reckey2t4);
System.out.println("Record Key4: "+ reckey4t4);


Begin txn4: -8027707595374730406


EvalException: Error -1: Command not allowed in current MRT state: ABORTED

In [39]:
//MRT 2 & 3 - Mixing two transactions
import com.aerospike.client.Txn;

Txn txn3 = new Txn();  //Generates unique TxnId for this MRT
System.out.println("Begin txn3: " + txn3.getId());
Policy mrtrpolicy3 = client.copyReadPolicyDefault();  //Create a default read policy copy. (Not a reference.)
WritePolicy mrtwpolicy3 = client.copyWritePolicyDefault();  //Create a default write policy copy. (Not a reference.)
mrtrpolicy3.txn = txn3;
mrtwpolicy3.txn = txn3;
    
//Read key1
Key key1 = new Key("test", "testset", "key1");

//Write key2
Key key2 = new Key("test", "testset", "key2");

Record reckey1t3 = client.get(mrtrpolicy3, key1);
client.put(mrtwpolicy3, key2, new Bin("age", 26));
Record reckey2t3 = client.get(mrtrpolicy3, key2);

//Done
client.commit(txn3);
txn3.clear();
mrtrpolicy3.txn = null;
mrtwpolicy3.txn = null;

System.out.println("Record Key1: "+ reckey1t3);
System.out.println("Record Key2: "+ reckey2t3);

Begin txn3: -8773178100543813631
Record Key1: (gen:1),(exp:0),(bins:(name:Jack),(age:26))
Record Key2: (gen:8),(exp:0),(bins:(name:Jill),(age:26))


In [41]:
//CLASS Cell
class myTest {
  public void foo (Record rec) {
    System.out.println("Calling myTest:foo() to print rec:"+rec);
  }
}

In [42]:
myTest myobj = new myTest();
myobj.foo(record);

Calling myTest:foo() to print rec:(gen:1),(exp:0),(bins:(name:Jack),(age:26))


# MRT ResultCode Defines

## SC Namespace check

If Namespace is AP, put() get() with policies that have Txn object with raise AerospikeException which will return:
```
try {
    Record reckey1t1 = client.get(mrtrpolicy1, key1);
}catch (AerospikeException ae){
    if(ae.getResultCode() == ResultCode.UNSUPPORTED_FEATURE){
        System.out.println("MRT Txn not supported. Check, is namespace SC?");
    }
}
```
## Any Transaction Timed out  
```
	/**
	 * Client or server has timed out.
	 */
	public static final int TIMEOUT = 9;

    // (ResultCode.TIMEOUT) ? --> (Check getInDoubt())
```
## MRT record blocked by a different transaction.
```
	public static final int MRT_BLOCKED = 120;
```

## MRT read version mismatch identified during commit.
### Some other command changed the record outside of the transaction.
```
	public static final int MRT_VERSION_MISMATCH = 121;
```

## MRT deadline reached without a successful commit or abort.
```
	public static final int MRT_EXPIRED = 122;
```

## MRT write command limit (4096) exceeded.
```
	public static final int MRT_TOO_MANY_WRITES = 123;
```

## MRT was already committed.
```
	public static final int MRT_COMMITTED = 124;
```

## MRT was already aborted.
```
	public static final int MRT_ABORTED = 125;
```    
## On Commit
### Multi-record transaction failed
```	 
	public static final int TXN_FAILED = -17;
```
## Txn Object

- Return MRT state.

	public Txn.State getState() {
		return state;
	}


- Verify that the MRT state allows future commands.
```
	public void verifyCommand() {
		if (state != Txn.State.OPEN) {
			throw new AerospikeException("Command not allowed in current MRT state: " + state);
		}
	}
```
where Txn.State can be: 
```   
    public static enum State {
		OPEN,
		VERIFIED,
		COMMITTED,
		ABORTED;
	}
```  

- Return if MRT is inDoubt.

	public boolean getInDoubt() {
		return inDoubt;
	}

- Clear MRT. Remove all tracked keys.  (Retains RxID and timeout setting, so will operate on same monitor record.)
```
    public void clear() {
      namespace = null;
      deadline = 0;
      reads.clear();
      writes.clear();
    }
```
- Txn object verifies each key handed to the transaction has the same namespace. Initially namespace is null. First transaction will set the namespace value. If you mix up namespaces in the same transaction, you will get:
```
  AerospikeException.CLIENT_ERROR = -1.  //(ResultCode.CLIENT_ERROR)
```
Also txn object can only be used if it is in OPEN state. If it is in anyother state, (VERIFIED, COMMITTED, ABORTED) a new read/write transaction in this Txn object (this TxnID) is not permitted.  You have to start a new Txn object.  You can re-use the same object as:
```
    if(txn4 != null) {
      txn4.clear();
      txn4 = null;
    }
    Txn txn4 = new Txn();  //Generates unique TxnId for this MRT
```

# Cleanup

In [43]:
%sh asadm --enable -e "manage sindex delete idx_age ns test set testset"

In [44]:
%sh asadm --enable -e "manage truncate ns test --no-warn" -h "127.0.0.1"