In [19]:
!pip install google-generativeai
!pip install pymongo



# ***Python AI agent***

an AI agent to automate the process of data Transformation. Tasks:

*   Database Schema Identification
*   gen AI for getting schema map b/w Database.
*   Data transfer from one format to another.  




In [20]:
import pymongo

def identify_database_schema(atlas_connection_string, database_name):
    """
    Identifies the database schema of a MongoDB database using an Atlas connection string.

    Args:
        atlas_connection_string (str): The connection string for your MongoDB Atlas cluster.
        database_name (str): The name of the database.

    Returns:
        dict: A dictionary representing the database schema, containing collection names and their corresponding schemas.
    """

    try:
        client = pymongo.MongoClient(atlas_connection_string)
        db = client[database_name]

        schema = {}
        for collection_name in db.list_collection_names():
            collection = db[collection_name]
            schema[collection_name] = {
                "count": collection.count_documents({}),
                "schema": list(collection.find_one().keys()) if collection.find_one() else []
            }

        return schema

    except pymongo.errors.ServerSelectionTimeoutError:
        print("Failed to connect to MongoDB server.")
    except pymongo.errors.ConnectionFailure:  # Catch authentication errors
        print("Authentication failed.")
    except Exception as e:
        print("An error occurred:", str(e))

if __name__ == "__main__":
    # Replace with MongoDB Atlas connection string
    atlas_connection_string = "mongodb+srv://boddepallibhargav363:122333@cluster0.o9mka.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
    database_name = "hospital"

    schema = identify_database_schema(atlas_connection_string, database_name)
    if schema:
        print("Target Database Schema:")
        target_schema = schema['users']['schema'] # Schema of that collection "USERS"
        print(target_schema)


Target Database Schema:
['_id', 'name', 'age', 'pho', 'mail']


In [21]:
source_schema = identify_database_schema("mongodb+srv://LuckyLoki03:Admin143@cluster0.lhpei.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0", database_name = "FHIR")
target_schema = identify_database_schema("mongodb+srv://boddepallibhargav363:122333@cluster0.o9mka.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0", database_name = "hospital")

In [22]:
import google.generativeai as genai
import os

API_KEY="AIzaSyCx2XdfqmkfDN9M9NCSHWOqbfWOuCWhsGw"  # get api key from google ai studios
genai.configure(api_key= API_KEY)



In [23]:
model = genai.GenerativeModel("gemini-1.5-pro-latest")
prompt = f"""
Give Quick responses
I have a schema for a source database and a user-provided target schema. I need you to generate a schema mapping that transforms the source schema to the target schema.

**Here's the information you need:**

- **Source Schema:** [Provide the source schema here as a list of column names]
- **Target Schema:** [Provide the target schema here as a list of column names]

**Your task:**

- **Understand the columns:** Analyze the source and target schemas to identify the corresponding columns and any potential differences in names or data types.
- **Generate Schema Mapping:** Create a schema mapping dictionary in JSON format, mapping each source column to its corresponding target column. Handle any necessary transformations or data conversions as needed.

**Output Format:**

Use this JSON format for the schema mapping:

here is the source_schema is {source_schema}"""+""" and target_schema is """+f""" {target_schema}"""+"""
```json
{
  "schema_mapping": {
    "source_column1": "target_column1",
    "source_column2": "target_column2",
    "source_column3": "target_column3",
    "pho":"pno",
    # ... other mappings
  }
} """
result = model.generate_content(prompt)
print(result.text)
print("Json string: ")
t = result.text.replace("```json", "").replace("```", "").replace("\n","")
print(t)

```json
{
  "schema_mapping": {
    "_id": "_id",
    "Patient_name": "name",
    "Patient_age": "age",
    "Patient_no": "pho",
    "Patient_mail": "mail"
  }
}
```

Json string: 
{  "schema_mapping": {    "_id": "_id",    "Patient_name": "name",    "Patient_age": "age",    "Patient_no": "pho",    "Patient_mail": "mail"  }}


# Converting string response to Dictionary

In [24]:
import json

schema_maps = json.loads(t)

# Print the dictionary to verify the result
print(schema_maps["schema_mapping"])

{'_id': '_id', 'Patient_name': 'name', 'Patient_age': 'age', 'Patient_no': 'pho', 'Patient_mail': 'mail'}


# Monitoring...

In [25]:
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

# Connect to source MongoDB instance
source_client = MongoClient('mongodb+srv://LuckyLoki03:Admin143@cluster0.lhpei.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0')  # Replace with source host and port
source_db = source_client['FHIR']  # Replace with the source database name
source_collection = source_db['patientCare']  # Replace with the source collection name

# Connect to target MongoDB instance
target_client = MongoClient('mongodb+srv://boddepallibhargav363:122333@cluster0.o9mka.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0')  # Replace with target host and port
target_db = target_client['hospital']  # Replace with the target database name
target_collection = target_db['users']  # Replace with the target collection name

# Function to handle change events and apply them to the target database
def handle_change(event):
    operation_type = event['operationType']

    # Get the document _id
    document_id = event['documentKey']['_id']

    # Insert operation
    if operation_type == 'insert':
        '''
        {
          "operationType": "insert",
          "documentKey": { "_id": "123" },
          "fullDocument": { "_id": "123", "name": "John", "age": 30 }
        }
        '''

        # Get the full document from event and insert it into target
        new_document = event['fullDocument']
        try:
            target_collection.insert_one(new_document)
            print(f"Inserted document with _id: {document_id} into target database.")
        except Exception as e:
            print(f"Failed to insert document: {e}")

    # Update operation
    elif operation_type == 'update':
        '''
        {
          "operationType": "update",
          "ns": { "db": "exampleDB", "coll": "exampleCollection" },
          "documentKey": { "_id": "123" },
          "updateDescription": {
            "updatedFields": { "age": 31 },
            "removedFields": []
          }
        }
        '''

        # Get updated fields and apply them to the target document
        updated_fields = event['updateDescription']['updatedFields']
        try:
            # Ensure new fields and values are added or updated in the target document
            target_collection.update_one({'_id': document_id}, {'$set': updated_fields})
            print(f"Updated document with _id: {document_id} in target database with new fields.")
        except Exception as e:
            print(f"Failed to update document: {e}")

    # Delete operation
    elif operation_type == 'delete':
        '''
        {
        "operationType": "delete",
        "ns": { "db": "exampleDB", "coll": "exampleCollection" },
        "documentKey": { "_id": "123" }
        }'''
        try:
            target_collection.delete_one({'_id': document_id})
            print(f"Deleted document with _id: {document_id} from target database.")
        except Exception as e:
            print(f"Failed to delete document: {e}")

# Start monitoring changes in the source collection using a change stream
try:
    with target_collection.watch() as stream:
        print("Listening for changes...")
        for change in stream:
            handle_change(change)

except ConnectionFailure:
    print("Lost connection to MongoDB. Reconnecting...")
except KeyboardInterrupt:
    print("Database Monitoring Interrupted...!")
except Exception as e:
    print(f"An error occurred: {e}")

Listening for changes...
Database Monitoring Interrupted...!


# Target data

In [26]:
import pymongo
import pandas as pd

# Connection strings
source_uri = 'mongodb+srv://LuckyLoki03:Admin143@cluster0.lhpei.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0'
target_uri = 'mongodb+srv://boddepallibhargav363:122333@cluster0.o9mka.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0'

# Connect to databases
source_client = pymongo.MongoClient(source_uri)
target_client = pymongo.MongoClient(target_uri)

# Retrieve data from source
source_db = source_client["FHIR"]
source_collection = source_db["patientCare"]
source_data = list(source_collection.find())
source_df = pd.DataFrame(source_data)

target_db = target_client["hospital"]
target_collection = target_db["users"]
target_data = list(target_collection.find())
target_df = pd.DataFrame(target_data)

print("Source Data:")
source_df

print("Target Data:")
target_df

Source Data:
Target Data:


Unnamed: 0,_id,name,age,pho,mail,pno
0,671c6845ba2fda669ea07f7e,Mahesh,30,1234561000.0,mahesh@gmail.com,
1,67191fdd87b41ffd1960760e,Mukesh,35,1212121000.0,mukesh@gmail.com,
2,671b26ee68d6a3ab34529ead,Sandeep,24,2525253000.0,sandY@yahoo.com,
3,671c675b5ea18ecb12c04695,Raju,30,10000000000.0,allurisetaramaraju@yahoo.com,
4,671c6fb65ea18ecb12c04697,Salaar,32,,salaar@gmail.com,5555556000.0
5,671c68c0ec527f713031dfc0,Bahubali,35,8989899000.0,bahu@gmail.com,
6,671c7aff2fc50e5852f68c4c,Bhargav,22,2121212000.0,bhargav@gmail.com,
7,671c79752fc50e5852f68c4b,Surya,22,3232323000.0,suryabhai@gmail.com,
8,671c71705ea18ecb12c04698,Tom,35,1212121000.0,tombhayya@mail.com,
9,671f54cb24ec192cead1d717,Sandeep,24,2525253000.0,sandY@yahoo.com,


In [27]:
target_df[target_df['age']>30]

Unnamed: 0,_id,name,age,pho,mail,pno
1,67191fdd87b41ffd1960760e,Mukesh,35,1212121000.0,mukesh@gmail.com,
4,671c6fb65ea18ecb12c04697,Salaar,32,,salaar@gmail.com,5555556000.0
5,671c68c0ec527f713031dfc0,Bahubali,35,8989899000.0,bahu@gmail.com,
8,671c71705ea18ecb12c04698,Tom,35,1212121000.0,tombhayya@mail.com,
11,671f54cb24ec192cead1d716,Mukesh,35,1212121000.0,mukesh@gmail.com,
12,671f54cb24ec192cead1d718,Bahubali,35,8989899000.0,bahu@gmail.com,
