# Get 5 min volume for Feb 2018 from Unprocessed Speed

    Created by: Apoorba Bibeka
    Date: March 20, 2019

## Import required modules

In [1]:
import getpass
import boto3
import pandas as pd
from impala.dbapi import connect
from impala.util import as_pandas
from io import BytesIO as StringIO

## Connect to the database

In [3]:
username = "abibeka@securedatacommons.com"
password1 = getpass.getpass()

········


In [5]:
conn = connect(host="172.18.1.20",auth_mechanism='PLAIN',port=10000,user=username, password=password1)

## Look at the device ids from "wydot_speed_sensors_index"

In [4]:
cursor = conn.cursor()
cursor.execute('select * from wydot_speed_sensors_index')
# Store the index data tanle
SSindex = as_pandas(cursor)
# Remove "wydot_speed_sensors_index." from the col names
Rename1 ={x:x.split('wydot_speed_sensors_index.')[1] for x in SSindex}
SSindex=SSindex.rename(index=str,columns=Rename1)
print(SSindex['deviceid'].values,SSindex.columns)

(array([ 382,  383,  393,  394,  395,  396,  398,  400,  405,  407,  408,
        411,  384,  482, 1075, 1084, 1100, 1134, 1145, 1153, 1167, 1219,
       1231,  385, 1241, 1251, 1258, 1269, 1280, 1327, 1342, 1837, 1838,
       1839,  386, 2020, 2032, 2049, 2070, 2079, 2090, 2146, 2147, 2178,
       2191,  387, 2202, 2213, 2246, 2263, 2274, 2289, 2298, 2310, 2319,
       2334,  388, 2346, 2359, 2372, 2383, 2395, 2409, 2421, 2433, 2445,
       2578,  389, 2607, 2609, 2916, 3236, 3243, 3249, 3296, 3402, 3482,
       3654,  390, 3897, 3899, 3901, 3903, 3905, 3907, 3909, 3911,  391],
      dtype=int64), Index([u'deviceid', u'lat_decimal', u'long_decimal', u'road_code', u'sitename',
       u'devicename', u'sensortype', u'public_route', u'gis_route',
       u'direction', u'milepost', u'sensor_loc', u'nearest_rwis', u'rwis',
       u'backup_rwis', u'2015_adt', u'vsl_id', u'eb_vsl', u'wb_vsl',
       u'horiz_d', u'horiz_i', u'vert_i', u'vert_d', u'notes'],
      dtype='object'))


### Subset data for Lamarie & Cheyenne

In [None]:
SS2index=SSindex.loc[(SSindex['milepost']>=314)&(SSindex['milepost']<=360)]
SS2index = SS2index[["deviceid","milepost","direction","2015_adt"]].copy()
print(SS2index['deviceid'].values)

## Get the index Data (314 <= Milepost <=360) from S3 bucket (Source: WyDoT Report)

In [17]:
#Data I got from a Report
client=boto3.client('s3')
# Get the data from our bucket directly
obj=client.get_object(Bucket='prod-sdc-tti-911061262852-us-east-1-bucket',Key="abibeka/uploaded_files/SpeedSen_MP_314_360.csv")
AB_SS=pd.read_csv(obj['Body'])
AB_SS.head()

Unnamed: 0,DEVICEID,SITENAME,MP,Sensor_Loc
0,396,Laramie East,317.68,EB
1,3911,Telephone Canyon,320.7,WB
2,395,Summit,322.05,WB
3,2146,Summit,322.6,WB
4,2147,Summit,323.85,EB


## Compare index from WyDOT Report and Database

In [8]:
# Return set with elements in either the set or ohter but not both
print(set(AB_SS['DEVICEID'].values) ^ set(SS2index['deviceid'].values))
# Return intersection 
print(set(AB_SS['DEVICEID'].values) & set(SS2index['deviceid'].values))

set([])
set([384, 385, 386, 387, 388, 389, 2310, 391, 393, 394, 395, 396, 2178, 2191, 2202, 3654, 1839, 2246, 3911, 2146, 2263, 2319, 390, 2213, 2274, 2147, 2289, 2298, 383])


## Get relevant columns and Rows from the Index Data

In [18]:
SS2index=SS2index.sort_values(by="milepost")
SS2index=SS2index.reset_index(drop=True)
SS2index.head()

Unnamed: 0,deviceid,milepost,direction,2015_adt
0,396,317.68,B,13890
1,3654,318.5,D,13890
2,3911,320.7,B,13890
3,395,322.05,B,13890
4,2146,322.6,B,13890


## Initial Approach to get Volume Data (Not good - Can Aggregate Directly on Hive)

In [11]:
#create_query='''SELECT t1.controller, t1.mountain, t2.direction, t2.milepost, t2.2015_adt FROM 
#(SELECT * FROM wydot_speed_unprocessed WHERE mountain between '2018-02-01' AND '2018-02-28') t1
#JOIN wydot_speed_sensors_index t2 
#ON (t1.controller = t2.deviceid)'''

#create_query='''SELECT t1.controller, t1.mountain, t2.direction, t2.milepost, t2.2015_adt FROM 
#(SELECT * FROM wydot_speed_unprocessed WHERE mountain between '2018-02-01' AND '2018-02-10') t1
#JOIN wydot_speed_sensors_index t2 
#ON (t1.controller = t2.deviceid)'''
#cursor.execute(create_query)
#Vol_dat=as_pandas(cursor)
#Vol_dat.to_csv("C:/Users/abibeka/Documents/Hive-SDC/Vol_dat.csv")
#print(cursor.fetchall())

## $2^{nd}$ Approach to get Count Data

### Specify Start and End Date for Data Aggregation

In [20]:
start_date='2018-02-01'
end_date='2018-02-28' 

In [24]:
create_query='''SELECT t1.controller, t1.Time5M,t1.NRec, t2.direction, t2.milepost, t2.2015_adt FROM 
    (SELECT controller, FROM_UNIXTIME(CEILING(UNIX_TIMESTAMP(mountain)/300)*300) AS Time5M, count(*) AS NRec
    FROM wydot_speed_unprocessed 
    WHERE mountain between '{}' AND '{}'
    GROUP BY controller, FROM_UNIXTIME(CEILING(UNIX_TIMESTAMP(mountain)/300)*300)) t1
JOIN (SELECT * FROM wydot_speed_sensors_index WHERE milepost BETWEEN 314 AND 360) t2 
ON (t1.controller = t2.deviceid)
ORDER BY t1.controller, t1.Time5M'''.format(start_date,end_date)
cursor.execute(create_query)
Vol_dat2=as_pandas(cursor)
Vol_dat2.head()

Unnamed: 0,t1.controller,t1.time5m,t1.nrec,t2.direction,t2.milepost,t2.2015_adt
0,384,2018-02-01 00:05:00,20,I,325.8,13890
1,384,2018-02-01 00:10:00,9,I,325.8,13890
2,384,2018-02-01 00:15:00,17,I,325.8,13890
3,384,2018-02-01 00:20:00,17,I,325.8,13890
4,384,2018-02-01 00:25:00,10,I,325.8,13890


In [None]:
# Store the data to Z drive
Vol_dat2.to_csv("Z:/Apoorb/Vol_dat_V1.csv")

In [15]:
print(Vol_dat2['t1.controller'].unique(),SS2index['deviceid'].unique()) 

(array([ 384,  385,  386,  387,  388,  389,  390,  391,  396, 1839, 2146,
       2147, 2178, 2191, 2202, 2213, 2246, 2263, 2274, 2289, 2298, 2310,
       2319, 3654, 3911], dtype=int64), array([ 396, 3654, 3911,  395, 2146, 2147,  394,  383,  384,  385, 2178,
        393, 2191, 2202, 2213,  386,  387,  388,  389, 2246,  390, 2263,
        391, 2274, 2289, 2298, 2310, 2319, 1839], dtype=int64))


## Check for Missing Index

In [38]:
# Return set with sensor ids missing from the database:
print(set(SS2index['deviceid'].values)-set(Vol_dat2['t1.controller'].values))

set([393, 394, 395, 383])


## Write file directly to my Bucket

    Note: For Python 2.7 we imported BytesIO as StringIO
    Don't need to do this for Python 3.x
    Just use import StringIO as StringIO

In [36]:
csv_buffer=StringIO()
Vol_dat2.to_csv(csv_buffer,sep=",",index=False)
s3=boto3.resource('s3')
s3.Object('prod-sdc-tti-911061262852-us-east-1-bucket','abibeka/Vol_dat.csv').put(Body=csv_buffer.getvalue())

{u'ETag': '"baef08d4afcb8f83143115e981d43b74"',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '0',
   'date': 'Thu, 21 Mar 2019 18:34:45 GMT',
   'etag': '"baef08d4afcb8f83143115e981d43b74"',
   'server': 'AmazonS3',
   'x-amz-id-2': '6jCOIhRUbMZoxqzyl95lFjoPY4wFVrwMq1VZWshckOBh1Oz9a8/Y5nOahOrhLVxsEyNqRLajin4=',
   'x-amz-request-id': '758A61AF86B4E780'},
  'HTTPStatusCode': 200,
  'HostId': '6jCOIhRUbMZoxqzyl95lFjoPY4wFVrwMq1VZWshckOBh1Oz9a8/Y5nOahOrhLVxsEyNqRLajin4=',
  'RequestId': '758A61AF86B4E780',
  'RetryAttempts': 0}}

# Get data divided by lane and vehicle type

In [14]:
c = '''select * from wydot_speed_unprocessed LIMIT 5'''
cursor.execute(c)
as_pandas(cursor)

Unnamed: 0,wydot_speed_unprocessed.utc,wydot_speed_unprocessed.mountain,wydot_speed_unprocessed.controller,wydot_speed_unprocessed.lane,wydot_speed_unprocessed.datasource,wydot_speed_unprocessed.durationms,wydot_speed_unprocessed.speedmph,wydot_speed_unprocessed.lengthft,wydot_speed_unprocessed.vehclass
0,2018-04-18 06:17:03,2018-04-18 00:17:03,1075,1,14,435,,14.16,1
1,2018-04-18 06:52:20,2018-04-18 00:52:20,1075,1,14,461,,15.39,1
2,2018-04-18 07:41:49,2018-04-18 01:41:49,1075,1,14,11989,,6.0,0
3,2018-04-18 08:03:02,2018-04-18 02:03:02,1075,1,14,694,,26.17,2
4,2018-04-18 09:24:06,2018-04-18 03:24:06,1075,1,14,293,,7.6,1


In [25]:
start_date='2018-02-01'
end_date='2018-02-28'
create_query='''SELECT t1.controller, t1.lane, t1.vehclas, t1.Time5M,t1.NRec, t2.direction, t2.milepost, t2.2015_adt FROM 
    (SELECT controller, FROM_UNIXTIME(CEILING(UNIX_TIMESTAMP(mountain)/300)*300) AS Time5M, lane, vehclass, count(*) AS NRec
    FROM wydot_speed_unprocessed 
    WHERE mountain between '{}' AND '{}'
    GROUP BY controller, lane, vehclass,FROM_UNIXTIME(CEILING(UNIX_TIMESTAMP(mountain)/300)*300)) t1
JOIN (SELECT * FROM wydot_speed_sensors_index WHERE milepost BETWEEN 314 AND 360) t2 
ON (t1.controller = t2.deviceid)
ORDER BY t1.controller, t1.Time5M, t1.lane, t1.vehclas'''.format(start_date,end_date)
cursor.execute(create_query)
Vol_dat3=as_pandas(cursor)

HiveServer2Error: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 8:35 Invalid table alias or column reference 'lane': (possible column names are: t1.controller, t1.time5m, t1.nrec, t2.direction, t2.milepost, t2.2015_adt)

In [22]:
Vol_dat2.head()

Unnamed: 0,t1.controller,t1.time5m,t1.nrec,t2.direction,t2.milepost,t2.2015_adt


In [None]:
# Store the data to Z drive
Vol_dat2.to_csv("Z:/Apoorb/Vol_dat_V1.csv")