# Description:
In this notebook we explore the MongoDB instance where we keep our news documents. We see some examples of documents stored, the time at which each batch of documents was inserted, provide a function to remove batch of documents given their insertion time, analyze the distribution of time at which each document is published, analyze the distribution of category of documents as well as their sources, analyze the relationship between category and source, provide a function to remove documents with no content or description and a function to check/ remove duplicate documents based on description and content combined. This function was used previoulsy the creation of the unique index of content and description. With this index there shouldn't be any duplicate documents.

# TODO:
- UPDATE THIS TO GET DATA FROM ELASTICSEARCH OVER MONGODB   

In [1]:
import os
from datetime import datetime, timedelta
from pprint import pprint
from elasticsearch import Elasticsearch
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
# Connecting to Elasticsearch
es = Elasticsearch(
    hosts=['odfe-node1', '0.0.0.0'],
    http_auth=('admin', 'admin'),
    scheme="https",
    verify_certs=False
)

In [3]:
# List indices
es.indices.get_alias("*")

{'.opendistro_security': {'aliases': {}},
 'document': {'aliases': {}},
 'label': {'aliases': {}},
 'security-auditlog-2021.08.05': {'aliases': {}}}

## Get random document

In [4]:
result = es.search(
    {
        "size": 1,
        "query": {
            "function_score": {
                "functions": [
                    {
                        "random_score": {
                            "seed": "1477072619038"
                        }
                    }
                ]
            }
        }
    },
    index="document"
)
print(f"Document ID: {result['hits']['hits'][0]['_id']}", "\n", result['hits']['hits'][0]['_source'])

Document ID: dc67d9fe-8020-4bb6-8878-0b1ec8baa112 
 {'text': "Gov. Larry Hogan to update Maryland's COVID-19 vaccination plan Tuesday - WBAL TV Baltimore Gov. Larry Hogan and state health officials plan to provide an overview of Maryland's COVID-19 vaccination plan. ANNAPOLIS, Md. —TUESDAY at 3 p.m. -- Gov. Larry Hogan and state health officials plan to provide an overview of Maryland's COVID-19 vaccination plan. Watch it live on WBAL-TV 11, WBALTV.com/nowcast, ", 'embedding': [-0.17567341029644012, -0.5241833925247192, 0.4313260316848755, -0.0863744467496872, -0.18458202481269836, -0.8593564629554749, -0.7901385426521301, 0.21458008885383606, 0.42209261655807495, -0.7684335708618164, -0.11245536804199219, 0.016612349078059196, -0.0765092670917511, 0.5376741290092468, 0.5210342407226562, 0.06201391667127609, -0.14926759898662567, -0.326467365026474, 0.3537862300872803, -0.23642179369926453, 0.3825759291648865, -0.46651792526245117, 0.051621925085783005, 0.6784964799880981, -0.850557923

## Collections

A database is compose of collections. These collections in turn hold documents.

In [4]:
collection_list = db.list_collection_names()
print(f"The database contains {len(collection_list)} collections: {collection_list[:5]}")

NameError: name 'db' is not defined

In [None]:
for col in collection_list:
    print(f"Collection {col} contains {db[col].count_documents({})} documents")

## Documents

In [None]:
for col in collection_list:
    print(f"EXAMPLE OF {col.upper()}'S DOCUMENTS:")
    pprint(db[col].find_one())
    print("-------------------------------------------------------------------------------------------------------------------------------\n")

## Insertion batches

In [None]:
pipeline_batches = [
    {  # project id_timestamp
        '$project': {
            'id_timestamp': {
                '$dateToString': {
                    'format': '%d-%m-%Y T%H:%M:%S', 
                    'date': {'$toDate': '$_id'}
                }
            }
        }
    },
    {  # groups on id_timestamp and counts the number of documents for each values
        '$group': {
            '_id': '$id_timestamp',
            'document_count': {'$sum': 1}
        }
    },
    {  # sort results in descending order by _id
        '$sort': {'_id': -1}
    }
]

for col in collection_list:
    batches = db[col].aggregate(pipeline_batches)
    print(f"\n{col} collection insertion batches history:")
    pprint(list(batches))
    print("-----------------------------------------------------------\n")

In [None]:
def remove_by_batch_date(batch_date, db, collection=None): 
    """
    Function to remove documents with a given batch date from db's specified collection or all of them (default).
    batch_date should have '%d-%m-%Y T%H:%M:%S' format.
    """
    pipeline_remove = [
        {  # project id_timestamp
            '$project': {
                '_id': 1,
                'id_timestamp': {
                    '$dateToString': {
                        'format': '%d-%m-%Y T%H:%M:%S', 
                        'date': {'$toDate': '$_id'}
                    }
                }
            }
        },
        {  # match id_timestamp to batch_date we want to remove
            '$match': {'id_timestamp': batch_date}
        },
        {  # project just _id
            '$project': {'_id': 1}
        }
    ]
    
    if collection is None:
        collection_list = db.list_collection_names()
        for col in collection_list:
            idsList = list(map(lambda x: x['_id'], db[col].aggregate(pipeline_remove)))
            db[col].delete_many({'_id': {'$in': idsList}})
            print(f"{len(idsList)} documents with batch_date {batch_date} were removed from {col}\n")
    else:
        idsList = list(map(lambda x: x['_id'], db[collection].aggregate(pipeline_remove)))
        db[collection].delete_many({'_id': {'$in': idsList}})
        print(f"{len(idsList)} documents with batch_date {batch_date} were removed from {collection}\n")

# remove_by_batch_date()

## Exploratory Data Analysis

### publishedAt

In [None]:
pipeline = [
    {  # project publishedAtDay and publishedAtHour
        '$project': {
            'publishedAtDay': {
                '$dateToString': {
                    'format': '%d-%m-%YT%H', 
                    'date': {'$toDate': '$publishedAt'}
                }
            },
            'publishedAtHour': {
                '$hour': {
                    'date': {'$toDate': '$publishedAt'}
                }
            }
        }
    },
    {  # groups on publishedAtDay and gets number of documents per day and hour (document_count) and publishedAtHour
        '$group': {
            '_id': '$publishedAtDay',
            'document_count': {'$sum': 1},
            'publishedAtHour': {'$first': '$publishedAtHour'}
        }
    },
    {  # groups on publishedAtHour and gets average of documents per hour over days
        '$group': {
            '_id': '$publishedAtHour',
            'avg_document_count': {'$avg': '$document_count'}
        }
    },
    {  # sort results in descending order by _id
        '$sort': {'_id': -1}
    }
]

fig, axes = plt.subplots(2, 1, figsize=(13, 9))
for ax, col in zip(axes.flatten(), collection_list):
    x, y = [], []
    for i in list(db[col].aggregate(pipeline)):
        x.append(i['_id'])
        y.append(i['avg_document_count'])
    ax.plot(x, y, linestyle="-")
    ax.set_xticks(x)
    ax.set_title(f"Average number of documents per publishedAt Hour - {col} collection")
    
plt.show()

## category

In [None]:
pipeline = [
    {  # project category
        '$project': {
            '_id': 0,
            'category': 1
        }
    },
    {  # groups on category and gets number of documents for each category
        '$group': {
            '_id': '$category',
            'document_count': {'$sum': 1},
        }
    },
    {  # sort results in descending order by _id
        '$sort': {'_id': 1}
    }
]

fig, axes = plt.subplots(2, 1, figsize=(13, 9))
for ax, col in zip(axes.flatten(), collection_list):
    x, y = [], []
    for i in list(db[col].aggregate(pipeline)):
        x.append(i['_id'])
        y.append(i['document_count'])
    ax.bar(x, y)
    ax.set_xticks(x)
    ax.set_title(f"Number of documents per category - {col} collection")
    
plt.show()

## source

In [None]:
pipeline = [
    {  # project source
        '$project': {
            '_id': 0,
            'source': 1
        }
    },
    {  # groups on source and gets number of documents for each source
        '$group': {
            '_id': '$source',
            'document_count': {'$sum': 1},
        }
    },
    {  # sort results in descending order by _id
        '$sort': {'_id': 1}
    }
]

fig, axes = plt.subplots(2, 1, figsize=(19, 9))
for ax, col in zip(axes.flatten(), collection_list):
    x, y = [], []
    for i in list(db[col].aggregate(pipeline)):
        if i['_id'] is None:
            x.append("Null")
        else:
            x.append(i['_id'])
        y.append(i['document_count'])
    ax.bar(x, y)
    ax.set_xticks(x)
    ax.set_xticklabels(x, rotation=30, ha='right')
    ax.set_title(f"Number of documents per source - {col} collection")

plt.subplots_adjust(hspace=0.4)
plt.show()

## relationship between categories and sources

In [None]:
# use stacked bar chart
pipeline = [
    {  # project source and category
        '$project': {
            '_id': 0,
            'source': 1,
            'category': 1
        }
    },
    {  # groups on source and gets number of documents for each source
        '$group': {
            '_id': {
                'category': '$category',
                'source': '$source'
            },
            'document_count': {'$sum': 1},
        }
    },
    {
        '$project':{
            '_id': 0,
            'document_count': 1,
            'category': '$_id.category',
            'source': '$_id.source'            
        }
    },
    {  # sort results in descending order by _id
        '$sort': {'category': 1}
    }
]

fig, axes = plt.subplots(2, 1, figsize=(19, 11))
for ax, col in zip(axes.flatten(), collection_list):
    plt_data = pd.DataFrame(list(db[col].aggregate(pipeline))).pivot(index="source", columns="category", values="document_count")
    plt_data["sum"] = plt_data.sum(axis=1)
    plt_data.sort_values("sum", ascending=False).drop("sum", axis=1).plot(kind='bar', stacked=True, rot=90, ax=ax)
    ax.set_title(f"Source frequencies by category - {col} collection")

plt.subplots_adjust(hspace=0.9)
plt.show()

## missing values

In [None]:
def remove_missing_values(db, collection=None): 
    """
    Function to remove documents with missing values on both description and content from db's specified collection
    or all of them (default).
    """
    pipeline_remove = [
        {
            '$project': {
                '_id': 1,
                'description': 1,
                'content': 1
            }
        },
        {
            "$match": {
                '$or': [
                    {
                        "description" : {"$eq" : None},
                        "content" : {"$eq": None}
                    },
                    {
                        "description" : {"$eq" : ''},
                        "content" : {"$eq": ''}
                    },
                    {
                        "description" : {"$eq" : None},
                        "content" : {"$eq": ''}
                    },
                    {
                        "description" : {"$eq" : ''},
                        "content" : {"$eq": None}
                    }                    
                ]
            } 
        }, 

        {
            "$project": {
                "id" : 1
            }
        }
    ]
    
    if collection is None:
        collection_list = db.list_collection_names()
        for col in collection_list:
            idsList = list(map(lambda x: x['_id'], db[col].aggregate(pipeline_remove)))
            db[col].delete_many({'_id': {'$in': idsList}})
            print(f"{len(idsList)} documents with missing values were removed from {col}\n")
    else:
        idsList = list(map(lambda x: x['_id'], db[collection].aggregate(pipeline_remove)))
        db[collection].delete_many({'_id': {'$in': idsList}})
        print(f"{len(idsList)} documents with missing values were removed from {collection}\n")

remove_missing_values(db)

## duplicates

In [None]:
def remove_duplicates(db, collection=None): 
    """
    Function to remove documents with missing values on both description and content from db's specified collection
    or all of them (default).
    """
    pipeline_remove = [
        {
            "$group": {
                "_id": {'description': '$description', 'content': '$content'},
                "_idsNeedsToBeDeleted": {"$push": "$$ROOT._id"} # push all `_id`'s to an array
            }
        },
        # Remove first element - which is removing a doc
        {
            "$project": {
                "_id": 0,
                "_idsNeedsToBeDeleted": {  
                    "$slice": [
                        "$_idsNeedsToBeDeleted", 1, {"$size": "$_idsNeedsToBeDeleted"}
                    ]
                }
            }
        },
        {
            "$unwind": "$_idsNeedsToBeDeleted" # Unwind `_idsNeedsToBeDeleted`
        },
        # Group without a condition & push all `_idsNeedsToBeDeleted` fields to an array
        {
            "$group": { "_id": "", "_idsNeedsToBeDeleted": { "$push": "$_idsNeedsToBeDeleted" } }
        },
        { 
            "$project" : { "_id" : 0 }  # Optional stage
        }
        # At the end you'll have an [{ _idsNeedsToBeDeleted: [_ids] }] or []
    ]
    
    if collection is None:
        collection_list = db.list_collection_names()
        for col in collection_list:
            try:
                idsList = list(db[col].aggregate(pipeline_remove))[0]["_idsNeedsToBeDeleted"]
                db[col].delete_many({'_id': {'$in': idsList}})
                print(f"{len(idsList)} documents with duplicated documents were removed from {col}\n")
            except IndexError:
                print(f"0 documents with duplicated documents in {col}\n")
    else:
        try:
            idsList = list(db[collection].aggregate(pipeline_remove))[0]["_idsNeedsToBeDeleted"]
            db[collection].delete_many({'_id': {'$in': idsList}})
            print(f"{len(idsList)} documents with duplicated documents were removed from {collection}\n")
        except IndexError:
                print(f"0 documents with duplicated documents in {collection}\n")

remove_duplicates(db)

### duplicates across collections

In [None]:
pipeline = [
    {  # project fields
        '$project': {
            '_id': 0,
            'text': {
                '$concat': [
                    {'$ifNull': ['$title', '']},
                    ' - ',
                    {'$ifNull': ['$description', '']},
                    ' - ',
                    {'$ifNull': ['$content', '']},
                ]
            }
        }
    }
]

r_everything = list(map(lambda x: x['text'], db.everything.aggregate(pipeline)))
r_top_headlines = list(map(lambda x: x['text'], db.top_headlines.aggregate(pipeline)))

In [None]:
# union allows to join the elements of two sets while removing the duplicates
len(set(r_top_headlines).union(set(r_everything)))