In [1]:
import pandas as pd
import numpy as np

import os

import datetime as dt
from datetime import datetime,timedelta
from pytz import all_timezones

import json, requests
import pickle
import OpenSSL

from pymongo import MongoClient

In [2]:
def connect_mongo(db, host='localhost', port=27017, ):
    """ A util for making a connection to mongo """

    conn = MongoClient(host, port)
    return conn[db]

In [3]:
def read_mongo(db, collection, query={}, host='localhost', port=27017, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = connect_mongo(host=host, port=port, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))
    
    return df

In [4]:
metadata_df =read_mongo('channel_db', 'metadata_coll')

In [5]:
channel_ids = metadata_df['channel_id']
channel_keys = metadata_df['read_api_key']
channel_dict = dict(zip(channel_ids, channel_keys))

In [6]:
#function that downloads all channel data form ThingSpeak
def download(apiurl,cache='use',verbose=False,apikey=None,cacheonly=None):
    """
    download(apiurl,cache='use',verbose=False,apikey=None):
    Loads thingspeak data from apiurl
    Set cache to:
        'use' - to use it
        'refresh' - to not use it
        'only' - to only use it
    cacheonly = if set, only cache this many previous training points,
       will only report this many when output. This is useful to avoid
       caches becoming arbitrarily large with historic data.
    """
    filename = 'channel%s.p'%apiurl.split('/')[-1]
    cachefile = os.path.isfile(filename)
    if (cache=='use' or cache=='only') and cachefile:
        alldata = pickle.load( open( filename, "rb" ) )
        if (cache=='only'):
            if verbose: print("Using just cache - may be out of date")
            return alldata
        if verbose: print("Using cache")
        nextid = alldata[-1]['entry_id']+1
        endtime = str_to_date(alldata[-1]['created_at'])+timedelta(seconds=1)
    else: #no cachefile or refresh -> we want to reload from the API
        if verbose: print("Ignoring/overwriting cache")
        if (cache=='only'):
            ##TODO Throw exception - can't only use cache as there is no cache
            assert False, "Can't only use cache as there is no cache"
        nextid = 1
        alldata = []
        endtime = None  
    if (cache=='only'): #we should stop now, and use the cached data we've got
        return alldata
        
    result = None
    if verbose: print("Using %d records from cache" % len(alldata))
    while result != -1:
        #thingspeak doesn't let you download ranges of ids, instead you have to
        #download ranges of dates. We can only download 8000 at a time, so we
        #need to get the date of the next one we need (then we ask for that datetime
        #until now, and repeat until we run out of new items).
        url = apiurl+'/feeds/entry/%d.json' % (nextid)
        if apikey is not None: url += '?api_key=%s' % apikey
        print("Loading from %s" % url)
        result = json.loads(requests.post(url, timeout = 100.0).content.decode('utf-8'))
        starttime = endtime
        if result==-1:
            #if verbose: print("Warning: Unable to retrieve data (does channel exist? is it public?)")
            endtime = datetime.now()
        else:
            endtime = str_to_date(result['created_at'])
        if (nextid==1):
            starttime = endtime
        else:
            start = datetime.strftime(starttime,'%Y-%m-%dT%H:%M:%SZ')
            end = datetime.strftime(endtime-timedelta(seconds=1),'%Y-%m-%dT%H:%M:%SZ')
            url = apiurl+'/feeds.json?start=%s&end=%s' % (start,end)
            if apikey is not None: url += '&api_key=%s' % apikey
            print("Loading from %s" % url)                        
            data = json.loads(requests.post(url, timeout = 100.0).content.decode('utf-8'))
            if (data!=-1):
                alldata.extend(data['feeds'])
                if verbose: print("    Adding %d records..." % len(data['feeds']))
            else:
                if verbose: print("Warning: unable to read data feed")
            
        nextid += 7999 #thought download was 8000 fields, but it's 8000 records. 8000/len(result)
    if verbose: print("New cache has %d records, saving." % len(alldata))
    
    if cacheonly is not None:
        pickle.dump( alldata[-cacheonly:], open( filename, "wb" ) )
    else:
        pickle.dump( alldata, open( filename, "wb" ) )
    return alldata
    

In [7]:
#Function that converts a string to datetime format
def str_to_date(st):
    return datetime.strptime(st,'%Y-%m-%dT%H:%M:%SZ')

In [8]:
channel_data_list = []
for channel_id, channel_key in channel_dict.items():
    channel_url = 'http://thingspeak.com/channels/'+str(channel_id)
    data = download(channel_url, verbose = True, apikey = channel_key)
    
    df = pd.DataFrame(data)#creating a dataframe of the data
    
    df['created_at'] =  pd.to_datetime(df['created_at']) #converting to DateTime format
    df['created_at'] = df['created_at'].dt.tz_convert('Africa/Kampala') #Converting from UTC to EAT
    
    df['channel_id'] = channel_id #adding an additional column
    print(channel_id, ":done!")
    channel_data_list.append(df)
    

Using cache
Using 76984 records from cache
Loading from http://thingspeak.com/channels/643676/feeds/entry/76985.json?api_key=MXMFGRF4ERL4VKI2
Loading from http://thingspeak.com/channels/643676/feeds.json?start=2019-06-23T20:43:37Z&end=2019-10-11T20:02:38Z&api_key=MXMFGRF4ERL4VKI2
    Adding 0 records...
New cache has 76984 records, saving.
643676 :done!
Using cache
Using 165977 records from cache
Loading from http://thingspeak.com/channels/667402/feeds/entry/165978.json?api_key=A7E6OGD6QRIAVVK7
Loading from http://thingspeak.com/channels/667402/feeds.json?start=2019-10-11T11:42:42Z&end=2019-10-11T11:43:20Z&api_key=A7E6OGD6QRIAVVK7
    Adding 0 records...
Loading from http://thingspeak.com/channels/667402/feeds/entry/173977.json?api_key=A7E6OGD6QRIAVVK7
Loading from http://thingspeak.com/channels/667402/feeds.json?start=2019-10-11T11:43:21Z&end=2019-10-11T20:02:48Z&api_key=A7E6OGD6QRIAVVK7
    Adding 137 records...
New cache has 166114 records, saving.
667402 :done!
Using cache
Using 11

    Adding 236 records...
New cache has 99215 records, saving.
689520 :done!
Using cache
Using 165780 records from cache
Loading from http://thingspeak.com/channels/689522/feeds/entry/165781.json?api_key=KRK2MEGVMD22YY59
Loading from http://thingspeak.com/channels/689522/feeds.json?start=2019-10-11T11:46:13Z&end=2019-10-11T11:47:25Z&api_key=KRK2MEGVMD22YY59
    Adding 0 records...
Loading from http://thingspeak.com/channels/689522/feeds/entry/173780.json?api_key=KRK2MEGVMD22YY59
Loading from http://thingspeak.com/channels/689522/feeds.json?start=2019-10-11T11:47:26Z&end=2019-10-11T20:05:40Z&api_key=KRK2MEGVMD22YY59
    Adding 228 records...
New cache has 166008 records, saving.
689522 :done!
Using cache
Using 116709 records from cache
Loading from http://thingspeak.com/channels/689525/feeds/entry/116710.json?api_key=NWFG4DF3K0KEYT15
Loading from http://thingspeak.com/channels/689525/feeds.json?start=2019-10-11T11:45:31Z&end=2019-10-11T11:47:00Z&api_key=NWFG4DF3K0KEYT15
    Adding 0 rec

718028 :done!
Using cache
Using 186312 records from cache
Loading from http://thingspeak.com/channels/718029/feeds/entry/186313.json?api_key=TRF8VHH9DWUKBT59
Loading from http://thingspeak.com/channels/718029/feeds.json?start=2019-10-11T11:49:56Z&end=2019-10-11T11:52:40Z&api_key=TRF8VHH9DWUKBT59
    Adding 0 records...
Loading from http://thingspeak.com/channels/718029/feeds/entry/194312.json?api_key=TRF8VHH9DWUKBT59
Loading from http://thingspeak.com/channels/718029/feeds.json?start=2019-10-11T11:52:41Z&end=2019-10-11T20:08:13Z&api_key=TRF8VHH9DWUKBT59
    Adding 229 records...
New cache has 186541 records, saving.
718029 :done!
Using cache
Using 199366 records from cache
Loading from http://thingspeak.com/channels/718030/feeds/entry/199367.json?api_key=2VDX6R4QQY92HH7A
Loading from http://thingspeak.com/channels/718030/feeds.json?start=2019-10-11T11:50:28Z&end=2019-10-11T11:51:51Z&api_key=2VDX6R4QQY92HH7A
    Adding 0 records...
Loading from http://thingspeak.com/channels/718030/feed

    Adding 135 records...
New cache has 85079 records, saving.
782718 :done!
Using cache
Using 120504 records from cache
Loading from http://thingspeak.com/channels/782719/feeds/entry/120505.json?api_key=QTNAHITKHEJ4C9I5
Loading from http://thingspeak.com/channels/782719/feeds.json?start=2019-10-11T11:53:34Z&end=2019-10-11T11:54:53Z&api_key=QTNAHITKHEJ4C9I5
    Adding 0 records...
Loading from http://thingspeak.com/channels/782719/feeds/entry/128504.json?api_key=QTNAHITKHEJ4C9I5
Loading from http://thingspeak.com/channels/782719/feeds.json?start=2019-10-11T11:54:54Z&end=2019-10-11T20:10:51Z&api_key=QTNAHITKHEJ4C9I5
    Adding 229 records...
New cache has 120733 records, saving.
782719 :done!
Using cache
Using 97013 records from cache
Loading from http://thingspeak.com/channels/782720/feeds/entry/97014.json?api_key=7ZN35W0OYQQ1A2ZC
Loading from http://thingspeak.com/channels/782720/feeds.json?start=2019-10-11T11:55:24Z&end=2019-10-11T11:56:36Z&api_key=7ZN35W0OYQQ1A2ZC
    Adding 0 recor

In [9]:
#Column names for the different types of sensors
PMS_heads_7 = ['time', 'entry_id', 'pm2_5', 'pm10', 's2_pm2_5', 's2_pm10', 'lat', 'long', 'voltage', 'channel_id']
PMS_heads_8 = ['time', 'entry_id', 'pm2_5', 'pm10', 's2_pm2_5', 's2_pm10', 'lat', 'long', 'voltage', 'gps_data', 'channel_id']
OPC_N2_heads_7 = ['time', 'entry_id', 'pm1', 'pm2_5', 'pm10', 'sample_period', 'lat', 'long', 'voltage', 'channel_id']
OPC_N2_heads_8 = ['time', 'entry_id', 'pm1', 'pm2_5', 'pm10', 'sample_period', 'lat', 'long', 'voltage', 'gps_data', 'channel_id']
PA_heads = ['time', 'entry_id', 'pm1', 'pm2_5', 'pm10', 'uptime', 'RSSI', 'temp', 'humidity', 'pm2_5_cf1', 'channel_id']

In [10]:
# creating a dictionary of the csv file names and their respective csv files
channel_name_dict = dict(zip(metadata_df['channel_name'], channel_data_list)) #where keys=filenames and values=dataframes

In [11]:
# Setting the column names for the different csv files based on the file name
for filename, dataframe in channel_name_dict.items():
                    
    rows,columns = dataframe.shape
    
    if (('AQ_' in filename) and (columns==11)):
        channel_name_dict[filename].columns = PMS_heads_8
        #print (filename, ': PMS_heads_8')
            
    elif(('AQ_' in filename) and (columns==10)):
        channel_name_dict[filename].columns = PMS_heads_7  
        #print (filename, ': PMS_heads_7')
            
    elif ('PA' in filename):
        channel_name_dict[filename].columns = PA_heads
        #print (filename, ': PA')            
            
    elif (('8A' in filename) or ('6F' in filename)):
        channel_name_dict[filename].columns = OPC_N2_heads_8   
        #print (filename, ': 8A/6F')
            
    else:
        channel_name_dict[filename].columns = OPC_N2_heads_7
        #print (filename, ': the rest')

In [12]:
#Adding latitude and longitude coordinates for Purple Air Sensors
for filename, dataframe in channel_name_dict.items():
    if ('PA_01' in filename): #International School Lubowa
        dataframe['lat'] = 0.2357
        dataframe['long'] = 32.5576
        #print (dataframe.head())
    elif ('PA_02' in filename): #Makerere
        dataframe['lat'] = 0.332050  #estimates
        dataframe['long'] = 32.570509
    elif ('PA_03' in filename): #Kabale
        dataframe['lat'] = -1.245 
        dataframe['long'] = 29.9892
    elif ('PA_04' in filename): #Bunamwaya
        dataframe['lat'] = 0.27
        dataframe['long'] = 32.558

In [21]:
def connect_mongo(db, host='localhost', port=27017, ):
    """ A util for making a connection to mongo """

    conn = MongoClient(host, port)
    return conn[db]

In [22]:
db =connect_mongo('channel_db')

In [23]:
count = 1

In [24]:
for filename,dataframe in channel_name_dict.items():
    
    dataframe['lat'] = dataframe['lat'].replace([0.000000,1000.000000], np.nan)
    dataframe['long'] = dataframe['long'].replace([0.000000,1000.000000], np.nan)
    
    records = json.loads(dataframe.T.to_json()).values()
    db.channeldata_collection.insert_many(records)
    print('Channel %d successful!' %count)
    
    count+=1

Channel 1 successful!
Channel 2 successful!
Channel 3 successful!
Channel 4 successful!
Channel 5 successful!
Channel 6 successful!
Channel 7 successful!
Channel 8 successful!
Channel 9 successful!
Channel 10 successful!
Channel 11 successful!
Channel 12 successful!
Channel 13 successful!
Channel 14 successful!
Channel 15 successful!
Channel 16 successful!
Channel 17 successful!
Channel 18 successful!
Channel 19 successful!
Channel 20 successful!
Channel 21 successful!
Channel 22 successful!
Channel 23 successful!
Channel 24 successful!
Channel 25 successful!
Channel 26 successful!
Channel 27 successful!
Channel 28 successful!
Channel 29 successful!
Channel 30 successful!
Channel 31 successful!
Channel 32 successful!
Channel 33 successful!
Channel 34 successful!
Channel 35 successful!
Channel 36 successful!
Channel 37 successful!
Channel 38 successful!
Channel 39 successful!
Channel 40 successful!
Channel 41 successful!
Channel 42 successful!
Channel 43 successful!
Channel 44 successfu