## Load from CVS
The code in this notebook copies the cvs files for beaked whales from S3 to HDFS and then loads the data into 
a spark DataFrame.


In [None]:
#sc.stop()

In [None]:
from pyspark import SparkContext
sc = SparkContext(master=master_url, pyFiles=['lib/numpy_pack.py','lib/row_parser.py','lib/spark_PCA.py'])

from pyspark.sql import Row, SQLContext,DataFrame
from pyspark.sql.types import *

sqlContext = SQLContext(sc)

%pylab inline

In [None]:
#!pip install --upgrade pip

#!sudo /usr/local/bin/pip install pandas

#!sudo /usr/local/bin/pip install scipy
!sudo /usr/local/bin/pip install --upgrade numpy

%load_ext autoreload
%autoreload # use this magic to reload modules

#import pandas as pd
import datetime as dt

#import scipy
#from scipy.io import loadmat,savemat,whosmat

from string import split
from collections import Counter
import re
import numpy as np
from numpy import shape

from glob import glob

print 'numpy version=',np.__version__, 'should be > 1.10'
#print 'scipy version=',scipy.__version__
#print 'pandas version=',pd.__version__

### Format of cvs files
|field name     | Description               | Data Type
|---------------|---------------------------|--------------
|0: time        | time of click             | string in datetime format `%Y-%m-%d %H:%M:%S.%f`
|1: species		| Initial species classification	        | 'str'
|2: site		| name of site		        | 'str'
|3: rec_no		| recording number		    | 'str'
|4: bout_i		| bout number		        | numpy.int64
|5: peak2peak	| peak to peak magnitude    | 			numpy.float64
|6: MSN	        |	wave form |		 an array of length 202
|208: MSP		|	spectra |	 an array of length 101  
|309: TPWS1		| 1 if click appears in TPWS1	| 	bool
|310: MD1		|	--- " ---	in MD1|	bool
|311: FD1	    |	--- " ---	in FD1|	bool
|312: TPWS2		| 1 if click appears in TPWS2	| 	bool
|313: MD2		|	--- " ---	in MD2|	bool
|314: FD2	    |	--- " ---	in FD2|	bool
|315: TPWS3		| 1 if click appears in TPWS3	| 	bool
|316: MD3		|	--- " ---	in MD3|	bool
|317: FD3	    |	--- " ---	in FD3|	bool
total number of fields= 318


In [None]:
%autoreload # use this magic to reload modules

### check  S3 contents

In [None]:
#%cd /root/ipython/BeakedWhaleClassification/

In [None]:
#remember to set credentials under "setup S3" in the cluster setup page
s3helper.open_bucket('while-classification')
s3helper.ls_s3()

In [None]:
dirs=s3helper.ls_s3('CVS')
dirs[:10]

In [None]:
from time import time

### Copy from S3 to HDFS

In [None]:
t1=time()
s3helper.s3_to_hdfs('CVS', 'CVS')
time()-t1


### Read data into dataframe

In [None]:
# %load lib/row_parser.py
from pyspark.sql import Row, SQLContext,DataFrame
from pyspark.sql.types import *
import datetime as dt

def packArray(a):
    if type(a)!=np.ndarray:
        raise Exception("input to packArray should be numpy.ndarray. It is instead "+str(type(a)))
    return bytearray(a.tobytes())
def unpackArray(x,data_type=np.int16):
    return np.frombuffer(x,dtype=data_type)

def init_parser_parameters():
    def parse_date(s):
        return dt.datetime.strptime(s,'%Y-%m-%d %H:%M:%S.%f')
    def parse_array(a):
        np_array=np.array([np.float64(x) for x in a])
        return packArray(np_array)
    def parse_int(s):
        return int(s)
    def parse_float(s):
        return float(s)
    def parse_string(s):
        return(s)

    Fields=[('time', 'datetime'),
        ('species', 'str'),
        ('site', 'str'),
        ('rec_no', 'str'),
        ('bout_i', 'int'),
        ('peak2peak', 'float'),
        ('MSN', 'array',202),
        ('MSP', 'array',101),
        ('TPWS1', 'bool'),
        ('MD1', 'bool'),
        ('FD1', 'bool'),
        ('TPWS2', 'bool'),
        ('MD2', 'bool'),
        ('FD2', 'bool'),
        ('TPWS3', 'bool'),
        ('MD3', 'bool'),
        ('FD3', 'bool')]

    #global Parse_rules, RowObject
    #prepare date structure for parsing
    Parse_rules=[]
    index=0
    for field in Fields:
        _type=field[1]
        #print _type
        _len=1 # default length in terms of csv fields
        if _type =='array': 
            parser=parse_array
            _len=int(field[2])
        elif _type=='datetime': 
            parser=parse_date
        elif _type=='int': 
            parser=parse_int
        elif _type=='float': 
            parser=parse_float
        elif _type=='bool': 
            parser=parse_int
        elif _type=='str': 
            parser=parse_string
        else:
            print 'unrecognized type',_type
        rule={'name':field[0],
              'start':index,
              'end':index+_len,
              'parser':parser}
        print field,rule
        Parse_rules.append(rule)
        index+=_len

    field_names=[a['name'] for a in Parse_rules]
    RowObject= Row(*field_names)
    return Parse_rules,field_names,RowObject


In [None]:
import sys
sys.path.append('lib')
from row_parser import *

Parse_rules,field_names,RowObject = init_parser_parameters()


In [None]:
def parse(row):
    #Parse_rules,field_names,RowObject = parser_data.value
    items=row.split(',')
    D=[]
    for pr in Parse_rules:
        start=pr['start']
        end=pr['end']
        parser=pr['parser']
        if end-start==1:
            D.append(parser(items[start]))
        else:
            D.append(parser(items[start:end]))
    return RowObject(*D)

In [None]:
from pyspark.sql import DataFrame
t1=time()
CVS_Data=sc.textFile("/CVS/")
CVS_Data.cache().count()
print time()-t1

In [None]:
row=CVS_Data.first()

In [None]:
print 'a row:\n',row

parse(row)

In [None]:
RDD=CVS_Data.map(parse)

In [None]:
RDD.take(1)

In [None]:
df=sqlContext.createDataFrame(RDD)
df.show()

In [None]:
t0=time()
print df.cache().count()
print time()-t0

t0=time()
print df.count()
time()-t0

In [None]:
5.*350/60.

In [None]:
A=df.first()

In [None]:
len(A)

In [None]:
type(A)

In [None]:
D=A.asDict()

In [None]:
t=0
for k in D.keys():
    t+=size(D[k])
    print k, size(D[k]),t

In [None]:
2439.*6353182./1000000000

In [None]:
import datetime as dt

from string import split
from collections import Counter
import re
import numpy as np
from numpy import shape

from glob import glob
from time import time

In [None]:
from row_parser import unpackArray
import numpy
def g(row):
    return unpackArray(row[field],data_type=numpy.float64)
def unpackArray(x,data_type=numpy.int16):
    return numpy.frombuffer(x,dtype=data_type)
L=df.take(20)
field='MSP'
for a in L:
    plot(g(a))
title(field);

In [None]:
field='MSN'
for a in L:
    plot(g(a))
title(field);

In [None]:
from time import time
from spark_PCA import *


In [None]:
def unpack(bytearray):
    return unpackArray(bytearray,data_type=numpy.float64)
feature='MSP'
rdd=df.rdd.map(g)
print type(rdd)
rdd.first()[:4]

In [None]:
t0=time()
rdd.cache().count()
print time()-t0

In [None]:
t0=time()
COV=computeCov(rdd)
print time()-t0