## Connection to the mongodb driver. 

The connector can be used to access remote database (e.g. Mongo Atlas) as well.

**N.B.** You should install the Python package (*pip install pymongo*) to use the API.

In [1]:
import pymongo

# Connection
client = pymongo.MongoClient('localhost', 27017)

## CRUD Operations (NON CI INTERESSA)

* Databases and collections are created only when data loaded into them.
* Documents can have different structures since MongoDB is schemaless! 

In [3]:
# Select the db by its name
db_test = client['yelpBusiness']

# Created or Switched to collection names
collection = db_test['computer_engineers'] # also db.computer_engineers

# print out available databases names
print('Available database -> ', client.list_database_names())
print('Available collections -> ', db_test.list_collection_names())

Available database ->  ['Yelp', 'admin', 'config', 'local']
Available collections ->  []


In [4]:
from datetime import datetime

document={"Id":"M63000768",
          "Birth": datetime(1996,1,15),
          "Name": "Valerio La Gatta",
          "Branch": "IT",
          "Exams": ["Impianti di Elaboratione","Information Systems","Machine Learning"],
          "GPA": 27.4}
          
result = collection.insert_one(document)
result.inserted_id

ObjectId('626587a051d11a414a387d88')

In [5]:
# print out available databases names
print('Available database -> ', client.list_database_names())
print('Available collections -> ', db_test.list_collection_names())

Available database ->  ['Yelp', 'admin', 'config', 'local', 'students']
Available collections ->  ['computer_engineers']


In [6]:
to_insert = [
    {"Id":"M63000123",
     "Birth": datetime(1990,2,4),
     "Name": "Giancarlo Sperlì",
     "Exams": ["Sistemi Informativi", "Architettura dei Sistemi Di Elaborazione", "Trasmissione Numerica"],
     "GPA": 30},
    {"Id":"M63000890",
     "Birth": datetime(2000,2,4),
     "Name": "Younger Student",
     "Branch": "IT"}
]

result = collection.insert_many(to_insert)
result.inserted_ids

[ObjectId('626587a551d11a414a387d89'), ObjectId('626587a551d11a414a387d8a')]

Nested documents are allowed and can be used to store more complex information or relationships between documents. 

In [7]:
document={"Id":"M63000987",
          "Birth": datetime(1998,12,25),
          "Name": "Another Student",
          "Branch": "IT",
          "Exams": {"Impianti di Elaboratione":25,"Information Systems":28,"Machine Learning":29},
          "GPA": 27.4}
          
result = collection.insert_one(document)
result.inserted_id

ObjectId('626587a951d11a414a387d8b')

In MongoDB, a write operation is **atomic on the level of a single document**, even if the operation modifies multiple embedded documents within a single document. When performing multi-document write operations, whether through a single write operation or multiple write operations, other operations may interleave.

In [64]:
result = collection.update_one({"Name": "Valerio La Gatta"},{"$push": {"Exams": "Big Data Engineering"}})
result.acknowledged

True

In [8]:
result = collection.update_many({"Branch": "IT"},{"$set": {"Notes": "Way to go!"}})
result.matched_count,result.modified_count

(3, 3)

Control the trade-off between performance and consistency using different *write concern* strategies. 

In [9]:
# DO NOT RUN THIS CODE
from pymongo.write_concern import WriteConcern

# w = "majority" | 0 (fire & forget) | 1 (acknoledged) | >1 (replica acknoledged)
# j = False (In memory) | True (On disk jounal)
db_test.users.with_options(
    write_concern= WriteConcern(w= <FILL>,j= <FILL>)
).insert_one(
     <YOUR DOCUMENT>
)

SyntaxError: invalid syntax (Temp/ipykernel_17212/1312447872.py, line 7)

From version 4.0, MongoDB supports multi-document distributed transactions.

In [10]:
# DO NOT RUN THIS CODE
from pymongo.write_concern import WriteConcern
from pymongo.read_concern import ReadConcern
from pymongo import ReadPreference

wc_majority = WriteConcern(w="majority", wtimeout=1000)

# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
    collection_one = session.client.db_test.<FILL>
    collection_two = session.client.db_test.<FILL>
    # Important:: You must pass the session to the operations.
    collection_one.insert_one({"abc": 1}, session=session)
    collection_two.insert_one({"xyz": 999}, session=session)
    
# Step 2: Start a client session.
with client.start_session() as session:
    # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
    session.with_transaction(
        callback,
        read_concern=ReadConcern("local"),
        write_concern=wc_majority,
        read_preference=ReadPreference.PRIMARY,
    )
    

SyntaxError: invalid syntax (Temp/ipykernel_17212/1921175851.py, line 10)

## Real Use Case

### Load Yelp Dataset

In [7]:
import json 

path = "C://Users//annal//Documents//GitHub//BigDataHomework//Homework1//yelp dataset//"
business = []

with open(path + 'yelp_academic_dataset_business.json', 'rb') as f:
    for jsonObj in f:
        business_dict = json.loads(jsonObj)
        business.append(business_dict)


In [8]:
checkin = []

with open(path + 'yelp_academic_dataset_checkin.json', 'rb') as f:
    for jsonObj in f:
        checkin_dict = json.loads(jsonObj)
        checkin.append(checkin_dict)


In [9]:
review = []
count=0

with open(path + 'yelp_academic_dataset_review.json', 'rb') as f:
    if count<10000:
        for jsonObj in f:
            review_dict = json.loads(jsonObj)
            review.append(review_dict)
            count+=1
    else:
        pass
        

In [10]:
tip = []

with open(path + 'yelp_academic_dataset_tip.json', 'rb') as f:
    for jsonObj in f:
        tip_dict = json.loads(jsonObj)
        tip.append(tip_dict)

In [18]:
user = []

count = 0

with open(path + 'yelp_academic_dataset_user.json', 'rb') as f:
    if count<10000:
        for jsonObj in f:
            user_dict = json.loads(jsonObj)
            user.append(user_dict)
            count+=1
    else:
        pass    

In [12]:
business[0]

{'business_id': 'Pns2l4eNsfO8kk83dixA6A',
 'name': 'Abby Rappoport, LAC, CMQ',
 'address': '1616 Chapala St, Ste 2',
 'city': 'Santa Barbara',
 'state': 'CA',
 'postal_code': '93101',
 'latitude': 34.4266787,
 'longitude': -119.7111968,
 'stars': 5.0,
 'review_count': 7,
 'is_open': 0,
 'attributes': {'ByAppointmentOnly': 'True'},
 'categories': 'Doctors, Traditional Chinese Medicine, Naturopathic/Holistic, Acupuncture, Health & Medical, Nutritionists',
 'hours': None}

In [16]:
import pymongo

# Connection
client = pymongo.MongoClient('localhost', 27017)

# Select the db by its name
db = client['yelpPython']

In [17]:
#Cancello tutte le collezioni all'inizio per evitare sovrascrizioni

lista_collection=list(db.list_collection_names())

for col in lista_collection:
    mycol = db[col]
    mycol.drop()


ServerSelectionTimeoutError: localhost:27017: [WinError 10061] Impossibile stabilire la connessione. Rifiuto persistente del computer di destinazione, Timeout: 30s, Topology Description: <TopologyDescription id: 6266562830cf22fd81f40a8b, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [WinError 10061] Impossibile stabilire la connessione. Rifiuto persistente del computer di destinazione')>]>

In [None]:
# print out available databases names
print('Available database -> ', client.list_database_names())
print('Available collections -> ', db.list_collection_names())

Available database ->  ['Yelp', 'admin', 'config', 'local', 'students', 'twitter']
Available collections ->  []


In [32]:
# Created or Switched to collection names: tweets_sample
collection_business = db['yelpBusiness']
res = collection_business.insert_many(business)

In [33]:
collection_checkin = db['yelpCheckin']
res = collection_checkin.insert_many(checkin)

AutoReconnect: localhost:27017: [WinError 10054] Connessione in corso interrotta forzatamente dall'host remoto

In [None]:
collection_tip= db['yelpTip']
res = collection_tip.insert_many(tip)


In [None]:
collection_user= db['yelpUser']
res = collection_user.insert_many(user)

In [None]:
review1=review[:len(review)]
review2=[len(review):]
collection_review = db['yelpReview']
res = collection_review.insert_many(review)

In [34]:
collection_review = db['yelpReview']
res = collection_review.insert_many(review)

In [None]:
# print out available databases names
print('Available database -> ', client.list_database_names())
print('Available collections -> ', db.list_collection_names())

### Query Application

In [None]:
#Data latitudine e longitudine di un utente (posizione) voglio trovare il business 

### Data Selection

In [61]:
collection=collection_business

cursor = collection.find()

In [62]:
res = list(cursor)
len(res)

1201191

In [None]:
#next(cursor)

### Comparison filters

In [None]:
collection=collection_business

In [None]:
# filtro per uguaglianza e per confronto
filter={
    'city': 'Santa Barbara', 
    'stars': {
        '$gt': 4
    }
}

result = collection.find(
    filter=filter
)


result_list = list(result)
len(result_list)

1898

In [65]:
result_list[0]

{'_id': ObjectId('626648fe0130b738ac8c6019'),
 'business_id': 'Pns2l4eNsfO8kk83dixA6A',
 'name': 'Abby Rappoport, LAC, CMQ',
 'address': '1616 Chapala St, Ste 2',
 'city': 'Santa Barbara',
 'state': 'CA',
 'postal_code': '93101',
 'latitude': 34.4266787,
 'longitude': -119.7111968,
 'stars': 5.0,
 'review_count': 7,
 'is_open': 0,
 'attributes': {'ByAppointmentOnly': 'True'},
 'categories': 'Doctors, Traditional Chinese Medicine, Naturopathic/Holistic, Acupuncture, Health & Medical, Nutritionists',
 'hours': None}

When you are interested in just counting the number of documents in output, it is better to use the *count_documents* method.

In [66]:
# usare la count_documents ti restituisce il risultato invece del cursore
print("Number of business -> ", collection.count_documents(filter))

Number of business ->  1898


### Data projection

In [67]:
# filtering
filter={
    'city': 'Santa Barbara', 
    'stars': {
        '$gt': 4
    }
}

# data projection (seleziono solo determinati campi)
project={
    'review_count': 1,
    'id': 1,
#    'user': 1
}

result = collection.find(
    filter=filter,
    projection=project,
)

result_list = list(result)
result_list[0]

{'_id': ObjectId('626648fe0130b738ac8c6019'), 'review_count': 7}

In [68]:
result_list

[{'_id': ObjectId('626648fe0130b738ac8c6019'), 'review_count': 7},
 {'_id': ObjectId('626648fe0130b738ac8c6074'), 'review_count': 32},
 {'_id': ObjectId('626648fe0130b738ac8c6091'), 'review_count': 17},
 {'_id': ObjectId('626648fe0130b738ac8c60c3'), 'review_count': 68},
 {'_id': ObjectId('626648fe0130b738ac8c60f4'), 'review_count': 48},
 {'_id': ObjectId('626648fe0130b738ac8c60fc'), 'review_count': 13},
 {'_id': ObjectId('626648fe0130b738ac8c623c'), 'review_count': 54},
 {'_id': ObjectId('626648fe0130b738ac8c6271'), 'review_count': 34},
 {'_id': ObjectId('626648fe0130b738ac8c6280'), 'review_count': 30},
 {'_id': ObjectId('626648fe0130b738ac8c6368'), 'review_count': 26},
 {'_id': ObjectId('626648fe0130b738ac8c6381'), 'review_count': 5},
 {'_id': ObjectId('626648fe0130b738ac8c6387'), 'review_count': 19},
 {'_id': ObjectId('626648fe0130b738ac8c63aa'), 'review_count': 13},
 {'_id': ObjectId('626648fe0130b738ac8c6407'), 'review_count': 7},
 {'_id': ObjectId('626648fe0130b738ac8c6455'), 'rev

### Retrieve top-k result

In [69]:
# filtering
filter={
    'city': 'Santa Barbara', 
    'stars': {
        '$gt': 4
    }
}

# data projection
project={
    'review_count': 1,
    'id': 1,
}

sort=list({
    'review_count': -1
}.items())

limit = 10

result = collection.find(
    filter=filter,
    projection=project,
    sort = sort,
    limit = limit
)

result_list = list(result)
result_list

[{'_id': ObjectId('626648ff0130b738ac8e595e'), 'review_count': 3834},
 {'_id': ObjectId('626649000130b738ac8e9710'), 'review_count': 1796},
 {'_id': ObjectId('626648ff0130b738ac8dedbe'), 'review_count': 1537},
 {'_id': ObjectId('626648ff0130b738ac8de831'), 'review_count': 1500},
 {'_id': ObjectId('626648ff0130b738ac8dadcc'), 'review_count': 1453},
 {'_id': ObjectId('626648ff0130b738ac8e1a50'), 'review_count': 1452},
 {'_id': ObjectId('626648ff0130b738ac8d0a13'), 'review_count': 1219},
 {'_id': ObjectId('626649000130b738ac8ea433'), 'review_count': 1213},
 {'_id': ObjectId('626648ff0130b738ac8e0d83'), 'review_count': 1108},
 {'_id': ObjectId('626648ff0130b738ac8df6f3'), 'review_count': 1084}]

### Filter on data schema

In [70]:
# find documents where 'place' field is not None
filter={
    'is_open': {
        '$ne': 0
    }
}

result = collection.find(
  filter=filter
)

result_list = list(result)
len(result_list)

1170543

In [71]:
result_list[0]

{'_id': ObjectId('626648fe0130b738ac8c601a'),
 'business_id': 'mpf3x-BjTdTEA3yCZrAYPw',
 'name': 'The UPS Store',
 'address': '87 Grasso Plaza Shopping Center',
 'city': 'Affton',
 'state': 'MO',
 'postal_code': '63123',
 'latitude': 38.551126,
 'longitude': -90.335695,
 'stars': 3.0,
 'review_count': 15,
 'is_open': 1,
 'attributes': {'BusinessAcceptsCreditCards': 'True'},
 'categories': 'Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services',
 'hours': {'Monday': '0:0-0:0',
  'Tuesday': '8:0-18:30',
  'Wednesday': '8:0-18:30',
  'Thursday': '8:0-18:30',
  'Friday': '8:0-18:30',
  'Saturday': '8:0-14:0'}}

### Combining different conditions

In [72]:
# combining logical conditions
filter={
    '$or': [
        {
            'city': 'Santa Barbara'
        }, 
        {
            'stars': {
                '$gt': 4
            }
        }
    ]
}
project={
    'business_id': 1, 
    'review_count': 1, 
}

result = collection.find(
  filter=filter,
  projection=project
)

result_list = list(result)
len(result_list),result_list[0]

(45419,
 {'_id': ObjectId('626648fe0130b738ac8c6019'),
  'business_id': 'Pns2l4eNsfO8kk83dixA6A',
  'review_count': 7})

In [73]:
result_list[0]

{'_id': ObjectId('626648fe0130b738ac8c6019'),
 'business_id': 'Pns2l4eNsfO8kk83dixA6A',
 'review_count': 7}

In [74]:
review[0]

{'review_id': 'KU_O5udG6zpxOg-VcAEodg',
 'user_id': 'mh_-eMZ6K5RLWhZyISBhwA',
 'business_id': 'XQfwVwDr-v0ZS3_CbbE5Xw',
 'stars': 3.0,
 'useful': 0,
 'funny': 0,
 'cool': 0,
 'text': "If you decide to eat here, just be aware it is going to take about 2 hours from beginning to end. We have tried it multiple times, because I want to like it! I have been to it's other locations in NJ and never had a bad experience. \n\nThe food is good, but it takes a very long time to come out. The waitstaff is very young, but usually pleasant. We have just had too many experiences where we spent way too long waiting. We usually opt for another diner or restaurant on the weekends, in order to be done quicker.",
 'date': '2018-07-07 22:09:11'}

### Filtro basato su testo

In [75]:

import re 

filter={
    'text': {
        '$regex': re.compile(r"good")
    }
}

project={
    'stars': 1, 
    'business_id': 1,
    'text':1
}


result = collection.find(
  filter=filter,
  projection=project
)

result_list = list(result)
len(result_list),result_list[0]

(73275,
 {'_id': ObjectId('626649080130b738ac90aebe'),
  'business_id': 'QoezRbYQncpRqyrLH6Iqjg',
  'text': 'They have lots of good deserts and tasty cuban sandwiches'})

### Aggregation framework

1. Find the number of tweets per language.
2. Find all hashtags used in the tweets.

In [88]:
review[0]

{'review_id': 'KU_O5udG6zpxOg-VcAEodg',
 'user_id': 'mh_-eMZ6K5RLWhZyISBhwA',
 'business_id': 'XQfwVwDr-v0ZS3_CbbE5Xw',
 'stars': 3.0,
 'useful': 0,
 'funny': 0,
 'cool': 0,
 'text': "If you decide to eat here, just be aware it is going to take about 2 hours from beginning to end. We have tried it multiple times, because I want to like it! I have been to it's other locations in NJ and never had a bad experience. \n\nThe food is good, but it takes a very long time to come out. The waitstaff is very young, but usually pleasant. We have just had too many experiences where we spent way too long waiting. We usually opt for another diner or restaurant on the weekends, in order to be done quicker.",
 'date': '2018-07-07 22:09:11'}

In [90]:
pipe = [
    {
        '$project': {
            'business_id': 1,
            'name': 1,
            'city': 1,
            'stars': 1,
            'review_count': 1
        }
    }, 
    {   '$lookup':{
            'from':"yelpReview",
            'localField':'business_id',
            'foreignField':'business_id',
            'as':'yelpReview'
        }

    },
    {
        '$unwind':'$yelpReview'
    },
      
    {
        '$match': {
            'stars':{
                '$eq': "3.0"
            }
        }
    }, 
    {
        '$group': {
            '_id': '$business_id', 
            'count': {
                '$sum': 1
            }
        }
    }
]

result = collection_business.aggregate(pipe)
list(result)

[]

In [31]:
pipe = [
    {
        '$project': {
            'id': 1, 
            'full_text': 1, 
            'lang': 1, 
            'entities.hashtags': 1
        }
    }, {
        '$unwind': {
            'path': '$entities.hashtags', 
            'preserveNullAndEmptyArrays': False
        }
    }, {
        '$replaceRoot': {
            'newRoot': '$entities.hashtags'
        }
    }, {
        '$project': {
            'text': 1
        }
    }, {
        '$group': {
            '_id': '$text', 
            'count': {
                '$sum': 1
            }
        }
    }, {
        '$sort': {
            'count': -1
        }
    }
]

result = collection.aggregate(pipe)
result_list = list(result)
result_list

[{'_id': 'COVID19', 'count': 3},
 {'_id': 'coronavirus', 'count': 2},
 {'_id': 'zonerosse', 'count': 1},
 {'_id': 'Cuba', 'count': 1},
 {'_id': 'iorestoacasa', 'count': 1},
 {'_id': 'irresponsabili', 'count': 1},
 {'_id': 'cumartesi', 'count': 1},
 {'_id': 'COVID2019', 'count': 1},
 {'_id': 'benvarım', 'count': 1},
 {'_id': 'Koronovirus', 'count': 1},
 {'_id': 'sxsw', 'count': 1}]

## Indices

### Compound index

In [32]:
from pymongo import ASCENDING, DESCENDING

# compound index
db.tweets.create_index([("created_at", ASCENDING),("retweet_count", DESCENDING)], 
                       name="twoLevel_index",
                       background=True,
                       unique=False,
                       sparse=False)

# print out available indices
print(list(db.tweets.list_indexes()))

[SON([('v', 2), ('key', SON([('_id', 1)])), ('name', '_id_')]), SON([('v', 2), ('key', SON([('created_at', 1), ('retweet_count', -1)])), ('name', 'twoLevel_index'), ('background', True), ('sparse', False)])]


### Text index

In [33]:
# text index
db.tweets.create_index([('source', pymongo.TEXT)], default_language='english',name='text_index')
print(list(db.tweets.list_indexes()))

[SON([('v', 2), ('key', SON([('_id', 1)])), ('name', '_id_')]), SON([('v', 2), ('key', SON([('created_at', 1), ('retweet_count', -1)])), ('name', 'twoLevel_index'), ('background', True), ('sparse', False)]), SON([('v', 2), ('key', SON([('_fts', 'text'), ('_ftsx', 1)])), ('name', 'text_index'), ('weights', SON([('source', 1)])), ('default_language', 'english'), ('language_override', 'language'), ('textIndexVersion', 3)])]


In [34]:
filter = {
    "$text": {"$search": "iphone"}
}

# proiezione di dati
project={
    'source': 1
}

result = db.tweets.find(
  filter=filter,
  projection=project
)

result_list = list(result)
len(result_list),result_list[0]

(16,
 {'_id': ObjectId('6265887a51d11a414a387dae'),
  'source': '<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>'})