In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [3]:
sc = pyspark.SparkContext(appName = "Test")

In [4]:
# Spark interface http://localhost:4043

## Pair RDDs

**1. Number of user requests**

In [47]:
logs = sc.textFile("file:/home/bego/Documents/curso_spark/datasets/weblogs/*")
logs.take(5)

['116.180.70.237 - 128 [15/Sep/2013:23:59:53 +0100] "GET /KBDOC-00031.html HTTP/1.0" 200 1388 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '116.180.70.237 - 128 [15/Sep/2013:23:59:53 +0100] "GET /theme.css HTTP/1.0" 200 5531 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '218.193.16.244 - 94 [15/Sep/2013:23:58:45 +0100] "GET /KBDOC-00273.html HTTP/1.0" 200 5325 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '218.193.16.244 - 94 [15/Sep/2013:23:58:45 +0100] "GET /theme.css HTTP/1.0" 200 9463 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '198.122.118.164 - 131 [15/Sep/2013:23:58:02 +0100] "GET /KBDOC-00117.html HTTP/1.0" 200 15818 "http://www.loudacre.com"  "Loudacre CSR Browser"']

In [6]:
userreqs = logs.map(lambda x: (x.split(" ")[2], 1))
userreqs.take(5)

[('128', 1), ('128', 1), ('94', 1), ('94', 1), ('131', 1)]

In [7]:
reqs_per_user = userreqs.reduceByKey(lambda accum, n: accum + n).sortByKey()
reqs_per_user.take(20)

[('1', 528),
 ('10', 472),
 ('100', 540),
 ('10000', 2),
 ('100000', 2),
 ('100003', 14),
 ('100004', 16),
 ('100008', 6),
 ('100009', 4),
 ('100012', 20),
 ('100019', 20),
 ('100022', 10),
 ('100035', 2),
 ('100038', 6),
 ('100048', 14),
 ('100053', 12),
 ('100058', 2),
 ('100062', 8),
 ('100079', 8),
 ('100081', 14)]

**2. Top 10 users by number of hits**

In [8]:
top_10_hits = reqs_per_user.map(lambda x: (x[1], x[0])).sortByKey(False)
top_10_hits.take(5)

[(667, '193'), (636, '13'), (620, '24'), (610, '147'), (606, '110')]

**3. UserID-Address RDD**

In [9]:
addresses_user = logs.map(lambda x: (x.split(" ")[2], x.split(" ")[0])).groupByKey().map(lambda x: (x[0], list(x[1])))
addresses_user.take(1)

[('54126',
  ['176.80.85.20',
   '176.80.85.20',
   '220.61.164.139',
   '220.61.164.139',
   '224.190.37.8',
   '224.190.37.8',
   '17.171.169.4',
   '17.171.169.4',
   '17.171.169.4',
   '17.171.169.4',
   '191.88.233.113',
   '191.88.233.113',
   '191.88.233.113',
   '191.88.233.113',
   '211.209.98.155',
   '211.209.98.155',
   '75.13.247.212',
   '75.13.247.212',
   '161.94.252.10',
   '161.94.252.10',
   '69.103.100.120',
   '69.103.100.120',
   '216.249.102.215',
   '216.249.102.215',
   '244.131.187.64',
   '244.131.187.64',
   '104.14.31.172',
   '104.14.31.172',
   '189.97.51.33',
   '189.97.51.33',
   '31.252.214.103',
   '31.252.214.103'])]

**4.1 Map account data to (userid,[values....])**

In [10]:
accounts = sc.textFile("file:/home/bego/Documents/curso_spark/datasets/accounts.csv")
accounts.take(1)

['1,2008-12-31 15:05:45,2013-12-29 09:53:35,Donald,Becton,2275 Washburn Street,Oakland,CA,94656,5104529635,2013-12-27 15:01:36,2013-12-27 15:01:36']

In [11]:
#accounts_id = accounts.keyBy(lambda x: x[0])
accounts_id = accounts.map(lambda x: (x.split(",")[0], list(x.split(",")))).sortByKey() #groupByKey().map(lambda x: (x[0], list(x[1])))
accounts_id.take(1)

[('1',
  ['1',
   '2008-12-31 15:05:45',
   '2013-12-29 09:53:35',
   'Donald',
   'Becton',
   '2275 Washburn Street',
   'Oakland',
   'CA',
   '94656',
   '5104529635',
   '2013-12-27 15:01:36',
   '2013-12-27 15:01:36'])]

**4.2 Join account data with userreqs then merge hit count into valuelist**

In [12]:
all_data = accounts_id.join(reqs_per_user).sortByKey()
all_data.take(2)

[('1',
  (['1',
    '2008-12-31 15:05:45',
    '2013-12-29 09:53:35',
    'Donald',
    'Becton',
    '2275 Washburn Street',
    'Oakland',
    'CA',
    '94656',
    '5104529635',
    '2013-12-27 15:01:36',
    '2013-12-27 15:01:36'],
   528)),
 ('10',
  (['10',
    '2008-11-07 21:04:07',
    '2013-06-24 06:16:28',
    'Diane',
    'Nelson',
    '921 Sardis Sta',
    'Santa Rosa',
    'CA',
    '94966',
    '7072839759',
    '2013-12-27 15:01:36',
    '2013-12-27 15:01:36'],
   472))]

**4.3 Show fields**

In [13]:
for userid, (values, count) in all_data.take(5):
    print(userid, count, values[3], values[4])

1 528 Donald Becton
10 472 Diane Nelson
100 540 Manuel Cerrato
10000 2 Teresa French
100000 2 Hope Smith


**Bonus.1 People per postal code**

In [14]:
accounts_pc = accounts.keyBy(lambda x: x.split(",")[8])
accounts_pc.take(1)

[('94656',
  '1,2008-12-31 15:05:45,2013-12-29 09:53:35,Donald,Becton,2275 Washburn Street,Oakland,CA,94656,5104529635,2013-12-27 15:01:36,2013-12-27 15:01:36')]

In [15]:
pc_names = accounts_pc.map(lambda x: (x[0], (x[1].split(",")[4],x[1].split(",")[3]))).groupByKey().map(lambda x: (x[0], list(x[1]))).sortByKey()
# can also use mapValues
pc_names.take(1)

[('85000',
  [('Willson', 'Leon'),
   ('Clark', 'Ronald'),
   ('Rush', 'Juanita'),
   ('Woodhouse', 'Roger'),
   ('Baptist', 'Colin'),
   ('King', 'Percy'),
   ('Carmack', 'David'),
   ('Milan', 'Ana'),
   ('McCurdy', 'Kendra'),
   ('Pitts', 'Robert'),
   ('Hopkins', 'Leslie'),
   ('Butler', 'Paul'),
   ('Barth', 'Phyllis')])]

In [16]:
for pc,people in pc_names.take(5):
    print("---{}".format(pc))
    for surname,name in people:
        print("{},{}".format(surname,name))

---85000
Willson,Leon
Clark,Ronald
Rush,Juanita
Woodhouse,Roger
Baptist,Colin
King,Percy
Carmack,David
Milan,Ana
McCurdy,Kendra
Pitts,Robert
Hopkins,Leslie
Butler,Paul
Barth,Phyllis
---85001
Cross,David
Pritchett,Danny
Sistrunk,Lennie
Sweet,Jeffery
Buckles,Nancy
James,Katie
Tutor,Anthony
Battle,Mark
Hiller,Carol
Landa,Nancy
Marks,Sergio
Sprague,Barbara
Greenwell,Angela
Helms,Sabrina
Allen,Sharon
Waller,Gregory
---85002
Whitmore,Alan
Chandler,Tara
Robinson,Diane
Brown,Henry
Sisson,Lacey
Root,Elfriede
Lynch,Barbara
Dixon,Jeremy
Hampton,David
Norman,Elizabeth
Granados,Lynnette
Sullivan,Martha
Novak,Grant
Johnson,Katrina
McConville,Jennifer
Sherer,Rose
---85003
Jenkins,Thad
Rick,Edward
Lindsay,Ivy
Oneil,Beth
Post,Elizabeth
Taylor,Maude
Smith,Wesley
Wolfe,Gilbert
Jenkins,Richard
Carmichael,David
Jaimes,Earl
Jennings,Guillermo
---85004
Morris,Eric
Reiser,Hazel
Gregg,Alicia
Preston,Elizabeth
Hass,Julie
Gunn,Debra
Frye,Terry
Reyes,Jack
Thomas,Brenda
Stowe,Lloyd
Vaughn,Tommy
Edmond,Barry


## HDFS files

In [48]:
logs = sc.textFile("/tmp/curso/weblogs/2014-02-04.log")
logs.take(5)

['52.186.163.97 - 93 [04/Feb/2014:23:59:55 +0100] "GET /KBDOC-00141.html HTTP/1.0" 200 13580 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '52.186.163.97 - 93 [04/Feb/2014:23:59:55 +0100] "GET /theme.css HTTP/1.0" 200 18642 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '165.73.165.250 - 130 [04/Feb/2014:23:59:22 +0100] "GET /KBDOC-00189.html HTTP/1.0" 200 5182 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '165.73.165.250 - 130 [04/Feb/2014:23:59:22 +0100] "GET /theme.css HTTP/1.0" 200 16441 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '230.236.75.86 - 4921 [04/Feb/2014:23:59:17 +0100] "GET /KBDOC-00002.html HTTP/1.0" 200 10216 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F00L"']

In [50]:
jpgs = logs.filter(lambda x: ".jpg" in x)
jpgs.saveAsTextFile("/tmp/curso/jpgs4")

In [55]:
jpgs.take(5)

['65.103.192.86 - 88816 [04/Feb/2014:23:58:34 +0100] "GET /titanic_2400.jpg HTTP/1.0" 200 2935 "http://www.loudacre.com"  "Loudacre Mobile Browser iFruit 3A"',
 '102.100.4.122 - 40439 [04/Feb/2014:23:58:20 +0100] "GET /ronin_s1.jpg HTTP/1.0" 200 13346 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F41L"',
 '215.166.217.166 - 40984 [04/Feb/2014:23:54:39 +0100] "GET /sorrento_f21l.jpg HTTP/1.0" 200 11038 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F11L"',
 '79.242.61.21 - 46925 [04/Feb/2014:23:50:52 +0100] "GET /ifruit_4.jpg HTTP/1.0" 200 5951 "http://www.loudacre.com"  "Loudacre Mobile Browser MeeToo 2.0"',
 '73.233.61.30 - 75640 [04/Feb/2014:23:42:48 +0100] "GET /ifruit_3.jpg HTTP/1.0" 200 14506 "http://www.loudacre.com"  "Loudacre Mobile Browser Titanic 2000"']

In [79]:
jpgs_name = jpgs.map(lambda x: x.split(" ")[6][1:].split(".")[0])
jpgs_name.count()

408

In [77]:
tm = sc.textFile("/home/bego/Documents/curso_spark/datasets/targetmodels.txt").map(lambda x: x.replace(" ", "_").lower())
tm_proc = [e[0] for e in tm.zipWithUniqueId().collect()]
tm_proc

['ifruit_5a',
 'ifruit_5',
 'ifruit_4a',
 'ifruit_4',
 'titanic_4000',
 'titanic_3000',
 'titanic_2500',
 'sorrento_f41l',
 'sorrento_f40l',
 'ronin_s4',
 'ronin_s3',
 'ronin_s2',
 'meetoo_5.1',
 'meetoo_5.0']

In [80]:
jpgs_tm = jpgs_name.filter(lambda x: x in tm_proc)
jpgs_tm.count()

104

## Working with partitions

In [21]:
# $ hadoop fs -put activations /tmp/curso/activations
activations = sc.textFile("/tmp/curso/activations/*")
print(activations.take(20))

# show the partitioning
print(activations.toDebugString())

['<activations>', '\t  <activation timestamp="1288545628" type="phone">', '\t    <account-number>8306</account-number>', '\t    <device-id>1d2d4f90-0244-4329-a9d2-615db7747930</device-id>', '\t    <phone-number>4085245772</phone-number>', '\t    <model>Titanic 1000</model>', '\t  </activation>', '\t  \t\t  <activation timestamp="1288542060" type="phone">', '\t    <account-number>63465</account-number>', '\t    <device-id>7ed8b07f-81dc-4a33-9944-c46acf888c60</device-id>', '\t    <phone-number>9281057035</phone-number>', '\t    <model>MeeToo 2.0</model>', '\t  </activation>', '\t  \t\t  <activation timestamp="1288539280" type="phone">', '\t    <account-number>8008</account-number>', '\t    <device-id>00615b91-c997-4038-b70c-e456d7e90e4b</device-id>', '\t    <phone-number>6260201208</phone-number>', '\t    <model>MeeToo 1.0</model>', '\t  </activation>', '\t  \t\t  <activation timestamp="1288539231" type="phone">']
b'(61) /tmp/curso/activations/* MapPartitionsRDD[66] at textFile at Native

In [22]:
import xml.etree.ElementTree as ElementTree

# return an iterator of activation Elements contained in the partition
def getactivations(fileiterator):
    s = ''
    for i in fileiterator: s = s + str(i)
    filetree = ElementTree.fromstring(s)
    return filetree.getiterator('activation')

# get the model name from a device activation record
def getmodel(activation):
    return activation.find('model').text 

In [23]:
# parse each partition as a file into an activation XML record
activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
activationTrees.take(2)

[<Element 'activation' at 0x7faf74398f98>,
 <Element 'activation' at 0x7faf7436def8>]

In [24]:
# map each activation record to a device model name
models = activationTrees.map(lambda activation: getmodel(activation))
print(models.take(5))

# show the partitioning
print(models.toDebugString())

['Titanic 1000', 'MeeToo 2.0', 'MeeToo 1.0', 'Sorrento F01L', 'Titanic 1100']
b'(61) PythonRDD[70] at RDD at PythonRDD.scala:53 []\n |   /tmp/curso/activations/* MapPartitionsRDD[66] at textFile at NativeMethodAccessorImpl.java:0 []\n |   /tmp/curso/activations/* HadoopRDD[65] at textFile at NativeMethodAccessorImpl.java:0 []'


In [25]:
# count activations by model
modelcounts = models.map(lambda model: (model,1)).reduceByKey(lambda v1,v2:v1+v2)
print(modelcounts.take(5))

# show the partitioning
print(modelcounts.toDebugString())

[('iFruit 2', 4061), ('Ronin S1', 4027), ('Titanic 4000', 1378), ('Sorrento F10L', 4633), ('Sorrento F23L', 2676)]
b'(61) PythonRDD[77] at RDD at PythonRDD.scala:53 []\n |   MapPartitionsRDD[74] at mapPartitions at PythonRDD.scala:133 []\n |   ShuffledRDD[73] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(61) PairwiseRDD[72] at reduceByKey at <ipython-input-25-88e9ab53c366>:2 []\n    |   PythonRDD[71] at reduceByKey at <ipython-input-25-88e9ab53c366>:2 []\n    |   /tmp/curso/activations/* MapPartitionsRDD[66] at textFile at NativeMethodAccessorImpl.java:0 []\n    |   /tmp/curso/activations/* HadoopRDD[65] at textFile at NativeMethodAccessorImpl.java:0 []'


In [26]:
# dislay the top 10 models
for count, model in modelcounts.map(lambda x: (x[1], x[0])).top(10):
    print(count,model)

5301 Titanic 1100
5238 Sorrento F00L
5232 Titanic 1000
5144 iFruit 1
5019 MeeToo 1.0
4863 Sorrento F01L
4633 Sorrento F10L
4504 Titanic 2000
4159 Titanic 2100
4132 Titanic 2200


## Using RDD cache

In [27]:
# count number of models
models.count()

124799

In [28]:
models.cache()

PythonRDD[70] at RDD at PythonRDD.scala:53

In [29]:
# substantial reduction in time
models.count()

124799

In [30]:
# spark interface: Storage, Executors

In [31]:
models.unpersist()

PythonRDD[70] at RDD at PythonRDD.scala:53

In [32]:
from pyspark import StorageLevel
# stores RDD in disk
models.persist(StorageLevel.DISK_ONLY)

PythonRDD[70] at RDD at PythonRDD.scala:53

In [33]:
models.count()

124799

## Using checkpoints

In [34]:
sc.setCheckpointDir("checkpoints") # to store data in HDFS

In [39]:
mydata = sc.parallelize(list(range(1,6)))

In [41]:
for i in range(400):
    mydata = mydata.map(lambda x: x+1) # without checkpoint this produces a StackOverflow
    mydata.checkpoint()
    mydata.count() # checkpoint does not materialize until an action

<bound method RDD.count of PythonRDD[1589] at RDD at PythonRDD.scala:53>


In [43]:
mydata.collect()

[709, 710, 711, 712, 713]

In [44]:
mydata.toDebugString()

b'(1) PythonRDD[1589] at RDD at PythonRDD.scala:53 []\n |  ReliableCheckpointRDD[1591] at count at <ipython-input-41-da7224145dd0>:4 []'

## Destroy SC

In [None]:
# destroy the spark context
#  sc.stop()

## Accumulators

In [101]:
logs = sc.textFile("/tmp/curso/weblogs/*")

In [103]:
logs.take(5)

['116.180.70.237 - 128 [15/Sep/2013:23:59:53 +0100] "GET /KBDOC-00031.html HTTP/1.0" 200 1388 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '116.180.70.237 - 128 [15/Sep/2013:23:59:53 +0100] "GET /theme.css HTTP/1.0" 200 5531 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '218.193.16.244 - 94 [15/Sep/2013:23:58:45 +0100] "GET /KBDOC-00273.html HTTP/1.0" 200 5325 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '218.193.16.244 - 94 [15/Sep/2013:23:58:45 +0100] "GET /theme.css HTTP/1.0" 200 9463 "http://www.loudacre.com"  "Loudacre CSR Browser"',
 '198.122.118.164 - 131 [15/Sep/2013:23:58:02 +0100] "GET /KBDOC-00117.html HTTP/1.0" 200 15818 "http://www.loudacre.com"  "Loudacre CSR Browser"']

In [113]:
def count(line):
    files = {"html":html, "css":css, "jpg": jpg}
    for e in files:
        if e in line:
            files[e] +=1

html = sc.accumulator(0)
css = sc.accumulator(0)
jpg = sc.accumulator(0)

logs.foreach(lambda line: count(line))

print("""
Total requests:\n
.css --> {}\n
.html --> {}\n
.jpg --> {}""".format(css.value, html.value, jpg.value))


Total requests:

.css --> 182067

.html --> 182067

.jpg --> 24743



## SparkStreaming

In [None]:
"""# terminal 1
nc -lkv 4444"""

In [5]:
"""# terminal 2
pyspark --master local[2]
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc,5)
ssc.checkpoint("/tmp/curso")
mystream = ssc.socketTextStream("localhost", 4444)
words = mystream.flatMap(lambda x: x.split(" "))
wordsCount = words.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)
def updateFunc(newValues, lastSum):
    if lastSum is None:
        lastSum = 0
    return sum(newValues, lastSum)
wordsCountAccum = wordsCount.updateStateByKey(updateFunc)
wordsCountAccum.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()"""

## Spark SQL

In [32]:
from pyspark.sql import SQLContext, Row, DataFrame

In [21]:
ssc = SQLContext(sc)

In [30]:
zips = ssc.read.load("/tmp/curso/zips.json", format="json")

In [31]:
zips.show()

+-----+---------------+--------------------+-----+-----+
|  _id|           city|                 loc|  pop|state|
+-----+---------------+--------------------+-----+-----+
|01001|         AGAWAM|[-72.622739, 42.0...|15338|   MA|
|01002|        CUSHMAN|[-72.51565, 42.37...|36963|   MA|
|01005|          BARRE|[-72.108354, 42.4...| 4546|   MA|
|01007|    BELCHERTOWN|[-72.410953, 42.2...|10579|   MA|
|01008|      BLANDFORD|[-72.936114, 42.1...| 1240|   MA|
|01010|      BRIMFIELD|[-72.188455, 42.1...| 3706|   MA|
|01011|        CHESTER|[-72.988761, 42.2...| 1688|   MA|
|01012|   CHESTERFIELD|[-72.833309, 42.3...|  177|   MA|
|01013|       CHICOPEE|[-72.607962, 42.1...|23396|   MA|
|01020|       CHICOPEE|[-72.576142, 42.1...|31495|   MA|
|01022|   WESTOVER AFB|[-72.558657, 42.1...| 1764|   MA|
|01026|     CUMMINGTON|[-72.905767, 42.4...| 1484|   MA|
|01027|      MOUNT TOM|[-72.679921, 42.2...|16864|   MA|
|01028|EAST LONGMEADOW|[-72.505565, 42.0...|13367|   MA|
|01030|  FEEDING HILLS|[-72.675

In [35]:
big_zips = zips.filter(zips.pop > 10000).collect()
print(len(big_zips))

7645


In [33]:
zips.createOrReplaceTempView("zips") # new -registerTempTable deprecated

In [37]:
big_zips_sql = ssc.sql("SELECT * FROM zips where pop > 10000").collect()
print(len(big_zips_sql))

7645


In [52]:
# city with more than 100 pc
pc_city = ssc.sql("SELECT city, COUNT(_id) as pc FROM zips GROUP BY city HAVING pc > 100")
pc_city.show()

+-------+---+
|   city| pc|
+-------+---+
|HOUSTON|101|
+-------+---+



In [None]:
# population Wisconsin
wisconsin = ssc.sql("SELECT state, SUM(pop) as pop FROM zips WHERE state = 'WI' GROUP BY state")
wisconsin.show()

In [72]:
# five more populated states
pop_states = ssc.sql("SELECT state, SUM(pop) as pop FROM zips GROUP BY state ORDER BY pop DESC LIMIT 5")
pop_states.show()

+-----+--------+
|state|     pop|
+-----+--------+
|   CA|29760021|
|   NY|17990455|
|   TX|16986510|
|   FL|12937926|
|   PA|11881643|
+-----+--------+

