# Initialize SparkContext

In [1]:
from pyspark import SparkContext

sc = SparkContext()

# Local files

### Download and Uplaod

Download and upload files via the Spark Notebook interface.

### Access Local Files

The file path to local files requires `file://` prefix.

In [2]:
ls /etc/passwd

/etc/passwd


In [3]:
local_files = sc.textFile("file:///etc/passwd")
local_files.collect()

[u'root:x:0:0:root:/root:/bin/bash',
 u'bin:x:1:1:bin:/bin:/sbin/nologin',
 u'daemon:x:2:2:daemon:/sbin:/sbin/nologin',
 u'adm:x:3:4:adm:/var/adm:/sbin/nologin',
 u'lp:x:4:7:lp:/var/spool/lpd:/sbin/nologin',
 u'sync:x:5:0:sync:/sbin:/bin/sync',
 u'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown',
 u'halt:x:7:0:halt:/sbin:/sbin/halt',
 u'mail:x:8:12:mail:/var/spool/mail:/sbin/nologin',
 u'uucp:x:10:14:uucp:/var/spool/uucp:/sbin/nologin',
 u'operator:x:11:0:operator:/root:/sbin/nologin',
 u'games:x:12:100:games:/usr/games:/sbin/nologin',
 u'gopher:x:13:30:gopher:/var/gopher:/sbin/nologin',
 u'ftp:x:14:50:FTP User:/var/ftp:/sbin/nologin',
 u'nobody:x:99:99:Nobody:/:/sbin/nologin',
 u'rpc:x:32:32:Rpcbind Daemon:/var/cache/rpcbind:/sbin/nologin',
 u'ntp:x:38:38::/etc/ntp:/sbin/nologin',
 u'saslauth:x:499:76:"Saslauthd user":/var/empty/saslauth:/sbin/nologin',
 u'mailnull:x:47:47::/var/spool/mqueue:/sbin/nologin',
 u'smmsp:x:51:51::/var/spool/mqueue:/sbin/nologin',
 u'rpcuser:x:29:29:RPC Servic

# s3helper

The object `s3helper` is a tool to transfer files between local filesystem, HDFS and S3.

Run `s3helper.help()` to learn all its methods.

In [5]:
s3helper.help()


        s3helper is a helper object to move files and directory between
        local filesystem, AWS S3 and local HDFS.

        Usage:

        1. Open a S3 bucket under your account
            s3helper.open_bucket(<bucket_name>)
        2. List all files under the opened S3 bucket
            s3helper.ls() or s3helper.ls_s3()
        Or optionally,
            s3helper.ls(<file_path>) or s3helper.ls_s3(<file_path>)
        3. List all files on HDFS
            s3helper.ls_hdfs()
        Or optionally,
            s3helper.ls_hdfs(<file_path>)
        where <file_path> is an absolute path in the opened S3 bucket.

        Now you can access your S3 files.

        1. Transfer files between S3 and HDFS
          a. To download all S3 files under a directory to HDFS, please call
                s3helper.s3_to_hdfs(<s3_directory_path>, <HDFS_directory_path>)
          b. To upload a directory on HDFS to S3, please call
                s3helper.hdfs_to_s3(<HDFS_directory_path>, <s3_dir

## (2) Open the bucket that has your files.

In [6]:
s3helper.open_bucket('dse-team')

## (3) List files in the S3 bucket and HDFS.

In [7]:
print(s3helper.ls_s3())  # By default, list all files in the root directory of the bucket
print(s3helper.ls_s3('fromHDFS'))

print(s3helper.ls_hdfs())

[u'fromHDFS']
[u'fromHDFS/', u'fromHDFS/README']
Found 4 items
drwxr-xr-x   - hdfs hadoop          0 2017-05-25 16:02 /apps
drwxrwxrwt   - hdfs hadoop          0 2017-05-25 16:07 /tmp
drwxr-xr-x   - hdfs hadoop          0 2017-05-25 16:02 /user
drwxr-xr-x   - hdfs hadoop          0 2017-05-25 16:02 /var

None


## (4) Move files around local filesystem, HDFS and S3

As described in `s3helper.help()`, there are five methods for file transfers:

1. `s3helper.s3_to_hdfs(<s3_directory_path>, <HDFS_directory_path>)`
2. `s3helper.hdfs_to_s3(<HDFS_directory_path>, <s3_directory_path>)`
3. `s3helper.s3_to_local(<s3_file_path>, <local_file_path>)`
4. `s3helper.local_to_s3(<local_file_path>, <s3_directory_path>)`
5. `s3helper.local_to_hdfs(<local_dir_path>, <HDFS_dir_path>)`

In [8]:
s3helper.local_to_s3("/home/hadoop/data/data.dat", "fromLocal/data.dat")

In [9]:
print(s3helper.ls_s3("fromLocal"))

[u'fromLocal/data.dat']


In [10]:
s3helper.s3_to_hdfs("fromLocal/data.dat", "/test/fromS3")

17/05/25 18:03:58 INFO s3n.S3NativeFileSystem: Opening 's3n://dse-team/fromLocal/data.dat' for reading

Found 1 items
-rw-r--r--   1 hadoop hadoop  484472684 2017-05-25 18:04 /test/fromS3/data.dat



In [11]:
print(s3helper.ls_hdfs("/test/fromS3"))

Found 1 items
-rw-r--r--   1 hadoop hadoop  484472684 2017-05-25 18:04 /test/fromS3/data.dat

None


In [12]:
%%bash
mkdir /home/hadoop/fromS3
mkdir /home/hadoop/fromHDFS

In [13]:
s3helper.s3_to_local("fromLocal/data.dat", "/home/hadoop/fromS3/data.dat")

In [14]:
ls ~/fromS3

data.dat


In [15]:
s3helper.local_to_hdfs("/home/hadoop/fromS3/", "/test/fromLocal/")

In [16]:
print(s3helper.ls_hdfs("/test/fromLocal/"))

Found 1 items
-rw-r--r--   1 hadoop hadoop  484472684 2017-05-25 18:08 /test/fromLocal/data.dat

None


In [17]:
s3helper.hdfs_to_s3("/test/fromLocal", "fromHDFS/")

*NOTE*
This method will create a MapReudce job to upload the content in HDFS to S3. The process may take a while.


17/05/25 18:09:02 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', sourceFileListing=null, sourcePaths=[/test/fromLocal], targetPath=s3n://dse-team/fromHDFS, targetPathExists=true, preserveRawXattrs=false}
17/05/25 18:09:03 INFO impl.TimelineClientImpl: Timeline service address: http://ip-172-31-70-194.ec2.internal:8188/ws/v1/timeline/
17/05/25 18:09:03 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-70-194.ec2.internal/172.31.70.194:8032
17/05/25 18:09:04 INFO Configuration.deprecation: io.sort.mb is deprecated. Instead, use mapreduce.task.io.sort.mb
17/05/25 18:09:04 INFO Configuration.deprecation: io.sort.factor is deprecated. Instead, use mapreduce.task.io.sort.factor
17/05/25 18:09:04 INFO impl.TimelineCl

In [18]:
print(s3helper.ls_s3("fromHDFS/"))

[u'fromHDFS/fromLocal/data.dat', u'fromHDFS/fromLocal_$folder$']


# Parquet Files

To get a reasonable reading speed, please always load parquet files from S3 to HDFS before accessing them.

In [None]:
s3helper.open_bucket("your-bucket-name")

files = s3helper.s3_to_hdfs('/sub-directory-for-parquets', '/hdfs-directory.parquet')
files[:10]

In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT key, value FROM parquet.`/hdfs-directory.parquet`")

### Example

In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext(master=master_url)
sqlc = SQLContext(sc)
rdd = sc.parallelize([
        ["name", "age"],
        ["John", 20],
        ["Mike", 25],
        ["Mary", 21]
    ])
df = sqlc.createDataFrame(rdd)
df

DataFrame[_1: string, _2: string]

In [None]:
df.write.save("hdfs:///mydata")
s3helper.ls_hdfs("/mydata")

It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
2016-08-13 07:38:38,699 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r--   3 root supergroup          0 2016-08-13 07:38 /mydata/_SUCCESS
-rw-r--r--   3 root supergroup        283 2016-08-13 07:38 /mydata/_common_metadata
-rw-r--r--   3 root supergroup        719 2016-08-13 07:38 /mydata/_metadata
-rw-r--r--   3 root supergroup        515 2016-08-13 07:38 /mydata/part-r-00000-b05d9e6d-39b2-4bf3-9496-40f74e43671b.gz.parquet
-rw-r--r--   3 root supergroup        508 2016-08-13 07:38 /mydata/part-r-00001-b05d9e6d-39b2-4bf3-9496-40f74e43671b.gz.parquet



In [None]:
s3helper.open_bucket("<your_bucket_name>")
s3helper.hdfs_to_s3("/mydata", "mydata")

*NOTE*
This method will create a MapReudce job to upload the content in HDFS to S3. The process may take a while.


2016-08-13 07:38:39,551 INFO  [main] tools.DistCp (DistCp.java:run(109)) - Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', sourceFileListing=null, sourcePaths=[/mydata], targetPath=s3n://ucsd-twitter/mydata}
2016-08-13 07:38:40,579 INFO  [main] client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at ec2-54-86-169-42.compute-1.amazonaws.com/172.31.56.20:8032
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
2016-08-13 07:38:41,538 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2016-08-13 07:38:43,069 INFO  [main] Configuration.d

In [None]:
s3helper.ls_s3("mydata")

[u'mydata/_SUCCESS',
 u'mydata/_common_metadata',
 u'mydata/_metadata',
 u'mydata/mydata',
 u'mydata/mydata_$folder$',
 u'mydata/part-r-00000-8e374f2c-b5a7-45ed-ae62-4279db81a48c.gz.parquet',
 u'mydata/part-r-00001-8e374f2c-b5a7-45ed-ae62-4279db81a48c.gz.parquet']

In [None]:
# To load data back to HDFS from S3 next time, you can run
s3helper.s3_to_hdfs("mydata", "/mydata")