In [None]:
from pymongo import MongoClient

%matplotlib inline

mongo_client = MongoClient('this-mongo.cc', 27016)
database_reference = mongo_client.twitter

In [None]:
collection_reference = database_reference.instructor_test_group

In [None]:
collection_reference.count()

## The Aggregation Pipeline

A call to the aggregation framework defines a pipeline (figure 6.1), the **aggregation pipeline**, where the output from each step in the pipeline provides input to the next step. Each step executes a single operation on the input documents to transform the input and generate output documents.

![](https://www.evernote.com/l/AAGxerRxKLZNFrjqxlYK2HPz1R11tr95FFkB/image.png)

### Useful Aggregation Pipeline Operations

- `$project` // Specify fields to be placed in the output document.
- `$match` // Select documents to be processed, similar to find().
- `$limit` // Limit the number of documents to be passed to the next step.
- `$skip` // Skip a specified number of documents.
- `$unwind` // Expand an array, generating one output document for each array entry.
- `$group` // Group documents by a specified key.
- `$sort` // Sort documents.
- `$geoNear` // Select documents near a geospatial location.
- `$out` // Write the results of the pipeline to a collection (new in v2.6).
- `$redact` // Control access to certain data (new in v2.6).

In [None]:
PROJECT = "$project"
MATCH = "$match"
LIMIT = "$limit"
UNWIND = "$unwind"
GROUP = "$group"
SORT = "$sort"
COUNT = "$count"

In [None]:
test_group = database_reference.instructor_test_group

In [None]:
cur = test_group.aggregate([
    { MATCH : { "geo" : { "$ne" : None }}},
    { COUNT : "geo"}
])

In [None]:
next(cur)

In [None]:
match_non_null_geo = { MATCH : { "geo" : { "$ne" : None }}}
count_geo = { COUNT : "geo"}

dag_count_non_null_geo = [
    match_non_null_geo,
    count_geo
]

In [None]:
next(test_group.aggregate(dag_count_non_null_geo))

### Group Template

    { $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }
    
#### Accumulators

- `$sum`
- `$avg`
- `$first`
- `$last`
- `$max`
- `$min`
- `$stdDevPop`
- `$stdDevSamp`

In [None]:
greater_than_10 = { "$gt" : 10}
sum_1 = { "$sum" : 1 }

def group_and_count(key):
    return { GROUP : {
                 "_id" : key,
                 "count" : sum_1
                }
           }

match_count_gt = { MATCH : { "count" : greater_than_10 } }

sort_by_count = { SORT : {"count" : -1}}

def limit(val):
    return { LIMIT : val }

In [None]:
list(test_group.aggregate(
    [
        group_and_count('$lang'),
        match_count_gt, 
        sort_by_count,
        limit(10)
    ]
))


In [None]:
not_an_empty_array = { "$ne" : [] }
match_non_empty_hashtag_arrays = { MATCH : { "entities.hashtags" : not_an_empty_array}}
project_to_text_only = { PROJECT : { "text" : "$entities.hashtags.text", "_id" :0 }}
unwind_text = { UNWIND : "$text" }

list(test_group.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        limit(10)
    ]
))


In [None]:
list(test_group.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        group_and_count('$text'),
        match_count_gt, 
        sort_by_count,
        limit(10)
    ]
))

In [None]:
job_hashtags = ['job', 'jobs', 'hiring', 'careerarc']
location_hashtags = ['california', 'losangeles', 'la', 'santamonica', 'glendale', 'paloalto']
project_to_lower = { PROJECT : { "text" : {"$toLower" : "$text"} } }
match_not_in_bad = { MATCH : { "_id" : { "$nin" : job_hashtags + location_hashtags}}}

list(test_group.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        project_to_lower,
        group_and_count('$text'),
        match_not_in_bad,
        match_count_gt, 
        sort_by_count,
        limit(50)
    ]
))