In [None]:
# Basic Spark exploration

In [2]:
sc?

In [1]:
# version
sc.version

u'1.6.1'

In [None]:
# check the UI
# change configurations
# memory

In [5]:
# Word Count example

# load text
text_file = sc.textFile("all-shakespeare.txt")

In [6]:
# process
counts = text_file.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)

In [8]:
counts.cache()

PythonRDD[13] at RDD at PythonRDD.scala:43

In [10]:
# write data
counts.saveAsTextFile("wsj_cnt.txt")

In [None]:
# or chain them together, either works

In [None]:
# RDD Operation (basic)

In [1]:
import numpy as np

In [2]:
# either read from file or parallelize local to spark RDD
data = sc.parallelize(np.random.rand(1000))

In [4]:
data.getNumPartitions()

8

In [5]:
data.map(lambda x: x + 1).reduce(lambda x, y: x + y)

1497.1939993466569

In [9]:
# RDD operation (key, pair)
np.random.seed(1)
raw_data = [(np.random.randint(1, 10), np.random.rand()) for i in range(1000)]
data = sc.parallelize(raw_data)

In [11]:
data?

In [13]:
a

PythonRDD[7] at RDD at PythonRDD.scala:43

In [12]:
a = data.reduceByKey(lambda x, y: x + y)

In [17]:
b

PythonRDD[19] at RDD at PythonRDD.scala:43

In [16]:
data2 = sc.parallelize([(np.random.randint(1, 10), np.random.rand()) for i in range(1000)])
b = data2.reduceByKey(lambda x, y: x + y)

In [18]:
c = a.join(b)

In [22]:
c.take(10)

[(8, (67.65729389898596, 49.11970411999167)),
 (1, (47.365147476830444, 46.43084201962389)),
 (9, (59.20103026442225, 54.14031513354443)),
 (2, (45.26461830302796, 47.674440014787834)),
 (3, (52.21962668987429, 56.13830822537983)),
 (4, (59.79628153165689, 56.030449353306686)),
 (5, (61.39343201096025, 50.1197365118821)),
 (6, (65.99062317377076, 67.32839026096616)),
 (7, (51.16393927045274, 65.68375377040968))]

In [23]:
c.map(lambda x: (x[0] + 1, x[1][0] + 0.4, x[1][1] + 100)).take(10)

[(9, 68.05729389898596, 149.11970411999167),
 (2, 47.76514747683044, 146.4308420196239),
 (10, 59.60103026442225, 154.14031513354445),
 (3, 45.66461830302796, 147.67444001478782),
 (4, 52.61962668987429, 156.13830822537983),
 (5, 60.196281531656886, 156.0304493533067),
 (6, 61.793432010960245, 150.1197365118821),
 (7, 66.39062317377076, 167.32839026096616),
 (8, 51.563939270452735, 165.68375377040968)]

In [None]:
# Spark SQL/DataFrame

In [24]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [25]:
# create a spark SQL 
df = sqlContext.createDataFrame([
        ('Danny', 235, 'M'),
        ('Wendy', 432, 'F'),
        ('Lisa', 2938, 'F'),
        ('Lucas', 9183, 'M')
    ], ['name', 'number', 'sex'])

In [29]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- number: long (nullable = true)
 |-- sex: string (nullable = true)



In [None]:
# from pyspark.sql.types import *
#fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
#schema = StructType(fields)

In [32]:
df.filter(" sex='M'  ").collect()

[Row(name=u'Danny', number=235, sex=u'M'),
 Row(name=u'Lucas', number=9183, sex=u'M')]

In [34]:
df.select(['name', 'sex']).collect()

[Row(name=u'Danny', sex=u'M'),
 Row(name=u'Wendy', sex=u'F'),
 Row(name=u'Lisa', sex=u'F'),
 Row(name=u'Lucas', sex=u'M')]

In [36]:
df.filter(" sex='M'  ").select(['name', 'sex']).collect()

[Row(name=u'Danny', sex=u'M'), Row(name=u'Lucas', sex=u'M')]

In [39]:
df.groupBy('sex').agg({'number': 'sum'}).collect()

[Row(sex=u'F', sum(number)=3370), Row(sex=u'M', sum(number)=9418)]

In [44]:
df.collect()

[Row(name=u'Danny', number=235, sex=u'M'),
 Row(name=u'Wendy', number=432, sex=u'F'),
 Row(name=u'Lisa', number=2938, sex=u'F'),
 Row(name=u'Lucas', number=9183, sex=u'M')]

In [48]:
df.registerTempTable('people')

In [50]:
sqlContext.sql("select name from people where sex = 'M' ").collect()

[Row(name=u'Danny'), Row(name=u'Lucas')]

In [46]:
# basic operations
# filter
# select
# groupby
# join

In [None]:
# registerTempTable

In [53]:
StreamingContext?

In [None]:
# Spark Streaming Example

In [54]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
ssc = StreamingContext(sc, 2)

In [55]:
# Discretized Streams
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))

In [56]:
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

In [57]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

-------------------------------------------
Time: 2016-05-14 15:02:34
-------------------------------------------

-------------------------------------------
Time: 2016-05-14 15:02:36
-------------------------------------------

-------------------------------------------
Time: 2016-05-14 15:02:38
-------------------------------------------

-------------------------------------------
Time: 2016-05-14 15:02:40
-------------------------------------------

-------------------------------------------
Time: 2016-05-14 15:02:42
-------------------------------------------



KeyboardInterrupt: 

In [None]:
# Spark GraphX Example

In [None]:
# create a spark SQL 
df = sqlContext.createDataFrame([
        ('Danny', 'Andy', 'follow'),
        ('Wendy', 432),
        ('Lisa', 2938),
        ('Lucas', 9183)
    ], ['name', 'number'])

In [1]:
from graphframes import GraphFrame

In [2]:
localVertices = [(1,"A"), (2,"B"), (3, "C")]
localEdges = [(1,2,"follow"), (2,1,"follow"), (2,3,"follow")]
v = sqlContext.createDataFrame(localVertices, ["id", "name"])
e = sqlContext.createDataFrame(localEdges, ["src", "dst", "action"])

In [3]:
v.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [4]:
e.printSchema()

root
 |-- src: long (nullable = true)
 |-- dst: long (nullable = true)
 |-- action: string (nullable = true)



In [15]:
g.shortestPaths?

In [5]:
g = GraphFrame(v, e)

In [16]:
gpg = g.pageRank(resetProbability=0.15, maxIter=10)

In [17]:
gpg.edges.show()

+---+---+------+------+
|src|dst|action|weight|
+---+---+------+------+
|  1|  2|follow|   1.0|
|  2|  1|follow|   0.5|
|  2|  3|follow|   0.5|
+---+---+------+------+



In [18]:
gpg.vertices.show()

+---+----+-------------------+
| id|name|           pagerank|
+---+----+-------------------+
|  1|   A|0.33350201198327134|
|  2|   B|0.43269228873098553|
|  3|   C|0.33350201198327134|
+---+----+-------------------+

