In [104]:
import pymongo
from pymongo import MongoClient, UpdateOne
from datetime import datetime
import pprint
import re
from IPython.display import clear_output

In [1]:
# LOADING DATA TO MONGODB 


# importing data to Atlas - MongoDb cloud.

import os
os.popen("mongoimport -d movies -c movie_collection --type csv --file C:\project_files\movies_initial.csv --headerline --host mongodb://karan:karan@analytics-shard-00-00-eidfy.mongodb.net:27017,analytics-shard-00-01-eidfy.mongodb.net:27017,analytics-shard-00-02-eidfy.mongodb.net:27017/test?ssl=true&replicaSet=Analytics-shard-0&authSource=admin")

client = MongoClient("mongodb://karan:karan@analytics-shard-00-00-eidfy.mongodb.net:27017,analytics-shard-00-01-eidfy.mongodb.net:27017,analytics-shard-00-02-eidfy.mongodb.net:27017/mflix?ssl=true&replicaSet=Analytics-shard-0&authSource=admin")

runtime_pat = re.compile(r'([0-9]+) min')


# Reshaping data into mongodb, for example genre was initially comma separated like action, drama, sci-fi so here I am reshaping 
# it into mongodb Array which will split the string with comma(,) as delimiter. Same is the case with director column, cast 
# and language columns. 

# Data had different IMDb columns like imdbRating and imdbVotes so I have clubbed this column into an imdb object in mongo.

# updating rows with batch size as 1000.

batch_size = 1000
updates = []
count = 0
for movie in client.mflix.movies.find({}):

    fields_to_set = {}
    fields_to_unset = {}

    for k,v in movie.copy().items():
        if v == "" or v == [""]:
            del movie[k]
            fields_to_unset[k] = ""

    if 'director' in movie:
        fields_to_unset['director'] = ""
        fields_to_set['directors'] = movie['director'].split(", ")
    if 'cast' in movie:
        fields_to_set['cast'] = movie['cast'].split(", ")
    if 'writer' in movie:
        fields_to_unset['writer'] = ""
        fields_to_set['writers'] = movie['writer'].split(", ")
    if 'genre' in movie:
        fields_to_unset['genre'] = ""
        fields_to_set['genres'] = movie['genre'].split(", ")
    if 'language' in movie:
        fields_to_unset['language'] = ""
        fields_to_set['languages'] = movie['language'].split(", ")
    if 'country' in movie:
        fields_to_unset['country'] = ""
        fields_to_set['countries'] = movie['country'].split(", ")
        
    if 'fullplot' in movie:
        fields_to_unset['fullplot'] = ""
        fields_to_set['fullPlot'] = movie['fullplot']
    if 'rating' in movie:
        fields_to_unset['rating'] = ""
        fields_to_set['rated'] = movie['rating']

    imdb = {}
    if 'imdbID' in movie:
        fields_to_unset['imdbID'] = ""
        imdb['id'] = movie['imdbID']
    if 'imdbRating' in movie:
        fields_to_unset['imdbRating'] = ""
        imdb['rating'] = movie['imdbRating']
    if 'imdbVotes' in movie:
        fields_to_unset['imdbVotes'] = ""
        imdb['votes'] = movie['imdbVotes']
    if imdb:
        fields_to_set['imdb'] = imdb
        
    if 'released' in movie:
        fields_to_set['released'] = datetime.strptime(movie['released'],
                                                      "%Y-%m-%d")
    if 'lastUpdated' in movie:
        fields_to_set['lastUpdated'] = datetime.strptime(movie['lastUpdated'][0:19],
                                                         "%Y-%m-%d %H:%M:%S")

    if 'runtime' in movie:
        m = runtime_pat.match(movie['runtime']) 
        if m:
            fields_to_set['runtime'] = int(m.group(1))

    update_doc = {}
    if fields_to_set:
        update_doc['$set'] = fields_to_set
    if fields_to_unset:
        update_doc['$unset'] = fields_to_unset

    updates.append(UpdateOne({'_id': movie['_id']}, update_doc))

    count += 1
    if count == batch_size:
        client.mflix.movies.bulk_write(updates)
        updates = []
        count = 0

if updates:         
    client.mflix.movies.bulk_write(updates)



In [None]:
# Data Model


#   _id: unique id field generated by mongodb for each document.
#   title: String - title of the movie.
#   year:  datetime - year movie was released in.
#   runtime: int - total duration of the movie.
#   released: datetime - release date of the movie.
#   cast: mongo db array consisting of names of all the cast members.
#   plot: string - plot of the movie.
#   awards: string - awards that movies have received.
#   lastupdated: string - when was these movie records were last updated.
#   type: string - type of movie.
#   genres: array - array of genres.
#   poster: string - link of movie poster.
#   countries: array - array of countries.
#   fullplot: string - fullplot of the movie.
#   directors: array(string) - array of name of the directors.
#   imdb - mongodb object containing (rating,votes,id).

In [101]:
# Query 1 - finding count of different language combinations and sorting by the highest. 

t1 = datetime.now()
pipeline = [
    {
        '$sortByCount': "$languages"
    }
]
clear_output()
pprint.pprint(list(client.mflix.movies.aggregate(pipeline)))

t2 = datetime.now()
delta = t2-t1
print "Execution time is: {}".format(delta)

# Execution time: 4.39 seconds

[{u'_id': [u'English'], u'count': 25325},
 {u'_id': [u'French'], u'count': 1784},
 {u'_id': [u'Italian'], u'count': 1480},
 {u'_id': [u'Japanese'], u'count': 1290},
 {u'_id': None, u'count': 1115},
 {u'_id': [u'Spanish'], u'count': 875},
 {u'_id': [u'Russian'], u'count': 777},
 {u'_id': [u'English', u'Spanish'], u'count': 728},
 {u'_id': [u'German'], u'count': 674},
 {u'_id': [u'English', u'French'], u'count': 584},
 {u'_id': [u'Hindi'], u'count': 498},
 {u'_id': [u'Korean'], u'count': 377},
 {u'_id': [u'Finnish'], u'count': 349},
 {u'_id': [u'Swedish'], u'count': 291},
 {u'_id': [u'English', u'German'], u'count': 288},
 {u'_id': [u'Mandarin'], u'count': 287},
 {u'_id': [u'English', u'Italian'], u'count': 263},
 {u'_id': [u'Portuguese'], u'count': 251},
 {u'_id': [u'French', u'English'], u'count': 246},
 {u'_id': [u'Polish'], u'count': 203},
 {u'_id': [u'Dutch'], u'count': 199},
 {u'_id': [u'Cantonese'], u'count': 170},
 {u'_id': [u'English', u'Russian'], u'count': 159},
 {u'_id': [u'E

In [118]:
# Query 2 - finds all the records where movie has both korean and english language.

t1 = datetime.now()
filter = {
    'languages': {'$all': ['Korean','English']}
}
clear_output()
pprint.pprint(list(client.mflix.movies.find(filter)))

t2 = datetime.now()
delta = t2-t1
print "Execution time is: {}".format(delta)

# Execution time: 1.37 seconds

[{u'_id': ObjectId('5a0248fb9cb9329fb91b0f24'),
  u'cast': [u'Rock Hudson', u'Anna Kashfi', u'Dan Duryea', u'Don DeFore'],
  u'countries': [u'USA'],
  u'directors': [u'Douglas Sirk'],
  u'fullPlot': u"Dean Hess, who entered the ministry to atone for bombing a German orphanage, decides he's a failure at preaching. Rejoined to train pilots early in the Korean War, he finds Korean orphans raiding the airbase garbage. With a pretty Korean teacher, he sets up an orphanage for them and others. But he finds that to protect his charges, he has to kill.",
  u'genres': [u'Biography', u'Drama', u'History'],
  u'imdb': {u'id': 50171, u'rating': 6.3, u'votes': 654},
  u'languages': [u'English', u'Korean'],
  u'lastupdated': u'2015-09-02 00:25:52.287000000',
  u'plot': u'A remorseful bomber pilot-turned-minister rejoins for the Korean War.',
  u'poster': u'http://ia.media-imdb.com/images/M/MV5BMTIwMjIzNTYzMl5BMl5BanBnXkFtZTcwOTIxMTUyMQ@@._V1_SX300.jpg',
  u'rated': u'APPROVED',
  u'released': dateti

In [16]:
# selecting all the records and calling explain method. Explain method returns the execution stats related to the query execution.

client.mflix.movies.find({}).explain()
    

{u'executionStats': {u'allPlansExecution': [],
  u'executionStages': {u'advanced': 46014,
   u'direction': u'forward',
   u'docsExamined': 46014,
   u'executionTimeMillisEstimate': 0,
   u'invalidates': 0,
   u'isEOF': 1,
   u'nReturned': 46014,
   u'needTime': 1,
   u'needYield': 0,
   u'restoreState': 359,
   u'saveState': 359,
   u'stage': u'COLLSCAN',
   u'works': 46016},
  u'executionSuccess': True,
  u'executionTimeMillis': 19,
  u'nReturned': 46014,
  u'totalDocsExamined': 46014,
  u'totalKeysExamined': 0},
 u'ok': 1.0,
 u'queryPlanner': {u'indexFilterSet': False,
  u'namespace': u'mflix.movies',
  u'parsedQuery': {},
  u'plannerVersion': 1,
  u'rejectedPlans': [],
  u'winningPlan': {u'direction': u'forward', u'stage': u'COLLSCAN'}},
 u'serverInfo': {u'gitVersion': u'078f28920cb24de0dd479b5ea6c66c644f6326e9',
  u'host': u'analytics-shard-00-00-eidfy.mongodb.net',
  u'port': 27017,
  u'version': u'3.4.10'}}

In [115]:
# Querying on an array - extracting movies details where genres equal to "fantasy"

pprint.pprint(list(client.mflix.movies.find({'genres': "Fantasy"},{'title':1,'year':1,'genres':1})))

[{u'_id': ObjectId('5a0248f99cb9329fb91afa60'),
  u'genres': [u'Short', u'Fantasy'],
  u'title': u'A Trip to the Moon',
  u'year': 1898},
 {u'_id': ObjectId('5a0248f99cb9329fb91afa62'),
  u'genres': [u'Short', u'Fantasy'],
  u'title': u'The Sign of the Cross',
  u'year': 1899},
 {u'_id': ObjectId('5a0248f99cb9329fb91afa68'),
  u'genres': [u'Short', u'Fantasy'],
  u'title': u'Jack and the Beanstalk',
  u'year': 1902},
 {u'_id': ObjectId('5a0248f99cb9329fb91afa69'),
  u'genres': [u'Short', u'Adventure', u'Fantasy'],
  u'title': u'A Trip to the Moon',
  u'year': 1902},
 {u'_id': ObjectId('5a0248f99cb9329fb91afa6a'),
  u'genres': [u'Fantasy', u'Short'],
  u'title': u'Alice in Wonderland',
  u'year': 1903},
 {u'_id': ObjectId('5a0248f99cb9329fb91afa6e'),
  u'genres': [u'Short', u'Adventure', u'Fantasy'],
  u'title': u'Fairyland: A Kingdom of Fairies',
  u'year': 1903},
 {u'_id': ObjectId('5a0248f99cb9329fb91afa72'),
  u'genres': [u'Short', u'Adventure', u'Fantasy'],
  u'title': u'The Voyage

In [116]:
client.mflix.movies.find({'genres': "Fantasy"},{'title':1,'year':1,'genres':1}).explain()

# Execution time: 55 milliSeconds

{u'executionStats': {u'allPlansExecution': [],
  u'executionStages': {u'advanced': 2210,
   u'executionTimeMillisEstimate': 61,
   u'inputStage': {u'advanced': 2210,
    u'direction': u'forward',
    u'docsExamined': 46014,
    u'executionTimeMillisEstimate': 61,
    u'filter': {u'genres': {u'$eq': u'Fantasy'}},
    u'invalidates': 0,
    u'isEOF': 1,
    u'nReturned': 2210,
    u'needTime': 43805,
    u'needYield': 0,
    u'restoreState': 359,
    u'saveState': 359,
    u'stage': u'COLLSCAN',
    u'works': 46016},
   u'invalidates': 0,
   u'isEOF': 1,
   u'nReturned': 2210,
   u'needTime': 43805,
   u'needYield': 0,
   u'restoreState': 359,
   u'saveState': 359,
   u'stage': u'PROJECTION',
   u'transformBy': {u'genres': 1, u'title': 1, u'year': 1},
   u'works': 46016},
  u'executionSuccess': True,
  u'executionTimeMillis': 55,
  u'nReturned': 2210,
  u'totalDocsExamined': 46014,
  u'totalKeysExamined': 0},
 u'ok': 1.0,
 u'queryPlanner': {u'indexFilterSet': False,
  u'namespace': u'mfl

In [40]:
# Querying on a nested field - extracting movies whose imdb rating is greater then 8.0

pprint.pprint(list(client.mflix.movies.find({"imdb.rating": {"$gt":8}})))

[{u'_id': ObjectId('5a0248f99cb9329fb91afa69'),
  u'cast': [u'Fran\ufffdois Lallement', u'Jules-Eug\ufffdne Legris'],
  u'countries': [u'France'],
  u'directors': [u'Georges M\ufffdli\ufffds'],
  u'fullPlot': u'A group of men travel to the moon by being shot in a capsule from a giant cannon. They are captured by moon-men, escape, and return to the earth.',
  u'genres': [u'Short', u'Adventure', u'Fantasy'],
  u'imdb': {u'id': 417, u'rating': 8.2, u'votes': 23904},
  u'lastupdated': u'2015-09-01 00:16:55.443000000',
  u'plot': u'A group of astronomers go on an expedition to the moon.',
  u'poster': u'http://ia.media-imdb.com/images/M/MV5BMTQzMDYxNzUxNl5BMl5BanBnXkFtZTgwMjgxNjkxMTE@._V1_SX300.jpg',
  u'rated': u'TV-G',
  u'released': datetime.datetime(1902, 10, 4, 0, 0),
  u'runtime': 13,
  u'title': u'A Trip to the Moon',
  u'type': u'movie',
  u'year': 1902},
 {u'_id': ObjectId('5a0248f99cb9329fb91afb22'),
  u'cast': [u'Werner Krauss',
            u'Conrad Veidt',
            u'Friedric

In [106]:
client.mflix.movies.find({"imdb.rating": {"$gt":8}}).explain()

# Execution time: 55 milliSeconds

{u'executionStats': {u'allPlansExecution': [],
  u'executionStages': {u'advanced': 1791,
   u'direction': u'forward',
   u'docsExamined': 46014,
   u'executionTimeMillisEstimate': 40,
   u'filter': {u'imdb.rating': {u'$gt': 8}},
   u'invalidates': 0,
   u'isEOF': 1,
   u'nReturned': 1791,
   u'needTime': 44224,
   u'needYield': 0,
   u'restoreState': 359,
   u'saveState': 359,
   u'stage': u'COLLSCAN',
   u'works': 46016},
  u'executionSuccess': True,
  u'executionTimeMillis': 55,
  u'nReturned': 1791,
  u'totalDocsExamined': 46014,
  u'totalKeysExamined': 0},
 u'ok': 1.0,
 u'queryPlanner': {u'indexFilterSet': False,
  u'namespace': u'mflix.movies',
  u'parsedQuery': {u'imdb.rating': {u'$gt': 8}},
  u'plannerVersion': 1,
  u'rejectedPlans': [],
  u'winningPlan': {u'direction': u'forward',
   u'filter': {u'imdb.rating': {u'$gt': 8}},
   u'stage': u'COLLSCAN'}},
 u'serverInfo': {u'gitVersion': u'078f28920cb24de0dd479b5ea6c66c644f6326e9',
  u'host': u'analytics-shard-00-00-eidfy.mongodb.n

In [94]:
# Query 5 - grouping each year and finding average runtime of movies on each individual years.

from datetime import datetime
t1 = datetime.now()
pprint.pprint(list(client.mflix.movies.aggregate([
    {'$group': {
        '_id':  '$year',
        'MaxRuntime': {'$avg':'$runtime'}
        
        
    }},
    {"$sort": {"MaxRuntime": -1}}
])))
t2 = datetime.now()
delta = t2-t1
print "Execution time is: {}".format(delta)

#Execution time: 271 milliSeconds



[{u'MaxRuntime': 600.0, u'_id': u'1999\ufffd'},
 {u'MaxRuntime': 537.0, u'_id': u'1996\ufffd'},
 {u'MaxRuntime': 292.0, u'_id': u'1995\ufffd'},
 {u'MaxRuntime': 270.0, u'_id': u'2012\ufffd'},
 {u'MaxRuntime': 259.6666666666667, u'_id': u'1988\ufffd'},
 {u'MaxRuntime': 225.0, u'_id': u'1987\ufffd'},
 {u'MaxRuntime': 220.0, u'_id': u'1979\ufffd'},
 {u'MaxRuntime': 215.0, u'_id': u'1980\ufffd'},
 {u'MaxRuntime': 199.0, u'_id': u'2004\ufffd'},
 {u'MaxRuntime': 178.0, u'_id': u'1971\ufffd'},
 {u'MaxRuntime': 175.0, u'_id': u'2000\ufffd'},
 {u'MaxRuntime': 174.0, u'_id': u'1997\ufffd'},
 {u'MaxRuntime': 160.0, u'_id': 2017},
 {u'MaxRuntime': 152.33333333333334, u'_id': u'2011\ufffd'},
 {u'MaxRuntime': 152.0, u'_id': u'1981\ufffd'},
 {u'MaxRuntime': 105.0, u'_id': u'2010\ufffd'},
 {u'MaxRuntime': 104.40983606557377, u'_id': 1980},
 {u'MaxRuntime': 103.74, u'_id': 1979},
 {u'MaxRuntime': 103.05446623093682, u'_id': 1992},
 {u'MaxRuntime': 102.55040322580645, u'_id': 1993},
 {u'MaxRuntime': 102

In [113]:
# Query - grouping by year and finding total number of movie details in the data.

t1 = datetime.now()

pprint.pprint(list(client.mflix.movies.aggregate([
    
    {'$group': {
        '_id':  {'year' : '$year'},
        'count': {'$sum':1}
        #'MaxRating': {'$max':'$imdb.rating'}
    }},
   # {"$sort": {"MaxRating":-1}}
    
])))

t2 = datetime.now()
delta = t2-t1
print "Execution time is: {}".format(delta)


# Execution time: 203 milliSeconds

[{u'_id': {u'year': 2019}, u'count': 1},
 {u'_id': {u'year': 2018}, u'count': 1},
 {u'_id': {u'year': 1874}, u'count': 1},
 {u'_id': {u'year': 1880}, u'count': 1},
 {u'_id': {u'year': 1887}, u'count': 1},
 {u'_id': {u'year': u'2010\ufffd'}, u'count': 4},
 {u'_id': {u'year': u'2016\ufffd'}, u'count': 4},
 {u'_id': {u'year': u'2007\ufffd'}, u'count': 3},
 {u'_id': {u'year': 2017}, u'count': 42},
 {u'_id': {u'year': u'2003\ufffd'}, u'count': 1},
 {u'_id': {u'year': 2016}, u'count': 519},
 {u'_id': {u'year': 2014}, u'count': 2078},
 {u'_id': {u'year': 1890}, u'count': 4},
 {u'_id': {u'year': 2013}, u'count': 1927},
 {u'_id': {u'year': 1888}, u'count': 2},
 {u'_id': {u'year': u'2002\ufffd'}, u'count': 2},
 {u'_id': {u'year': 2011}, u'count': 1706},
 {u'_id': {u'year': 2009}, u'count': 1650},
 {u'_id': {u'year': u'2001\ufffd'}, u'count': 1},
 {u'_id': {u'year': 2012}, u'count': 1817},
 {u'_id': {u'year': u'1984\ufffd'}, u'count': 1},
 {u'_id': {u'year': 2007}, u'count': 1359},
 {u'_id': {u'y

In [None]:
# Taking backup of .bson file and uploading it into google cloud. 

from google.cloud import storage
#storage_client = storage.Client.from_service_account_json('C:\saccount\My First Project-57f7edb3dddc.json','optimal-sylph-184116')

# creating bucket for the first time
# bucket_name = 'store1139'
# bucket = storage_client.create_bucket(bucket_name)

def upload(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client.from_service_account_json('C:\saccount\My First Project-57f7edb3dddc.json','optimal-sylph-184116')
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)


    blob.upload_from_filename(source_file_name)

    print('File {} uploaded to {}.'.format(source_file_name,destination_blob_name))

upload("store1139","C:\mongodump\moviesDb\movies.bson","MongoBlob")


In [None]:
# Scheduling backup every hour. 


import os
import schedule

def job_backup():
    os.popen("mongodump /o C:\mongodump")

schedule.every().hour.do(job_backup)
schedule.every().hour.do(upload)

