## Loading many pkl files into an RDD

What I am trying to do in this notebook seems easy. Load a bunch of files into an RDD and then save the RDD as a parquet directory.

I have ~188 tar files on S3, each holding all of the face frames for a single youtube video. Each tar file contains ~10 pkl files, and each pkl file contains all of the frames for one sequence of faces (200-2000 frames).

Each frame is stored as a list in which the last element is a square array (300X300) which holds the grey-levels of the face or 0 if no skin detected, at a 16bit int resolution.




## S3 files

The object `s3helper` is created to help you access S3 files.

In [None]:
help(s3helper)

To access s3 files, the first step is setting AWS credential.

In [1]:
%cd /root/ipython/AWS-Spark-Cluster/
%run Credentials.ipynb

/root/ipython/AWS-Spark-Cluster


In [2]:
%pwd

u'/root/ipython/AWS-Spark-Cluster'

In [17]:
sc.stop()

## Configuration directions
taken from [here](http://spark.apache.org/docs/latest/configuration.html)
and [here](http://spark.apache.org/docs/1.6.1/configuration.html#memory-management)


In [18]:
from pyspark import SparkConf
sparkConf=SparkConf()
sparkConf.setExecutorEnv("spark.driver.memory","40g")
sparkConf.setExecutorEnv("spark.driver.maxResultSize","10g")
sparkConf.setExecutorEnv("spark.executor.memory","40g")
sparkConf.setExecutorEnv("spark.python.worker.memory","10g")

<pyspark.conf.SparkConf at 0x7fce292d2f10>

### from stackoverflow
This is an answer to a question regarding the error:

```
 Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
 : java.lang.OutOfMemoryError: Java heap space
```

I finally solved it by creating a spark-defaults.conf file in apache-spark/1.5.1/libexec/conf/ and adding the following line to it: spark.driver.memory 14g

That solved my issue. But then I ran into another issue of "exceeding max result size of 1024MB". The solution was to add another line in the file above: spark.dirver.maxResultSize 2g

In [19]:
from pyspark import SparkContext
sc=SparkContext(conf=sparkConf)

In [20]:
RDD=sc.parallelize(range(100))

In [21]:
RDD.count()

100

In [22]:
s3helper.set_credential(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

Then open the bucket that has your files.

In [23]:
s3helper.open_bucket('yoav-faces')

Now you can list your files in the bucket.

In [10]:
print s3helper.ls()
filenames=s3helper.ls('output/')
filenames[:10]

[u'faces-avi.tgz', u'otherpkl.tgz', u'output', u'pklfiles2', u'test.txt', u'videos.tgz']


[u"output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows.tgz",
 u'output/19 Days And Counting-JxQKYgw9G2k_windows.tgz',
 u'output/A Modest PromPosal For Bernie Sanders-kRsRUIPoW4E_windows.tgz',
 u'output/A Moment For South Carolina-GDFrVwgicsc_windows.tgz',
 u'output/A Tahini Bit Of Perspective On Politics-q-cirwvF-SI_windows.tgz',
 u'output/A Tribute To Glenn Frey-0DaJGhJz7S_windows.tgz',
 u'output/All You Can Trump Buffet-F5zjVUZA7rY_windows.tgz',
 u'output/And Now, Some Totally Organic Product Placement, Part 2-ua1jbS-je8I_windows.tgz',
 u'output/Baby Hitler Is No Match For Jeb!-76MlVLbv1zE_windows.tgz']

In [11]:
!mkdir /mnt/output

mkdir: cannot create directory `/mnt/output': File exists


In [12]:
%cd /mnt/output
s3helper.get_file(filenames[0])
!ls -l

/mnt/output
total 251604
drwxr-xr-x 3 root root      4096 Jun 18 15:12 data1
-rw-r--r-- 1 root root 257636191 Jun 10 20:35 'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows.tgz


In [20]:
filename=filenames[0][7:]
print filename
!tar -xzvf "$filename"
!ls -l

'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows.tgz
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows7.pkl
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows4.pkl
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows9.pkl
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows5.pkl
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows0.pkl
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows3.pkl
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows1.pkl
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows6.pkl
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows8.pkl
data1/output/'Homeland' Suffered A Major Intelligence Failure-9IE1mUL1erk_windows2.pkl
total 251604
drwxr-xr-x 3 root root      4096 Jun 18 15:1

In [12]:
DATA=sc.parallelize([])
DATA.count()

0

In [13]:
%cd /mnt/output
video_names={}
video_index=0

from glob import glob
import pickle
import re
import numpy as np

from pyspark import StorageLevel
StLevel=StorageLevel.MEMORY_AND_DISK_SER

pattern=re.compile(r'.*/([^/]+)_windows(\d+)\.pkl')

/mnt/output


In [14]:
list=glob('/mnt/output/data1/output/*')
print len(list)
data=[]

10


In [15]:
for file in list:
    match=re.search(pattern,file)
    if match:
        video_name=match.group(1)
        if not video_name in video_names.keys():        
            video_names[video_name]=video_index
            video_index+=1
        video_num=video_names[video_name]
        window_num=int(match.group(2))
    else:
        print 'COULD NOT FIND NUMBER IN',file
        continue

    In = pickle.load(open(file,'r'))
    print window_num,len(In),
    Full=[]
    for f in In:
        descriptor=(video_num, window_num,f[0],f[1],f[2],f[3])
        Full.append((descriptor,np.array(f[-1],dtype=np.uint16)))
    In=[]
    data = data+Full
    #New=sc.parallelize(Full)
    #DATA=sc.union([DATA,New]).persist(StLevel)
    #New.unpersist()
    
    print window_num,len(list),len(data)

5 174 5 10 174
2 243 2 10 417
7 1373 7 10 1790
3 120 3 10 1910
4 276 4 10 2186
6 633 6 10 2819
0 2032 0 10 4851
9 1316 9 10 6167
1 1224 1 10 7391
8 1213 8 10 8604


In [26]:
len(data)*2*30*30/1000000

15

In [27]:
DATA=sc.parallelize(data)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:416)
	at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


In [29]:
sc._conf.get('spark.driver.memory')

In [12]:
print window_num,len(list),DATA.count()

 0 10 2819


In [30]:
from pyspark import StorageLevel
StLevel=StorageLevel.MEMORY_AND_DISK_SER

StorageLevel(True, True, False, False, 1)

In [13]:
X=sc.parallelize(range(1000))

In [15]:
Y=sc.union([X,DATA])
Y.count()

3819

In [None]:
A=sc.parallelize([1])
B=sc.parallelize([2])
A.union(B).collect()

In [None]:
len(Full)

In [None]:
Full[:2]

In [None]:
frame=Full[0][-1]
%pylab inline
max(ravel(frame))

In [None]:
hist(ravel(frame),bins=100);

In [None]:
array(frame,dtype=uint16)

In [None]:
match=re.search(pattern,file)
if match:
    video_name=match.group(1)
    window_num=int(match.group(2))
else:
    print 'COULD NOT FIND NUMBER IN',file
video_name,window_num

In [None]:
!df

To read the files, you have two options. 

(1) Get a list of s3 file paths and pass it to Spark.

In [None]:
files = s3helper.get_path('/model-feb')
print files
rdd = sc.textFile(','.join(files))

(2) Load S3 files to HDFS and read them from HDFS

In [None]:
files = s3helper.load_path('/model-feb', '/feb')
print files
rdd = sc.textFile(','.join(files))

In [None]:
rdd.count()

## Parquet Files

In [None]:
s3helper.open_bucket("mas-dse-public")

files = s3helper.load_path('/Weather/US_Weather.parquet', '/US_Weather.parquet')
files[:10]

In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext(master=master_url)
sqlContext = SQLContext(sc)

In [None]:
df = sqlContext.sql("SELECT station, measurement FROM parquet.`/US_Weather.parquet`")
df.head()