In [11]:
import pymongo
import pandas as pd
import os
from dotenv import load_dotenv

import pymongo.errors as mongo_errors

import certifi

In [12]:
def build_collection(df, collection, column_mapping, indices, ind_comp=None):
    """
    Inserts data from a pandas DataFrame into a MongoDB collection based on specified column mapping.

    :param df: pandas DataFrame containing the data to be inserted.
    :param collection: MongoDB collection object where the data will be inserted.
    :param column_mapping: Dictionary mapping the DataFrame column names to MongoDB document field names.
    :param indices: List of field names to create index for.
    :param ind_comp: List of list containing multiple fields to create compound indices.
    """

    # Load documents
    for index, row in df.iterrows():
        # Create a document for each row using the column mapping
        document = {mongo_attr: row[csv_col] for csv_col, mongo_attr in column_mapping.items()}

        # Insert the document into MongoDB
        collection.insert_one(document)

    
    # Create indices
    for field in indices:
        collection.create_index(field)
    
    # Create compound indices
    if ind_comp is not None:
        for combo in ind_comp:
            f_list = [];
            for field in combo:
                f_list.append((field))
            collection.create_index(f_list)


In [14]:
load_dotenv()

# Create the connection URI
#connection_uri = f"mongodb://{username}:{password}@{host}:{port}/{database_name}"
ca = certifi.where()
connection_uri = os.environ['MONGO_URL'] + '&tlsCAFile=' + ca

print(connection_uri)

# Connect to MongoDB
client = pymongo.MongoClient(connection_uri)

# Access the specific database
db = client['sparkplug']
db = client['testing'] # For testing small sample, comment out for production

mongodb+srv://sparkplug:5jkS0ew4zVnACstU@sparkplug.i7nlrbn.mongodb.net/?retryWrites=true&w=majority&tlsCAFile=/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/certifi/cacert.pem


In [15]:
# Test connection

try:
    # Attempt to retrieve the server status
    server_status = client.server_info()
    print("Connected to MongoDB server:", server_status)
except mongo_errors.ConnectionFailure as e:
    print("Connection Failure: ", e)
except mongo_errors.ConfigurationError as e:
    print("Configuration Error: ", e)
except mongo_errors.PyMongoError as e:
    print("PyMongo Error: ", e)
except Exception as e:
    print("An error occurred: ", e)


Connected to MongoDB server: {'version': '6.0.12', 'gitVersion': '21e6e8e11a45dfbdb7ca6cf95fa8c5f859e2b118', 'modules': ['enterprise'], 'allocator': 'tcmalloc', 'javascriptEngine': 'mozjs', 'sysInfo': 'deprecated', 'versionArray': [6, 0, 12, 0], 'bits': 64, 'debug': False, 'maxBsonObjectSize': 16777216, 'storageEngines': ['devnull', 'ephemeralForTest', 'inMemory', 'queryable_wt', 'wiredTiger'], 'ok': 1.0, '$clusterTime': {'clusterTime': Timestamp(1701570896, 12), 'signature': {'hash': b'We\x1f\xa4\x0f1\x1a\xefF\x93N?\xf9\xd3r\xeb\xcaJ6C', 'keyId': 7247536804084056066}}, 'operationTime': Timestamp(1701570896, 12)}


In [7]:
# Build transactions collection
df_transactions = pd.read_csv('data/transactions.csv')
df_transactions = df_transactions.head(100) # For testing small sample, comment out for production

collection_transactions = db['transactions']

column_mapping_transactions = {
    'Station Name': 'station_name',
    'station_id': 'station_id',
    'simulated_start_date': 'start_date',
    'simulated_end_date': 'end_date',
    'simulated_transaction_date': 'transaction_date',
    'Total Duration (hh:mm:ss)': 'total_duration',
    'Charging Time (hh:mm:ss)': 'charging_time',
    'Energy (kWh)': 'energy_kwh',
    'GHG Savings (kg)': 'ghg_savings_kg',
    'Gasoline Savings (gallons)': 'gas_savings_gal',
    'Port Type': 'charge_level',
    'Port Number': 'port_number',
    'Plug Type': 'plug_type',
    'City': 'city',
    'State/Province': 'state',
    'Postal Code': 'postal_code',
    'Country': 'country',
    'Currency': 'currency',
    'simulated_fee': 'fee',
    'Ended By': 'ended_by',
    'Plug In Event Id': 'plug_in_event_id',
    'User ID': 'user_id',
}

indices_transactions = ['station_id', 'charge_level', 'plug_type', 'postal_code', 'country', 'user_id']
ind_comp_transactions = [
    ['country', 'state', 'city'],
    ['country', 'state']
]

## Run builder
build_collection(df_transactions, collection_transactions, 
                 column_mapping_transactions, indices_transactions, ind_comp_transactions)

In [8]:
# Initiate station logs collection
collection_stations = db['station_logs']

In [17]:
# Establish a connection to MongoDB
client = pymongo.MongoClient("mongodb://localhost:27017/")

# Select the database
db = client["sparkplug"]

# Select the collection
collection = db["transactions"]

# Fetch one document
document = collection.find_one()

print(document)

{'_id': ObjectId('656bc711a67f2051797cc8ea'), 'station_name': 'PALO ALTO CA / HAMILTON #1', 'station_id': '1392', 'start_date': '08/31/2022 11:43', 'end_date': '08/31/2022 12:05', 'transaction_date': '08/31/2022 12:05', 'total_duration': '0:21:49', 'charging_time': '0:21:38', 'energy_dWh': 1.251424, 'ghg_savings_kg': 0.526, 'gas_savings_gal': 0.157, 'charge_level': 'Level 2', 'port_number': 2, 'plug_type': 'J1772', 'city': 'Palo Alto', 'state': 'California', 'postal_code': 94301, 'country': 'United States', 'currency': 'USD', 'fee': 0.4472469593, 'ended_by': 'CPS Server', 'plug_in_event_id': 1324583013, 'user_id': 107.0}


In [10]:
# Cleanup
collection_transactions = db['transactions']

collection_transactions.delete_many({})

DeleteResult({'n': 50, 'electionId': ObjectId('7fffffff00000000000000c5'), 'opTime': {'ts': Timestamp(1701502628, 57), 't': 197}, 'ok': 1.0, '$clusterTime': {'clusterTime': Timestamp(1701502628, 57), 'signature': {'hash': b'\xc6\xf5c\xf5"n\t\xa1O\xb3\x10W{\xab\xd8*\x00\xc26\\', 'keyId': 7247536804084056066}}, 'operationTime': Timestamp(1701502628, 57)}, acknowledged=True)