# Pipeline

In [None]:
# Install the required libraries
!pip install pymongo
!pip install openai

Collecting pymongo
  Downloading pymongo-4.7.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (670 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m670.0/670.0 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.6.1-py3-none-any.whl (307 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.7/307.7 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.6.1 pymongo-4.7.2
Collecting openai
  Downloading openai-1.30.1-py3-none-any.whl (320 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m320.6/320.6 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
Collecting httpx<1,>=0.23.0 (from openai)
  Downloading httpx-0.27.0-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.6/75.6 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
Collecting 

In [None]:
import pymongo
from pymongo import MongoClient
import json
import re
from openai import OpenAI
from bson import ObjectId
from datetime import datetime
from abc import ABC, abstractmethod

In [None]:
# Connect to your MongoDB Atlas cluster
# Replace the connection string with your own
client = MongoClient(<connection-string>)

# Access the sample_mflix database
db = client.sample_mflix

In [None]:
class PipelineStage(ABC):
    @abstractmethod
    def execute(self, data, **kwargs):
        pass

In [None]:
class SchemaExtractionStage(PipelineStage):
    def execute(self, data, **kwargs):
        def get_collection_schema(collection_name):
            collection = db[collection_name]
            sample_document = collection.find_one()
            schema = {}
            for field, value in sample_document.items():
                field_type = type(value).__name__
                if isinstance(value, list):
                    schema[field] = {"type": "array"}
                else:
                    schema[field] = {"type": field_type}
            return schema

        def generate_schema():
            collections_schema = {}
            for collection_name in db.list_collection_names():
                collections_schema[collection_name] = {"name": collection_name, "description": f"Contains detailed information about {collection_name}."}
                collections_schema[collection_name]["fields"] = get_collection_schema(collection_name)
            return collections_schema

        schema = generate_schema()
        # print(schema)
        data['schema'] = schema

        for collection_name in db.list_collection_names():
            schema = {"name": collection_name, "description": f"Contains detailed information about {collection_name}."}
            schema["fields"] = get_collection_schema(collection_name)
            with open(f"{collection_name}.json", "w") as f:
                json.dump(schema, f, indent=4)
        # print("schema files generated")
        return data

In [None]:
class ExampleDataAdditionStage(PipelineStage):
    def execute(self, data, **kwargs):
        movies_collection = db.movies
        movie_document = movies_collection.find_one()

        def add_example_data(schema, document):
            for field in schema['fields']:
                if field in document:
                    schema['fields'][field]['example'] = document[field]
                else:
                    schema['fields'][field]['example'] = None
            return schema

        with open('movies.json', 'r') as file:
            movies_schema = json.load(file)

        updated_schema = add_example_data(movies_schema, movie_document)

        class JSONEncoder(json.JSONEncoder):
            def default(self, obj):
                if isinstance(obj, ObjectId):
                    return str(obj)
                if isinstance(obj, datetime):
                    return obj.isoformat()
                return json.JSONEncoder.default(self, obj)

        with open('movies.json', 'w') as file:
            json.dump(updated_schema, file, indent=4, cls=JSONEncoder)

        # print("movies.json updated")

        data['updated_movies_schema'] = updated_schema
        return data

In [None]:
class RelationshipsIndexesStage(PipelineStage):
    def execute(self, data, **kwargs):
        def identify_relationships_and_indexes(db):
            relationships = []
            indexes = []
            collections = db.list_collection_names()

            for collection_name in collections:
                collection = db[collection_name]
                sample_document = collection.find_one()
                if not sample_document:
                    continue

                for field in sample_document:
                    if field.endswith('_id'):
                        related_collection_name = field[:-3] + 's'
                        if related_collection_name in collections:
                            relationship = {
                                "from_collection": collection_name,
                                "from_field": field,
                                "to_collection": related_collection_name,
                                "to_field": "_id",
                                "description": f"Links {collection_name} to {related_collection_name} through {field}."
                            }
                            relationships.append(relationship)

                            index = {
                                "from_collection": collection_name,
                                "index": field
                            }
                            indexes.append(index)
            return relationships, indexes

        with open('comments.json', 'r') as file:
            comments_schema = json.load(file)

        relationships, indexes = identify_relationships_and_indexes(db)
        for relationship in relationships:
            if relationship['from_collection'] == 'comments':
                comments_schema["relationships"] = relationship
                break

        for index in indexes:
            if index['from_collection'] == 'comments':
                comments_schema["indexes"] = index["index"]
                break

        with open('comments.json', 'w') as file:
            json.dump(comments_schema, file, indent=4)

        # print("comments.json updated")

        data['comments_schema'] = comments_schema
        return data

In [None]:
class OpenAIDescriptionStage(PipelineStage):
    def execute(self, data, **kwargs):
        def generate_description(prompt):
            client = OpenAI(
                api_key = <OPENAI_API_KEY>,
              )

            chat_completion = client.chat.completions.create(
                model = "gpt-3.5-turbo",
                messages = [
                    {"role": "user",
                     "content": prompt
                     }
                ],
                max_tokens = 300
            )

            return chat_completion

        # Load JSON file
        with open('movies.json', 'r') as f:
            data = json.load(f)

        # Extract field names
        fields = data['fields']

        # Construct the schema description
        json_schema = json.dumps(fields)

        # Generate description for each field
        prompt_str = f"""{json_schema} is the schema of collections, fields in a mongodb database.
Description refers to what information the collection or field holds.
Give the description as
field name : description
"""

        description_generated = generate_description(prompt_str)
        # print(description_generated)

        description_string = str(description_generated)
        # print(description_string)

        # Use regular expression to find the content
        pattern = re.compile(r"content=\"(.*?)\", role='assistant'", re.DOTALL)
        match = pattern.search(description_string)

        if match:
            content = match.group(1)
        #     print(content)
        # else:
        #     print("No content found")

        description_list = content.split("\\n")
        # print(description_list)

        for item in description_list:
            field, description_value = item.split(" : ", 1)
            data["fields"][field]["description"] = description_value
            # print(field, description_value)

        with open('movies.json', 'w') as f:
            json.dump(data, f, indent=4)

        # print("Descriptions updated and written to movies.json file.")

        return {"description_generated": description_generated}

In [None]:
class EmbeddingStage(PipelineStage):
    def execute(self, data, **kwargs):
        def get_string_fields(document):
            string_fields = [key for key, value in document.items() if isinstance(value, str)]
            return string_fields

        def embed_text(text):
            payload = {"inputs": text}
            response = requests.post(HF_API_URL, headers=headers, data=json.dumps(payload))
            response.raise_for_status()
            return response.json()

        def embed_and_save_collection(collection, collection_name):
            updated_documents = []
            for document in collection.find():
                string_fields = get_string_fields(document)
                updates = {}
                for field in string_fields:
                    try:
                        embedding = embed_text(document[field])
                        if isinstance(embedding, list) and isinstance(embedding[0], list):
                            flattened_embedding = [item for sublist in embedding for item in sublist]
                        else:
                            flattened_embedding = embedding

                        embedded_field_name = f"embedded_{field}"
                        updates[embedded_field_name] = flattened_embedding
                    except Exception as e:
                        print(f"Error embedding field {field} in document {document['_id']}: {e}")

                # if updates:
                #     collection.update_one({'_id': document['_id']}, {'$set': updates})
                if updates:
                  updated_document = {**document, **updates}
                  updated_documents.append(updated_document)

            if updated_documents:
                with open(f"{collection_name}_embed.json", 'w') as f:
                    json.dump(updated_documents, f, default=str, indent=4)

        # Hugging Face API URL and headers
        HF_API_URL = <API_URL>
        HF_API_KEY = <API_KEY>  # replace with your Hugging Face API key
        headers = {
            "Authorization": f"Bearer {HF_API_KEY}",
            "Content-Type": "application/json"
        }

        for collection_name in db.list_collection_names():
            collection = db[collection_name]
            embed_and_save_collection(collection, collection_name)

        # db_name = kwargs.get("db_name", "sample_mflix")
        # collection_name = kwargs.get("collection_name")

        # db = client[db_name]

        # if collection_name:
        #     collection = db[collection_name]
        #     embed_and_save_collection(collection)
        # else:
        #     for collection_name in db.list_collection_names():
        #         collection = db[collection_name]
        #         embed_and_save_collection(collection)

        return data

In [None]:
class Pipeline:
    def __init__(self):
        self.stages = []

    def add_stage(self, stage):
        self.stages.append(stage)

    def execute(self, initial_data={}, **kwargs):
        data = initial_data
        for stage in self.stages:
            output = stage.execute(data, **kwargs)
            if output is not None:
                data.update(output)
        return data

In [None]:
if __name__ == "__main__":
    pipeline = Pipeline()
    pipeline.add_stage(SchemaExtractionStage())
    pipeline.add_stage(ExampleDataAdditionStage())
    pipeline.add_stage(RelationshipsIndexesStage())
    pipeline.add_stage(OpenAIDescriptionStage())
    pipeline.add_stage(EmbeddingStage())

    # db_name = input("Enter the database name: ")
    # collection_name = input("Enter the collection name (or press Enter to process all collections): ")

    # result = pipeline.execute(db_name=db_name, collection_name=collection_name)
    result = pipeline.execute()
    print("Pipeline result:", result)