Skip to content

Part III Dealing With Metric Overload Dynamic Sweeper

Dave Carroll edited this page Sep 29, 2016 · 13 revisions

Dealing With Metric Overload

So now our various web servers are sending along five metrics every 60 seconds. These metrics are accumulating in the form of many .rrd files since each one is a uniquely named metric based upon our id we discussed earlier. This isn't scalable due to the number of potential id's we process. We need to accomplish two things initially. First, we need to get the sheer number of these metrics under control by purging any that are older than 60 seconds (because we only care about the metrics received within the last 60 seconds) and second we need to dynamically generate .json files based upon only these most recent and valid metrics, every 60 seconds. The .json created will build node level graphs showing the current top 5 metrics. These will be rebuilt dynamically every 60 seconds so we always only see the most current info. In a sense, we ignore the regular Ganglia metric graphs displayed lower on each node page and really focus only upon the header json-built graphs near the top of the node page.

To manage my dynamic graphs I wrote a program called DynamicGraph. This contains several classes to handle the purging, building of graphs and some other neat features.

DynamicGraph started off being run from cron every 60 seconds but later evolved into a demonized program started from init with the run time now configurable from within a config file located in /etc. DynamicGraph sweeps my rrd tree for the defined clusters containing the dynamic metrics, checks the rrd timestamps, purges any older than a specific configurable time and builds new json dynamic graphs under /var/www/html/ganglia/graph.d to display in near real-time.

It helps greatly if you use a naming system for any dynamic metrics, given you will be working with them. For example all metrics in this example are prefixed api_top5_ and classed by type (http, https etc..) followed by the id I assigned and finally the .rrd suffix. In this manner I dict up clusters, nodes within clusters and metrics within nodes so I can create class objects from these in my program.

After pulling in each type of metric by node, I run them through my class in the following workflow:

  • Check each metric using rrdtool info and if the last update is older than 60 (secs) purge that metric by calling os.remove on the actual .rrd file as well as purging it from the working list. This ensures that at any given time I have no more than 5-10 metrics per 60 seconds.
  • Surviving metrics are then checked for last ds and that value is assigned to the metric as a value.
  • Surviving metrics are then sent for sorting from greatest to lowest value in the dict. These are returned to my create_json method which does an re.sub on a pre-built .json template.
  • Prior to being written to the .json files however, I extract the id and run that through a local Redis db which maps id's to customer names. If the id is not in my local db, I look that up against our primary central database across the network then insert it for later use in my local Redis db. This keeps my lookups fast.
  • Write out the .json graph to my Ganglia web doc root. These .json graphs then appear at the top of each node page showing the top5 requestors to each web server over the past 60 seconds.

Because we deal with many id's and customers, these mappings are stored in a central database. I figured it would be neat to replace a numeric id showing in my metric graphs with the actual customer name retrieved from the database by looking up the id. Once I have narrowed the top5 metrics by id, I call a local Redis db to look up the mapping if I have it there. I use a db connection library so logins are not visible here.

(Note this section is for using sqlite3, now outdated. I've since moved to using Redis)

    def check_local_db(self,app_id):
    ''' here we look up site name by app_id
        checking our local sqlite db first '''
    import sqlite3

    self.app_id = app_id

    db = sqlite3.connect('/usr/local/sbin/APPIDS.db')
    cursor = db.cursor()
    # Create table if not exists
    cursor.execute('''CREATE TABLE IF NOT EXISTS appids
        (APP_ID INT PRIMARY KEY UNIQUE NOT NULL,
        CUSTNAME CHAR(50),
        SHARD CHAR(10));''')
    db.commit()
    db.close()

    # Query to see if app_id exists in our local db
    #print("Trying to query for our app_id %s" % app_id)   # DEBUG
    try:
        db = sqlite3.connect('/usr/local/sbin/APPIDS.db')
        cursor = db.cursor()
        cursor.execute("SELECT * FROM appids WHERE APP_ID = %s" % (self.app_id))
        #db.commit
        allrows = cursor.fetchall()
        for row in allrows:
            #print("record exists: %s" % row[0])
            custname = row[1]
            shard = row[2]
            if custname is not 'None':
                return(custname) # return to first caller
    except Exception as e:
        db.rollback()
        raise e
    finally:
        db.close()

    # No record, so let's get one from central db
    print("No record found locally, going to central db")
    try:
        (custname,shard) = self.lookup_appid(self.app_id)
        print(self.app_id,custname,shard)
    except:
        print("Error querying or looking up against the central db in lookup_appid")
        #custname = "Unknown"

    # If we were successful getting the customer name, let's enter that locally
    if custname:
        print("Returned data, so inserting records")
        print("Inserting: %s %s and %s into db" % (self.app_id,custname,shard))
        try:
            # If record does not exist, insert
            db = sqlite3.connect('/usr/local/sbin/APPIDS.db')
            cursor = db.cursor()
            cursor.execute("INSERT INTO appids (APP_ID,CUSTNAME,SHARD) \
            VALUES (?, ?, ?)",(self.app_id,custname,shard));
            db.commit()
        except Exception as e:
            #db.rollback()
            raise e
        finally:
            db.close()
            return(custname)
    else:
        print("Error: Returned None and so skipping db insert locally for %s" %(self.app_id))
        return(self.app_id)

    # Catchall db close
    db.close()

And, if my local lookup fails, I call a method in this class called lookup_appid as seen in the above method.

    def lookup_appid(self,app_id):
    sys.path.insert(0,'/usr/lib/ganglia/python_modules')
    from nanglia import dbconfig
    import MySQLdb

    self.app_id = app_id

    db = dbconfig('centraldb','central')
    cursor = db.cursor()
    sql = "select id,name,parent_id from central.lookups where id in (select app_id from central.app_ids where app_id = %s);"
    try:
        cursor.execute(sql,(self.app_id))
        result = cursor.fetchall()
        print(result)  # Debug database returns - returns a tuple
        for row in result:
            app = row[0]
            custname = row[1]
            shard = row[2]
    except:
        print("Error running mysql query")
    finally:
        db.close()
    print("Returning %s and %s from lookup_appid" % (custname,id))
    return(custname,id)

The sqlite solution worked very well - never had an issue and it was quite fast. However, as I tinkered I decided to employ Redis to achieve the same thing - very fast lookups by id. To accomplish this I added a method to check my local Redis instance for the result and if not found, call the central database and insert the result - identical flow to the sqlite option, though far less coding, it turned out.

I began by adding a variable option in DynamicGraph to support either Sqlite or Redis.

# Cache engine can be 'redis' or 'sqlite'
cache_engine = 'redis'

The Redis method looked something like:

def check_redis(self,app_id):
    '''here we check our local redis cache for
       app_id to name mappings and return to create_json '''

    import redis

    self.app_id = app_id

    r = redis.Redis(
        host='localhost',
        port=6379,
        password='############')

    if r.exists(app_id):
        custname = r.get(app_id)
        return(custname)

    else:
        # Key not found, cache miss

        # No record, so let's get one from s-central
        #print("No record found locally, going to s-central")
        try:
            (custname,shard) = self.lookup_appid(self.app_id)
            #print(self.app_id,custname,shard)
        except:
            #print("Error querying or looking up against s-central in lookup_appid")
            loglib(logfile,"ERROR: Error querying s-central for %s" % self.app_id)
            #custname = "Unknown"

        # If we were successful getting the customer name, let's enter that locally
        if custname:
            #print("Returned data, so inserting records")
            #print("Inserting: %s %s and %s into redis cache" % (self.app_id,custname,shard))
            try:
                # If record does not exist, insert
                r.set(app_id,custname)
            except Exception as e:
                raise e
            finally:
                return(custname)
        else:
            #print("Error: Returned None and so skipping db insert locally for %s" %(self.app_id))
            return(self.app_id)

Either option proved quick and efficient - just a matter of preference. About 5 minutes after writing in the Redis method and supporting code I was up and running. Over time my Redis keystore began filling up with the id to customer name mappings.

Redis Ids

With the id's now converted to actual customer names, I can begin building my .json files to create graphs that will live only 60 seconds. But first, let's look at an example of where I purge metrics.

In my purge method, metrics older than 60 seconds are removed from /var/lib/ganglia/rrds using the Python os.path.abspath method.

    def purge_metric(self,metric):
        ''' Here we take metrics that are considered
            expired and remove them from the Ganglia
            working metric directories. '''

        fullpath = os.path.abspath(metric)
        MasDir = os.path.dirname(os.path.dirname(fullpath))     # gives us cluster path
        SumDir = MasDir+'/__SummaryInfo__'+'/'+metric           # gives us path to metric in SummaryInfo
        # Here we need to remove __Summary_File/file too - need path set for this
        loglib(logfile,'INFO: Purging metric %s under purge_metric method' % fullpath)
        try:
            os.remove(fullpath)
            os.remove(SumDir)
            loglib(logfile,'INFO: Metric %s successfully purged' % fullpath)
        except:
            loglib(logfile,'WARN: Purge of metric %s encountered an error.' % fullpath)
            retstatus = 'failed'

We have to remove the expired metric from our list of metrics being passed between methods, remove the actual .rrd file in the node directory and remove the corresponding metric reference in the cluster SummaryInfo directory. The above code does this.

Part IV: DynamicGraph Configuration

Clone this wiki locally