# Intro

Get the data from http://overpass-turbo.eu/ [amenity=bar]

using https://github.com/Guts/Paris-Beer-Week/blob/master/data/raw_data/getOpenBeerMap.py

![](img/dataset-cover.jpg)

![](img/beer map.png)

# 1. Map Reduce & DAG

In [26]:
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objs as go
import plotly.figure_factory as ff
import unicodedata

# helper function to fix unicode in JSON file
def fixUnicode(str):
    return unicodedata.normalize('NFKD', str).encode('ascii','ignore')

init_notebook_mode(connected=True)

# keep the same chart layout
layout = go.Layout(paper_bgcolor="#323a48",
                   plot_bgcolor="#323a48",
                   font=dict(color="#cdd2e9"),
                   xaxis=dict(color="#cdd2e9"),
                   yaxis=dict(color="#cdd2e9"))

# get the application Id and print the tracking URL
applicationId = sc._jsc.sc().applicationId()
trackingUrl = 'http://beebox01.localdomain:8088/proxy/'+applicationId

from IPython.core.display import display, HTML
display(HTML('<a href="'+trackingUrl+'" target="_blank">'+trackingUrl+'</a>'))

In [6]:
path = "/user/mathias/beermap.json"
beerMap = sqlContext.read.json(path)

beerMap.printSchema()

root
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- id: long (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- BEERS: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- BREWER: string (nullable = true)
 |    |-- NAME: string (nullable = true)
 |    |-- OSM_ID: long (nullable = true)
 |    |-- TYPE: string (nullable = true)
 |-- type: string (nullable = true)



In [8]:
%%sh
hdfs dfs -tail /user/mathias/beermap.json

WER": "Oui", "NAME": "The Frog & Princess", "OSM_ID": 3520483492, "TYPE": "pub"}, "type": "Feature"}
{"geometry": {"coordinates": [2.3683, 48.8539], "type": "Point"}, "id": 125, "properties": {"BEERS": ["Dark de triomphe", "Ginger twist", "Inseine", "Maison blanche"], "BREWER": "Oui", "NAME": "The Frog Revolution", "OSM_ID": 3520535766, "TYPE": "pub"}, "type": "Feature"}
{"geometry": {"coordinates": [2.373, 48.8522], "type": "Point"}, "id": 126, "properties": {"BEERS": ["Guinness"], "BREWER": "Non", "NAME": "Corcoran's", "OSM_ID": 3992830929, "TYPE": "pub"}, "type": "Feature"}
{"geometry": {"coordinates": [2.3483, 48.8596], "type": "Point"}, "id": 127, "properties": {"BEERS": ["Guinness"], "BREWER": "Non", "NAME": "Guinness tavern", "OSM_ID": 4068475107, "TYPE": "pub"}, "type": "Feature"}
{"geometry": {"coordinates": [2.4135, 48.8892], "type": "Point"}, "id": 128, "properties": {"BEERS": ["Gallia"], "BREWER": "Non", "NAME": "Brasserie Gallia", "OSM_ID": 4449883375, "TYPE": "Brasser

In [16]:
# get all beers
allBeer = beerMap.rdd.flatMap(lambda b: b.properties['BEERS'])

# fix unicode characters
allBeer = allBeer.map(lambda b: fixUnicode(b))

# MAP
allBeerMap = allBeer.map(lambda b: (b,1))

# Print table
table = ff.create_table([('Key', 'Value')] + allBeerMap.take(10))
iplot(table, show_link=False, filename='map_table')

In [74]:
# REDUCE
allBeerReduce = allBeerMap.reduceByKey(lambda v1,v2: v1+v2)

# Print table
table = ff.create_table([('Key', 'Value')] + allBeerReduce.take(10))
iplot(table, show_link=False, filename='reduce_table')

# 2. Spark SQL

In [18]:
# register the DataFrame as a Table
beerMap.registerTempTable("beerMap")

# SQL
sql = """
SELECT properties.name, beer 
FROM beerMap 
LATERAL VIEW explode(properties.beers) beersTable AS beer
"""
allBeer = sqlContext.sql(sql)
# fix unicode characters
allBeer = allBeer.map(lambda r: [ fixUnicode(r.name), fixUnicode(r.beer) ])

# Print table
table = ff.create_table([('Bar', 'Beer')] + allBeer.take(10))
iplot(table, show_link=False, filename='bar_beer_table')

In [30]:
# what is the most famous beer ?
sql = """
SELECT count(*) as count, beer 
FROM beerMap 
LATERAL VIEW explode(properties.beers) beersTable AS beer 
GROUP BY beer
ORDER BY count(*) DESC
"""
countBeer = sqlContext.sql(sql)

pdf = countBeer.toPandas()

# Print table
data = [go.Bar(x=pdf['beer'],y=pdf['count'])]
figure = go.Figure(data=data, layout=layout)
iplot(figure,show_link=False)

In [31]:
# where to find the Chimay ?
sql = """
SELECT properties.name, geometry.coordinates 
FROM beerMap 
LATERAL VIEW explode(properties.beers) beersTable AS beer 
WHERE beer = 'Chimay'
"""
chimay = sqlContext.sql(sql)
pdf = chimay.toPandas()

# Print table
table = ff.create_table(pdf)
iplot(table, show_link=False, filename='chimay_table')