## Moving and deserializing Data

* When you need to process a lot of data, a big part of the execution time of your program is devoted to moving the data between storage units.

* This notebook is **NOT** intended to be run on your personal computer. It is intended to show you the main steps needed when processing a large file on a multi-computer cluster. The main emphasis here is on the **Wall time** required for different operations. Please refer to the videos on the output of each cell. 

## Some terminology

### Data Serialization.
* Data in memory is usually stored in **data structures** that allow for fast manipulation. This often means that the amount of memory needed is significantly larger than the amount that would be needed to store the same data on disk.
* We say that the data on disk is **serial** and the data stored in data structures is **deserialized**

### AWS-EMR
We demonstrate the movement of data on "Amazon Web Services" (AWS) "Elastic Map Reduce" (EMR).

Recall the slide about data organization in the video "a short history of affordable massive computing"  In the next figure we add to that slide the way it fits within AWS-EMR.

#### Three file systems: 
* **S3:** long term persistent memory. 
* **Head Node:** standard Unix file system. 
* **HDFS:** distributed file system on the workers.

<img alt="" src="Figures/AWS-EMR-S3.png" style="height:455px;width:800px" />

## Reading a CSV file from S3

We start with a CSV file on S3, which we move through the head node to HDFS and than parse into a spark RDD.

### Moving a file from S3 to the head node

We start with a CSV file on S3, which we move through the head node to HDFS and than parse into a spark RDD.

* Serial to Serial

In [None]:
%cd ~/Data/
!ls

In [None]:
#create directory to hold data one node
!mkdir Weather
%cd Weather/

In [None]:
#list files on S3
!aws s3 ls s3://dse-weather/ALL.csv.gz
#compressed file is about 1.5GB

In [None]:
%%time
#copy file from S3
!aws s3 cp s3://dse-weather/ALL.csv.gz ./ALL.csv.gz

In [None]:
%%time
#unompress file
!rm ALL.csv
!gunzip ALL.csv.gz

In [None]:
!ls -l ALL.csv
# About 7.7 GB

In [None]:
!head -2 ALL.csv

## Distribute file into HDFS

copy file from the head-node file system to HDFS

* Serial to Serial

In [None]:
%%time
!hadoop fs -mkdir /weather

In [None]:
%%time
#create a data directory on hdfs
!hadoop fs -copyFromLocal ALL.csv hdfs:///weather/weather.csv

In [None]:
!hadoop fs -ls /weather

### Read csv file into an RDD

* Serial to Serial

In [None]:
%cd /mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Section2-PCA/PCA/data_preparation/ 
!ls lib

In [None]:
%pwd
!ls -l lib/numpy_pack.py

In [None]:
%%time
%pwd
from pyspark import SparkContext
sc = SparkContext(pyFiles=['/mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Section2-PCA/PCA/data_preparation/lib/numpy_pack.py'])

In [None]:
%%time
RDD=sc.textFile('/weather/weather.csv')

In [None]:
%%time
RDD.count()

In [None]:
%%time
RDD.count()

In [None]:
%%time
#re-open HDFS file to get accurate time measurements.
RDD=sc.textFile('/weather/weather.csv')

In [None]:
fs_file="/mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Data/Weather/ALL.csv"
!ls -l $fs_file

In [None]:
%%time
with open(fs_file,'r') as f:
    text=f.readlines()
print(len(text))

It is not always better to use multiple computers

[Scalability, but at what cost!](https://www.usenix.org/system/files/conference/hotos15/hotos15-paper-mcsherry.pdf) / Frank McSherry, Michael Isard and Derek G. Murray

* So far we just moved data aroud, in a serialized format. Next, we will perform some desrialization.

## Deserialization

Transforming the RDD that contains lines of text into an RDD where each element is a data structure:

* Parsing
* Error Detection
* Casting data into types.

### Code for packing and unpacking byte arrays

In [None]:
import numpy as np
"""Code for packing and unpacking a numpy array into a byte array.
   the array is flattened if it is not 1D.
   This is intended to be used as the interface for storing 
   
   This code is intended to be used to store numpy array as fields in a dataframe and then store the 
   dataframes in a parquet file.
"""

def packArray(a):
    """
    pack a numpy array into a bytearray that can be stored as a single 
    field in a spark DataFrame

    :param a: a numpy ndarray 
    :returns: a bytearray
    :rtype:

    """
    if type(a)!=np.ndarray:
        raise Exception("input to packArray should be numpy.ndarray. It is instead "+str(type(a)))
    return bytearray(a.tobytes())

def unpackArray(x,data_type=np.float16):
    """
    unpack a bytearray into a numpy.ndarray

    :param x: a bytearray
    :param data_type: The dtype of the array. This is important because if determines how many bytes go into each entry in the array.
    :returns: a numpy array
    :rtype: a numpy ndarray of dtype data_type.

    """
    return np.frombuffer(x,dtype=data_type)

### range values
Using code that was removed we find that the range of values is 

`-1000.0, 97892.0` 

which means that as ints we will need 32 but, but with float we can use just 16.

In [None]:
#main parsing code

import numpy as np
def parse_weather(line):
    L=line.split(',')
    try:
        assert len(L)==368
        i=2
        L[i]=int(L[i])
        for i in range(3,368):
            if L[i]!='':
                L[i]=np.float16(L[i])
            else:
                L[i]=np.nan
    except:
        #if error in parsing, return (1, input line)
        return (1,line)
    Out=L[:3]
    Out.append(packArray(np.array(L[3:],dtype=np.float16)))
    # if parsing OK, return (0, parsed data)
    return (0,Out)

In [None]:
#this cell demonstrates how to test the parse_weather function on an individual row.
Debug=False
if Debug:
    lines=RDD.take(10)
    GG=parse_weather(lines[-2])
    GG

In [None]:
%%time
Parsed=RDD.map(parse_weather).cache() # filter out bad rows which are mapped (1,line)
DATA=Parsed.filter(lambda x:x[0]==0).map(lambda x:x[1])
ERRORS=Parsed.filter(lambda x:x[0]==1).map(lambda x:x[1])

In [None]:
print(DATA.toDebugString().decode())

In [None]:
%%time
PRCP=DATA.filter(lambda row:row[1]=='PRCP')
print('PRCP records:',PRCP.count())

In [None]:
%%time
print('bad records:',ERRORS.count())
#all lines: 9358395
# only the first line (the header) is bad.
# Good lines: 9358394

In [None]:
DATA.take(1)

## Summary

We saw how to:

* copy from S3 to the head-node
* copy from the head node to HDFS
* Read from HDFS
* Parse and detect errors.
* Read into an RDD.
* Next: using DataFrames and Parquet files.

## Transform RDD into a Spark DataFrame

In [None]:
import os
import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, BinaryType, FloatType

# Just like using Spark requires having a SparkContext, using SQL requires an SQLContext
sqlContext = SQLContext(sc)
sqlContext

In [None]:
### Defining the Schema explicitly
# The advantage of creating a DataFrame using a pre-defined schema allows the content of the RDD to be simple tuples, rather than rows.

# In this case we create the dataframe from an RDD of tuples (rather than Rows) and provide the schema explicitly
# Schema with two fields - person_name and person_age
schema = StructType([StructField("Station",     StringType(), True),
                     StructField("Measurement", StringType(), True),
                     StructField("Year",        IntegerType(),True),
                     StructField("Values",      BinaryType(),True)
                    ])
schema

In [None]:
%%time
# Create a DataFrame by applying the schema to the RDD and print the schema
ALL_DataFrame = sqlContext.createDataFrame(DATA, schema)
ALL_DataFrame.printSchema()

### Write out data frame into Parquet directory

In [None]:
%%time
!hadoop fs -rm -r /weather/weather.parquet

In [None]:
%%time 
outfilename="hdfs:///weather/weather.parquet"
ALL_DataFrame.write.save(outfilename)

In [None]:
!hadoop fs -du /weather/

### Copy parquet directory to head node and then to S3

In [None]:
%cd /mnt/workspace/Data/
!rm -rf weather.parquet/
!ls -lrt

In [None]:
%%time
!hadoop fs -copyToLocal /weather/weather.parquet weather.parquet

In [None]:
!du .

In [None]:
%%time
#rm parquet directory from s3
!aws s3 rm --recursive --quiet s3://dse-weather/weather.parquet

In [None]:
%%time
# Copy parquet directory from headnode to s3
!aws s3 cp --recursive --quiet ./weather.parquet s3://dse-weather/weather.parquet

## Summary

* Defining a schema and transforming an RDD into a dataframe.
* Saving the dataframe as a Parquet file (directory)
* Parquet compresses!
* Copying Parquet dir to head node and then to S3
* **Recommendation for real projects:** Transform your raw data into parquet directories on S3!!
* **next**: working directly with parquet files.

## Loading and using a parquet file

In [None]:
!ls

In [None]:
!rm -rf weather.parquet/

In [None]:
%%time
!aws s3 cp --recursive --quiet s3://dse-weather/weather.parquet ./weather.parquet

In [None]:
%%time
!hadoop fs -copyFromLocal  weather.parquet /weather/weather.parquet

In [None]:
%%time
parquet_name='/weather/weather'
query="""SELECT station,measurement,year 
FROM parquet.`%s.parquet` 
WHERE measurement=\"PRCP\" """%parquet_name
print(query)
df2 = sqlContext.sql(query)
print 'number of rows=',df2.count()
df2.show(5)