In [123]:
sc

<pyspark.context.SparkContext at 0x100775590>

In [5]:
# We're creating two RDD, one is from the README file of Spark
# and the other is directly from a list within the notebook.
#
# If you downloaded Spark, the README file is in the same folder
# as the one you extracted. If you use other package management
# methods like 'brew', 'dnf', 'apt', etc. you will need to figure
# out the path of Spark by printing sys.path.


In [2]:
#'/Users/hvo/sw/spark/README.md'
rdd = sc.textFile('/Users/sugar/Downloads/spark-2.1.0-bin-hadoop2.7/README.md')

In [3]:
testRdd = sc.parallelize([1,2,3,4,5])

In [11]:
# Here, we're showing the difference between map() and flatMap()
# doing the line split and get back the first 3 elements, take(3).
# - map() is a one-to-one mapping, just like Python, so the first
# line prints out 3 lists, each consists words per each.
# - flatMap() is a one-to-many mapping, like MapReduce's map(). So
# the second line prints out only 3 words.

wordsPerLine = rdd.map(lambda line: line.split()).take(3)
words = rdd.flatMap(lambda line: line.split()).take(3)

print ( '%s\n\n%s' %(wordsPerLine, words))

[[u'#', u'Apache', u'Spark'], [], [u'Spark', u'is', u'a', u'fast', u'and', u'general', u'cluster', u'computing', u'system', u'for', u'Big', u'Data.', u'It', u'provides']]

[u'#', u'Apache', u'Spark']


In [16]:
# This is the word count example with Spark using the approach
# shown in the slides, i.e. staying true to the MapReduce paradigm.
# Note that groupByKey() will sort and group everything together by
# keys first. Then the function in mapValues() will each get applied
# per each (key, list of values) pair. This could be an issue if we
# have a pair with lots of values since all of the values have to be
# stored in memory.

wc = rdd.flatMap(lambda line: line.split()) \
        .map(lambda x: (x.lower(), 1)) \
        .groupByKey() \
        .mapValues(lambda values: sum(values))
wc.take(3)

[(u'when', 1), (u'alternatively,', 1), (u'"local"', 1)]

In [5]:
# This is another approach with reduceByKey() instead of groupByKey().
# The reduce function provided for reduceByKey() only takes 2 params
# at a time, thus, doesn't suffer the scalability issue. It also has
# better benefits in term of parallelism.

wc = rdd.flatMap(lambda line: line.split()) \
        .map(lambda x: (x.lower(), 1)) \
        .reduceByKey(lambda x,y: x+y)
wc.take(2)

[(u'when', 1), (u'alternatively,', 1)]

In [6]:
# If we'd like to compute the top 3 most popular words in Spark. We
# can use the RDD's top() function directly. This is much easier
# than the two-step MapReduce job, where we had to first compute the
# top 3 words per partition, then another top 3 on top of that. In
# fact, this is exactly how Spark RDD's top() function is implemented.
# More info can be found here:
# https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L1249
wc.top(3, key=lambda x: x[1])

[(u'the', 25), (u'to', 19), (u'spark', 16)]

## LAB 6 - Task 1

In [10]:
SAT_FN = 'SAT_Results.csv'
HSD_FN = 'DOE_High_School_Directory_2014-2015.csv'

In [41]:
# Below is a way to read CSV file from within Spark directly into a 
# Spark's DataFrame, which we will not be covering yet. Just putting
# it here so that we have a reference for now. Note that, the 
# 'parserLib' option is important for reading multi-line fields of CSV.
df = spark.read \
            .format("com.databricks.spark.csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("parserLib", "UNIVOCITY") \
            .load(HSD_FN)

In [11]:
# We read the SAT score to our RDD. Note that the use_unicode can be
# changed accordingly to your data file to handle Unicode. If you cannot
# parse your data due to an 'utf8' or 'ascii' decoding issue, it might
# be a good thing to try flipping the use_unicode parameter here.

sat = sc.textFile(SAT_FN, use_unicode=False).cache()

# This line for us to list the column index and column names to see
# which column we need to use for our task. In this case, we're
# interested in the number of test takers (#2) and the math score (#4).
list(enumerate(sat.first().split(',')))


[(0, 'DBN'),
 (1, 'SCHOOL NAME'),
 (2, 'Num of SAT Test Takers'),
 (3, 'SAT Critical Reading Avg. Score'),
 (4, 'SAT Math Avg. Score'),
 (5, 'SAT Writing Avg. Score')]

In [10]:
# Note that, our data input includes a header line that we don't want to
# use in analysis. We can remove the header line from our RDD by doing
# a 'filter' to remove all rows that matches the header like below. Though
# this works, it means that we have to apply the filter function on *all*
# row, which could be a lot of computation.

noHeaderRDD = sat.filter(lambda x: not x.startswith('DBN,SCHOOL'))
print (sat.first())
print (noHeaderRDD.first())

DBN,SCHOOL NAME,Num of SAT Test Takers,SAT Critical Reading Avg. Score,SAT Math Avg. Score,SAT Writing Avg. Score
02M047,47 THE AMERICAN SIGN LANGUAGE AND ENGLISH SECONDARY SCHOOL,16,395,400,387


In [13]:
# Alternatively, we can perform the header checking per-partition, instead
# of per-row like below. mapPartitions() is another type of map operators
# in Spark that is similar to Hadoop Streaming's map(). It is many-to-many.
# RDD in Spark are divided into partitions (as we read or as provided by
# HDFS), each partition can be processed in parallel using a function
# supplied to the mapPartitions() call.
# 
# In addition to mapPartitions(), Spark also provides a variation called
# mapPartitionsWithIndex() that provides information on which partition
# we are currently processing. Indeed, mapPartitionsWithIndex() is the
# the operator with the lowest overhead (since mapPartitions() get mapped
# to mapPartitionsWithIndex) and also the most efficient one among all the
# map operators.
#
# So our logic below is to use the partition index to check if we're hitting
# the header (aka the first partition). If so, we just skip the first row.

def extractScores(partId, records):
    if partId==0:
        records.next()
    import csv
    reader = csv.reader(records)
    for row in reader:
        if row[2]!='s': # to filter our bad-quality data
            (dbn,takers,score) = (row[0], int(row[2]), int(row[4]))
            yield (dbn, (score*takers, takers))

satScores = sat.mapPartitionsWithIndex(extractScores)
satScores.take(5)

[('02M047', (6400, 16)),
 ('21K410', (207575, 475)),
 ('30Q301', (43120, 98)),
 ('17K382', (22066, 59)),
 ('18K637', (13335, 35))]

In [36]:
# Here we do the same thing with the school directory data
schools = sc.textFile(HSD_FN, use_unicode=False).cache()
list(enumerate(schools.first().split(',')))[:18]

[(0, 'dbn'),
 (1, 'school_name'),
 (2, 'boro'),
 (3, 'building_code'),
 (4, 'phone_number'),
 (5, 'fax_number'),
 (6, 'grade_span_min'),
 (7, 'grade_span_max'),
 (8, 'expgrade_span_min'),
 (9, 'expgrade_span_max'),
 (10, 'bus'),
 (11, 'subway'),
 (12, 'primary_address_line_1'),
 (13, 'city'),
 (14, 'state_code'),
 (15, 'zip'),
 (16, 'website'),
 (17, 'total_students')]

In [None]:
def extractSchools(partId, list_of_records):
    if partId==0: 
        list_of_records.next() # skipping the first line
    import csv
    reader = csv.reader(list_of_records)
    for row in reader:
        if len(row)==58 and row[17].isdigit():
            (dbn, boro, total_students) = (row[0], row[2], int(row[17]))
            if total_students>500: # filter to keep the large schools
                yield (dbn, boro)

largeSchools = schools.mapPartitionsWithIndex(extractSchools)

In [38]:
largeSchools.take(10)

[('01M450', 'Manhattan'),
 ('01M539', 'Manhattan'),
 ('01M696', 'Manhattan'),
 ('02M374', 'Manhattan'),
 ('02M400', 'Manhattan'),
 ('02M408', 'Manhattan'),
 ('02M412', 'Manhattan'),
 ('02M413', 'Manhattan'),
 ('02M416', 'Manhattan'),
 ('02M418', 'Manhattan')]

In [116]:
scores = largeSchools.join(satScores).values() \
    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))\
    .mapValues(lambda x: x[0]/x[1]) \
    .collect()

In [117]:
scores[:5]

[('Bronx', 470),
 ('Manhattan', 514),
 ('Brooklyn', 487),
 ('Staten Island', 477),
 ('Queens', 474)]

In [115]:

scores

[('Bronx', (1619364, 3444)),
 ('Manhattan', (3206992, 6228)),
 ('Brooklyn', (4544126, 9322)),
 ('Staten Island', (1406967, 2944)),
 ('Queens', (5190534, 10942))]

## LAB 6 - Task 2

In [122]:
def extractBus(partId, list_of_records):
    if partId==0: 
        list_of_records.next() # skipping the first line
    import csv
    reader = csv.reader(list_of_records)
    for row in reader:
        if len(row)==58:
            (dbn,bus) = (row[0], row[10])
            for i in bus.split(','):
                yield (dbn,i)

buslines = schools.mapPartitionsWithIndex(extractBus)



busScores = buslines.join(satScores).values()\
    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))\
    .mapValues(lambda x: x[0]/x[1]) \
    .collect()

busScores[:10]

[(' Q56', 434),
 (' B42', 384),
 (' Bx29', 392),
 (' B11', 519),
 ('Bx1', 464),
 ('Bx33', 402),
 (' Bx21', 389),
 ('S57', 526),
 (' Q30', 507),
 (' M4', 442)]

In [121]:
def extractSub(partId, list_of_records):
    if partId==0: 
        list_of_records.next() # skipping the first line
    import csv
    reader = csv.reader(list_of_records)
    for row in reader:
        if len(row)==58:
            (dbn,bus) = (row[0], row[11])
            for i in bus.split(','):
                yield (dbn,i)

subway = schools.mapPartitionsWithIndex(extractSub)
subScores = subway.join(satScores).values()\
    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))\
    .mapValues(lambda x: x[0]/x[1]) \
    .collect()
subScores[:10]

[(' Z to Elderts Lane-75th St', 371),
 ('A', 419),
 (' 3 to Eastern Parkway-Brooklyn Museum ; C to Franklin Ave ; F', 364),
 (' 3 to Hoyt St ; A', 402),
 (' 3 to Central Park North-110th St ; 6 to 103rd St', 446),
 (' C to Euclid Ave', 400),
 ('1 to 66th St - Lincoln Center ; 2', 514),
 (' 3 to 34th St - Penn Station ; A', 423),
 (' S to Franklin Ave ; G to Bedford-Nostrand', 443),
 (' 5 to Jackson Ave', 385)]

### Extra

In [127]:
sat = LOAD 'SAT_Results' AS (dbn, school name, )


SyntaxError: invalid syntax (<ipython-input-127-03b8803aea7e>, line 1)