In [3]:
import findspark
findspark.init()

In [4]:
import pyspark

In [12]:
conf = pyspark.SparkConf().setAppName("Guide")
sc = pyspark.SparkContext(conf=conf)

###### BASIC

In [8]:
lines = sc.textFile("in.txt")
lineLengths = lines.map(lambda s: len(s))
# lineLengths.persist() to keep lineLengths in cache -> the difference between RDD and Map&Reduce
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(totalLength)

141


###### Passing Function

In [None]:
class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        # copy field into local variable in order to avoid send the whole class object but just field
        field = self.field
        return rdd.map(lambda s: field + s)

###### Understanding Closure

In [None]:
counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    # still local counter on each executor due to closure
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

###### Transformation

| Transformation(params) | Meaning |
|-----|-----|
| map(func) | literal meaning |
| filter(func) | literal meaning |
| flatMap(func) | map + flatten |
| mapPartitions(func of generator) | map in partitions |
| mapPartitionsWithIndex(func of generator) | map in partitions with corresponding index | 
| sample(withReplacement, fration, seed) | literal meaning |
| union(dataset) | literal meaning |
| intersection(dataset) | literal meaning |
| distinct | return a new dataset with unique element |
| groupByKey | literal meaning |
| reduceByKey(func) | literal meaning |
| aggregateByKey(seqOP, combOP) | aggregate based on self-defined function |
| sortByKey | literal meaning |
| join(dataset) | literal meaning |
| cogroup(dataset) | group within two dataset |
| cartesian(dataset | cartesian product |
| pipe(command) | execute bash script |
| coalece(number) | reduce number |
| repartition(number) | reshuffle the data |
| repartitionAndSortWithPartition

###### Action

| Action | Meaning |
| ----- | ----- |
| reduce(func) | literal meaning | 
| collect | return all element |
| count | literal meaning |
| first | literal meaning |
| take(n) | first()==take(1) |
| takeSample(withReplacement, number, seed) | literal meaning |
| takeOrdered | take with a custom comparator |
| saveAsTextFile(path) | literal meaning |
| saveAsSequenceFile(path) | literal meaning |
| saveAsObjectFile(path) | literal meaning |
| countByKey() | literal meaning |
| foreach() | literal meaning |

###### Shuffle

###### RDD Persisence

###### Shared Variables

* boardcast -> read only variable
* accumulators -> added only variable -> activated by action thus each part only calculated once

###### Demo

In [None]:
# demo1->wordcount
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

In [None]:
# demo2->estimate Pi
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(xrange(0, NUM_SAMPLES)) \
             .filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

###### Get hands dirty
* Task: Write a Spark program to find the top 10 products based on the number of user reviews.
* Dataset: Use the Musical Instruments review file (5-core) and metadata from the Amazon product dataset (http://jmcauley.ucsd.edu/data/amazon/links.html).

In [184]:
sqlcontext = pyspark.SQLContext(sc)

In [185]:
# unique reviewerID for each product
review = sqlcontext.read.schema("asin String, reviewerID String").json ("./Musical_Instruments_5.json").select(["asin","reviewerID"])
asin_ids = review.rdd.map(lambda x: (x.asin,x.reviewerID)).distinct()
asin_ids = asin_ids.map(lambda x: (x[0],1)).reduceByKey(lambda a,b:a+b)
asin_ids.collect()

[('B00005ML71', 5),
 ('B000068NSX', 7),
 ('B000068NZC', 7),
 ('B000068O4H', 10),
 ('B00009W40G', 6),
 ('B0000AQRSR', 5),
 ('B0000AQRST', 11),
 ('B0000AQRSU', 17),
 ('B000165DSM', 18),
 ('B0002BACB4', 7),
 ('B0002CZSJO', 28),
 ('B0002CZST4', 11),
 ('B0002CZT0M', 8),
 ('B0002CZTIO', 16),
 ('B0002CZUUG', 22),
 ('B0002CZV7I', 6),
 ('B0002CZVA0', 9),
 ('B0002CZVHI', 7),
 ('B0002CZVK0', 24),
 ('B0002CZVWS', 16),
 ('B0002CZVZK', 5),
 ('B0002CZW0Y', 46),
 ('B0002CZZW4', 16),
 ('B0002D01K4', 18),
 ('B0002D01KO', 9),
 ('B0002D02IU', 6),
 ('B0002D02RQ', 13),
 ('B0002D05FU', 7),
 ('B0002D0B4K', 9),
 ('B0002D0C1C', 5),
 ('B0002D0CA8', 10),
 ('B0002D0CAI', 11),
 ('B0002D0CGM', 7),
 ('B0002D0CL2', 13),
 ('B0002D0CLM', 9),
 ('B0002D0CQC', 29),
 ('B0002D0HY4', 6),
 ('B0002D0KOG', 11),
 ('B0002D0L40', 7),
 ('B0002D0MFI', 5),
 ('B0002D0Q2W', 15),
 ('B0002DUPZU', 6),
 ('B0002DUS8E', 5),
 ('B0002DV7U2', 15),
 ('B0002DVBJY', 5),
 ('B0002E1I2I', 8),
 ('B0002E1J3Q', 11),
 ('B0002E1J5E', 9),
 ('B0002E1NNC', 5)

In [186]:
asin_ids.sortBy(lambda x:-x[1]).take(10)

[('B003VWJ2K8', 163),
 ('B0002E1G5C', 143),
 ('B0002F7K7Y', 116),
 ('B003VWKPHC', 114),
 ('B0002H0A3S', 93),
 ('B0002CZVXM', 74),
 ('B0006NDF8A', 71),
 ('B0009G1E0K', 69),
 ('B0002E2KPC', 68),
 ('B0002GLDQM', 67)]

In [187]:
len(asin_id.collect())

900

In [188]:
# product-price pair
product = sqlcontext.read.json("./meta_Musical_Instruments.json")
product_price = product.rdd.map(lambda x:(x.asin,x.price))
product_price = product_price.groupByKey().map(lambda x: (x[0],list(set(x[1]))))

In [189]:
product_price.join(asin_ids).sortBy(lambda x: -x[1][1]).take(10)

[('B0002E1G5C', ([6.92], 143)),
 ('B0002F7K7Y', ([1.96], 116)),
 ('B0002H0A3S', ([0.2], 93)),
 ('B0002CZVXM', ([10.22], 74)),
 ('B0006NDF8A', ([10.99], 71)),
 ('B0009G1E0K', ([6.46], 69)),
 ('B0002E2KPC', ([9.92], 68)),
 ('B0002GLDQM', ([1.54], 67)),
 ('B004XNK7AI', ([9.95], 65)),
 ('B005FKF1PY', ([12.56], 63))]

The top10 products based on the number of unique reviewers. 
However, bug remains in reading json which ignore several records. 

In [190]:
sc.stop()