In [92]:
import boto3
import botocore
import os
import pandas as pd
import numpy as np

In [84]:
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']

In [85]:
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']

## AWS (S3, Redshift, Kinesis) + Databricks Spark = Real-time Smart Meter Analytics

**Create S3 Bucket**

In [30]:
s3 = boto3.client('s3')

In [45]:
s3.list_buckets()

{u'Buckets': [{u'CreationDate': datetime.datetime(2016, 1, 26, 1, 47, 19, tzinfo=tzutc()),
   u'Name': 'doug62890'},
  {u'CreationDate': datetime.datetime(2015, 6, 5, 7, 3, 45, tzinfo=tzutc()),
   u'Name': 'elasticbeanstalk-us-west-2-473548050994'},
  {u'CreationDate': datetime.datetime(2016, 3, 7, 2, 45, 37, tzinfo=tzutc()),
   u'Name': 'pecanstreetresearch-2016'}],
 u'Owner': {u'DisplayName': 'dkelly628',
  u'ID': 'bda9f00a638e7c5c17498b033a6ce34b124be90c6ae542abc73a64f4d56f1913'},
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'HostId': 'tvKe4ptiqACT0S0ZzK6+LpC36DZt2oLwSB/v8NHI7bs3smVkTQ6MkR+OQXspINn1',
  'RequestId': 'DFAFD1582C9475BF'}}

In [76]:
def create_s3_bucket(bucketname):
    """Quick method to create bucket with exception handling"""
    s3 = boto3.resource('s3')
    exists = True
    bucket = s3.Bucket(bucketname)
    try:
        s3.meta.client.head_bucket(Bucket=bucketname)
    except botocore.exceptions.ClientError as e:
        error_code = int(e.response['Error']['Code'])
        if error_code == 404:
            exists = False
    if exists:
        print 'Bucket {} already exists'.format(bucketname)
    else:
        s3.create_bucket(Bucket=bucketname, GrantFullControl='dkelly628')

In [77]:
create_s3_bucket('pecanstreetresearch-2016')

Bucket pecanstreetresearch-2016 already exists


**Create Amazon Firehose for writing to S3**

In [80]:
firehose = boto3.client('firehose')

**Copy Postgres to S3 via Postgres dump to CSV and s3cmd upload**

In [None]:
# Note: Used s3cmd tools because awscli tools not working in conda env

In [89]:
# 14m rows or ~ 1.2 GB local unzipped; 10min write to CSV and another 10min to upload to S3
# !s3cmd put ~/Users/Doug/PecanStreet/electricity-03-06-2016.csv s3://pecanstreetresearch-2016/electricity-03-06-2016.csv

In [91]:
# 200k rows ~ 15 MB local unzipped; 30 sec write to CSV and 15 sec upload to S3
# !s3cmd put ~/Users/Doug/PecanStreet/weather-03-06-2016.csv s3://pecanstreetresearch-2016/weather-03-06-2016.csv

**Amazon Redshift: NoSQL Columnar Data Warehouse**

In [93]:
# Quick geohashing before uploading to Redshift
weather_df = pd.read_csv('/Users/Doug/PecanStreet/weather_03-06-2016.csv')

In [99]:
weather_df.groupby(['latitude', 'longitude']).count()

Unnamed: 0_level_0,Unnamed: 1_level_0,localhour,temperature
latitude,longitude,Unnamed: 2_level_1,Unnamed: 3_level_1
30.292432,-97.699662,45336,45336
32.778033,-117.151885,45384,45384
40.027278,-105.256111,45384,45384


In [None]:
weather_df['city'] = weather_df['Austin' if weather_df.latitude=30.292432 elif '']

In [105]:
weather_df['city'] = 'city'

In [116]:
weather_df.city.unique()

array(['Austin', 'Boulder', 'San Diego'], dtype=object)

In [117]:
# weather_df['city'][weather_df.latitude==40.027278] = 'Boulder'

In [119]:
weather_df.to_csv('/Users/Doug/PecanStreet/weather_03-07-2016.csv')

In [None]:
redshift = boto3.client('redshift')

create table electricity (
localhour timestamp not null distkey sortkey,
dataid smallint not null,
use
air1
furnace1
car1
);

create table weather (
localhour timestamp not null distkey sortkey,
latitude,
longitude,
temperature,
);

create table households (
dataid smallint not null distkey,
city varchar(20) not null
);

**Databricks Spark Analysis: Batch analytics on S3, Streaming using Amazon Kinesis Stream**