# Data Analysis and Map-Reduce with MongoDB and pymongo

Alexander Hendorf (@opotoc), MongoDB Days Germany 2015 Munich, MongoDB Days 2015 Silicon Valley San José, PySS15 Ibaeta University San Sebastián, EuroPython 2015 Bilbao

The dataset only contains the data for the two artists (207 playlists)

In [None]:
from IPython.display import HTML, Audio
from IPython.display import Image
import pymongo
import datetime
from pymongo import ASCENDING, DESCENDING
from bson.son import SON
from pprint import pprint
import json
from config import *

import pymongo driver, bson, json, some config parameters (e.g. database access) & 
make database connection

In [None]:
# make sure the database is running (start.sh)!
db = pymongo.MongoClient("localhost:27017")['db']
playlists = 'playlists'

our dataset

In [None]:
count = db[playlists].count()
HTML('<p style="height:30px"></p><p style="font-size: 36px">{:,} playlists found.</p><p style="height:10px"></p>'.format(count))

# Map Reduce

In [None]:
from bson.code import Code
# bson.code is just a tool for representing JavaScript code in BSON

#### Map Function

In [None]:
mapper = Code("""
               function () {
                   var artist = this.info.artistName; 
                   if (artist.length < 100){
                       // avoid indexing error in output collection
                       emit(artist, 1);
                   }
               }
               """)

#### Reduce Function

In [None]:
reducer = Code("""
                function (key, values) {
                  var total = 0;
                  for (var i = 0; i < values.length; i++) {
                    total += values[i];
                  }
                  return total;
                }
                """)

#### send output to a collection: "results_collection"
measuring the excecution time and playing a scientific sound when finished

In [None]:
start = datetime.datetime.now()
#
result = db[playlists].map_reduce(mapper, reducer, "results_collection")
# fallback: limit query to a company
#result = db[playlists].map_reduce(mapper, reducer, "results_collection", query={'info.copyrightIndex': 'MOTOWN'})
#
print("mape reduce done, took: {}".format(datetime.datetime.now() - start))
print("found {0:,} artists".format(db['results_collection'].count()))
# audio-signal when done
Audio("http://www.trekcore.com/audio/communicator/ent_communicator1.mp3", autoplay=True)

In [None]:
# <<<<








#

# Aggregation Framework

In [None]:
query = {"info.artistName": artist}
count = db[playlists].count(query)
print("found {:,} releases for {}".format(count, artist))

### \$match \$project

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    {"$project": {"release": "$info.name", "_id": 0}},
]

In [None]:
res = db[playlists].aggregate(pipeline)
print("\n found {} document\n".format(len(list(res))))

In [None]:
# let's hav a look at our dataset
print(list(res))
#
Image(filename='pic/boom.jpg')

In [None]:
cursor = db[playlists].aggregate(pipeline)
print(list(cursor))

In [None]:
cursor = db[playlists].aggregate(pipeline)
# output as HTML table
table = ListTable()
table.title = "releases of {}".format(artist)
table.append(['position', 'release'])
for i, l in enumerate(cursor, 1):
    table.append([i, l['release']])
table

In [None]:
# <<<<








#

###  $group

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    
    {"$group": {
       "_id": "$info.name",
       "count": {"$sum": 1}}},
   
    {"$project": {"release": "$_id", "_id": 0, "count": 1}},
    {"$sort": {"release": ASCENDING}}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
table2 = ListTable()
table2.title = "duplicate releases of {}".format(artist)
table2.append(['count', 'release'])
for l in cursor:
    table2.append([l.get('count'), l.get('release')])
table2

###  \$group - \$sort -  \$limit

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    {"$group": {
        "_id": "$info.name",
        "count": {"$sum": 1}
    }},
    {"$project": {"release": "$_id", "count": 1, "_id": 0}},
    
#   {"$sort": {"count": DESCENDING, "release": ASCENDING}},
    {"$sort": {"release": ASCENDING, "count": DESCENDING}},

    {"$limit": 10},
    
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
table3 = ListTable()
table3.title = "top duplicate releases of {}".format(artist)
table3.append(['count', 'release'])
for l in cursor:
    table3.append([l.get('count'), l.get('release')])
table3

#### mind the sort datatype

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    {"$group": {
        "_id": "$info.name",
        "count": {"$sum": 1}
    }},
    {"$project": {"release": "$_id", "count": "$count", "_id": 0}},
    # caveat sort order: use collections.OrderDict or bson.SON!
   {"$sort": SON([("release", ASCENDING),("count", DESCENDING)])},
#    {"$sort": SON([("count", DESCENDING), ("release", ASCENDING)])},

    {"$limit": 10},
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
table3 = ListTable()
table3.title = "top duplicate releases of {}".format(artist)
table3.append(['count', 'release'])
for l in cursor:
    table3.append([l.get('count'), l.get('release')])
table3

In [None]:
# <<<<








#

### $unwind

In [None]:
# aggregate information in subdocuments
# get all the songs
pipeline = [
    {"$match": {"info.artistName": artist}},
    
    
    {"$unwind": "$info.children"},
    
    
    {"$group": {
        "_id": "$info.children.name"
    }},
    {"$project": {"song": "$_id", "_id": 0}},
    {"$sort": SON([("song", ASCENDING)])},
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
table3 = ListTable()
table3.title = "songs of {}".format(artist)
table3.append(['position', 'release'])
for i, l in enumerate(cursor, 1):
    table3.append([i, l.get('song')])
table3

#### inner mechanics of $unwind

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    {"$limit": 1},
    {"$unwind": "$info.children"},
    {"$project": {"adamId": 1, "info.name": 1, "info.children.name": 1, "_id": 0}}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
r = list(cursor)
pprint(r)
print("found {:,} documents".format(len(r)))

In [None]:
# <<<<








#

## Accumlators

### \$min & \$max

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    
    
    {"$group": {
        "_id": '',
        "minDate": {"$min": "$info.releaseDateEpoch"},
        "maxDate": {"$max": "$info.releaseDateEpoch"}
    }},
    
    
    {"$project": {"_id": 0, "minDate": 1, "maxDate": 1}}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
print("release period of {}".format(artist))
pprint(list(cursor))

### \$first & \$last

Returns the value that results from applying an **expression** to the first document in a group of documents that share the same group by key.

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    
    {"$sort": SON([("info.releaseDate", ASCENDING)])},  
    {"$group": {
        "_id": "",
        "minDate": {"$first": "$info.releaseDate"},
        "maxDate": {"$last": "$info.releaseDate"},
    }},
    
    
    {"$project": {"_id": 0, "minDate": 1, "maxDate": 1}}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
print("\n\nrelease period of {}".format(artist))
pprint(list(cursor))

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    
    {"$sort": SON([("info.releaseDate", ASCENDING)])},  
    {"$group": {
        "_id": "",
        "minYear": {"$first": {"$year": "$info.releaseDateEpoch"}},
        "maxYear": {"$last":  {"$year": "$info.releaseDateEpoch"}},
    }},
    
    
    {"$project": {"_id": 0, "minYear": 1, "maxYear": 1}}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
print("release period of {}".format(artist))
pprint(list(cursor))

##  Operators

### date operators

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    
    
    {"$group": {
        "_id": {"$year": "$info.releaseDateEpoch"},
        "count": {"$sum": 1}
    }},
    
    
    {"$project": {"year": "$_id", "_id": 0, "count": 1}},
    {"$sort": SON([("year", DESCENDING)])}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
table4 = ListTable()
table4.title = "count of releases by year of {}".format(artist)
table4.append(['year', 'count'])
for l in cursor:
    table4.append([l.get('year'), l.get('count')])
table4

In [None]:
pipeline = [
    {"$match": {"info.artistName": artist}},
    
    
    {"$group": {
        "_id": {"year": {"$year": "$info.releaseDateEpoch"},
               "month": {"$month": "$info.releaseDateEpoch"}},
        "count": {"$sum": 1}
    }},
    
    
    {"$project": {"year": "$_id.year","month": "$_id.month", "_id": 0, "count": 1}},
    {"$sort": SON([("year", DESCENDING), ("month", DESCENDING)])}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
table5 = ListTable()
table5.title = "count of releases by month/year of {}".format(artist)
table5.append(['year', 'month', 'count'])
for l in cursor:
    table5.append([l.get('year'), l.get('month'), l.get('count')])
table5

In [None]:
# <<<<








#

# -

### $in

In [None]:
nemesis = "Katy Perry"  # arch enemy

In [None]:
pipeline = [
    {"$match": {"info.artistName": {"$in": [artist, nemesis]}}},
    {"$unwind": "$info.children"},
    {"$group": {
        "_id": {"artist": '$info.artistName', "song": "$info.children.name"},
        "minDate": {"$min": "$info.releaseDateEpoch"},
        "maxDate": {"$max": "$info.releaseDateEpoch"},
    }},
    # artist
    {"$group": {"_id": "$_id.artist",
                "minDate": {"$min": "$minDate"},
                "maxDate": {"$max": "$maxDate"},
                "songCount": {"$sum": 1}
                }},
    # just renaming attributes
    {"$project": {"maxDate": 1, "minDate": 1, "releaseCount": 1, "songCount": 1,
                  "artist": "$_id", "_id": 0}}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
print("\n\n active period")
pprint(list(cursor))

### $avg

In [None]:
pipeline = [
    
    {"$match": {"info.artistName": {"$in": [artist, nemesis]}}},
    
    {"$unwind": "$info.children"},
    {"$unwind": "$info.children.offers"},
    {"$unwind": "$info.children.offers.assets"},
    
    # some cleanup of outliers, don't worrry about it
    {"$match": {"info.children.offers.assets.duration": {"$gt": 30}}},
    {"$group": {"_id": "$info.artistName",
                "playtime": {"$avg": "$info.children.offers.assets.duration"},
                }},
    # just renaming attributes
    {"$project": {"artist": "$_id", "_id": 0,
                  "playtime": {"$subtract": ["$playtime", {"$mod": ["$playtime", 1]}]}}}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
print("\n\n average playtimes")
pprint(list(cursor))

In [None]:
# <<<<








#

### string operations

In [None]:
pipeline = [
    {"$match": {"info.artistName": {"$in": [artist, nemesis]}}},
    {"$unwind": "$info.offers"},
    
    
    {"$project": {"info.offers.price": 1, "info.offers.priceFormatted": 1, 
                  "artist": "$info.artistName",
                  "product": "$info.name",
                  # isUSD compare if substring is usd
                  "isUSD": {"$cmp": [
                    {"$toLower": {"$substr": ["$info.offers.priceFormatted", 0, 3]}},
                    "usd"]
                           }}},
    # $cmp returns -1 / 0 / 1 less / equal / greater, $eq also possible
    {"$match": {"isUSD": 0}},
    
    
    {"$group": {
        "_id": {"artist": "$artist"},
        "releases": {"$push": {"price": "$info.offers.price", "product": "$product"}}
    }},
    {"$project": {"artist": "$_id.artist", "_id": 0, "releases": 1}}
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
res = list(cursor)
table6 = ListTable()
table6.title = "pricing of releases"
table6.append(['artist', 'product', 'currency', 'price'])
for l in res:
    a = l.get('artist')
    for r in l.get('releases'): 
        table6.append([a, r.get('product')[:80], "$", r.get('price')])
        a  = ""
table6

In [None]:
# <<<<








#

### $map

In [None]:
# first: look on the data array we use $map on
pipeline = [
    {"$match": {"info.artistName": {"$in": [artist, nemesis]}}},
    {"$unwind": "$info.offers"},  
    {"$project": {"info.offers.price": 1, "info.offers.priceFormatted": 1, 
                  "artist": "$info.artistName",
                  "product": "$info.name",
                  # isUSD compare if substring is usd
                  "isUSD": {"$cmp": [
                    {"$toLower": {"$substr": ["$info.offers.priceFormatted", 0, 3]}},
                    "usd"]}}},
    {"$match": {"isUSD": 0}},

    {"$group": {
        "_id": "$artist",
        "pricing": {"$push": "$info.offers.price"}
    }} 
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
res = list(cursor)
for a in res:
    print(a['_id'])
    print(a['pricing'])

In [None]:
# exchange rate
eur_dollar_exchange_rate = 0.88

pipeline = [
    {"$match": {"info.artistName": {"$in": [artist, nemesis]}}},
    {"$unwind": "$info.offers"},  
    {"$project": {"info.offers.price": 1, "info.offers.priceFormatted": 1, 
                  "artist": "$info.artistName",
                  "product": "$info.name",
                  # isUSD compare if substring is usd
                  "isUSD": {"$cmp": [
                    {"$toLower": {"$substr": ["$info.offers.priceFormatted", 0, 3]}},
                    "usd"]}}},
    {"$match": {"isUSD": 0}},

    {"$group": {
        "_id": "$artist",
        "pricing": {"$push": "$info.offers.price"}
    }},
    {"$project": {
        "pricing": {"$map": {"input": "$pricing",
                                         "as": "value",
                                         "in": {"$multiply": ["$$value",
                                                         eur_dollar_exchange_rate
                                                         ]}}}}},
]

In [None]:
cursor = db[playlists].aggregate(pipeline)
res = list(cursor)
table7 = ListTable()
table7.title = "pricing of releases"
table7.append(['artist', 'currency', 'price'])
for l in res:
    a = l.get('_id')
    for r in l.get('pricing'): 
        table7.append([a, "€", round(r, 3)])
        a  = ""
table7

In [None]:
server_version = db.client.server_info()
server_version

In [None]:
if server_version.get('versionArray')[0] == 3 and server_version.get('versionArray')[1] >= 2:
    pipeline = [
        {"$match": {"info.artistName": {"$in": [artist, nemesis]}}},
        {"$unwind": "$info.offers"},
        {"$project": {"info.offers.price": 1, "info.offers.priceFormatted": 1,
                      "artist": "$info.artistName",
                      "product": "$info.name",
                      "currency": {"$toUpper": {"$substr": ["$info.offers.priceFormatted", 0, 3]}}}},
        # get most current exchanges rate by $lookup
        {"$lookup": {
            "from": "exchangerates",  # collection to join
            "localField": "currency",  # field from the input documents
            "foreignField": "_id",  # field from the documents of the "from" collection
            "as": "exchangeRate"
        }},
        
        {"$match": {"exchangeRate": {"$size": 1}}},  # filter 

        {"$group": {
            "_id": {"artist": "$artist", "currency": "$currency"},
            "pricing": {"$push": "$info.offers.price"},
            "rate": {"$first": "$exchangeRate.rate"}}},        
        
        {"$project": {
                "_id": "$_id.artist", 
                "currency": "$_id.currency", 
                "pricing": {"$map": {"input": "$pricing",
                                         "as": "value",
                                         "in": {"$multiply": ["$$value",
                                                         {"$arrayElemAt": ["$rate", 0]}  # rate: [float], $unwind
                                                         ]}}} }}
    ]
else:
    print("This feature requires mongoDB 3.2+")

In [None]:
if server_version.get('versionArray')[0] == 3 and server_version.get('versionArray')[1] >= 2:
    cursor = db[playlists].aggregate(pipeline)
    res = list(cursor)
    table8 = ListTable()
    table8.title = "pricing of releases"
    table8.append(['artist', 'currency', 'price'])
    for l in res:
        a, c = l.get('_id'), l.get('currency')
        for r in l.get('pricing'): 
            table8.append([a, c, "{} EUR".format(round(r, 2))])
            a  = ""
    table8
else:
    print("This feature requires mongoDB 3.2+")

In [None]:
# <<<<








#

## Map Reduce Most Popular Words

#### get the most common word in relase titles…

In [None]:
from bson.code import Code
mapper = Code("""
               function () {
                   var words = this.info.name.split(' '); 
                   for (i in words) {
                   var word = words[i].replace(/[^a-z0-9]/gi,"");
                   if (word.length > 0){
                   emit(word.toLowerCase(), 1);
                   }
                 };
               }
               """)

In [None]:
reducer = Code("""
                function (key, values) {
                  var total = 0;
                  for (var i = 0; i < values.length; i++) {
                    total += values[i];
                  }
                  return total;
                }
                """)

In [None]:
start = datetime.datetime.now()
result = db[playlists].map_reduce(mapper, reducer, "results")
#result = db.playlists.map_reduce(mapper, reducer, "results", query={"info.artistName": {"$in": [artist, nemesis]}})
#, query={"info.artistName": {"$in": [artist, nemesis]}}
print("mape reduce done, took: {}".format(datetime.datetime.now() - start))

In [None]:
# print(result)

In [None]:
from operator import itemgetter
try:
    from nltk.corpus import stopwords 
    cachedStopWords = stopwords.words("english") + stopwords.words("spanish") + list([str(x) for x in range(10)])
except: 
    cachedStopWords = _stopwords
max, i = 50, 0

table8 = ListTable()
table8.title = "most popular words in album titles"
table8.append(['count', 'word'])    

for doc in sorted([x for x in result.find() if x.get('_id') not in cachedStopWords and x.get('_id') not in ["single", "ep", "vol", "feat"]], key=itemgetter('value'), reverse=True):
    i += 1
    table8.append([doc.get('value'), doc.get('_id')])
    if i >= max:
        break
table8

In [None]:
# <<<< END








#

#### alternative: db.collection.group()

Uses JavaScript and is subject to a number of performance limitations, see docs for details.

In [None]:
reduce_f = "function (doc, out) {out.count++;}"
query = {"info.artistName": artist}
initial = {'count': 0}
key = ["info.artistName"]
res = db[playlists].group(
        key=key, 
        condition=query,
        initial=initial,
        reduce=reduce_f)
print(res)

#### bonus: first map reduce example with group() operator

In [None]:
start = datetime.datetime.now()
#
pipeline = [
    {"$group": {"_id": "$info.artistName"}},
    {"$group": {"_id": None, 'count': {'$sum': 1}}},
]
res = db[playlists].aggregate(pipeline)
#
print("aggregation done, took: {}".format(datetime.datetime.now() - start))
print("found {0:,} artists".format(list(res)[0].get('count')))
# audio-signal when done
Audio("http://www.trekcore.com/audio/communicator/ent_communicator1.mp3", autoplay=True)