In [1]:
import os
import sys 

# $SPARK_HOME set to the directory where you
# have the source download unpacked
spark_home = os.environ.get('SPARK_HOME', None)

if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))

sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.3-src.zip'))                                      

# executing shell.py will create 'sc' a SparkContext
#execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

In [2]:
from pyspark import SparkContext
import pydap.client
import requests

In [3]:
sc = SparkContext("local", "Simple App")

In [4]:
## Will return dataset details, not the data
#opendap_url = 'http://opendap.cr.usgs.gov/opendap/hyrax/MCD12Q1.051/h01v09.ncml'
## URL for grabbing actual data
opendap_url = 'http://opendap.cr.usgs.gov/opendap/hyrax/MOD09Q1.005/h11v04.ncml.ascii?sur_refl_b01[683:1:683][2000:1:2010][2000:1:2010]'

In [5]:
## Raises Exception, unable to parse token
# dataset = pydap.client.open_url(opendap_url)
response = requests.get(opendap_url)

In [6]:
for i in response.text.split("\n"):
    print i

Dataset: h11v04.ncml
sur_refl_b01[0][0], 0.5359, 0.5657, 0.5258, 0.4739, 0.4648, 0.5087, 0.5234, 0.5234, 0.6214, 0.5793, 0.5374
sur_refl_b01[0][1], 0.5759, 0.5414, 0.5338, 0.4489, 0.432, 0.5036, 0.6051, 0.6214, 0.5793, 0.455, 0.446
sur_refl_b01[0][2], 0.4594, 0.5409, 0.6224, 0.6289, 0.6289, 0.6051, 0.5005, 0.5005, 0.3325, 0.3092, 0.3826
sur_refl_b01[0][3], 0.5831, 0.5831, 0.6126, 0.6307, 0.6307, 0.5277, 0.612, 0.3325, 0.3092, 0.332, 0.4036
sur_refl_b01[0][4], 0.6329, 0.6126, 0.6061, 0.5691, 0.5691, 0.612, 0.5841, 0.5841, 0.4301, 0.4782, 0.6271
sur_refl_b01[0][5], 0.6092, 0.6092, 0.6152, 0.5691, 0.6297, 0.6035, 0.5052, 0.5052, 0.4782, 0.6518, 0.6913
sur_refl_b01[0][6], 0.5712, 0.6152, 0.6248, 0.5686, 0.5686, 0.3522, 0.5088, 0.5912, 0.6316, 0.4729, 0.5756
sur_refl_b01[0][7], 0.5737, 0.5737, 0.524, 0.5368, 0.3522, 0.5691, 0.5944, 0.4085, 0.4729, 0.5375, 0.5375
sur_refl_b01[0][8], 0.5606, 0.5211, 0.5368, 0.5874, 0.5874, 0.343, 0.26, 0.26, 0.3825, 0.5794, 0.5863
sur_refl_b01[0][9], 0.5883, 

In [7]:
# A single use/disposable function
def dictorize(dataobj):
    od, dkey = {}, ''
    _dlist = dataobj.split('\n')
    for index, val in enumerate(_dlist):
        if index == 0:
            dkey = val.split(':')[1]
            od[dkey] = {}
        else:
            _vlist = val.split(',')
            _vind = len(_vlist) - 1
            if _vlist[0]:
                od[dkey][_vlist[0]] = [float(i) for i in _vlist[-_vind:]]
    return od

In [8]:
simple_dataset = dictorize(response.text)

In [9]:
simple_dataset

{u' h11v04.ncml': {u'sur_refl_b01[0][0]': [0.5359,
   0.5657,
   0.5258,
   0.4739,
   0.4648,
   0.5087,
   0.5234,
   0.5234,
   0.6214,
   0.5793,
   0.5374],
  u'sur_refl_b01[0][10]': [0.5435,
   0.5263,
   0.4325,
   0.4887,
   0.4887,
   0.5175,
   0.626,
   0.5475,
   0.4101,
   0.4592,
   0.4203],
  u'sur_refl_b01[0][1]': [0.5759,
   0.5414,
   0.5338,
   0.4489,
   0.432,
   0.5036,
   0.6051,
   0.6214,
   0.5793,
   0.455,
   0.446],
  u'sur_refl_b01[0][2]': [0.4594,
   0.5409,
   0.6224,
   0.6289,
   0.6289,
   0.6051,
   0.5005,
   0.5005,
   0.3325,
   0.3092,
   0.3826],
  u'sur_refl_b01[0][3]': [0.5831,
   0.5831,
   0.6126,
   0.6307,
   0.6307,
   0.5277,
   0.612,
   0.3325,
   0.3092,
   0.332,
   0.4036],
  u'sur_refl_b01[0][4]': [0.6329,
   0.6126,
   0.6061,
   0.5691,
   0.5691,
   0.612,
   0.5841,
   0.5841,
   0.4301,
   0.4782,
   0.6271],
  u'sur_refl_b01[0][5]': [0.6092,
   0.6092,
   0.6152,
   0.5691,
   0.6297,
   0.6035,
   0.5052,
   0.5052,
   0.478

In [10]:
## Create an RDD
rdd_dataset = sc.parallelize(simple_dataset)

In [11]:
rdd_dataset

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475