In [1]:
import pandas as pd
from collections import deque, OrderedDict
from datetime import timedelta, datetime, date, time
import os, pytz, calendar, urllib2, json, re, requests, pprint
import plotly.plotly as py
import plotly.graph_objs as go

## Configuration

In [2]:
#chemical names and associated health threshold
healthlimits = OrderedDict([("Benzene",          1), 
                            ("Black_Carbon",     5), 
                            ("Ethylbenzene",    60), 
                            ("Hydrogen_Sulfide", 8),
                            ("Sulfur_Dioxide",  75),
                            ("Toluene",         70),
                            ("Xylene",          50)])

#investigation time frame
#April 2017
startDay = datetime(2017,4,1)
duration = 30 #days

## Helper Functions

### Time helper functions

In [3]:
#returns start and end timestamps of provided start date and duration in number of days
def getEpochTimeBounds(d, duration):
    dt = assignPacificTimeZone(datetime(d.year,d.month,d.day))
    start = calendar.timegm(dt.utctimetuple())
    end = calendar.timegm((dt + timedelta(days=duration)).utctimetuple())
    return {'start' : start, 'end': end}

#attach DST aware timezone offset
#Note: does not convert time
def assignPacificTimeZone(dt):
    pacific = pytz.timezone("US/Pacific")
    dt = pacific.localize(dt)
    return dt

#convert either a unix timestamp or a datetime with tzinfo to a datetime in Pacific time
def convertToPacific(time):
    if not isinstance(time,datetime):
        time = datetime.fromtimestamp(time,tz=pytz.utc)
    pacific = pytz.timezone("US/Pacific")
    inPacific = time.astimezone(pacific)
    return inPacific

#returns a date range generator to be used to capture specific days:
# Ex:
#for single_date in daterange(startDay, duration):
#    print single_date.strftime("%Y-%m-%d")
def daterange(start_date, duration):
    for n in range(duration):
        yield start_date + timedelta(n)
        
#generator for 24 hour times
#Ex:
#for single_hour in twentyfourhourrange():
#     print single_hour.strftime("%H:%M:%S")
def twentyfourhourrange():
    for n in range(24):
        yield (datetime.combine(date.today(), time(0)) + timedelta(hours=(1*n))).time()

### Fenceline ESDR data helper functions

In [4]:
#converts a fenceline feed name to a location
def parsefeedcommunity(feedname):
    return str(feedname.split('fenceline_org')[0].strip().split('Fence')[0].strip())

#returns if an ESDR chemical name relates to a chemical name
def parsechemicalname(esdrchemicalname, chemicalname):
    return esdrchemicalname.find(chemicalname) > -1

#returns a copy of the interested data columns using chemical_map (generated below)
def getchemicaldata(monitor, chemical, fenceline_data, chemical_map):
    col_names = chemical_map[monitor][chemical]
    return fenceline_data[monitor][col_names].copy().replace("[^0-9]+",0,regex=True)

#Ex: getchemicaldata('Point Richmond Refinery',"Hydrogen_Sulfide",fenceline_data, chemicalmap)

## ESDR Functions

In [5]:
#returns ESDR data as a json
def makeESDRrequest(urlsuffix):
    url = "https://esdr.cmucreatelab.org/api/v1/feeds/%s" % urlsuffix
    return json.loads(urllib2.urlopen(url).read())['data']

#returns the feed information for a given feedID
def loadfeed(feedID):
    return makeESDRrequest(feedID)

#returns all of the channels for a given feed data (use with loadfeed)
def getchannels(feedData):
    return [str(channel) for channel in feedData['channelBounds']['channels'].keys()]

#returns info on all of the fenceline feeds
def getfencelinefeedinfo():
    return makeESDRrequest('?fields=id,name,latitude,longitude&whereOr=productId=36&orderBy=+id')['rows']    

In [6]:
#feedID: integer
#esdrChannels: a list of channel names as strings
#timeOptions: a dictionary as {bounds: {start: epochInt, end: epochInt}}, or {day: datetime, duration: int}
def makedataframefromESDR(feedID, 
                          timeOptions = {}):
    if timeOptions.get('bounds') == None:
        bounds = getEpochTimeBounds(
            timeOptions.get('day') or datetime.now()-timedelta(1), 
            timeOptions.get('duration') or 1)
    else:
        bounds = timeOptions.get('bounds')
        
    esdrChannels = getchannels(loadfeed(feedID))
    
    try:
        r = makeESDRrequest("%s/channels/%s/export?from=%s&to=%s&format=json" % (feedID, ','.join(esdrChannels), bounds['start'], bounds['end']))
        print "loaded " + str(len(r)) + " data points for feed " + str(feedID) + ", channels: " + '|'.join(esdrChannels) + ", time " + str(bounds['start'])
    except:
        print "error loading data from ESDR: feed " + str(feedID) + ", channel " + '|'.join(esdrChannels) + ", time " + str(bounds['start'])
    cols = ['Time']
    cols.extend(esdrChannels)
    df = pd.DataFrame(r,columns=cols)
    df['Time'] = pd.to_datetime(df['Time'],unit='s').dt.tz_localize('UTC').dt.tz_convert('US/Pacific')
    return df.set_index(['Time'])

## Import Data

In [7]:
#time frame: 1 month of April, 2017
timeframe = {'day':startDay, 'duration':duration}

#retrieves fenceline feed information (location, name, feed id)
fenceline_feeds = getfencelinefeedinfo()

#loads all the data from every channel and every fenceline feed, separated by feed
fenceline_data = {str(parsefeedcommunity(feed['name'])):makedataframefromESDR(feed['id'],timeframe) for feed in fenceline_feeds}

loaded 25287 data points for feed 4901, channels: FTIR_System_Status|UV_Signal_Strength|FTIR_Ethanol|FTIR_Mercaptan|FTIR_System_Manufacturer|FTIR_Ammonia|UV_Ozone|FTIR_Methane|FTIR_Carbon_Monoxide|UV_Carbon_Disulfide|FTIR_Nitrous_Oxide|TDL_Signal_Strength|FTIR_Carbonyl_Sulfide|UV_Benzene|FTIR_Total_Hydrocarbons|FTIR_1_3_Butadiene|FTIR_Ethylene|TDL_Hydrogen_Sulfide|UV_Xylene|UV_Toluene|UV_Sulfur_Dioxide|UV_System_Status|FTIR_MTBE, time 1491030000
loaded 24448 data points for feed 4902, channels: FTIR_System_Status|UV_Signal_Strength|FTIR_Ethanol|FTIR_Mercaptan|FTIR_System_Manufacturer|FTIR_Ammonia|UV_Ozone|FTIR_Methane|FTIR_Carbon_Monoxide|UV_Carbon_Disulfide|FTIR_Nitrous_Oxide|TDL_Signal_Strength|FTIR_Carbonyl_Sulfide|UV_Benzene|FTIR_Total_Hydrocarbons|FTIR_1_3_Butadiene|FTIR_Ethylene|TDL_Hydrogen_Sulfide|UV_Xylene|UV_Toluene|UV_Sulfur_Dioxide|UV_System_Status|FTIR_MTBE, time 1491030000
loaded 16910 data points for feed 4903, channels: Wind_Speed_MPH|Wind_Direction|OGD_System_Status|Hu

### Build Chemical to Column Mapping

In [8]:
fenceline_columns = {monitor: [x for x in fenceline_data[monitor].columns]
                     for monitor in fenceline_data}

chemicalmap = {monitor: {chemical:[x for x in fenceline_data[monitor].columns if parsechemicalname(x, chemical)] 
                     for chemical in healthlimits}
               for monitor in fenceline_data}

## Data Analysis Algorithms

In [9]:
def countabovehealthlimit(df, chemical):
    countsabovehealthlimit = [];
    healthlimit = healthlimits[chemical]
    for column in df:
        countsabovehealthlimit.append(df[df[column] > healthlimit].count(numeric_only=True).tolist()[0])
    return countsabovehealthlimit

def countpresence(df, chemical):
    countspresence = [];
    for column in df:
        countspresence.append(df[df[column] > 0].count(numeric_only=True).tolist()[0])
    return countspresence

def maxHourlyAverage(df, windFeed, channel):
    ret = []
    halfHourInSecs = 30 * 60
    def avg(x, delta):
        ser = df.iloc[(df.index >= x - delta) & (df.index <= x + delta), 0]
        return ser.mean()
    
    healthLimit = 1
    df['avg'] = pd.Series(data = df.index, index = df.index).apply(lambda x: avg(x,delta=halfHourInSecs))
    maxAvg = df.nlargest(1,'avg')
    maxAvgValue = maxAvg.avg.iloc[0]
    ret.append("%.2f" % maxAvgValue + unit(channel))
    
    #don't calculate time and wind data if max average was 0 (below detection limit)
    if maxAvgValue != 0:
        ret.append(generateHealthFactor(maxAvgValue,channel))
    
        hourStart = convertToPacific(maxAvg.index[0] - halfHourInSecs).strftime('%I:%M')
        hourEnd = convertToPacific(maxAvg.index[0] + halfHourInSecs).strftime('%I:%M%p')
        ret.append(hourStart + "-" + hourEnd)
    
        #get wind data for hour with highest average
        bounds = {'start':maxAvg.index[0] - halfHourInSecs,'end':maxAvg.index[0] + halfHourInSecs}
        wind = makeDataFrameFromEsdr(windFeed,"Wind_Direction,Wind_Speed_MPH","Wind_Direction,Wind_Speed_MPH",{'bounds':bounds})
        
        if len(wind) == 0 or wind['Wind_Direction'].mean() == 0:
            ret.append("No data")
        else:
            #break into quadrants and select the prevailing one
            quads = [0,90,180,270,360]
            quad_names = ['NE','SE','SW','NW']
            wind['Compass_Dir'] = pd.cut(wind['Wind_Direction'],quads,labels=quad_names)
            direction = wind.groupby('Compass_Dir').sum().nlargest(1,'Wind_Speed_MPH').index[0]
            ret.append(direction)
    else:
        ret.extend(['0.00','0.00','0.00'])
    return ret

#total time, in hours, that a detection was present of given chemical or aggregated set of chemicals
def calcHoursDetected(df, chemical, sampleFrequency = 1):
    detected = df.loc[df[chemical] > 0, [chemical]]
    fullDecimal = sampleFrequency * len(detected) / float(60)
    return "%.2f" % fullDecimal + " hours"

#total time, in hours, that detection was greater than health threshold of given chemical
def calcHoursAboveHealthLimit(df, chemical, sampleFrequency = 1):
    limit = channels[chemical]
    detected = df.loc[df[chemical] > limit, [chemical]]
    fullDecimal = sampleFrequency * len(detected) / float(60)
    return "%.2f" % fullDecimal + " hours"
#calcHoursAboveHealthLimit(df, "Benzene")

def calcDailyMean(df, chemical, nd=0):
    if nd == 0:
        fullDecimal = df[chemical].mean()
    else:
        #substitute readings of 0 for the passed-in non-detect value
        #(which should represent that chemicals' detection limit)
        fullDecimal = df.replace(0.0,nd)[chemical].mean()
    return "%.2f" % fullDecimal + unit(chemical)
#calcDailyMean(df,"Benzene", 0.5)

## Analyze Data Hourly

In [11]:
profilerStart = datetime.now();
#Retrieves number of 
daily_data={}
numAnalyzed = 0;
for monitor in fenceline_data.keys():
    daily_data[monitor]={};
    for chemical in healthlimits:
        daily_data[monitor][chemical]={}
        df = getchemicaldata(monitor, chemical, fenceline_data, chemicalmap)
        for single_date in daterange(startDay, duration):
            numAnalyzed += 1;
            daily_data[monitor][chemical][single_date]={}
            healthdaycount = 0
            presencedaycount = 0
            one_day_data = df[single_date.strftime("%Y-%m-%d")]
            for single_hour in twentyfourhourrange():
                next_hour = datetime.combine(date.today(), single_hour) + timedelta(hours=1)                
                one_hour_data = one_day_data.between_time(single_hour.strftime("%H:%M:%S"), next_hour.strftime("%H:%M:%S"))
                healthdaycount += 1 if sum(countabovehealthlimit(one_hour_data, chemical)) else 0
                presencedaycount += 1 if sum(countpresence(one_hour_data, chemical)) else 0
            daily_data[monitor][chemical][single_date]['hoursabovehealthlimit']=healthdaycount
            daily_data[monitor][chemical][single_date]['hourspresent']=presencedaycount
elapsedTime = (datetime.now() - profilerStart).totalseconds()
timeperprocessed = elapsedTime/numAnalyzed
print "Elapsed Time:", elapsedTime, "seconds"
print "Time Per Hour Processed:", timeperprocessed, "seconds"


In [15]:
daily_data['North Richmond Community']['Hydrogen_Sulfide']

{datetime.datetime(2017, 4, 1, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 3},
 datetime.datetime(2017, 4, 2, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 1},
 datetime.datetime(2017, 4, 3, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 4},
 datetime.datetime(2017, 4, 4, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 2},
 datetime.datetime(2017, 4, 5, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 0},
 datetime.datetime(2017, 4, 6, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 0},
 datetime.datetime(2017, 4, 7, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 0},
 datetime.datetime(2017, 4, 8, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 0},
 datetime.datetime(2017, 4, 9, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 0},
 datetime.datetime(2017, 4, 10, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 0},
 datetime.datetime(2017, 4, 11, 0, 0): {'hoursabovehealthlimit': 0,
  'hourspresent': 2},
 datetime.datetime(

In [None]:
getchemicaldata('Point Richmond Refinery',"Hydrogen_Sulfide",fenceline_data, chemicalmap)['2017-04-01':'2017-04-02']

datetime.timedelta(-1, 86399, 999989)