In [1]:
from pymongo import MongoClient
import json
from config import client, db, collection


In [3]:

row_count = collection.count_documents({})  
print(f"Number of Rows (Records): {row_count}")

sample_record = collection.find_one()  
column_count = len(sample_record.keys())  
print(f"Number of Columns (Attributes): {column_count}")


Number of Rows (Records): 735502
Number of Columns (Attributes): 18


# Clean the data (Silver Layer) (using DB Queries or Python).

In [5]:
#i am defining a function to counnt all the missinng values
import pandas as pd
def count_missing_values(data):
    missing_counts = data.isnull().sum()
    print("Missing values in each column:")
    print(missing_counts)
    return missing_counts

if __name__ == "__main__":
    raw_data = pd.DataFrame(list(collection.find()))
    missing_counts = count_missing_values(raw_data)


Missing values in each column:
_id                        0
Bike ID                    0
Birth Year                 0
End Station ID             0
End Station Latitude       0
End Station Longitude      0
End Station Name           0
Gender                     0
Start Station ID           0
Start Station Latitude     0
Start Station Longitude    0
Start Station Name         0
Start Time                 0
Stop Time                  0
Trip Duration              0
Trip_Duration_in_min       0
Unnamed: 0                 0
User Type                  0
dtype: int64


## Work to do
1.Drop the id column

2.Fill the missing values eg.with mean and dropping the duplicate values



In [6]:
def load_data_from_mongodb():
    data = pd.DataFrame(list(collection.find()))
    return data

def clean_data(data):
    if '_id' in data.columns:
        data.drop(columns=['_id'], inplace=True)

    data.fillna({
        'Start Station Name': 'Unknown',
        'End Station Name': 'Unknown',
        'User Type': 'Unknown',
        'Birth Year': 0,
        'Gender': 0,
    }, inplace=True)
    data.drop_duplicates(inplace=True)
    data['Trip Duration'] = data['Trip Duration'].astype(int)
    data['Trip_Duration_in_min'] = data['Trip_Duration_in_min'].astype(int)
    data['Birth Year'] = data['Birth Year'].astype(int)
    data['Gender'] = data['Gender'].astype(int)

    data = data[data['Trip Duration'] > 0]
    
    return data

def save_cleaned_data_to_mongodb(cleaned_data):
    silver_collection = db['Trips_Silver']
    silver_collection.delete_many({})  # to Clear existing data
    silver_collection.insert_many(cleaned_data.to_dict('records'))
    print(f"Cleaned data saved to MongoDB (Silver Layer). Records: {len(cleaned_data)}")

if __name__ == "__main__":

    raw_data = load_data_from_mongodb()
    print(f"Loaded {len(raw_data)} records from MongoDB.")

    cleaned_data = clean_data(raw_data)
    print(f"Cleaned data has {len(cleaned_data)} records.")
    
    save_cleaned_data_to_mongodb(cleaned_data)


Loaded 735502 records from MongoDB.
Cleaned data has 339620 records.
Cleaned data saved to MongoDB (Silver Layer). Records: 339620


### I have saved the cleaned silver data in a new mongodb collection, i can just update the existing one still for sake of debugging and understanding and no space constraints i have done it in two different collections.

# Create Aggregated dataset(s) (Gold Layer)

### for gold layer im doing this aggregations

Total trips per user type.

Average trip duration by station.

Popular start and end stations.

Total trips per day.

Trips by Gender and Average trip Duration

Busiest Days

Bike usage by count

longest Trips 

Busiest Hours

Trips by day of week



In [9]:
from pymongo import MongoClient
import pandas as pd
from datetime import datetime, date 

silver_collection = db['Trips_Silver']
gold_collection = db['Trips_Gold']

def load_cleaned_data():
    data = pd.DataFrame(list(silver_collection.find()))
    return data

def create_aggregations(data):
    aggregations = {}

    # 1. Total trips per user type
    trips_per_user_type = data.groupby('User Type').size().reset_index(name='Total Trips')
    aggregations['Trips_Per_User_Type'] = trips_per_user_type

    # 2. Average trip duration by start station
    avg_duration_by_station = data.groupby('Start Station Name')['Trip_Duration_in_min'].mean().reset_index(name='Avg Duration (min)')
    aggregations['Avg_Duration_By_Station'] = avg_duration_by_station

    # 3. Popular start and end stations
    popular_start_stations = data['Start Station Name'].value_counts().reset_index(name='Trip Count').rename(columns={'index': 'Station Name'})
    aggregations['Popular_Start_Stations'] = popular_start_stations

    popular_end_stations = data['End Station Name'].value_counts().reset_index(name='Trip Count').rename(columns={'index': 'Station Name'})
    aggregations['Popular_End_Stations'] = popular_end_stations

    # 4. Total trips per day
    data['Start Time'] = pd.to_datetime(data['Start Time'])  # Ensure Start Time is datetime
    trips_per_day = data.groupby(data['Start Time'].dt.date).size().reset_index(name='Total Trips').rename(columns={'Start Time': 'Date'})
    aggregations['Trips_Per_Day'] = trips_per_day
    
    busiest_days = data.groupby(data['Start Time'].dt.date).size() \
                   .reset_index(name='Total Trips') \
                   .sort_values(by='Total Trips', ascending=False) \
                   .head(10)
    aggregations['Busiest_Days'] = busiest_days

    trips_by_gender = data.groupby('Gender').size().reset_index(name='Total Trips')
    aggregations['Trips_By_Gender'] = trips_by_gender

    avg_duration_by_gender = data.groupby('Gender')['Trip_Duration_in_min'].mean() \
                             .reset_index(name='Avg Duration (min)')
    aggregations['Avg_Trip_Duration_By_Gender'] = avg_duration_by_gender

    most_used_bikes = data['Bike ID'].value_counts().reset_index(name='Trip Count') \
                     .rename(columns={'index': 'Bike ID'}).head(10)
    aggregations['Most_Used_Bikes'] = most_used_bikes

    longest_trips = data.sort_values(by='Trip Duration', ascending=False) \
                    .head(10)[['Trip Duration', 'Start Station Name', 'End Station Name']]
    aggregations['Longest_Trips'] = longest_trips

    
    longest_trips = data.sort_values(by='Trip Duration', ascending=False) \
                    .head(10)[['Trip Duration', 'Start Station Name', 'End Station Name']]
    aggregations['Longest_Trips'] = longest_trips

    popular_routes = data.groupby(['Start Station Name', 'End Station Name']).size() \
                     .reset_index(name='Trip Count') \
                     .sort_values(by='Trip Count', ascending=False) \
                     .head(10)
    aggregations['Popular_Routes'] = popular_routes

    popular_routes = data.groupby(['Start Station Name', 'End Station Name']).size() \
                     .reset_index(name='Trip Count') \
                     .sort_values(by='Trip Count', ascending=False) \
                     .head(10)
    aggregations['Popular_Routes'] = popular_routes

    
    data['Day of Week'] = pd.to_datetime(data['Start Time']).dt.day_name()
    trips_by_day = data.groupby('Day of Week').size().reset_index(name='Total Trips') \
                   .sort_values(by='Total Trips', ascending=False)
    aggregations['Trips_By_Day_Of_Week'] = trips_by_day


    return aggregations

def save_aggregations_to_mongodb(aggregations):
    gold_collection.delete_many({})  # Clear existing data
    
    for key, df in aggregations.items():
        # Convert datetime.date to datetime.datetime
        data = df.to_dict('records')
        for record in data:
            for field, value in record.items():
                if isinstance(value, date):
                    record[field] = datetime.combine(value, datetime.min.time()) 
        
        gold_collection.insert_one({
            'aggregation_name': key,
            'data': data
        })
    
    print(f"Aggregated datasets saved to MongoDB (Gold Layer).")
if __name__ == "__main__":

    cleaned_data = load_cleaned_data()
    print(f"Loaded {len(cleaned_data)} records from Silver Layer.")

    aggregated_data = create_aggregations(cleaned_data)

    save_aggregations_to_mongodb(aggregated_data)


Loaded 339620 records from Silver Layer.
Aggregated datasets saved to MongoDB (Gold Layer).
