In [1]:
import requests
import json
import time
import elasticsearch

In [198]:
CONST_RIPPLE_URL = 'https://data.ripple.com/v2/'
CONST_RIPPLE_HEALTH_CHECK = 'https://data.ripple.com/v2/health/importer?verbose=true'
CONST_ELASTIC_URL = 'http://localhost:9200'
CONST_ELASTIC_MAXSIZE = 25
CONST_ELASTIC_TIMEOUT = 30
CONST_ELASTIC_MAXRETRIES = 10
CONST_ELASTIC_LEDGER_NAME = "ledger"
CONST_ELASTIC_TRANSACTION_NAME = "transaction"
CONST_NULL_TRANSACTIONS = [{}]
CONST_NULL_LEDGER = {}

In [199]:
class RippleLedger:
    
    def __init__(self, LedgerIndex):
        #self.makeGetLedgerRequest(LedgerIndex)
        requestResult = self.makeGetLedgerRequest(LedgerIndex)
        if requestResult['result'] == 'success':
            self.result = 'success'
            self.extract(requestResult['ledger'])
        else:
            self.result = 'error'
        
    @staticmethod
    def makeGetLedgerRequest(LedgerIndex):
        requestResult = json.loads(requests.get(CONST_RIPPLE_URL + 'ledgers/' + str(LedgerIndex) 
                                   + '?transactions=true&binary=false&expand=true').text)
        return requestResult

    def extract(self, LedgerObject):
        self.Transactions = LedgerObject.pop('transactions')
        self.LedgerHeader = LedgerObject

In [206]:
class ElasticRipple:
    
    def __init__(self):
        self.elasticDatabase = elasticsearch.Elasticsearch([CONST_ELASTIC_URL], maxsize = CONST_ELASTIC_MAXSIZE,
                                        timeout = CONST_ELASTIC_TIMEOUT, max_retries = CONST_ELASTIC_MAXRETRIES, 
                                        retry_on_timeout=True)
        self.indexRipple()
    
    def addLedger(self, LedgerObject, curID):
        self.elasticDatabase.index(index = CONST_ELASTIC_LEDGER_NAME, doc_type = 'b', body = LedgerObject, id = curID)
    
    def addTransactions(self, Transactions):
        if Transactions != CONST_NULL_TRANSACTIONS:
            for tx in Transactions:
                del tx['meta']['AffectedNodes']
                if 'TakerPays' in tx['tx'] and type(tx['tx']['TakerPays']) != 'dict':
                    del tx['tx']['TakerPays']
                if 'TakerGets' in tx['tx'] and type(tx['tx']['TakerGets']) != 'dict':
                    del tx['tx']['TakerGets']
                self.elasticDatabase.index(index = CONST_ELASTIC_TRANSACTION_NAME, doc_type = 'b', body = tx)
    
    def indexRipple(self):
        self.getLastValidatedLedgerIndex()
        self.getCurrentLedgerIndex()
        while self.CurrentLedgerIndex <= self.LastValidatedLedgerIndex:
            self.CurrentLedgerIndex += 1
            self.RL = RippleLedger(self.CurrentLedgerIndex)
            print (self.CurrentLedgerIndex)
            if self.RL.result == 'success':
                self.addTransactions(self.RL.Transactions)
                self.addLedger(self.RL.LedgerHeader, self.CurrentLedgerIndex)
            else:
                self.addTransactions(CONST_NULL_TRANSACTIONS)
                self.addLedger(CONST_NULL_LEDGER, self.CurrentLedgerIndex)
    
    def getCurrentLedgerIndex(self):
        try:
            self.CurrentLedgerIndex = self.elasticDatabase.count(CONST_ELASTIC_LEDGER_NAME)['count']
        except:
            self.CurrentLedgerIndex = 0
    
    def getLastValidatedLedgerIndex(self):
        try:
            self.LastValidatedLedgerIndex = json.loads(requests.get(CONST_RIPPLE_HEALTH_CHECK).text)['last_validated_ledger']
        except:
            self.LastValidatedLedgerIndex = 1

In [145]:
a = RippleLedger(8924147)

In [174]:
ER.getCurrentLedgerIndex()
ER.CurrentLedgerIndex
ER.elasticDatabase.count(CONST_ELASTIC_LEDGER_NAME)['count']
elasticDatabase.get(index = CONST_ELASTIC_LEDGER_NAME, 
                                                      doc_type = 'b', id = c)['_source']['seqNum']

GET http://localhost:9200/ledgerrr/b/4 [status:404 request:0.005s]


4

In [209]:
json.loads(requests.get(CONST_RIPPLE_HEALTH_CHECK).text)['last_validated_ledger']

38024242

In [208]:
ER = ElasticRipple()
#ER.elasticDatabase.count(CONST_ELASTIC_LEDGER_NAME)['count']
#ER.elasticDatabase.get(index = CONST_ELASTIC_LEDGER_NAME,doc_type = CONST_ELASTIC_LEDGER_NAME, id = 8924146)['_source']['seqNum']
#ER.elasticDatabase.get(index = CONST_ELASTIC_LEDGER_NAME, doc_type = CONST_ELASTIC_LEDGER_NAME, id = 8924149)

159
160
161
162


KeyboardInterrupt: 

In [203]:
ER.elasticDatabase.count(CONST_ELASTIC_LEDGER_NAME)['count']

10

In [8]:
elasticDatabase.index(index = 'asd', doc_type='b', id = 23, body={'a':2, 'b':3})

{'_id': '23',
 '_index': 'asd',
 '_primary_term': 3,
 '_seq_no': 4,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_type': 'b',
 '_version': 2,
 'result': 'updated'}

In [96]:
ER.elasticDatabase.count(CONST_ELASTIC_TRANSACTION_NAME)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 5, 'total': 5},
 'count': 108}

In [122]:
ER.elasticDatabase.count(CONST_ELASTIC_LEDGER_NAME)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 5, 'total': 5},
 'count': 1}

In [167]:
ER.elasticDatabase.get(index = CONST_ELASTIC_LEDGER_NAME,doc_type = 'b', id = 8924148)

{'_id': '8924148',
 '_index': 'ledgerr',
 '_source': {'accepted': 'true',
  'account_hash': '65BFA7A98A81A7A795D4ABD351E34459D8E0E75C8C7ADAB309B86194DA3FE953',
  'close_time': 1410990430,
  'close_time_human': '2014-Sep-17 21:47:10',
  'close_time_resolution': '10',
  'closed': 'true',
  'hash': 'F271AF21016E468F22242BB5F51A740D7D635820DBFF62F3B2422F8B6CD3F22B',
  'ledger_hash': 'F271AF21016E468F22242BB5F51A740D7D635820DBFF62F3B2422F8B6CD3F22B',
  'ledger_index': 8924148,
  'parent_hash': 'E66A6BFA2B33274C2497F9296BEE34C3786A4737A095C43829C34181CC7955B5',
  'seqNum': '8924148',
  'totalCoins': '99999976630355735',
  'total_coins': '99999976630355735',
  'transaction_hash': '3F7B263B06CF778EE68D9F18BEA23DF0331063B7994619AF534E96B5FF453A6B'},
 '_type': 'b',
 '_version': 1,
 'found': True}

In [184]:
#ER.elasticDatabase.search(CONST_ELASTIC_LEDGER_NAME)['hits']['hits']
#ER.elasticDatabase.search(CONST_ELASTIC_LEDGER_NAME)
ER.elasticDatabase.Search(using=es, index=CONST_ELASTIC_LEDGER_NAME, doc_type='b').query("query_string", 
                                                                      query="*").sort("timestamp", {'order': "desc"})

AttributeError: 'Elasticsearch' object has no attribute 'Search'