# TASK 1
## SAT and HS data

In [56]:
SAT_FN = 'SAT_Results.csv'
HSD_FN = 'DOE_High_School_Directory_2014-2015.csv'

In [57]:
# Below is a way to read CSV file from within Spark directly into a 
# Spark's DataFrame, which we will not be covering yet. Just putting
# it here so that we have a reference for now. Note that, the 
# 'parserLib' option is important for reading multi-line fields of CSV.
df = spark.read \
            .format("com.databricks.spark.csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("parserLib", "UNIVOCITY") \
            .load(HSD_FN)

In [58]:
# We read the SAT score to our RDD. Note that the use_unicode can be
# changed accordingly to your data file to handle Unicode. If you cannot
# parse your data due to an 'utf8' or 'ascii' decoding issue, it might
# be a good thing to try flipping the use_unicode parameter here.

sat = sc.textFile(SAT_FN, use_unicode=False).cache()

# This line for us to list the column index and column names to see
# which column we need to use for our task. In this case, we're
# interested in the number of test takers (#2) and the math score (#4).
list(enumerate(sat.first().split(',')))

[(0, 'DBN'),
 (1, 'SCHOOL NAME'),
 (2, 'Num of SAT Test Takers'),
 (3, 'SAT Critical Reading Avg. Score'),
 (4, 'SAT Math Avg. Score'),
 (5, 'SAT Writing Avg. Score')]

In [59]:
# Note that, our data input includes a header line that we don't want to
# use in analysis. We can remove the header line from our RDD by doing
# a 'filter' to remove all rows that matches the header like below. Though
# this works, it means that we have to apply the filter function on *all*
# row, which could be a lot of computation.

noHeaderRDD = sat.filter(lambda x: not x.startswith('DBN,SCHOOL'))
print (sat.first())
print (noHeaderRDD.first())

DBN,SCHOOL NAME,Num of SAT Test Takers,SAT Critical Reading Avg. Score,SAT Math Avg. Score,SAT Writing Avg. Score
02M047,47 THE AMERICAN SIGN LANGUAGE AND ENGLISH SECONDARY SCHOOL,16,395,400,387


In [60]:
# Alternatively, we can perform the header checking per-partition, instead
# of per-row like below. mapPartitions() is another type of map operators
# in Spark that is similar to Hadoop Streaming's map(). It is many-to-many.
# RDD in Spark are divided into partitions (as we read or as provided by
# HDFS), each partition can be processed in parallel using a function
# supplied to the mapPartitions() call.
# 
# In addition to mapPartitions(), Spark also provides a variation called
# mapPartitionsWithIndex() that provides information on which partition
# we are currently processing. Indeed, mapPartitionsWithIndex() is the
# the operator with the lowest overhead (since mapPartitions() get mapped
# to mapPartitionsWithIndex) and also the most efficient one among all the
# map operators.
#
# So our logic below is to use the partition index to check if we're hitting
# the header (aka the first partition). If so, we just skip the first row.

def extractScores(partId, records):
    if partId==0:
        records.next()
    import csv
    reader = csv.reader(records)
    for row in reader:
        if row[2]!='s': # to filter our bad-quality data
            (dbn,takers,score) = (row[0], int(row[2]), int(row[4]))
            yield (dbn, (score*takers, takers))

satScores = sat.mapPartitionsWithIndex(extractScores)
satScores.take(5)

[('02M047', (6400, 16)),
 ('21K410', (207575, 475)),
 ('30Q301', (43120, 98)),
 ('17K382', (22066, 59)),
 ('18K637', (13335, 35))]

In [61]:
# Here we do the same thing with the school directory data
schools = sc.textFile(HSD_FN, use_unicode=False).cache()
list(enumerate(schools.first().split(',')))

[(0, 'dbn'),
 (1, 'school_name'),
 (2, 'boro'),
 (3, 'building_code'),
 (4, 'phone_number'),
 (5, 'fax_number'),
 (6, 'grade_span_min'),
 (7, 'grade_span_max'),
 (8, 'expgrade_span_min'),
 (9, 'expgrade_span_max'),
 (10, 'bus'),
 (11, 'subway'),
 (12, 'primary_address_line_1'),
 (13, 'city'),
 (14, 'state_code'),
 (15, 'zip'),
 (16, 'website'),
 (17, 'total_students'),
 (18, 'campus_name'),
 (19, 'school_type'),
 (20, 'overview_paragraph'),
 (21, 'program_highlights'),
 (22, 'language_classes'),
 (23, 'advancedplacement_courses'),
 (24, 'online_ap_courses'),
 (25, 'online_language_courses'),
 (26, 'extracurricular_activities'),
 (27, 'psal_sports_boys'),
 (28, 'psal_sports_girls'),
 (29, 'psal_sports_coed'),
 (30, 'school_sports'),
 (31, 'partner_cbo'),
 (32, 'partner_hospital'),
 (33, 'partner_highered'),
 (34, 'partner_cultural'),
 (35, 'partner_nonprofit'),
 (36, 'partner_corporate'),
 (37, 'partner_financial'),
 (38, 'partner_other'),
 (39, 'addtl_info1'),
 (40, 'addtl_info2'),
 (4

In [62]:
def extractSchools(partId, list_of_records):
    if partId==0: 
        list_of_records.next() # skipping the first line
    import csv
    reader = csv.reader(list_of_records)
    for row in reader:
        if len(row)==58 and row[17].isdigit():
            (dbn, boro, total_students) = (row[0], row[2], int(row[17]))
            if total_students>500: # filter to keep the large schools
                yield (dbn, boro)

largeSchools = schools.mapPartitionsWithIndex(extractSchools)

In [63]:
scores = largeSchools.join(satScores).values() \
    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])) \
    .mapValues(lambda x: x[0]/x[1]) \
    .collect()

In [64]:
scores

[('Bronx', 470),
 ('Manhattan', 514),
 ('Brooklyn', 487),
 ('Staten Island', 477),
 ('Queens', 474)]

# ====================================================
# Task 2
### Quiz

In [65]:
import csv

with open (HSD_FN, 'rb') as fi:
    reader = csv.reader(fi)
    print reader.next()
    print reader.next()

['dbn', 'school_name', 'boro', 'building_code', 'phone_number', 'fax_number', 'grade_span_min', 'grade_span_max', 'expgrade_span_min', 'expgrade_span_max', 'bus', 'subway', 'primary_address_line_1', 'city', 'state_code', 'zip', 'website', 'total_students', 'campus_name', 'school_type', 'overview_paragraph', 'program_highlights', 'language_classes', 'advancedplacement_courses', 'online_ap_courses', 'online_language_courses', 'extracurricular_activities', 'psal_sports_boys', 'psal_sports_girls', 'psal_sports_coed', 'school_sports', 'partner_cbo', 'partner_hospital', 'partner_highered', 'partner_cultural', 'partner_nonprofit', 'partner_corporate', 'partner_financial', 'partner_other', 'addtl_info1', 'addtl_info2', 'start_time', 'end_time', 'se_services', 'ell_programs', 'school_accessibility_description', 'number_programs', 'priority01', 'priority02', 'priority03', 'priority04', 'priority05', 'priority06', 'priority07', 'priority08', 'priority09', 'priority10', 'Location 1']
['01M292', 'H

In [66]:
hs = sc.textFile(HSD_FN, use_unicode=False).cache()

In [67]:
list(enumerate(hs.first().split(',')))

[(0, 'dbn'),
 (1, 'school_name'),
 (2, 'boro'),
 (3, 'building_code'),
 (4, 'phone_number'),
 (5, 'fax_number'),
 (6, 'grade_span_min'),
 (7, 'grade_span_max'),
 (8, 'expgrade_span_min'),
 (9, 'expgrade_span_max'),
 (10, 'bus'),
 (11, 'subway'),
 (12, 'primary_address_line_1'),
 (13, 'city'),
 (14, 'state_code'),
 (15, 'zip'),
 (16, 'website'),
 (17, 'total_students'),
 (18, 'campus_name'),
 (19, 'school_type'),
 (20, 'overview_paragraph'),
 (21, 'program_highlights'),
 (22, 'language_classes'),
 (23, 'advancedplacement_courses'),
 (24, 'online_ap_courses'),
 (25, 'online_language_courses'),
 (26, 'extracurricular_activities'),
 (27, 'psal_sports_boys'),
 (28, 'psal_sports_girls'),
 (29, 'psal_sports_coed'),
 (30, 'school_sports'),
 (31, 'partner_cbo'),
 (32, 'partner_hospital'),
 (33, 'partner_highered'),
 (34, 'partner_cultural'),
 (35, 'partner_nonprofit'),
 (36, 'partner_corporate'),
 (37, 'partner_financial'),
 (38, 'partner_other'),
 (39, 'addtl_info1'),
 (40, 'addtl_info2'),
 (4

In [68]:
noHeaderHs = hs.filter(lambda x: not x.startswith('dbn,'))
noHeaderHs.first()

'01M292,Henry Street School for International Studies,Manhattan,M056,212-406-9411,212-406-9417,6,12,,,"B39, M14A, M14D, M15, M15-SBS, M21, M22, M9","B, D to Grand St ; F to East Broadway ; J, M, Z to Delancey St-Essex St",220 Henry Street,New York,NY,10002,http://schools.nyc.gov/schoolportals/01/M292,323,N/A,,"Henry Street School for International Studies is a unique small school founded by the Asia Society. While in pursuit of knowledge about other world regions, including their histories, economies and world languages, students acquire the knowledge and skills needed to prepare for college and/or careers. Teachers and other adults who make up the learning community forge supportive relationships with students and parents while providing challenging and engaging learning experiences. Our school partners with various community, arts and business organizations to help students achieve success. Our theme of international studies extends beyond the classroom, where students participate in

In [69]:
hs.getNumPartitions()

2

In [80]:
def extractBus(partitionId, partition):
    if partitionId==0:
        partition.next()
    import csv
    reader = csv.reader(partition)

    for row in reader:
        if len(row)==58 and row[17].isdigit():
            for b in row[10].split(','):
                yield row[0], b


def extractSubway(partitionId, partition):
    if partitionId==0:
        partition.next()
        
    import csv
    reader = csv.reader(partition)
    
    for row in reader:
        if len(row)==58 and row[17].isdigit():
            for subway in row[11].split(';'):
                for s in subway.split(','):
    #                 yield s, row[0]
                    if len(s) == 1:
                        yield (row[0], s)                   
                    if s[0] == ' ':
                        yield (row[0], s[1])

hsBus = hs.mapPartitionsWithIndex(extractBus)
hsSubway = hs.mapPartitionsWithIndex(extractSubway)
hsBus.take(5)
#hsSubway.take(5)


[('01M292', 'B39'),
 ('01M292', ' M14A'),
 ('01M292', ' M14D'),
 ('01M292', ' M15'),
 ('01M292', ' M15-SBS')]

In [82]:
hsBus.join(satScores).values() \
    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])) \
    .mapValues(lambda x: x[0]/x[1]) \
    .take(5)

[(' Q56', 434), (' B42', 384), (' Bx29', 392), (' B11', 519), ('Bx1', 464)]

In [83]:
hsSubway.join(satScores).values() \
    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])) \
    .mapValues(lambda x: x[0]/x[1]) \
    .take(5)

[('A', 519), ('Q', 482), ('E', 501), ('M', 454), ('1', 525)]