<a href="https://colab.research.google.com/github/atm1504/mongodb-details/blob/master/advanced_pipelines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Learning Advanced Pipelines

In [3]:
!pip3 install pymongo[srv]

Collecting dnspython<2.0.0,>=1.16.0; extra == "srv"
[?25l  Downloading https://files.pythonhosted.org/packages/ec/d3/3aa0e7213ef72b8585747aa0e271a9523e713813b9a20177ebe1e939deb0/dnspython-1.16.0-py2.py3-none-any.whl (188kB)
[K     |████████████████████████████████| 194kB 5.7MB/s 
[?25hInstalling collected packages: dnspython
Successfully installed dnspython-1.16.0


In [1]:
import pymongo
from pymongo import MongoClient
import pprint
from IPython.display import clear_output

In [2]:
# Replace XXXX with your connection URI from the Atlas UI
client = MongoClient("XXX")

In this pipeline we take the first 100 rows and clean them and store them in ```movies_scratch``` collection. 
Data is cleaned, often variables are renamed, also in some places data has been formated.

In [5]:
pipeline = [
    {
        '$limit': 100
    },
    {
        '$project': {
            'title': 1,
            'year': 1,
            'directors': {'$split': ["$director", ", "]},
            'actors': {'$split': ["$cast", ", "]},
            'writers': {'$split': ["$writer", ", "]},
            'genres': {'$split': ["$genre", ", "]},
            'languages': {'$split': ["$language", ", "]},
            'countries': {'$split': ["$country", ", "]},
            'plot': 1,
            'fullPlot': "$fullplot",
            'rated': "$rating",
            'released': 1,
            'runtime': 1,
            'poster': 1,
            'imdb': {
                'id': "$imdbID",
                'rating': "$imdbRating",
                'votes': "$imdbVotes"
                },
            'metacritic': 1,
            'awards': 1,
            'type': 1,
            'lastUpdated': "$lastupdated"
        }
    },
    {
        '$out': "movies_scratch"
    }
]

clear_output()
pprint.pprint(list(client.mongo.milf.aggregate(pipeline)))

[]


In [11]:
# Output pipeline to see the db
clear_output()
t_p=[    {
        '$limit': 10
    }]
pprint.pprint(list(client.mongo.movies_scratch.find().limit(5)))

[{'_id': ObjectId('5ef8953205d42488e36c6b75'),
  'actors': ['Carmencita'],
  'awards': '',
  'countries': ['USA'],
  'directors': ['William K.L. Dickson'],
  'fullPlot': 'Performing on what looks like a small wooden stage, wearing a '
              'dress with a hoop skirt and white high-heeled pumps, Carmencita '
              'does a dance with kicks and twirls, a smile always on her face.',
  'genres': ['Documentary', 'Short'],
  'imdb': {'id': '1', 'rating': '5.9', 'votes': '1032'},
  'languages': [''],
  'lastUpdated': '2015-08-26 00:03:45.040000000',
  'metacritic': '',
  'plot': 'Performing on what looks like a small wooden stage, wearing a dress '
          'with a hoop skirt and white high-heeled pumps, Carmencita does a '
          'dance with kicks and twirls, a smile always on her face.',
  'poster': 'https://m.media-amazon.com/images/M/MV5BMjAzNDEwMzk3OV5BMl5BanBnXkFtZTcwOTk4OTM5Ng@@._V1_SX300.jpg',
  'rated': 'NOT RATED',
  'released': '',
  'runtime': '1 min',
  'title':

In this pipeline, I have worked with the ```released``` field. The date released field has been converted from string to timestamp format. To do so, first a condition has been checked if the string is empty or not, if its empty, then leave it empty, else convert it to time stamp. To do so, I have used ```cond, if, then, else```. ```$dateFromString``` operator makes it easy to convert a string to timestamp format.

In [14]:
pipeline = [
    {
        '$limit': 100
    },
    {
        '$project': {
            'title': 1,
            'year': 1,
            'directors': {'$split': ["$director", ", "]},
            'actors': {'$split': ["$cast", ", "]},
            'writers': {'$split': ["$writer", ", "]},
            'genres': {'$split': ["$genre", ", "]},
            'languages': {'$split': ["$language", ", "]},
            'countries': {'$split': ["$country", ", "]},
            'plot': 1,
            'fullPlot': "$fullplot",
            'rated': "$rating",
            'released': {
                '$cond':{
                    'if':{'$ne':["$released",""]},
                    'then':{
                        '$dateFromString':{
                            'dateString':"$released"
                        }
                    },
                    'else':""
                }
            },
            'runtime': 1,
            'poster': 1,
            'imdb': {
                'id': "$imdbID",
                'rating': "$imdbRating",
                'votes': "$imdbVotes"
                },
            'metacritic': 1,
            'awards': 1,
            'type': 1,
            'lastUpdated': "$lastupdated"
        }
    },
    {
        '$out': "movies_scratch"
    }
]

clear_output()
pprint.pprint(list(client.mongo.milf.aggregate(pipeline)))

[]


Read the modiefies data using Aggregation!

In [18]:
# Output pipeline to see the db
clear_output()
t_p=[    {
        '$limit': 10
    },{
        '$match': { 'released': {'$ne':""} }
    }]
# pprint.pprint(list(client.mongo.movies_scratch.find().limit(5)))
pprint.pprint(list(client.mongo.movies_scratch.aggregate(t_p)))

[{'_id': ObjectId('5ef8953205d42488e36c6b78'),
  'actors': ['Fred Ott'],
  'awards': '',
  'countries': ['USA'],
  'directors': ['William K.L. Dickson'],
  'fullPlot': "A man (Edison's assistant) takes a pinch of snuff and sneezes. "
              'This is one of the earliest Edison films and was the first '
              'motion picture to be copyrighted in the United States.',
  'genres': ['Documentary', 'Short'],
  'imdb': {'id': '8', 'rating': '5.9', 'votes': '988'},
  'languages': [''],
  'lastUpdated': '2015-08-10 00:21:07.127000000',
  'metacritic': '',
  'plot': "A man (Thomas Edison's assistant) takes a pinch of snuff and "
          'sneezes. This is one of the earliest Thomas Edison films and was '
          'the first motion picture to be copyrighted in the United States.',
  'poster': '',
  'rated': '',
  'released': datetime.datetime(1894, 1, 9, 0, 0),
  'runtime': '1 min',
  'title': 'Edison Kinetoscopic Record of a Sneeze',
  'type': 'movie',
  'writers': [''],
  'year'

In this pipeline we are going to parse the ```lastUpdated``` field. The problem is that in this field, the string often contains many extended zeros. SO it arises a problem, hence we need to preprocess the data. While pre processing the input is like ```2015-08-26 00:03:45.040000000```, so we are splitting it on the basis of ```.``` then using the item at index 0. In pre processing, the ```lastupdated``` field has been updated with the new value.

In [19]:
pipeline = [
    {
        '$limit': 100
    },
    {'$addFields':{
        'lastupdated':{
            '$arrayElemAt':[
                            {'$split':["$lastupdated","."]},0
            ]
        }
    }},
    {
        '$project': {
            'title': 1,
            'year': 1,
            'directors': {'$split': ["$director", ", "]},
            'actors': {'$split': ["$cast", ", "]},
            'writers': {'$split': ["$writer", ", "]},
            'genres': {'$split': ["$genre", ", "]},
            'languages': {'$split': ["$language", ", "]},
            'countries': {'$split': ["$country", ", "]},
            'plot': 1,
            'fullPlot': "$fullplot",
            'rated': "$rating",
            'released': {
                '$cond':{
                    'if':{'$ne':["$released",""]},
                    'then':{
                        '$dateFromString':{
                            'dateString':"$released"
                        }
                    },
                    'else':""
                }
            },
            'runtime': 1,
            'poster': 1,
            'imdb': {
                'id': "$imdbID",
                'rating': "$imdbRating",
                'votes': "$imdbVotes"
                },
            'metacritic': 1,
            'awards': 1,
            'type': 1,
            'lastUpdated': {
                '$cond':{
                    'if':{'$ne':["$lastupdated",""]},
                    'then':{
                        '$dateFromString':{
                            'dateString':"$lastupdated",
                            'timezone':"America/New_York"
                        }
                    },
                    'else':""
                }
            },
        }
    },
    {
        '$out': "movies_scratch"
    }
]

clear_output()
pprint.pprint(list(client.mongo.milf.aggregate(pipeline)))

[]


In [20]:
# Output pipeline to see the db
clear_output()
t_p=[    {
        '$limit': 10
    },{
        '$match': { 'lastUpdated': {'$ne':""} }
    },{
        '$project':{
            'lastUpdated':1
        }
    }]
# pprint.pprint(list(client.mongo.movies_scratch.find().limit(5)))
pprint.pprint(list(client.mongo.movies_scratch.aggregate(t_p)))

[{'_id': ObjectId('5ef8953205d42488e36c6b75'),
  'lastUpdated': datetime.datetime(2015, 8, 26, 4, 3, 45)},
 {'_id': ObjectId('5ef8953205d42488e36c6b78'),
  'lastUpdated': datetime.datetime(2015, 8, 10, 4, 21, 7)},
 {'_id': ObjectId('5ef8953205d42488e36c6b95'),
  'lastUpdated': datetime.datetime(2015, 8, 29, 4, 26, 22)},
 {'_id': ObjectId('5ef8953205d42488e36c6ba0'),
  'lastUpdated': datetime.datetime(2015, 8, 14, 4, 7, 10)},
 {'_id': ObjectId('5ef8953205d42488e36c6b8f'),
  'lastUpdated': datetime.datetime(2015, 8, 22, 4, 28, 40)},
 {'_id': ObjectId('5ef8953205d42488e36c6b79'),
  'lastUpdated': datetime.datetime(2015, 8, 26, 4, 3, 56)},
 {'_id': ObjectId('5ef8953205d42488e36c6b84'),
  'lastUpdated': datetime.datetime(2015, 8, 29, 4, 19, 44)},
 {'_id': ObjectId('5ef8953205d42488e36c6b89'),
  'lastUpdated': datetime.datetime(2015, 8, 29, 4, 23, 5)},
 {'_id': ObjectId('5ef8953205d42488e36c6b9b'),
  'lastUpdated': datetime.datetime(2015, 8, 29, 4, 28, 54)},
 {'_id': ObjectId('5ef8953205d424