## Spark on AWS Elastic Map Reduce.
The first step to using this is to deploy an aws emr cluster using the spark option.  

Then modify the the port setting in the security profile so that port 8192 is exposed and your ssh key pair is set correctlly.  when it comes up login to to the master with 
<pre>
$ssh -i /path-to-your-keyfile/keyfile.pem  hadoop@ipaddress-of-master
</pre>
next do the following
<pre>
$sudo pip install ipython[notebook]

$ipython profile create default

$echo "c = get_config()" > /home/hadoop/.ipython/profile_default/ipython_notebook_config.py
$echo "c.NotebookApp.ip = '*'" >> /home/hadoop/.ipython/profile_default/ipython_notebook_config.py
$echo "c.NotebookApp.open_browser = False" >> /home/hadoop/.ipython/profile_default/ipython_notebook_config.py
$echo "c.NotebookApp.port = 8192" >> /home/hadoop/.ipython/profile_default/ipython_notebook_config.py
$export PYSPARK_DRIVER_PYTHON=/usr/local/bin/jupyter
$export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
$export MASTER=yarn
</pre>
Next create a file called runjup.sh with one line
<pre>
PYSPARK_DRIVER_PYTHON=/usr/local/bin/jupyter /usr/lib/spark/bin/pyspark --master yarn
</pre>
(there may be some redundancy here, but this works)
finally execute
<pre>
$nohup ./runjup.sh &
</pre>
The examples here are based on Wikipedia.   if you want to play with the full wikipedia dump do
<pre>
$hdadoop fs -mkdir /user/wiki

$curl -s -L http://dumps.wikimedia.org/enwiki/20161020/enwiki-20161020-pages-articles-multistream.xml.bz2 \
| bzip2 -cd | hadoop fs -put  - /user/wiki/wikidump-en.xml
<\pre>

The first thing we will do is look at a sample of wikipedia access logs from 2008 to 2010. It is a small sample.

In [6]:
import numpy as np
import pyspark

The spark context should already be there.

In [21]:
# what one does on the containerized spark-jupyter is
#sc = pyspark.SparkContext('local[*]')
#here it is
sc

<pyspark.context.SparkContext at 0x7f1453442850>

In [22]:
txtfile = sc.textFile("s3://support.elasticmapreduce/bigdatademo/sample/wiki")

Lets make suser we have at least 10 partitions. 
And split the text lines into lists along blank space separators.

In [11]:
txtfile = txtfile.repartition(10)

In [23]:
def parseline(line):
    return np.array([x for x in line.split(' ')])

In [24]:
data = txtfile.map(parseline)

we are next going to look for the page references that mention famous folks and see how may hits there are.

In [25]:
def filter_fun(row, titles):
    for title in titles:
        if row[1].find(title) > -1:
            return True
    else:
        return False

In [26]:
namelist = ['Albert_Einstein','Lady_Gaga','Barack_Obama','Richard_Nixon','Steve_Jobs', 'Bill_Clinton', 'Bill_Gates', 'Michael_Jackson', 
            'Justin_Bieber','Dante_Alighieri' 'Shakespeare', 'Byron', 'Donald_Trump', 'Hillary_Clinton', 'Werner_Heisenberg',
            'Arnold_Schwarzenegger', 'Elon_Musk', 'Nicolas_Sarkozy', 'Vladimir_Putin', 'Vladimir_Lenin', 'Karl_Marx',
            'Groucho_Marx']

In [27]:
filterd = data.filter(lambda p: filter_fun(p, namelist))

In [28]:
def mapname(row, names):
    for name in names:
        if row[1].find(name) > -1:
            return name
    else:
        return 'huh?'

The RDD filtered has only the tuples that contain one of our names in namelist.

we next map that to tuples of the form (name, reference count)

We then reduce that by the key and sum the counts.

In [30]:
remaped = filterd.map(lambda row: (mapname(row, namelist), int(row[2]) )).reduceByKey(lambda v1, v2: v1+v2)

In [13]:
remaped.takeOrdered(100, key  = lambda x: -x[1])

[('Lady_Gaga', 4427),
 ('Bill_Clinton', 4221),
 ('Michael_Jackson', 3310),
 ('Barack_Obama', 2518),
 ('Justin_Bieber', 2234),
 ('Albert_Einstein', 1609),
 ('Byron', 964),
 ('Karl_Marx', 892),
 ('Arnold_Schwarzenegger', 820),
 ('Bill_Gates', 799),
 ('Steve_Jobs', 613),
 ('Vladimir_Putin', 563),
 ('Richard_Nixon', 509),
 ('Vladimir_Lenin', 283),
 ('Donald_Trump', 272),
 ('Nicolas_Sarkozy', 171),
 ('Hillary_Clinton', 162),
 ('Groucho_Marx', 152),
 ('Werner_Heisenberg', 92),
 ('Elon_Musk', 21)]

In [None]:
What happens if we look for just part of the name?    

In [14]:
snamelist = ['Einstein','Gaga','Obama','Nixon','Jobs', 'Clinton', 'Gates', 'Jackson', 
            'Bieber','Dante', 'Shakespeare', 'Byron', 'Trump', 'Heisenberg',
            'Schwarzenegger', 'Musk', 'Sarkozy', 'Putin', 'Lenin', 'Marx',
            'Marx']

In [102]:
sfilterd = data.filter(lambda p: filter_fun(p, snamelist))
sremaped = sfilterd.map(lambda row: (mapname(row, snamelist), int(row[2]) )).reduceByKey(lambda v1, v2: v1+v2)
sremaped.takeOrdered(100, key  = lambda x: -x[1])

[('Jackson', 17056),
 ('Dante', 7289),
 ('Clinton', 6923),
 ('Gaga', 5527),
 ('Obama', 3563),
 ('Marx', 2548),
 ('Einstein', 2291),
 ('Bieber', 2284),
 ('Gates', 2124),
 ('Shakespeare', 1867),
 ('Nixon', 1344),
 ('Lenin', 1343),
 ('Trump', 984),
 ('Byron', 964),
 ('Schwarzenegger', 906),
 ('Musk', 861),
 ('Jobs', 830),
 ('Putin', 723),
 ('Sarkozy', 297),
 ('Heisenberg', 149)]

let's see how many page references there are here.  we can delete pages with non readable titles.

In [27]:
bdata = data.map(lambda row: (row[1], int(row[2])))

In [28]:
cdata = bdata.filter(lambda p:  p[0].find('%') < 0)

In [29]:
cdata.take(100)

[(u'Special:WhatLinksHere/MediaWiki:Group-Ombudsmen', 1),
 (u'MediaWiki:Ipbexpiry', 1),
 (u'Wikipedia:Community_Portal', 1),
 (u'Antartika', 2),
 (u'Baku', 2),
 (u'Berkas:Coat_of_Amrs_of_Bashkortostan.svg', 1),
 (u'Berkas:Destroyed_Warsaw,_capital_of_Poland,_January_1945.jpg', 1),
 (u'Berkas:Flag_of_Omsk_Oblast.svg', 1),
 (u'Berkas:Mongolia_Topography.png', 1),
 (u'Berkas:Southeast_asia.svg', 1),
 (u'Berkas:Waterfallweh.jpg', 1),
 (u'Busan', 1),
 (u'Dublin', 1),
 (u'Haeju', 2),
 (u'Hyesan', 1),
 (u'Istimewa:Penggunaan_global/Crystal_Clear_action_run.png', 1),
 (u'Istimewa:Pranala_balik/Berkas:Flag_of_Smolensk_Oblast.png', 1),
 (u'Istimewa:Pranala_balik/Berkas:User_Abigor_global1.jpg', 1),
 (u'Jeollanam-do', 1),
 (u'Kategori:Neugara', 1),
 (u'Lithuania', 1),
 (u'P', 1),
 (u'Sipak_Bhan', 1),
 (u'Tokyo', 1),
 (u'islamabad', 1),
 (u'Gebruiker:Az1568', 1),
 (u'MediaWiki:Import-logentry-upload', 1),
 (u'Spesiaal:OnlangseVeranderingsMetSkakels/MediaWiki:Editgroup', 1),
 (u'Spesiaal:Recentchan

In [48]:
cdata.count()

3240116

### Now look at full wikipedia dump from HDFS
these are xml file with one line per file line.   
we can extract the titles of each of the listings

In [33]:
wikidump = sc.textFile("hdfs:///user/wiki/wikidump-en.xml")

In [38]:
wikidump.count()

927769981

In [40]:
wikidump.getNumPartitions()

441

In [42]:
def findtitle(line):
    if line.find('<title>') > -1:
        return True
    else:
        return False

In [43]:
titles = wikidump.filter(lambda p: findtitle(p))

In [45]:
titles.count()

17008269

In [46]:
titles.cache()

PythonRDD[29] at RDD at PythonRDD.scala:48

In [47]:
titles.take(200)

[u'    <title>AccessibleComputing</title>',
 u'    <title>Anarchism</title>',
 u'    <title>AfghanistanHistory</title>',
 u'    <title>AfghanistanGeography</title>',
 u'    <title>AfghanistanPeople</title>',
 u'    <title>AfghanistanCommunications</title>',
 u'    <title>AfghanistanTransportations</title>',
 u'    <title>AfghanistanMilitary</title>',
 u'    <title>AfghanistanTransnationalIssues</title>',
 u'    <title>AssistiveTechnology</title>',
 u'    <title>AmoeboidTaxa</title>',
 u'    <title>Autism</title>',
 u'    <title>AlbaniaHistory</title>',
 u'    <title>AlbaniaPeople</title>',
 u'    <title>AsWeMayThink</title>',
 u'    <title>AlbaniaGovernment</title>',
 u'    <title>AlbaniaEconomy</title>',
 u'    <title>Albedo</title>',
 u'    <title>AfroAsiaticLanguages</title>',
 u'    <title>ArtificalLanguages</title>',
 u'    <title>AbacuS</title>',
 u'    <title>AbalonE</title>',
 u'    <title>AbbadideS</title>',
 u'    <title>AbbesS</title>',
 u'    <title>AbbevilleFrance</title>'

To get more from the wikipedia xml files you need to seperate the lines for each xml file and then parse the xml to something spark can process.