# Spark Context

Spark Shell provides a preconfigured Spark Context called sc

In [1]:
sc

<pyspark.context.SparkContext at 0x7fe4555c76d0>

In [2]:
sc.appName

u'PySparkShell'

# Creating RDDs

## From Data in Memory

In [3]:
myData = ['Alex', 'Max', 'Sam', 'Farzad']
myData

['Alex', 'Max', 'Sam', 'Farzad']

In [8]:
myRDD = sc.parallelize(myData)

In [9]:
myRDD.collect()

['Alex', 'Max', 'Sam', 'Farzad']

In [10]:
myData = [{'name': 'ALex', 'age': 25, 'city': 'Dallas'},
          {'name': 'Max', 'age': 36, 'city': 'Austin'},
          {'name': 'Sam', 'age': 43, 'city': 'New York'}]

In [11]:
myRDD = sc.parallelize(myData)

In [12]:
myRDD.collect()

[{'age': 25, 'city': 'Dallas', 'name': 'ALex'},
 {'age': 36, 'city': 'Austin', 'name': 'Max'},
 {'age': 43, 'city': 'New York', 'name': 'Sam'}]

## From Files or a Set of Files

In [15]:
myRDD = sc.textFile("myfile.txt") # a single file

In [17]:
myRDD = sc.textFile("mydata/*.log") # a wildcard list of files

In [18]:
myRDD = sc.textFile("myfile1.txt, myfile2.txt, myfile3.txt") # a comma-separated list of files

In [201]:
myRDD = sc.textFile("file:/home/training/myfile.txt") # a single file from Local file system

In [19]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt") # a single file from HDFS

In [20]:
myRDD.collect()

[u'lorem ipsum dolor sit amet',
 u'fusce sit lorem ipsum',
 u'donec in dolor odio ipsum',
 u'morbi ac ipsum quis enim male',
 u'donec risus nulla at pretium',
 u'nulla dolor in donec',
 u'morbi ut erat nec libero',
 u'libero nulla sit odio']

In [21]:
myRDD.count()

8

## Whole File-Based RDDs

In [205]:
import json

In [206]:
myRDD = sc.wholeTextFiles("hdfs://localhost/big_data/Spark/jsondata")

In [207]:
myRDD.collect()

[(u'hdfs://localhost/big_data/Spark/jsondata/file1.json',
  u'{\n "name": "Alex",\n "age": "23",\n "id": "874621"\n}\n'),
 (u'hdfs://localhost/big_data/Spark/jsondata/file2.json',
  u'{\n "name": "Mary",\n "age": "27",\n "id": "957325"\n}')]

In [208]:
myRDD.collect()[0][1]

u'{\n "name": "Alex",\n "age": "23",\n "id": "874621"\n}\n'

In [None]:
myRDD2 = myRDD.map(lambda (name,s): json.loads(s))

In [None]:
myRDD2.collect()

In [None]:
for record in myRDD2.collect():
    print(record['name'])

# RDD Operations

## Actions

In [24]:
myData = [{'name': 'ALex', 'age': 25, 'city': 'Dallas'},
          {'name': 'Max', 'age': 36, 'city': 'Austin'},
          {'name': 'Sam', 'age': 43, 'city': 'New York'}]

In [25]:
myRDD = sc.parallelize(myData)

In [26]:
myRDD.collect()

[{'age': 25, 'city': 'Dallas', 'name': 'ALex'},
 {'age': 36, 'city': 'Austin', 'name': 'Max'},
 {'age': 43, 'city': 'New York', 'name': 'Sam'}]

In [27]:
myRDD.take(1)

[{'age': 25, 'city': 'Dallas', 'name': 'ALex'}]

In [28]:
myRDD.count()

3

## Transformations

In [29]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt")

In [30]:
myRDD.collect()

[u'lorem ipsum dolor sit amet',
 u'fusce sit lorem ipsum',
 u'donec in dolor odio ipsum',
 u'morbi ac ipsum quis enim male',
 u'donec risus nulla at pretium',
 u'nulla dolor in donec',
 u'morbi ut erat nec libero',
 u'libero nulla sit odio']

In [31]:
myRDD2 = myRDD.map(lambda line: line.upper())

In [32]:
myRDD2.collect()

[u'LOREM IPSUM DOLOR SIT AMET',
 u'FUSCE SIT LOREM IPSUM',
 u'DONEC IN DOLOR ODIO IPSUM',
 u'MORBI AC IPSUM QUIS ENIM MALE',
 u'DONEC RISUS NULLA AT PRETIUM',
 u'NULLA DOLOR IN DONEC',
 u'MORBI UT ERAT NEC LIBERO',
 u'LIBERO NULLA SIT ODIO']

In [33]:
myRDD3 = myRDD2.filter(lambda line: line.startswith('L'))

In [34]:
myRDD3.collect()

[u'LOREM IPSUM DOLOR SIT AMET', u'LIBERO NULLA SIT ODIO']

### Single-RDD Transformations

In [35]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt")

In [36]:
myRDD.collect()

[u'lorem ipsum dolor sit amet',
 u'fusce sit lorem ipsum',
 u'donec in dolor odio ipsum',
 u'morbi ac ipsum quis enim male',
 u'donec risus nulla at pretium',
 u'nulla dolor in donec',
 u'morbi ut erat nec libero',
 u'libero nulla sit odio']

In [37]:
myRDD2 = myRDD.flatMap(lambda line: line.split())

In [38]:
myRDD2.collect()

[u'lorem',
 u'ipsum',
 u'dolor',
 u'sit',
 u'amet',
 u'fusce',
 u'sit',
 u'lorem',
 u'ipsum',
 u'donec',
 u'in',
 u'dolor',
 u'odio',
 u'ipsum',
 u'morbi',
 u'ac',
 u'ipsum',
 u'quis',
 u'enim',
 u'male',
 u'donec',
 u'risus',
 u'nulla',
 u'at',
 u'pretium',
 u'nulla',
 u'dolor',
 u'in',
 u'donec',
 u'morbi',
 u'ut',
 u'erat',
 u'nec',
 u'libero',
 u'libero',
 u'nulla',
 u'sit',
 u'odio']

In [39]:
myRDD2.count()

38

In [40]:
myRDD3 = myRDD2.distinct()

In [41]:
myRDD3.collect()

[u'ac',
 u'in',
 u'erat',
 u'at',
 u'enim',
 u'quis',
 u'sit',
 u'risus',
 u'nec',
 u'amet',
 u'fusce',
 u'donec',
 u'morbi',
 u'lorem',
 u'odio',
 u'ipsum',
 u'ut',
 u'pretium',
 u'libero',
 u'dolor',
 u'nulla',
 u'male']

In [42]:
myRDD3.count()

22

### Multi-RDD Transformation

In [43]:
myData1 = ['New York', 'Dallas', 'Austin', 'Los Angeles']
myData2 = ['Dallas', 'San Francisco', 'Houston', 'Austin']

In [44]:
myRDD1 = sc.parallelize(myData1)
myRDD2 = sc.parallelize(myData2)

In [45]:
myRDD3 = myRDD1.union(myRDD2)

In [46]:
myRDD3.collect()

['New York',
 'Dallas',
 'Austin',
 'Los Angeles',
 'Dallas',
 'San Francisco',
 'Houston',
 'Austin']

In [47]:
myRDD4 = myRDD1.subtract(myRDD2)

In [48]:
myRDD4.collect()

['Los Angeles', 'New York']

In [49]:
myRDD5 = myRDD1.zip(myRDD2)

In [50]:
myRDD5.collect()

[('New York', 'Dallas'),
 ('Dallas', 'San Francisco'),
 ('Austin', 'Houston'),
 ('Los Angeles', 'Austin')]

## Chaining Transformations

In [51]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt")

In [52]:
myRDD.collect()

[u'lorem ipsum dolor sit amet',
 u'fusce sit lorem ipsum',
 u'donec in dolor odio ipsum',
 u'morbi ac ipsum quis enim male',
 u'donec risus nulla at pretium',
 u'nulla dolor in donec',
 u'morbi ut erat nec libero',
 u'libero nulla sit odio']

In [53]:
myRDD2 = myRDD.map(lambda line: line.upper())

In [54]:
myRDD2.collect()

[u'LOREM IPSUM DOLOR SIT AMET',
 u'FUSCE SIT LOREM IPSUM',
 u'DONEC IN DOLOR ODIO IPSUM',
 u'MORBI AC IPSUM QUIS ENIM MALE',
 u'DONEC RISUS NULLA AT PRETIUM',
 u'NULLA DOLOR IN DONEC',
 u'MORBI UT ERAT NEC LIBERO',
 u'LIBERO NULLA SIT ODIO']

In [55]:
myRDD3 = myRDD2.flatMap(lambda line: line.split())

In [56]:
myRDD3.collect()

[u'LOREM',
 u'IPSUM',
 u'DOLOR',
 u'SIT',
 u'AMET',
 u'FUSCE',
 u'SIT',
 u'LOREM',
 u'IPSUM',
 u'DONEC',
 u'IN',
 u'DOLOR',
 u'ODIO',
 u'IPSUM',
 u'MORBI',
 u'AC',
 u'IPSUM',
 u'QUIS',
 u'ENIM',
 u'MALE',
 u'DONEC',
 u'RISUS',
 u'NULLA',
 u'AT',
 u'PRETIUM',
 u'NULLA',
 u'DOLOR',
 u'IN',
 u'DONEC',
 u'MORBI',
 u'UT',
 u'ERAT',
 u'NEC',
 u'LIBERO',
 u'LIBERO',
 u'NULLA',
 u'SIT',
 u'ODIO']

In [58]:
myRDD4 = myRDD3.distinct()

In [59]:
myRDD4.collect()

[u'IPSUM',
 u'PRETIUM',
 u'DOLOR',
 u'NEC',
 u'ERAT',
 u'ENIM',
 u'AMET',
 u'SIT',
 u'LIBERO',
 u'MORBI',
 u'MALE',
 u'AC',
 u'RISUS',
 u'AT',
 u'IN',
 u'NULLA',
 u'ODIO',
 u'QUIS',
 u'UT',
 u'FUSCE',
 u'DONEC',
 u'LOREM']

In [60]:
myRDD4.take(5)

[u'IPSUM', u'PRETIUM', u'DOLOR', u'NEC', u'ERAT']

In [61]:
sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt") \
  .map(lambda line: line.upper()) \
  .flatMap(lambda line: line.split()) \
  .distinct() \
  .take(5)

[u'IPSUM', u'PRETIUM', u'DOLOR', u'NEC', u'ERAT']

# Lineage

In [62]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt") \
          .map(lambda line: line.upper()) \
          .flatMap(lambda line: line.split()) \
          .distinct()

In [63]:
print(myRDD.toDebugString())

(1) PythonRDD[77] at RDD at PythonRDD.scala:42 []
 |  MapPartitionsRDD[76] at mapPartitions at PythonRDD.scala:338 []
 |  ShuffledRDD[75] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(1) PairwiseRDD[74] at distinct at <ipython-input-62-4f89dfdd6bd9>:1 []
    |  PythonRDD[73] at distinct at <ipython-input-62-4f89dfdd6bd9>:1 []
    |  hdfs://localhost/big_data/Spark/myfile.txt MapPartitionsRDD[71] at textFile at NativeMethodAccessorImpl.java:-2 []
    |  hdfs://localhost/big_data/Spark/myfile.txt HadoopRDD[70] at textFile at NativeMethodAccessorImpl.java:-2 []


# Persistence

In [64]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt")

In [65]:
myRDD2 = myRDD.map(lambda line: line.upper())

In [66]:
myRDD2.persist()

PythonRDD[80] at RDD at PythonRDD.scala:42

In [67]:
myRDD3 = myRDD2.flatMap(lambda line: line.split())

In [68]:
myRDD4 = myRDD3.distinct()

In [69]:
myRDD4.count()

22

## Persistence Levels

In [None]:
from pyspark import StorageLevel

In [None]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt")

In [None]:
myRDD.persist(StorageLevel.MEMORY_ONLY)

In [None]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt")

In [None]:
myRDD.persist(StorageLevel.MEMORY_AND_DISK)

In [None]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt")

In [None]:
myRDD.persist(StorageLevel.DISK_ONLY)

# Pair RDDs

Example 1

In [70]:
!hdfs dfs -cat /big_data/Spark/user_data.txt

user01	Alex
user02	Sam
user03	Max

In [71]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/user_data.txt")

In [72]:
myRDD.collect()

[u'user01\tAlex', u'user02\tSam', u'user03\tMax']

In [73]:
myRDD2 = myRDD.map(lambda line: line.split('\t'))

In [75]:
myRDD2.collect()

[[u'user01', u'Alex'], [u'user02', u'Sam'], [u'user03', u'Max']]

In [76]:
myRDD3 = myRDD2.map(lambda fields: (fields[0], fields[1]))

In [77]:
myRDD3.collect()

[(u'user01', u'Alex'), (u'user02', u'Sam'), (u'user03', u'Max')]

Example 2

In [78]:
!hdfs dfs -cat /big_data/Spark/weblogs.txt | head -n 3

12.13.139.87 - 96828 [16/Sep/2013:23:59:44 +0100] "GET /KBDOC-00129.html HTTP/1.0" 200 17834 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F24L"
12.13.139.87 - 96828 [16/Sep/2013:23:59:44 +0100] "GET /theme.css HTTP/1.0" 200 17883 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F24L"
109.129.242.140 - 58687 [16/Sep/2013:23:57:52 +0100] "GET /KBDOC-00282.html HTTP/1.0" 200 4107 "http://www.loudacre.com"  "Loudacre Mobile Browser Titanic 2400"


In [79]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/weblogs.txt")

In [80]:
myRDD.take(1)

[u'12.13.139.87 - 96828 [16/Sep/2013:23:59:44 +0100] "GET /KBDOC-00129.html HTTP/1.0" 200 17834 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F24L"']

In [82]:
myRDD2 = myRDD.map(lambda line: line.split(' '))
myRDD2.take(1)

[[u'12.13.139.87',
  u'-',
  u'96828',
  u'[16/Sep/2013:23:59:44',
  u'+0100]',
  u'"GET',
  u'/KBDOC-00129.html',
  u'HTTP/1.0"',
  u'200',
  u'17834',
  u'"http://www.loudacre.com"',
  u'',
  u'"Loudacre',
  u'Mobile',
  u'Browser',
  u'Sorrento',
  u'F24L"']]

In [83]:
myRDD.keyBy(lambda line: line.split(' ')[2]).take(3)

[(u'96828',
  u'12.13.139.87 - 96828 [16/Sep/2013:23:59:44 +0100] "GET /KBDOC-00129.html HTTP/1.0" 200 17834 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F24L"'),
 (u'96828',
  u'12.13.139.87 - 96828 [16/Sep/2013:23:59:44 +0100] "GET /theme.css HTTP/1.0" 200 17883 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F24L"'),
 (u'58687',
  u'109.129.242.140 - 58687 [16/Sep/2013:23:57:52 +0100] "GET /KBDOC-00282.html HTTP/1.0" 200 4107 "http://www.loudacre.com"  "Loudacre Mobile Browser Titanic 2400"')]

## Pair RDDs with Complex Values

Example 1

In [84]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/zip_codes.txt")

In [85]:
myRDD.collect()

[u'75248 65.32658 72.45998',
 u'75254 63.25698 72.25468',
 u'75263 61.25648 71.45689',
 u'75349 60.23546 74.26598',
 u'75102 59.56498 70.26654']

In [86]:
myRDD2 = myRDD.map(lambda line: line.split())

In [88]:
myRDD2.collect()

[[u'75248', u'65.32658', u'72.45998'],
 [u'75254', u'63.25698', u'72.25468'],
 [u'75263', u'61.25648', u'71.45689'],
 [u'75349', u'60.23546', u'74.26598'],
 [u'75102', u'59.56498', u'70.26654']]

In [89]:
myRDD3 = myRDD2.map(lambda fields: (fields[0],(fields[1], fields[2])))

In [90]:
myRDD3.collect()

[(u'75248', (u'65.32658', u'72.45998')),
 (u'75254', (u'63.25698', u'72.25468')),
 (u'75263', (u'61.25648', u'71.45689')),
 (u'75349', (u'60.23546', u'74.26598')),
 (u'75102', (u'59.56498', u'70.26654'))]

Example 2

In [246]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/orders.txt")

In [247]:
myRDD.collect()

[u'75248 oid6813:oid9863:oid2465',
 u'75254 oid4682:oid9242:oid9534',
 u'75263 oid1326:oid4532',
 u'75349 oid6813',
 u'75102 oid6813:oid9863']

In [248]:
myRDD2 = myRDD.map(lambda line: line.split(' '))
myRDD2.collect()

[[u'75248', u'oid6813:oid9863:oid2465'],
 [u'75254', u'oid4682:oid9242:oid9534'],
 [u'75263', u'oid1326:oid4532'],
 [u'75349', u'oid6813'],
 [u'75102', u'oid6813:oid9863']]

In [249]:
myRDD3 = myRDD2.map(lambda fields: (fields[0], fields[1]))
myRDD3.collect()

[(u'75248', u'oid6813:oid9863:oid2465'),
 (u'75254', u'oid4682:oid9242:oid9534'),
 (u'75263', u'oid1326:oid4532'),
 (u'75349', u'oid6813'),
 (u'75102', u'oid6813:oid9863')]

In [250]:
myRDD4 = myRDD3.flatMapValues(lambda oid: oid.split(':'))
myRDD4.collect()

[(u'75248', u'oid6813'),
 (u'75248', u'oid9863'),
 (u'75248', u'oid2465'),
 (u'75254', u'oid4682'),
 (u'75254', u'oid9242'),
 (u'75254', u'oid9534'),
 (u'75263', u'oid1326'),
 (u'75263', u'oid4532'),
 (u'75349', u'oid6813'),
 (u'75102', u'oid6813'),
 (u'75102', u'oid9863')]

# Spark Map-Reduce

In [None]:
myRDD = sc.textFile("hdfs://localhost/big_data/Spark/myfile.txt")
myRDD.collect()

In [None]:
myRDD2 = myRDD.flatMap(lambda line: line.split())
myRDD2.collect()

In [None]:
myRDD3 = myRDD2.map(lambda word: (word, 1))
myRDD3.collect()

In [None]:
myRDD4 = myRDD3.reduceByKey(lambda v1,v2: v1+v2)
myRDD4.collect()

# Other Operations with Pair RDDs

## Example for Join

### Preparing Weblogs containing requests

In [None]:
import re

In [None]:
def getRequestDoc(s):
    return re.search(r'KBDOC-[0-9]*', s).group()

In [None]:
logs = "file:/home/training/training_materials/data/weblogs/*"    
kbreqs = sc.textFile(logs)
kbreqs.take(3)

In [None]:
kbreqs2 = kbreqs.filter(lambda line: 'KBDOC-' in line)
kbreqs2.take(3)

In [None]:
kbreqs3 = kbreqs2.map(lambda line: (getRequestDoc(line), line.split(' ')[2])).distinct()
kbreqs3.take(3)

### Preparing Knowledge Base artiles containing titles

In [None]:
kblistfile = "file:/home/training/training_materials/data/kb/*.html"
kblist = sc.wholeTextFiles(kblistfile)
kblist.take(1)

In [None]:
print kblist.take(1)[0][0]
print kblist.take(1)[0][1]

In [None]:
kblist2 = kblist.map(lambda file: (file[0].split('/')[-1].split('.')[0], file[1].split('<title>')[1].split('</title>')[0]))
kblist2.take(3)

### Merging requests with KB files

In [None]:
titlereqs = kbreqs3.join(kblist2)
titlereqs.take(3)

In [None]:
titlereqs2 = titlereqs.map(lambda (docid, (userid, title)): (userid, title)).groupByKey()
l = titlereqs2.take(5)

In [None]:
for (userid,titles) in titlereqs2.take(10):
    print "userid:", userid
    for title in titles: print '\t', title

In [None]:
for i in l:
    for j in i[1]:
        print i[0], j

# Building Spark Applications

In [None]:
# import sys
# from pyspark import SparkContext


# if __name__ == "__main__":
#     if len(sys.argv) < 2:
#         print >> sys.stderr, "Usage: WordCount <file>"
#         exit(-1)
        
#     sc = SparkContext()
    
#     counts = sc.textFile(sys.argv[1]) \
#                .flatMap(lamda line: line.split()) \
#                .map(lambda word: (word, 1)) \
#                .reduceByKey(lambda v1, v2: v1+v2)
    
#     for pair in counts.take(5): print pair
        
#     sc.stop()

# Configuring Spark Application

In [None]:
# import sys
# from pyspark import SparkContext
# from pyspark import SparkConf

# if __name__ == "__main__":
#     if len(sys.argv) < 2:
#         print >> sys.stderr, "Usage: WordCount <file>"
#         exit(-1)
    
#     sconf = SparkConf() \
#                 .setAppName("Word Count") \
#                 .set("spark.ui.port", "4141")
        
#     sc = SparkContext(conf=sconf)
    
#     counts = sc.textFile(sys.argv[1]) \
#                .flatMap(lamda line: line.split()) \
#                .map(lambda word: (word, 1)) \
#                .reduceByKey(lambda v1, v2: v1+v2)
    
#     for pair in counts.take(5): print pair
        
#     sc.stop()

# Spark SQL

## SQL Context

In [91]:
from pyspark import SQLContext

In [92]:
sqlCtx = SQLContext(sc)

# Creating DataFrames

## From a Text/CSV File

In [93]:
from pyspark.sql import *
from pyspark.sql.types import *

In [94]:
studentsRDD = sc.textFile("hdfs://localhost/big_data/Spark/data.txt")

In [95]:
studentsRDD.take(2)

[u'1,Donald,Becton,Oakland,CA,3.5,3.7',
 u'2,Donna,Jones,San Francisco,TX,4.0,2.6']

In [96]:
studentsRDD2 = studentsRDD.map(lambda l: l.split(","))

In [97]:
studentsRDD2.take(2)

[[u'1', u'Donald', u'Becton', u'Oakland', u'CA', u'3.5', u'3.7'],
 [u'2', u'Donna', u'Jones', u'San Francisco', u'TX', u'4.0', u'2.6']]

In [98]:
studentsRDD3 = studentsRDD2.map(lambda p: (int(p[0]), p[1], p[2], p[3], \
                                              p[4], float(p[5]), float(p[6])))

In [99]:
studentsRDD3.take(2)

[(1, u'Donald', u'Becton', u'Oakland', u'CA', 3.5, 3.7),
 (2, u'Donna', u'Jones', u'San Francisco', u'TX', 4.0, 2.6)]

In [102]:
field_names = ['id', 'firstname', 'lastname', 'city', 'state', 'math', 'physics']

In [103]:
fields = [StructField('id', IntegerType(), True), 
          StructField('firstname', StringType(), True), 
          StructField('lastname', StringType(), True), 
          StructField('city', StringType(), True), 
          StructField('state', StringType(), True), 
          StructField('math', DoubleType(), True), 
          StructField('physics', DoubleType(), True)]

In [104]:
fields

[StructField(id,IntegerType,true),
 StructField(firstname,StringType,true),
 StructField(lastname,StringType,true),
 StructField(city,StringType,true),
 StructField(state,StringType,true),
 StructField(math,DoubleType,true),
 StructField(physics,DoubleType,true)]

In [105]:
studentsSchema = StructType(fields)

In [107]:
studentsDF = sqlCtx.createDataFrame(studentsRDD3, studentsSchema)

In [108]:
studentsDF

DataFrame[id: int, firstname: string, lastname: string, city: string, state: string, math: double, physics: double]

## From a JSON File

In [110]:
studentsDF = sqlCtx.jsonFile("hdfs://localhost/big_data/Spark/students.json")

In [111]:
studentsDF

DataFrame[age: string, city: string, id: string, math: string, name: string, physics: string]

## From MySQL Database

In [112]:
accountsDF = sqlCtx.load(source="jdbc", 
                         url="jdbc:mysql://localhost/loudacre?user=training&password=training", 
                         dbtable="accounts")

In [113]:
accountsDF

DataFrame[acct_num: int, acct_create_dt: timestamp, acct_close_dt: timestamp, first_name: string, last_name: string, address: string, city: string, state: string, zipcode: string, phone_number: string, created: timestamp, modified: timestamp]

# Transforming and Querying DataFrames

## Basic Operations

In [114]:
studentsDF.schema

StructType(List(StructField(age,StringType,true),StructField(city,StringType,true),StructField(id,StringType,true),StructField(math,StringType,true),StructField(name,StringType,true),StructField(physics,StringType,true)))

In [115]:
studentsDF.printSchema()

root
 |-- age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- id: string (nullable = true)
 |-- math: string (nullable = true)
 |-- name: string (nullable = true)
 |-- physics: string (nullable = true)



In [116]:
studentsDF.columns

[u'age', u'city', u'id', u'math', u'name', u'physics']

In [117]:
studentsDF.dtypes

[('age', 'string'),
 ('city', 'string'),
 ('id', 'string'),
 ('math', 'string'),
 ('name', 'string'),
 ('physics', 'string')]

In [118]:
studentsDF.explain

<bound method DataFrame.explain of DataFrame[age: string, city: string, id: string, math: string, name: string, physics: string]>

## DataFrame Actions

In [119]:
studentsDF.count()

6L

In [120]:
studentsDF.show(3)

age city    id     math name  physics
23  Dallas  765342 3.5  Alice 3.2    
25  Austin  736253 3.7  Max   3.5    
31  Houston 674629 3.2  Alex  3.9    


In [121]:
studentsDF.take(3)

[Row(age=u'23', city=u'Dallas', id=u'765342', math=u'3.5', name=u'Alice', physics=u'3.2'),
 Row(age=u'25', city=u'Austin', id=u'736253', math=u'3.7', name=u'Max', physics=u'3.5'),
 Row(age=u'31', city=u'Houston', id=u'674629', math=u'3.2', name=u'Alex', physics=u'3.9')]

In [122]:
studentsDF.collect()

[Row(age=u'23', city=u'Dallas', id=u'765342', math=u'3.5', name=u'Alice', physics=u'3.2'),
 Row(age=u'25', city=u'Austin', id=u'736253', math=u'3.7', name=u'Max', physics=u'3.5'),
 Row(age=u'31', city=u'Houston', id=u'674629', math=u'3.2', name=u'Alex', physics=u'3.9'),
 Row(age=u'22', city=u'Dallas', id=u'467392', math=u'3.9', name=u'Farzad', physics=u'3.8'),
 Row(age=u'27', city=u'Los Angeles', id=u'983267', math=u'3.3', name=u'Sam', physics=u'3.0'),
 Row(age=u'21', city=u'New Yorl', id=u'354901', math=u'3.0', name=u'Daivd', physics=u'2.7')]

## DataFrame Queries

In [123]:
studentsDF.limit(2).show()

age city   id     math name  physics
23  Dallas 765342 3.5  Alice 3.2    
25  Austin 736253 3.7  Max   3.5    


In [124]:
studentsDF.select('name','age').show(2)

name  age
Alice 23 
Max   25 


In [125]:
studentsDF.where('math > 3.5').show()

age city   id     math name   physics
25  Austin 736253 3.7  Max    3.5    
22  Dallas 467392 3.9  Farzad 3.8    


In [126]:
studentsDF.filter('math < 3.5').show(1)

age city    id     math name physics
31  Houston 674629 3.2  Alex 3.9    


In [127]:
studentsDF.where(studentsDF.age > 25).show(1)

age city    id     math name physics
31  Houston 674629 3.2  Alex 3.9    


In [128]:
studentsDF.select(studentsDF.id, (studentsDF.math + studentsDF.physics)/2).show()

id     ((math + physics) / 2)
765342 3.35                  
736253 3.6                   
674629 3.55                  
467392 3.8499999999999996    
983267 3.15                  
354901 2.85                  


In [129]:
studentsDF.sort(studentsDF.age.asc()).show()

age city        id     math name   physics
21  New Yorl    354901 3.0  Daivd  2.7    
22  Dallas      467392 3.9  Farzad 3.8    
23  Dallas      765342 3.5  Alice  3.2    
25  Austin      736253 3.7  Max    3.5    
27  Los Angeles 983267 3.3  Sam    3.0    
31  Houston     674629 3.2  Alex   3.9    


You can also perform any free form SQL queries

In [130]:
studentsDF.registerAsTable("students")

In [131]:
result = sqlCtx.sql("""
            SELECT * 
            FROM students
            WHERE age > 25 AND physics > 3.5""")

In [132]:
result.show()

age city    id     math name physics
31  Houston 674629 3.2  Alex 3.9    


# DataFrames and RDDs

In [133]:
studentsDF.show()

age city        id     math name   physics
23  Dallas      765342 3.5  Alice  3.2    
25  Austin      736253 3.7  Max    3.5    
31  Houston     674629 3.2  Alex   3.9    
22  Dallas      467392 3.9  Farzad 3.8    
27  Los Angeles 983267 3.3  Sam    3.0    
21  New Yorl    354901 3.0  Daivd  2.7    


In [134]:
studentsRDD = studentsDF.rdd

In [135]:
studentsRDD.collect()

[Row(age=u'23', city=u'Dallas', id=u'765342', math=u'3.5', name=u'Alice', physics=u'3.2'),
 Row(age=u'25', city=u'Austin', id=u'736253', math=u'3.7', name=u'Max', physics=u'3.5'),
 Row(age=u'31', city=u'Houston', id=u'674629', math=u'3.2', name=u'Alex', physics=u'3.9'),
 Row(age=u'22', city=u'Dallas', id=u'467392', math=u'3.9', name=u'Farzad', physics=u'3.8'),
 Row(age=u'27', city=u'Los Angeles', id=u'983267', math=u'3.3', name=u'Sam', physics=u'3.0'),
 Row(age=u'21', city=u'New Yorl', id=u'354901', math=u'3.0', name=u'Daivd', physics=u'2.7')]

In [136]:
studentsRDD2 = studentsRDD.map(lambda row: (row.id, row.math, row.physics))
studentsRDD2.take(3)

[(u'765342', u'3.5', u'3.2'),
 (u'736253', u'3.7', u'3.5'),
 (u'674629', u'3.2', u'3.9')]