![](image/futur.png)

# PART 5 -Serverless Architecture with Amazon Lambda
🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶λ🐶
### The Vision
**As Data Sciensits, What don't we just let Amazon (or any other cloud services) deal with the data pipes, so we can spend more time focusing on what we're good at.**

## What Is AWS Lambda?
λ🐶
**AWS Lambda** is a compute service where you can upload your code to AWS Lambda and the service can run the code on your behalf using AWS infrastructure. After you upload your code and create a Lambda function, **AWS Lambda** takes care of provisioning and managing the servers that you use to run the code.

**AWS Lambda** runs your code on a high-availability compute infrastructure and performs all of the administration of the compute resources, including server and operating system maintenance, capacity provisioning and automatic scaling, code monitoring and logging. **All you need to do is supply your code!**

![](image/lambdaSNS.png)

## What is Amazon Simple Notification Service?
**Amazon Simple Notification Service (Amazon SNS)** is a web service that coordinates and manages the delivery or sending of messages to subscribing endpoints or clients. In Amazon SNS, there are two types of clients—publishers and subscribers—also referred to as producers and consumers. Publishers communicate asynchronously with subscribers by producing and sending a message to a topic, which is a logical access point and communication channel. Subscribers (i.e., web servers, email addresses, Amazon SQS queues, AWS Lambda functions) consume or receive the message or notification over one of the supported protocols (i.e., Amazon SQS, HTTP/S, email, SMS, Lambda) when they are subscribed to the topic.

![](image/sns.png)

### Application Flow Summary
1. One of the 160 NEXRAD radars uploads an object to the source bucket in Amazon S3 (object-created event).
2. Amazon S3 detects the object-created event.
3. Amazon S3 publishes the s3:ObjectCreated:* event to AWS Lambda by invoking the Lambda function and passing event data as a function parameter.
4. AWS Lambda executes the Lambda function by assuming the execution role that you specified at the time you created the Lambda function.
5. From the event data it receives, the Lambda function knows the source bucket name and object key name. The Lambda function runs and saves the output to the target bucket.

![](image/s3.png)

### Resources Needed for Lambda Implementation

- AWS account with admin privileges
- Source and target S3 Bucket
- Lambda function (your code)
- Zip file of library dependencies


![](image/resources.png)

## Lambda Function: The only code you have to write

In [None]:
from __future__ import print_function

import json
import boto3
from utils import processData

print('Loading function')

s3c = boto3.client('s3')
s3r = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Get NEXRAD data from it's S3 bucket
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    
    keyFile = key.replace("/","_")[:-3]
    
    try:
        response = s3c.get_object(Bucket=b, Key=k)
        print("CONTENT TYPE: " + response['ContentType'])
        
        ## Add Function of your choice
        ##############################
        data = processData(Bucket, Key)
        ##############################
        s3r.Object('data-eng-project', 'stream/' + keyFile).put(Body = data )
        
        return Bucket, Key, response['ContentType']
        
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}.'.format(key, bucket))
        raise e

## Amazon CouldWatch Dashboard!

![](image/dash2.png)

In [2]:
from IPython.display import IFrame
%matplotlib inline

import matplotlib.pyplot as plt
import numpy.ma as ma
import numpy as np
import pyart.graph
import tempfile
import pyart.io
import boto
import geopy
import math
import re
import csv
from geopy.distance import VincentyDistance

from subprocess import call

In [3]:
from boto.s3.connection import S3Connection
aws_access_key_id = "AKIAJN3VR7WUHQ4DW57Q"
aws_secret_access_key = "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"
s3conn = S3Connection(aws_access_key_id, aws_secret_access_key)

In [4]:
# Generates a regular expression with which to grab the files we want from AWS

def generate_regex(year=False,month=False,day=False,station=False):
    y = str(year) if year else "\d*"
    m = str(month) if month else "\d*"
    d = str(day) if day else "\d*"
    s = station if station else ".*"
    return y+"\/"+m+"\/"+d+"\/"+s+".*\.gz"

# Creates a short cut to limit the numebr of keys we need to search to get the files we want 
def generate_short_cut_path(year,month=False,day=False,station=False):
    shortcutTemplate = str(year)
    if month:
        shortcutTemplate += "/" + str(month)
        if day:
            shortcutTemplate += "/" + str(day)
            if station:
                shortcutTemplate += "/" + station
    return shortcutTemplate

# Grab a list of files we want from the NEXRad data set
def grab_list_of_files(year,month=False,day=False,station=False):
    s3conn = S3Connection(aws_access_key_id, aws_secret_access_key)
    bucket   = s3conn.get_bucket('noaa-nexrad-level2')
    regex    = generate_regex(year,month,day,station)
    shortcut = generate_short_cut_path(year,month,day,station)
    print shortcut
    print regex
    keys = [key.key for key in bucket.list(shortcut) if re.match(regex,key.key)  ]

    return keys
    

In [5]:
# Use Boto to grab the file from s3 and load it in to pyart
def grab_and_process_radar(key):
    s3conn = S3Connection(aws_access_key_id, aws_secret_access_key)
    bucket = s3conn.get_bucket('noaa-nexrad-level2')
    s3key = bucket.get_key(key)
    localfile = tempfile.NamedTemporaryFile()
    s3key.get_contents_to_filename("data.gz")
    call(["gunzip", "data.gz"])
    radar = pyart.io.read_nexrad_archive("data")
    return radar

In [6]:
def filter_data(radar):
    refl_grid = radar.get_field(0, 'reflectivity')
    rhohv_grid = radar.get_field(0, 'cross_correlation_ratio')
    zdr_grid = radar.get_field(0, 'differential_reflectivity')

    # apply rudimentary quality control
    reflow = np.less(refl_grid, 20)
    zdrhigh = np.greater(np.abs(zdr_grid), 2.3)
    rhohvlow = np.less(rhohv_grid, 0.95)
    notweather = np.logical_or(reflow, np.logical_or(zdrhigh, rhohvlow))

    qcrefl_grid = ma.masked_where(notweather, refl_grid)
    qced = radar.extract_sweeps([0])
    qced.add_field_like('reflectivity', 'reflectivityqc', qcrefl_grid)
    return qced

In [7]:
def offset_by_meters(x,y,lat,lon):
    if x==y==0:
        return lat,lon
    dist = math.sqrt(x*x+y*y)
    bearing = math.atan2(y,x)

    origin = geopy.Point(lat, lon)
    destination = VincentyDistance(meters=dist).destination(origin, math.degrees(bearing))

    lat2, lon2 = destination.latitude, destination.longitude    
    return lat2,lon2

In [8]:

def save_as_csv(filename,data,level, append, extent=300, points=100):

    grids = pyart.map.grid_from_radars(
        (data,),
        grid_shape=(11, points, points),
        grid_limits= ((0, 11000), (-extent*1000.0, extent*1000.0), (-extent*1000.0, extent*1000.0)),
        fields=['reflectivityqc'],
        refl_field='reflectivityqc',
        max_refl=100.)
    center  = [grids.axes["lat"]["data"][0], grids.axes["lon"]["data"][0]]
    date    = grids.axes['time']["units"].replace( "seconds since ","")
    print date
    
    ref = grids.fields["reflectivityqc"]["data"][level]
    
    x_dists = grids.axes["x_disp"]["data"]
    y_dists = grids.axes["y_disp"]["data"]
    
    data    = np.array(grids.fields["reflectivityqc"]["data"][level])
    
    if append:
        csvfile =  open(filename, 'ab')
    else:
        csvfile =  open(filename, 'wb')
    writer = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
    
    if not append:
        writer.writerow(["lat", "lon", "value","date"])
    for (ix,iy), value in np.ndenumerate(data):
        if value != -9999.0:
            x = x_dists[ix]
            y = y_dists[iy]
            lat, lon = offset_by_meters(x,y,center[0],center[1])
            writer.writerow([lat,lon,value,date])
    return data
    

In [9]:
#keys = grab_list_of_files("2015", "05", "15", station="KVWX")
keys = grab_list_of_files("2014", "07", "03", station="KMHX")
#keys = grab_list_of_files("2005", "08", "29", station="KLIX") # Hurricane Katrina 2005/08/29/
print "got ", len(keys)
print len(keys[:60])

2014/07/03/KMHX
2014\/07\/03\/KMHX.*\.gz
got  254
60


In [10]:
for key in keys[100:200]:
    print "doing file ", key

    radar  = grab_and_process_radar(key)
    print radar.get_field(0, 'cross_correlation_ratio') 
    filtered_data = filter_data(radar)
    print filtered_data
    append = (key != keys[0])
    data = save_as_csv("result.csv", filtered_data, 5 ,append)
    plt.imshow(data)
    call(["rm", "data.gz"])
    call(["rm", "data"])

In [11]:
IFrame('https://marvinbertin.cartodb.com/viz/bff84faa-e3fa-11e5-aa36-0e5db1731f59/embed_map', width=800, height=600)

## Version 2.0

- Include forcasting analytics
- Port the rest of the project into a AWS Lambda framework
- Hadoop computations into Spark or Flink
- Create a web interface