In [1]:
!pip install --quiet pymongo ijson


In [2]:
from pymongo import MongoClient, InsertOne
from pymongo.errors import ServerSelectionTimeoutError
import json, os, math, re
import ijson

# <<< EDIT THESE PATHS IF NEEDED >>>
DATA_DIR = r""
TEST_JSON = os.path.join(DATA_DIR, "MOCK_DATA.json") #variable pointing to location of json file


DB_NAME = "Test_Data" #create a variable to store datbase

def to_num(s): #character to number generator
    if s is None: return None
    s = str(s).strip().replace(",", "")
    try:
        if "." in s: return float(s)
        return int(s)
    except:
        return None

print("Devices file:", TEST_JSON) #


Devices file: MOCK_DATA.json


In [40]:
try:
    !net start MongoDB
except Exception as e:
    pass

# Connect
client = MongoClient("mongodb://localhost:27017/", serverSelectionTimeoutMS=5000)
try:
    _ = client.server_info()
    print("Connected to MongoDB ✔")
except ServerSelectionTimeoutError as e:
    raise SystemExit("⚠️ Could not connect to MongoDB. Make sure MongoDB is installed and the service is running.")

db = client[DB_NAME]
db

Connected to MongoDB ✔


System error 5 has occurred.

Access is denied.



Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, serverselectiontimeoutms=5000), 'Test_Data')

In [7]:


for name in ["data_staging", "data_catalog"]:
    if name not in db.list_collection_names():
        db.create_collection(name)

# Helpful indexes
db.data_catalog.create_index([("patientId", 1)], unique=False)
db.data_catalog.create_index([("name.first", 1)])
db.data_catalog.create_index([("name.last", 1)])
db.data_catalog.create_index("Sex")

print("Collections:", db.list_collection_names())

Collections: ['data_staging', 'devices_staging', 'data_catalog']


In [8]:
data_staging = db.devices_staging
data_staging.delete_many({})  # clear if re-running

count = 0
with open(TEST_JSON, "rb") as f:
    # the file is a JSON array, stream it safely with ijson
    for obj in ijson.items(f, "item"):
        data_staging.insert_one(obj)
        count += 1

print(f"Imported {count} device docs into devices_staging ✔")


Imported 1000 device docs into devices_staging ✔


In [10]:
data_catalog = db.devices_catalog
data_catalog.delete_many({})

ops = []
for d in devices_staging.find({}, {"_id":0}):
    doc = {
        "patientId": d.get("id"),
        "name": {
            "first": d.get("first_name"),
            "last": d.get("last_name"),
        },
        
        "sex": d.get("gender")
    }
    ops.append(InsertOne(doc))

if ops:
    data_catalog.bulk_write(ops)

print("data_catalog count:", data_catalog.count_documents({}))
print("Sample:", data_catalog.find_one({}, {"_id":0}))


data_catalog count: 1000
Sample: {'patientId': 1, 'name': {'first': 'Currie', 'last': 'Redhouse'}, 'sex': 'Male'}


In [38]:
query = { "name.first": "Currie"  }

results = data_catalog.find(query)
for doc in results:
    print(doc)

{'_id': ObjectId('690a8b3ba6b4dc7742c75316'), 'patientId': 1, 'name': {'first': 'Currie', 'last': 'Redhouse'}, 'sex': 'Male'}
