In [31]:
import pymongo
from pymongo import MongoClient
from pprint import pprint

In [32]:
# Connect to my local MongoDB database
mongo = MongoClient(port=27017)
divvy_db = mongo.divvy_db 

In [33]:
divvy_ride_data = divvy_db["divvy_ride_data"]

# Define the aggregation pipeline
# Define the aggregation pipeline
pipeline = [
    {
        "$group": {
            "_id": {
                "year": {"$year": {"$toDate": "$started_at"}},
                "month": {"$month": {"$toDate": "$started_at"}}
            },
            "total_rides": {"$sum": 1}
        }
    },
    {
        "$project": {
            "year": "$_id.year",
            "month": "$_id.month",
            "total_rides": 1
        }
    },
    {
        "$project": {
            "_id": {
                "$toString": {
                    "$concat": [
                        {"$toString": "$year"},
                        {"$toString": "$month"}
                    ]
                }
            },
            "year": 1,
            "month": 1,
            "total_rides": 1
        }
    },
    {
        "$sort": {"year": 1, "month": 1}
    }
]

# Execute the aggregation pipeline and write to a new collection
divvy_rides_by_month = divvy_db["divvy_rides_by_month"]
divvy_rides_by_month.drop()  # Drop the collection
aggregated_result = divvy_ride_data.aggregate(pipeline, allowDiskUse=True, collation=None)

for doc in aggregated_result:
    divvy_rides_by_month.insert_one(doc)

print("Aggregation result has been written to the new collection.")

Aggregation result has been written to the new collection.


In [34]:
divvy_rides_by_month = divvy_db["divvy_rides_by_month"]

# Define the aggregation pipeline
pipeline = [
    {
        "$group": {
            "_id": {
                "year": "$year",
                "season": {
                    "$switch": {
                        "branches": [
                            {"case": {"$in": ["$month", [3, 4, 5]]}, "then": "Spring"},
                            {"case": {"$in": ["$month", [6, 7, 8]]}, "then": "Summer"},
                            {"case": {"$in": ["$month", [9, 10, 11]]}, "then": "Autumn"},
                            {"case": {"$in": ["$month", [12, 1, 2]]}, "then": "Winter"}
                        ],
                        "default": "Unknown"
                    }
                }
            },
            "total_rides": {"$sum": "$total_rides"}
        }
    },
    {
        "$sort": {"_id.year": 1, "_id.season": 1}
    }
]

# Execute the aggregation pipeline
divvy_rides_by_season = divvy_db["divvy_rides_by_season"]
divvy_rides_by_season.drop()  # Drop the collection
aggregated_result = list(divvy_rides_by_month.aggregate(pipeline, allowDiskUse=True, collation=None))

# Insert the aggregated documents into the new collection
for doc in aggregated_result:
    divvy_rides_by_season.insert_one(doc)

print("Aggregation by season result has been written to the new collection.")

Aggregation by season result has been written to the new collection.


In [35]:
divvy_ride_data = divvy_db["divvy_ride_data"]

# Define the aggregation pipeline
pipeline = [
    {
        "$group": {
            "_id": {
                "date": "$started_at_date",  # Group by the date
            },
            "total_rides": {"$sum": 1}  # Calculate the total rides for each date
        }
    },
    {
        "$project": {
            "date": "$_id.date",
            "total_rides": 1  # Include the total_rides field
        }
    },
    {
        "$project": {
            "_id": {
                "$toString": "$date"  # Convert the date to a string
            },
            "date": 1,
            "total_rides": 1
        }
    },
    {
        "$sort": {
            "date": 1  # Sort by date in ascending order
        }
    }
]

# Execute the aggregation pipeline and write to a new collection
divvy_rides_by_day = divvy_db["divvy_rides_by_day"]
divvy_rides_by_day.drop()  # Drop the collection if it exists
aggregated_result = divvy_ride_data.aggregate(pipeline, allowDiskUse=True, collation=None)

for doc in aggregated_result:
    divvy_rides_by_day.insert_one(doc)

print("Aggregation result has been written to the new collection.")


Aggregation result has been written to the new collection.


In [36]:
divvy_rides_by_day = divvy_db["divvy_rides_by_day"]
weather_daily = divvy_db["weather_daily"]

pipeline = [
    {
        "$lookup": {
            "from": "weather_daily",
            "localField": "date",
            "foreignField": "date",
            "as": "weather"
        }
    },
    {
        "$unwind": "$weather"  # Unwind the "weather" array created by the $lookup
    },
    {
        "$project": {
            "date": {"$toString": "$date"},  # Convert the date to a string
            "total_rides": 1,
            "cloud_cover": "$weather.cloud_cover",
            "precipitation": "$weather.precipitation",
            "min_temp": "$weather.min_temp",
            "max_temp": "$weather.max_temp",
            "morning_temp": "$weather.morning_temp",
            "afternoon_temp": "$weather.afternoon_temp",
            "evening_temp": "$weather.evening_temp",
            "night_temp": "$weather.night_temp",
            "max_windspeed": "$weather.max_windspeed",
            "significant_precipitation": "$weather.significant_precipitation"
        }
    }
]

# Create a new collection to store the merged data
divvy_rides_and_weather = divvy_db["divvy_rides_and_weather"]

# Drop the collection if it already exists
divvy_rides_and_weather.drop()

# Execute the aggregation pipeline and insert the results into the new collection
results = list(divvy_db.divvy_rides_by_day.aggregate(pipeline))

# Insert each result document into the new collection
for result in results:
    divvy_rides_and_weather.insert_one(result)

print("Merged data has been created in the 'divvy_rides_and_weather' collection.")

Merged data has been created in the 'divvy_rides_and_weather' collection.
