In [13]:
import psycopg2
from pymongo import MongoClient
from neo4j import GraphDatabase
import configparser

config = configparser.ConfigParser()
config.read('scripts/analysis/config.ini')

['scripts/analysis/config.ini']

In [19]:


class HybridAnalysis:
    def __init__(self):
        
        self.pg_conn = psycopg2.connect(**config['postgresql'])
        
        
        self.mongo_client = MongoClient(config['mongodb']['uri'])
        self.mongo_db = self.mongo_client[config['mongodb']['dbname']]
        
        
        self.neo4j_driver = GraphDatabase.driver(
            config['neo4j']['uri'],
            auth=(config['neo4j']['user'], config['neo4j']['password'])
        )

    def analyze_campaigns(self):
        # PostgreSQL analysis
        with self.pg_conn.cursor() as pg_cur:
            pg_cur.execute(open("scripts/q1.sql").read())
            pg_results = pg_cur.fetchall()
        
        # MongoDB analysis
        mongo_results = list(self.mongo_db.events.aggregate(
            [{"$match": {"event_type": "purchase"}}, 
             {"$group": {"_id": "$product_pk", "total": {"$sum": 1}}}]
        ))
        
        # Neo4j analysis
        with self.neo4j_driver.session() as neo_session:
            neo_results = neo_session.run(open("scripts/q1.cypher").read()).data()
        
        return {
            "postgres": pg_results,
            "mongo": mongo_results,
            "neo4j": neo_results
        }

    def generate_recommendations(self, user_id):
       
        with self.pg_conn.cursor() as pg_cur:
            pg_cur.execute("SELECT * FROM e_commerce.friends WHERE friend1 = %s", (user_id,))
            friends = pg_cur.fetchall()
        
       
        pipeline = [
            {"$match": {"user_id": user_id, "event_type": "view"}},
            {"$group": {"_id": "$product_pk", "views": {"$sum": 1}}}
        ]
        mongo_views = list(self.mongo_db.events.aggregate(pipeline))
        
        
        with self.neo4j_driver.session() as neo_session:
            cypher = """
            MATCH (u:User {user_id: $user_id})-[:FRIENDSHIP]-(friend)-[:INTERACTED_WITH]->(p:Product)
            RETURN p.product_pk, COUNT(*) AS score
            ORDER BY score DESC
            """
            neo_recs = neo_session.run(cypher, user_id=user_id).data()
        
        return {
            "friends": friends,
            "mongo_views": mongo_views,
            "neo_recommendations": neo_recs
        }

    def full_text_search(self, keywords):
       
        with self.pg_conn.cursor() as pg_cur:
            pg_cur.execute("""
                SELECT product_id, brand, category_code 
                FROM e_commerce.products 
                WHERE to_tsvector(category_code) @@ to_tsquery(%s)
            """, (keywords,))
            pg_results = pg_cur.fetchall()
        
        
        self.mongo_db.products.create_index([("category_code", "text")])
        mongo_results = list(self.mongo_db.products.find(
            {"$text": {"$search": keywords}},
            {"score": {"$meta": "textScore"}}
        ).sort([("score", {"$meta": "textScore"})]))
        
       
        with self.neo4j_driver.session() as neo_session:
            neo_results = neo_session.run("""
                CALL db.index.fulltext.queryNodes("categoryIndex", $keywords) 
                YIELD node, score 
                RETURN node.product_pk, node.brand, score
            """, keywords=keywords).data()
        
        return {
            "postgres": pg_results,
            "mongo": mongo_results,
            "neo4j": neo_results
        }