## MongoDB 5

In [None]:
# import statements
import os
from pymongo import MongoClient
import bson
from datetime import datetime, timedelta

import geopandas as gpd
import matplotlib.pyplot as plt
from shapely.geometry import Point

### Connection establishment

In [None]:
client = MongoClient('mongodb://localhost:27017/')

### Dropping a database: `dropDatabase(database)`

In [None]:
client.drop_database('sample_airbnb')

In [None]:
db = client.sample_airbnb

In [None]:
# directory where the JSON files are stored
json_dir = 'sample_airbnb'
json_files = [f for f in os.listdir(json_dir) if f.endswith(".json")]
collections = [f.replace(".json", "") for f in json_files]
collections

In [None]:
for idx, json_file in enumerate(json_files):
    with open(os.path.join(json_dir, json_file), 'r') as f:
        for line in f:
            data = bson.json_util.loads(line.strip())
            db[collections[idx]].insert_one(data)
        
        print(f"Loaded {json_file} into the '{collections[idx]}' collection.")

In [None]:
db.list_collection_names()

### Let's review `$elemMatch` operator

- `$elemMatch`: Selects documents if at least one element in the array field matches all the specified $elemMatch conditions.

### Q: Find all listing names and amenities where at least one amenity contains "wifi" (case-insensitive) and another contains "Parking" (case-insensitive).

In [None]:
cursor = db.listingsAndReviews.find(
    {
        "amenities": {
            "$elemMatch": { "$regex": "wifi", "$options": "i" },
            "$elemMatch": { "$regex": "Parking", "$options": "i" }
        }
    },
    {"name": 1, "amenities": 1, "_id": 0}
)

listings = list(cursor)
listings[:1]

### Aggregation Pipelines

- An aggregation pipeline consists of one or more stages that process documents:
    - Each stage performs an operation on the input documents. For example, a stage can filter documents, group documents, and calculate values.
    - The documents that are output from a stage are passed to the next stage.
    - An aggregation pipeline can return results for groups of documents. For example, return the total, average, maximum, and minimum values.

### `db.collection.aggregate(pipeline, options)`

- Calculates aggregate values for the data in a collection or a view.
- Returns:	
    - A cursor for the documents produced by the final stage of the aggregation pipeline.
    - If the pipeline includes the `explain` option, the query returns a document that provides details on the processing of the aggregation operation.

### Building a pipeline

### 1. `$match`
- Filters documents based on a specified query predicate. Matched documents are passed to the next pipeline stage.
- Syntax: `{ $match: { <query predicate> } }`

### 2. `$group`
- The $group stage separates documents into groups according to a "group key". The output is one document for each unique group key.
- A group key is often a field, or group of fields. The group key can also be the result of an expression. Use the `_id` field in the `$group` pipeline stage to set the group key. 
- In the `$group` stage output, the `_id` field is set to the group key for that document.
- Syntax:
```
{
 $group:
   {
     _id: <expression>, // Group key
     <field1>: { <accumulator1> : <expression1> },
     ...
   }
 }
```

### 3. `$project`
- Passes along the documents with the requested fields to the next stage in the pipeline. The specified fields can be existing fields from the input documents or newly computed fields.

#### Q: Find the average price (rounded to two decimal places) of all "Entire home/apt" (`room_type`) listings. 

What kind of grouping do we want to create here? All documents should be part of a single group because we are querying based on a specific room type. How do we mention this? 

- `_id: None` - meaning that all documents are treated as belonging to a single group, effectively removing the grouping by any field and aggregating over the entire dataset.

**Recommendation**: test individual stages of the pipeline to make sure you are making progress. You can easily test the `$match` stage by using an equivalent `find` method call.

In [None]:
pipeline = [
    {"$match": {"room_type": "Entire home/apt"}},
    {
        "$group": {
            "_id": None, # Group all documents
            "average_price": {"$avg": "$price"}
        }
    },
    {
        "$project": {
            "_id": 0,
            "average_price": {"$round": ["$average_price", 2]}
        }
    }
]
avg_price = list(db.listingsAndReviews.aggregate(pipeline))
avg_price

In [None]:
entire_homes = list(db.listingsAndReviews.find(
    {"room_type": "Entire home/apt"},
    {"name": 1, "room_type": 1, "_id": 0}
))
entire_homes[:10]

In [None]:
pipeline = [
    {"$match": {"room_type": "Entire home/apt"}},
    {
        "$group": {
            "_id": None, # Group all documents
            "average_price": {"$avg": "$price"}
        }
    }
]
avg_price = list(db.listingsAndReviews.aggregate(pipeline))
avg_price

In [None]:
pipeline = [
    {"$match": {"room_type": "Entire home/apt"}},
    {
        "$group": {
            "_id": None, # Group all documents
            "average_price": {"$avg": "$price"}
        }
    },
    {
        "$project": {
            "_id": 0,
            "average_price": {"$round": ["$average_price", 2]}
        }
    }
]
avg_price = list(db.listingsAndReviews.aggregate(pipeline))
avg_price

#### Q: Find the average price (rounded to two decimal places) of all `room_type` listings. 

In [None]:
pipeline = [
    {
        "$group": {
            "_id": "$room_type",  # Group by room_type
            "average_price": {"$avg": "$price"}  
        }
    }
]
avg_price = list(db.listingsAndReviews.aggregate(pipeline))
avg_price

In [None]:
pipeline = [
    {
        "$group": {
            "_id": "$room_type",  # Group by room_type
            "average_price": {"$avg": "$price"}  
        }
    },
    {
        "$project": {
            "_id": 1,  # Include room_type in the result
            "average_price": {"$round": ["$average_price", 2]}  
        }
    }
]
avg_price = list(db.listingsAndReviews.aggregate(pipeline))
avg_price

#### Q: Find the average price per bedroom for each property type.

In [None]:
pipeline = [
    {
        "$group": {
            "_id": { # Group by property_type and bedrooms
                "property_type": "$property_type",
                "bedrooms": "$bedrooms"
            },
            "avg_price_per_bedroom": {"$avg": "$price"}
        }
    },
    {
        "$project": {
            "_id": 1,
            "avg_price_per_bedroom": {"$round": ["$avg_price_per_bedroom", 2]}
        }    
    }
]

avg_price_per_bedroom = list(db.listingsAndReviews.aggregate(pipeline))
avg_price_per_bedroom[:5]

#### Q: Find the average price per bedroom for each property type, and then calculate the total average price across all property types.

In [None]:
pipeline = [
    {
        "$group": {
            "_id": { # Group by property_type and bedrooms
                "property_type": "$property_type",
                "bedrooms": "$bedrooms"
            },
            "avg_price_per_bedroom": {"$avg": "$price"}
        }
    },
    {
        "$group": {
            "_id": "$_id.property_type",
            "avg_price_per_property_type": {"$avg": "$avg_price_per_bedroom"}
        }
    },
    {
        "$project": {
            "_id": 1,
            "avg_price_per_property_type": {"$round": ["$avg_price_per_property_type", 2]}
        }    
    }
]

avg_price_per_bedroom = list(db.listingsAndReviews.aggregate(pipeline))
avg_price_per_bedroom

#### Q: Find the top 2 hosts who have the most listings.

How can you explore a complex document?

In [None]:
single_listing = list(db.listingsAndReviews.find().limit(1))[0]

for field in single_listing:
    #print(field)
    if "host" in field:
        print(single_listing[field])

### More pipeline stages

4. `$sort`: Reorders the document stream by a specified sort key. Only the order changes; the documents remain unmodified. For each input document, outputs one document.
5. `$limit`: Passes the first n documents unmodified to the pipeline where n is the specified limit. For each input document, outputs either one document (for the first n documents) or zero documents (after the first n documents).
- documentation: https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/#std-label-aggregation-pipeline-operator-reference

In [None]:
pipeline = [
    {
        "$group": {
            "_id": "$host.host_name",
            "total_listings": {"$sum": 1}
        }
    }
]
top_hosts = list(db.listingsAndReviews.aggregate(pipeline))
top_hosts[:5]

In [None]:
natalie_listings = list(db.listingsAndReviews.find(
    {'host.host_name': 'Natalie'},
    {'name': 1, 'host': 1, '_id': 0}
))
#natalie_listings

In [None]:
pipeline = [
    {
        "$group": {
            "_id": "$host.host_name",
            "total_listings": {"$sum": 1}
        }
    },
    {"$sort": {"total_listings": -1, "_id": 1}},
    {"$limit": 2}
]
top_hosts = list(db.listingsAndReviews.aggregate(pipeline))
top_hosts

### More pipeline stages

6. `$lookup`:
   - Performs a left outer join to another collection in the same database to filter in documents from the "joined" collection for processing.
   - The $lookup stage adds a new array field to each input document.
   - The new array field contains the matching documents from the "joined" collection. 
   - Syntax:
```
{
   $lookup:
     {
       from: <collection to join>,
       localField: <field from the input documents>,
       foreignField: <field from the documents of the "from" collection>,
       as: <output array field>
     }
}
```

7. `$unwind`: Deconstructs an array field from the input documents to output a document for each element. Each output document replaces the array with an element value. For each input document, outputs n documents where n is the number of array elements and can be zero for an empty array.
    - Syntax: `{ $unwind: <field path> }`

#### Q: List all accounts with customer names and corresponding limits.

In [None]:
db = client.sample_analytics

In [None]:
db.accounts.find_one()

In [None]:
db.customers.find_one()

In [None]:
pipeline = [
    {
        '$lookup': {
            'from': 'customers',             
            'localField': 'account_id',       
            'foreignField': 'accounts',  
            'as': 'account_details'         
        }
    }
]

merged_results = list(db.accounts.aggregate(pipeline))
first = merged_results[0]
first

In [None]:
pipeline = [
    {
        '$lookup': {
            'from': 'customers',             
            'localField': 'account_id',       
            'foreignField': 'accounts',  
            'as': 'account_details'         
        }
    },
    {
        '$unwind': '$account_details'       
    },
]

merged_results = list(db.accounts.aggregate(pipeline))
first = merged_results[0]
first

In [None]:
pipeline = [
    {
        '$lookup': {
            'from': 'customers',             
            'localField': 'account_id',       
            'foreignField': 'accounts',  
            'as': 'account_details'         
        }
    },
    {
        '$unwind': '$account_details'       
    },
    {
        '$project': {                       
            '_id': 0,                       
            'account_details.name': 1,                      
            'account_id': 1, 
            'limit': 1       
        }
    }
]

merged_results = list(db.accounts.aggregate(pipeline))
first = merged_results[0]
print(f"Customer Name: {first['account_details']['name']}, \
Account ID: {first['account_id']}, \
Limit: {first['limit']}")

Let's try this the other way around with respect to the outer join.

In [None]:
pipeline = [
    {
        '$lookup': {
            'from': 'accounts',             
            'localField': 'accounts',       
            'foreignField': 'account_id',  
            'as': 'account_details'         
        }
    }
]

merged_results = list(db.customers.aggregate(pipeline))
first = merged_results[0]
first

In [None]:
pipeline = [
    {
        '$lookup': {
            'from': 'accounts',             
            'localField': 'accounts',       
            'foreignField': 'account_id',  
            'as': 'account_details'         
        }
    },
    {
        '$unwind': '$account_details'       
    },
    {
        '$project': {                       
            '_id': 0,                       
            'name': 1,                      
            'account_details.account_id': 1, 
            'account_details.limit': 1       
        }
    }
]

merged_results = list(db.customers.aggregate(pipeline))

first = merged_results[0]
print(f"Customer Name: {first['name']}, \
Account ID: {first['account_details']['account_id']}, \
Limit: {first['account_details']['limit']}")

### Intuition behind usage of `$` sign with field name

- `$match`: No, when using existing field values. Yes, inside expressions.
- `$group`: Yes, when using existing field values. No, when defining new fields.
- `$project`: No, when using existing field without changing its name. Yes, when defining new fields or constants.
- `$sort`: No, when using existing field values.
- `$lookup`: No, when using existing fields (for both collections)
- `$unwind`: Yes, when using existing field values. 

### More pipeline stages

8. `$addFields`: Adds new fields to documents. Similar to `$project`, `$addFields` reshapes each document in the stream; specifically, by adding new fields to output documents that contain both the existing fields from the input documents and the newly added fields.

### `$cond (aggregation)`

- Evaluates a boolean expression to return one of the two specified return expressions.
- Syntax:
```{ $cond: { if: <boolean-expression>, then: <true-case>, else: <false-case> } }```

or

```{ $cond: [ <boolean-expression>, <true-case>, <false-case> ] }```

#### Q: Add a field called "account_status" with value "High" if account limit is greater than 9000 and "Low" otherwise.

In [None]:
pipeline = [
    {
        '$addFields': {
            'account_status': {             
                '$cond': {                  
                    'if': { '$gt': ['$limit', 9000] },  
                    'then': 'High',         
                    'else': 'Low'           
                }
            }
        }
    }
]

accounts = list(db.accounts.aggregate(pipeline))
accounts[:2]

### More about array methods

- `$addToSet`: The `$addToSet` operator adds a value to an array unless the value is already present, in which case $addToSet does nothing to that array.

### Geospatial Query Operators

- documentation: https://www.mongodb.com/docs/manual/reference/operator/query-geospatial/

### Query selectors

- `$geoIntersects`: Selects geometries that intersect with a GeoJSON geometry. The 2dsphere index supports `$geoIntersects`.
- `$geoWithin`: Selects geometries within a bounding GeoJSON geometry. The 2dsphere and 2d indexes support `$geoWithin`.
- `$near`: Returns geospatial objects in proximity to a point. Requires a geospatial index. The 2dsphere and 2d indexes support `$near`.
- `$nearSphere`:Returns geospatial objects in proximity to a point on a sphere. Requires a geospatial index. The 2dsphere and 2d indexes support `$nearSphere`.

### Geometry Specifiers

- `$geometry`: The `$geometry` operator specifies a GeoJSON geometry for use with the following geospatial query operators: `$geoWithin`, `$geoIntersects`, `$near`, and `$nearSphere`. `$geometry` uses EPSG:4326 as the default coordinate reference system (CRS).
- `$maxDistance`: Specifies a maximum distance to limit the results of `$near` and `$nearSphere` queries. The 2dsphere and 2d indexes support `$maxDistance`.

Let's go back to analyzing AirBnb dastaset.

In [None]:
db = client.sample_airbnb

In [None]:
def plot_listings(listings, map_type, filename):
    assert map_type in ['hi', 'nyc']
    
    gdf = gpd.GeoDataFrame(listings, geometry=gpd.points_from_xy(
        [listing['address']['location']['coordinates'][0] for listing in listings],
        [listing['address']['location']['coordinates'][1] for listing in listings]
    ))

    if map_type == 'hi':
        geo_map = gpd.read_file("hawaii_limits.geojson")
    elif map_type == 'nyc':
        geo_map = gpd.read_file("nyc_limits.geojson")
    gdf = gdf.set_crs(geo_map.crs)
    
    ax = geo_map.plot(figsize=(10, 6), color='lightgray', edgecolor='black')
    gdf.plot(ax=ax, marker='o', color='red', markersize=5)
    plt.title("Airbnb Listing Locations")

    plt.grid(True)
    plt.savefig(f'{filename}.png')
    plt.show()

#### Q: Find all listings located in Hawaii and plot them.

In [None]:
db.listingsAndReviews.find_one({"address.country": "United States"}, {"address"})

In [None]:
hi_listings = list(db.listingsAndReviews.find({
    "address.street": {"$regex": "HI, United States$"}
}))

plot_listings(hi_listings, map_type='hi', filename='airbnb_hi')

### `db.collection.createIndex( { <location field> : "2dsphere" } )`

- `2dsphere` indexes support queries that calculate geometries on an earth-like sphere.
- documentation: https://www.mongodb.com/docs/manual/geospatial-queries/#std-label-index-feature-geospatial

#### Q: Find listings within 5 km of Central Park (NYC) and plot them.

In [None]:
# Longitude, Latitude for Central Park
coordinates = [-73.9654, 40.7851]  

# Create a geospatial index on the location field of the listings
db.listingsAndReviews.create_index({"address.location": "2dsphere"})

nearest_listings = list(db.listingsAndReviews.find({
    "address.location": {
        "$near": {
            "$geometry": {
                "type": "Point",
                "coordinates": coordinates
            },
            "$maxDistance": 5000  
        }
    }
}))

plot_listings(nearest_listings, map_type='nyc', filename='airbnb_nyc_nearest')