In [1]:
from pymongo import MongoClient
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from datetime import datetime, timedelta
import time
from neo4j import GraphDatabase

In [4]:
#Akses data dari Neo4j
uri = "bolt://localhost:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "jekialacarte"))
session = driver.session()


#Akses data dari MongoDB
client = MongoClient("mongodb://localhost:27017/")

db = client["dbcafe"]
collection = db["transactionlog"]

In [4]:
# Configuration
months_back = 12
end_date = datetime.now()
start_date = end_date - timedelta(days=months_back * 30)

# MongoDB aggregation pipeline
pipeline = [
    {
        "$match": {
            "transaction_date": {
                "$gte": start_date.strftime("%Y-%m-%d"),
                "$lte": end_date.strftime("%Y-%m-%d")
            }
        }
    },
    {
        "$addFields": {
            "month_year": {"$substr": ["$transaction_date", 0, 7]},
            "calculated_revenue": {"$multiply": ["$order_quantity", 30000]}
        }
    },
    {
        "$group": {
            "_id": {
                "franchise_id": "$id_franchise",
                "month": "$month_year"
            },
            "monthly_transactions": {"$sum": 1},
            "monthly_revenue": {"$sum": "$calculated_revenue"}
        }
    },
    {
        "$group": {
            "_id": "$_id.franchise_id",
            "monthly_data": {
                "$push": {
                    "month": "$_id.month",
                    "transactions": "$monthly_transactions",
                    "revenue": "$monthly_revenue"
                }
            },
            "total_transactions": {"$sum": "$monthly_transactions"},
            "total_revenue": {"$sum": "$monthly_revenue"}
        }
    }
]

# Get MongoDB results
mongo_start_time = time.time()
mongo_results = list(collection.aggregate(pipeline))
mongo_end_time = time.time()
mongo_query_time = mongo_end_time - mongo_start_time

# Get Neo4j franchise data
neo4j_query = """
MATCH (f:Franchise)
RETURN f.id as franchise_id, f.name as franchise_name, 
       f.year_established as year_established
"""

neo4j_start_time = time.time()
with driver.session() as session:
    franchise_data = {item['franchise_id']: item for item in session.run(neo4j_query).data()}
neo4j_end_time = time.time()
neo4j_query_time = neo4j_end_time - neo4j_start_time

# Process results
growth_analysis = []
for franchise in mongo_results:
    franchise_id = franchise['_id']
    franchise_info = franchise_data.get(franchise_id, {})
    
    # Sort monthly data by month
    monthly_data = sorted(franchise['monthly_data'], key=lambda x: x['month'])
    
    # Calculate growth rate
    growth_rate = 0
    if len(monthly_data) >= 2:
        first_revenue = monthly_data[0]['revenue']
        last_revenue = monthly_data[-1]['revenue']
        if first_revenue > 0:
            growth_rate = ((last_revenue - first_revenue) / first_revenue) * 100
    
    # Calculate franchise age
    franchise_age = datetime.now().year - franchise_info.get('year_established', datetime.now().year)
    
    # Build analysis record
    growth_analysis.append({
        'franchise_id': franchise_id,
        'franchise_name': franchise_info.get('franchise_name', f'Franchise {franchise_id}'),
        'franchise_age_years': franchise_age,
        'total_transactions': franchise['total_transactions'],
        'total_revenue': franchise['total_revenue'],
        'growth_rate_percent': growth_rate,
        'monthly_trends': monthly_data,
        'avg_monthly_revenue': franchise['total_revenue'] / len(monthly_data) if monthly_data else 0
    })

# Sort results
growth_analysis.sort(key=lambda x: x['growth_rate_percent'], reverse=True)
top_performers = sorted(growth_analysis, key=lambda x: x['total_revenue'], reverse=True)[:10]

# Final results
total_query_time = mongo_query_time + neo4j_query_time

results = {
    'analysis_period_months': months_back,
    'franchise_growth': growth_analysis,
    'top_performers': top_performers,
    'query_performance': {
        'mongo_query_time_seconds': round(mongo_query_time, 3),
        'neo4j_query_time_seconds': round(neo4j_query_time, 3),
        'total_query_time_seconds': round(total_query_time, 3)
    }
}

# Print query times for monitoring
print(f"MongoDB query took: {mongo_query_time:.3f} seconds")
print(f"Neo4j query took: {neo4j_query_time:.3f} seconds")
print(f"Total database query time: {total_query_time:.3f} seconds")

ServiceUnavailable: Couldn't connect to localhost:7687 (resolved to ()):
Failed to establish connection to ResolvedIPv6Address(('::1', 7687, 0, 0)) (reason [WinError 10061] No connection could be made because the target machine actively refused it)
Failed to establish connection to ResolvedIPv4Address(('127.0.0.1', 7687)) (reason [WinError 10061] No connection could be made because the target machine actively refused it)

##### Unused ######

In [None]:
#data MongoDB
data = list(collection.find())
dbMongo = pd.DataFrame(data)
dbMongo.head()

In [None]:
#data neo4j
query = "MATCH (d:Daerah) RETURN d"
result = session.run(query)

dataneo4j = []

for record in result:
    node = record["d"]
    dataneo4j.append(node._properties)

dbNeo4j = pd.DataFrame(dataneo4j)
dbNeo4j.head()

In [None]:
dbMongo['id_franchise'] = dbMongo['id_franchise'].astype(str)
dbNeo4j['id_cafe'] = dbNeo4j['id_cafe'].astype(str)

dbMerge = pd.merge(dbMongo, dbNeo4j, left_on='id_franchise', right_on='id_cafe', how='inner')
dbMerge

In [None]:
def plot_pipeline_barchart(
    collection,
    pipeline,
    x_field="count",
    y_field="_id",
    title="Bar Chart dari Pipeline MongoDB",
    x_label="Jumlah",
    y_label="Kategori",
    color_scale="plasma"
):

    hasil = list(collection.aggregate(pipeline))
    df = pd.DataFrame(hasil)

    if df.empty:
        print("Hasil pipeline kosong.")
        return

    fig = px.bar(
        df,
        x=x_field,
        y=y_field,
        orientation="h",
        text=x_field,
        color=x_field,
        color_continuous_scale=color_scale,
        labels={x_field: x_label, y_field: y_label},
        title=title
    )

    fig.update_traces(textposition="outside")
    fig.update_layout(
        yaxis=dict(type="category", autorange="reversed"),
        coloraxis_showscale=False,
        xaxis_title=x_label,
        yaxis_title=y_label
    )

    fig.show()

In [None]:
pipeline = [
    {"$group":{"_id": "$id_franchise","count": {"$sum": 1}}},
    {"$sort":{"count": -1}},
    {"$limit":5}
]

plot_pipeline_barchart(
    collection=collection,
    pipeline=pipeline,
    x_field="count",
    y_field="_id",
    title="Top 5 Franchise dengan Transaksi Terbanyak",
    x_label="Jumlah Transaksi",
    y_label="ID Franchise",
    color_scale="teal" \
    ""
)

In [None]:
import pymongo
from neo4j import GraphDatabase
import pandas as pd
from datetime import datetime, timedelta
import json
from typing import List, Dict, Any
from dotenv import load_dotenv
load_dotenv()
import logging
import os

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MongoNeo4jAggregator:
    def __init__(self, mongo_uri: str, neo4j_uri: str, neo4j_user: str, neo4j_password: str):
        """
        Initialize connections to MongoDB and Neo4j
        """
        # MongoDB Connection
        self.mongo_client = pymongo.MongoClient(mongo_uri)
        self.mongo_db = self.mongo_client['dbcafe']
        self.transactions_collection = self.mongo_db['transactionlog']
        
        # Neo4j Connection
        self.neo4j_driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
        
        # Product price mapping (since prices aren't in MongoDB)
        self.product_prices = {
            "C1": 25000,  # Americano
            "C2": 35000,  # Cappuccino
            "C3": 40000,  # Latte
            "C4": 20000,  # Espresso
            "C5": 30000,  # Macchiato
            "C6": 38000,  # Flat White
            "C7": 45000,  # Mocha
            "C8": 35000,  # Cold Brew
            "NC1": 42000, # Matcha Latte
            "NC2": 40000  # Chai Latte
        }
        
    def close_connections(self):
        """Close database connections"""
        self.mongo_client.close()
        self.neo4j_driver.close()

    def get_product_price(self, product_id: str) -> int:
        """Get product price with fallback"""
        return self.product_prices.get(product_id, 30000)  # Default 30k

    # ========== FIXED IDEA 1: Employee Performance Analysis ==========
    def employee_performance_analysis(self, start_date: str, end_date: str) -> Dict[str, Any]:
        """
        Analyze employee performance by combining transaction data from MongoDB 
        with employee work schedule from Neo4j KG
        """
        logger.info(f"Starting employee performance analysis for period: {start_date} to {end_date}")
        
        # Get transaction data from MongoDB with enhanced date analysis
        mongo_pipeline = [
            {
                "$match": {
                    "transaction_date": {
                        "$gte": start_date,
                        "$lte": end_date
                    }
                }
            },
            {
                "$addFields": {
                    "calculated_revenue": {
                        "$multiply": [
                            "$order_quantity", 
                            30000  # Fixed price instead of trying to access non-existent field
                        ]
                    },
                    "transaction_year": {"$substr": ["$transaction_date", 0, 4]},
                    "transaction_month": {"$substr": ["$transaction_date", 5, 2]},
                    "transaction_day_of_week": {"$dayOfWeek": {"$dateFromString": {"dateString": "$transaction_date"}}}
                }
            },
            {
                "$group": {
                    "_id": "$id_employee",
                    "total_transactions": {"$sum": 1},
                    "total_revenue": {"$sum": "$calculated_revenue"},
                    "avg_order_quantity": {"$avg": "$order_quantity"},
                    "franchise_id": {"$first": "$id_franchise"},
                    "first_transaction": {"$min": "$transaction_date"},
                    "last_transaction": {"$max": "$transaction_date"},
                    "active_years": {"$addToSet": "$transaction_year"},
                    "active_months": {"$addToSet": "$transaction_month"},
                    "transactions_by_day": {
                        "$push": {
                            "date": "$transaction_date",
                            "revenue": "$calculated_revenue",
                            "quantity": "$order_quantity",
                            "day_of_week": "$transaction_day_of_week"
                        }
                    }
                }
            },
            {
                "$addFields": {
                    "years_active": {"$size": "$active_years"},
                    "months_active": {"$size": "$active_months"},
                    "days_between_first_last": {
                        "$divide": [
                            {"$subtract": [
                                {"$dateFromString": {"dateString": "$last_transaction"}},
                                {"$dateFromString": {"dateString": "$first_transaction"}}
                            ]},
                            1000 * 60 * 60 * 24  # Convert milliseconds to days
                        ]
                    }
                }
            }
        ]
        
        mongo_results = list(self.transactions_collection.aggregate(mongo_pipeline))
        
        # Get employee data from Neo4j
        with self.neo4j_driver.session() as session:
            neo4j_query = """
            MATCH (e:Employee)
            RETURN e.id as employee_id, e.name as employee_name, 
                   e.work_start_hour as start_hour, e.work_end_hour as end_hour,
                   e.id_cafe as cafe_id
            """
            try:
                neo4j_results = session.run(neo4j_query).data()
            except Exception as e:
                logger.warning(f"Neo4j query failed, using empty results: {e}")
                neo4j_results = []
        
        # Combine results with enhanced date-based metrics
        combined_results = []
        for mongo_emp in mongo_results:
            emp_id = mongo_emp['_id']
            neo4j_emp = next((e for e in neo4j_results if e['employee_id'] == emp_id), None)
            
            # Calculate time-based metrics
            total_active_days = max(1, mongo_emp.get('days_between_first_last', 1))
            avg_daily_revenue = mongo_emp['total_revenue'] / total_active_days
            avg_transactions_per_day = mongo_emp['total_transactions'] / total_active_days
            
            # Calculate consistency score (lower standard deviation = more consistent)
            daily_revenues = [t['revenue'] for t in mongo_emp['transactions_by_day']]
            if len(daily_revenues) > 1:
                mean_revenue = sum(daily_revenues) / len(daily_revenues)
                variance = sum((x - mean_revenue) ** 2 for x in daily_revenues) / len(daily_revenues)
                std_dev = variance ** 0.5
                consistency_score = 1 / (1 + std_dev / mean_revenue) if mean_revenue > 0 else 0
            else:
                consistency_score = 1.0
            
            if neo4j_emp:
                # Parse work hours (assuming format like "07:00")
                try:
                    start_hour = int(str(neo4j_emp['start_hour']).split(':')[0])
                    end_hour = int(str(neo4j_emp['end_hour']).split(':')[0])
                    work_hours_per_day = end_hour - start_hour
                except:
                    work_hours_per_day = 8  # Default
                
                # Calculate total work hours in period
                total_work_hours = work_hours_per_day * total_active_days
                performance_score = mongo_emp['total_revenue'] / total_work_hours if total_work_hours > 0 else 0
                revenue_per_hour = performance_score
                
                combined_results.append({
                    'employee_id': emp_id,
                    'employee_name': neo4j_emp['employee_name'],
                    'cafe_id': neo4j_emp.get('cafe_id', 'Unknown'),
                    'total_transactions': mongo_emp['total_transactions'],
                    'total_revenue': mongo_emp['total_revenue'],
                    'avg_order_quantity': mongo_emp['avg_order_quantity'],
                    'work_hours_per_day': work_hours_per_day,
                    'total_active_days': total_active_days,
                    'total_work_hours': total_work_hours,
                    'performance_score': performance_score,
                    'revenue_per_hour': revenue_per_hour,
                    'avg_daily_revenue': avg_daily_revenue,
                    'avg_transactions_per_day': avg_transactions_per_day,
                    'consistency_score': consistency_score,
                    'years_active': mongo_emp['years_active'],
                    'months_active': mongo_emp['months_active'],
                    'first_transaction': mongo_emp['first_transaction'],
                    'last_transaction': mongo_emp['last_transaction'],
                    'activity_period_days': total_active_days
                })
            else:
                # Include MongoDB data even if Neo4j data is missing
                default_work_hours_per_day = 8
                total_work_hours = default_work_hours_per_day * total_active_days
                performance_score = mongo_emp['total_revenue'] / total_work_hours
                
                combined_results.append({
                    'employee_id': emp_id,
                    'employee_name': f'Employee {emp_id}',
                    'cafe_id': mongo_emp.get('franchise_id', 'Unknown'),
                    'total_transactions': mongo_emp['total_transactions'],
                    'total_revenue': mongo_emp['total_revenue'],
                    'avg_order_quantity': mongo_emp['avg_order_quantity'],
                    'work_hours_per_day': default_work_hours_per_day,
                    'total_active_days': total_active_days,
                    'total_work_hours': total_work_hours,
                    'performance_score': performance_score,
                    'revenue_per_hour': performance_score,
                    'avg_daily_revenue': avg_daily_revenue,
                    'avg_transactions_per_day': avg_transactions_per_day,
                    'consistency_score': consistency_score,
                    'years_active': mongo_emp['years_active'],
                    'months_active': mongo_emp['months_active'],
                    'first_transaction': mongo_emp['first_transaction'],
                    'last_transaction': mongo_emp['last_transaction'],
                    'activity_period_days': total_active_days
                })
        
        # Sort by performance score (revenue per hour) - primary metric
        sorted_results = sorted(combined_results, key=lambda x: x['performance_score'], reverse=True)
        
        return {
            'analysis_period': f"{start_date} to {end_date}",
            'total_employees_analyzed': len(sorted_results),
            'employee_performance': sorted_results,
            'performance_summary': {
                'best_performer': sorted_results[0] if sorted_results else None,
                'worst_performer': sorted_results[-1] if sorted_results else None,
                'avg_performance_score': sum(emp['performance_score'] for emp in sorted_results) / len(sorted_results) if sorted_results else 0,
                'most_consistent': max(sorted_results, key=lambda x: x['consistency_score']) if sorted_results else None,
                'most_active_days': max(sorted_results, key=lambda x: x['total_active_days']) if sorted_results else None
            }
        }

    # ========== FIXED IDEA 2: Regional Product Popularity ==========
    def regional_product_analysis(self) -> Dict[str, Any]:
        """
        Analyze product popularity across different regions
        """
        logger.info("Starting regional product analysis...")
        
        # Since location data is missing from sample, we'll analyze by franchise
        mongo_pipeline = [
            {
                "$unwind": "$product"  # Unwind the product array
            },
            {
                "$group": {
                    "_id": {
                        "franchise_id": "$id_franchise",
                        "product_id": "$product.id_product",
                        "product_name": "$product.name"
                    },
                    "total_quantity": {"$sum": "$product.quantity"},
                    "total_orders": {"$sum": 1},
                    "avg_order_size": {"$avg": "$order_quantity"}
                }
            },
            {
                "$group": {
                    "_id": "$_id.franchise_id",
                    "products": {
                        "$push": {
                            "product_id": "$_id.product_id",
                            "product_name": "$_id.product_name",
                            "total_quantity": "$total_quantity",
                            "total_orders": "$total_orders",
                            "avg_order_size": "$avg_order_size"
                        }
                    },
                    "franchise_total_orders": {"$sum": "$total_orders"}
                }
            }
        ]
        
        mongo_results = list(self.transactions_collection.aggregate(mongo_pipeline))
        
        # Get product category data from Neo4j
        with self.neo4j_driver.session() as session:
            neo4j_query = """
            MATCH (p:Product)
            RETURN p.id_product as product_id, p.name as product_name, 
                   p.category as category, p.price as price
            """
            try:
                product_data = {item['product_id']: item for item in session.run(neo4j_query).data()}
            except Exception as e:
                logger.warning(f"Failed to get product data from Neo4j: {e}")
                product_data = {}
        
        # Enhance results with product categories
        enhanced_results = []
        for franchise_data in mongo_results:
            enhanced_products = []
            for product in franchise_data['products']:
                product_info = product_data.get(product['product_id'], {})
                enhanced_product = {
                    **product,
                    'category': product_info.get('category', 'Coffee'),  # Default category
                    'price': self.get_product_price(product['product_id']),
                    'popularity_score': product['total_orders'] / franchise_data['franchise_total_orders'] if franchise_data['franchise_total_orders'] > 0 else 0
                }
                enhanced_products.append(enhanced_product)
            
            enhanced_results.append({
                'franchise_id': franchise_data['_id'],
                'total_orders': franchise_data['franchise_total_orders'],
                'products': sorted(enhanced_products, key=lambda x: x['popularity_score'], reverse=True)
            })
        
        return {
            'franchise_analysis': enhanced_results,
            'top_franchises': sorted(enhanced_results, key=lambda x: x['total_orders'], reverse=True)[:5]
        }

    # ========== FIXED IDEA 3: Franchise Growth Analysis ==========
    def franchise_growth_analysis(self, months_back: int = 12) -> Dict[str, Any]:
        """
        Analyze franchise growth trends
        """
        logger.info("Starting franchise growth analysis...")
        
        # Calculate date range
        end_date = datetime.now()
        start_date = end_date - timedelta(days=months_back * 30)
        
        # Get monthly transaction trends from MongoDB
        mongo_pipeline = [
            {
                "$match": {
                    "transaction_date": {
                        "$gte": start_date.strftime("%Y-%m-%d"),
                        "$lte": end_date.strftime("%Y-%m-%d")
                    }
                }
            },
            {
                "$addFields": {
                    "month_year": {"$substr": ["$transaction_date", 0, 7]},  # Extract YYYY-MM
                    "calculated_revenue": {"$multiply": ["$order_quantity", 30000]}
                }
            },
            {
                "$group": {
                    "_id": {
                        "franchise_id": "$id_franchise",
                        "month": "$month_year"
                    },
                    "monthly_transactions": {"$sum": 1},
                    "monthly_revenue": {"$sum": "$calculated_revenue"}
                }
            },
            {
                "$group": {
                    "_id": "$_id.franchise_id",
                    "monthly_data": {
                        "$push": {
                            "month": "$_id.month",
                            "transactions": "$monthly_transactions",
                            "revenue": "$monthly_revenue"
                        }
                    },
                    "total_transactions": {"$sum": "$monthly_transactions"},
                    "total_revenue": {"$sum": "$monthly_revenue"}
                }
            }
        ]
        
        mongo_results = list(self.transactions_collection.aggregate(mongo_pipeline))
        
        # Get franchise data from Neo4j
        with self.neo4j_driver.session() as session:
            neo4j_query = """
            MATCH (f:Franchise)
            RETURN f.id as franchise_id, f.name as franchise_name, 
                   f.year_established as year_established
            """
            try:
                franchise_data = {item['franchise_id']: item for item in session.run(neo4j_query).data()}
            except Exception as e:
                logger.warning(f"Failed to get franchise data: {e}")
                franchise_data = {}
        
        # Calculate growth metrics
        growth_analysis = []
        for franchise_info in mongo_results:
            franchise_id = franchise_info['_id']
            neo4j_info = franchise_data.get(franchise_id, {})
            
            # Calculate growth trend
            monthly_data = sorted(franchise_info['monthly_data'], key=lambda x: x['month'])
            if len(monthly_data) >= 2:
                first_month_revenue = monthly_data[0]['revenue']
                last_month_revenue = monthly_data[-1]['revenue']
                growth_rate = ((last_month_revenue - first_month_revenue) / first_month_revenue * 100) if first_month_revenue > 0 else 0
            else:
                growth_rate = 0
            
            franchise_age = datetime.now().year - neo4j_info.get('year_established', datetime.now().year)
            
            growth_analysis.append({
                'franchise_id': franchise_id,
                'franchise_name': neo4j_info.get('franchise_name', f'Franchise {franchise_id}'),
                'franchise_age_years': franchise_age,
                'total_transactions': franchise_info['total_transactions'],
                'total_revenue': franchise_info['total_revenue'],
                'growth_rate_percent': growth_rate,
                'monthly_trends': monthly_data,
                'avg_monthly_revenue': franchise_info['total_revenue'] / len(monthly_data) if monthly_data else 0
            })
        
        return {
            'analysis_period_months': months_back,
            'franchise_growth': sorted(growth_analysis, key=lambda x: x['growth_rate_percent'], reverse=True),
            'top_performers': sorted(growth_analysis, key=lambda x: x['total_revenue'], reverse=True)[:10]
        }

    # ========== NEW: Time-based Performance Analysis ==========
    def time_based_performance_analysis(self) -> Dict[str, Any]:
        """
        Analyze employee performance trends over time periods
        """
        logger.info("Starting time-based performance analysis...")
        
        pipeline = [
            {
                "$addFields": {
                    "calculated_revenue": {"$multiply": ["$order_quantity", 30000]},
                    "transaction_year": {"$substr": ["$transaction_date", 0, 4]},
                    "transaction_month": {"$substr": ["$transaction_date", 5, 2]},
                    "transaction_quarter": {
                        "$switch": {
                            "branches": [
                                {"case": {"$in": [{"$substr": ["$transaction_date", 5, 2]}, ["01", "02", "03"]]}, "then": "Q1"},
                                {"case": {"$in": [{"$substr": ["$transaction_date", 5, 2]}, ["04", "05", "06"]]}, "then": "Q2"},
                                {"case": {"$in": [{"$substr": ["$transaction_date", 5, 2]}, ["07", "08", "09"]]}, "then": "Q3"},
                                {"case": {"$in": [{"$substr": ["$transaction_date", 5, 2]}, ["10", "11", "12"]]}, "then": "Q4"}
                            ],
                            "default": "Unknown"
                        }
                    }
                }
            },
            {
                "$group": {
                    "_id": {
                        "employee_id": "$id_employee",
                        "year": "$transaction_year",
                        "quarter": "$transaction_quarter"
                    },
                    "quarterly_transactions": {"$sum": 1},
                    "quarterly_revenue": {"$sum": "$calculated_revenue"},
                    "avg_order_quantity": {"$avg": "$order_quantity"}
                }
            },
            {
                "$group": {
                    "_id": "$_id.employee_id",
                    "quarterly_performance": {
                        "$push": {
                            "year": "$_id.year",
                            "quarter": "$_id.quarter",
                            "transactions": "$quarterly_transactions",
                            "revenue": "$quarterly_revenue",
                            "avg_order_quantity": "$avg_order_quantity"
                        }
                    },
                    "total_quarters_active": {"$sum": 1},
                    "total_revenue_all_time": {"$sum": "$quarterly_revenue"},
                    "total_transactions_all_time": {"$sum": "$quarterly_transactions"}
                }
            },
            {
                "$addFields": {
                    "avg_quarterly_revenue": {"$divide": ["$total_revenue_all_time", "$total_quarters_active"]},
                    "avg_quarterly_transactions": {"$divide": ["$total_transactions_all_time", "$total_quarters_active"]}
                }
            },
            {"$sort": {"total_revenue_all_time": -1}}
        ]
        
        time_results = list(self.transactions_collection.aggregate(pipeline))
        
        return {
            'time_based_analysis': time_results,
            'summary': {
                'total_employees': len(time_results),
                'most_consistent_performer': max(time_results, key=lambda x: x['total_quarters_active']) if time_results else None
            }
        }
    def cross_selling_analysis(self) -> Dict[str, Any]:
        """
        Identify cross-selling opportunities by analyzing product combinations
        """
        logger.info("Starting cross-selling analysis...")
        
        # Get multi-product transactions
        multi_product_pipeline = [
            {
                "$match": {
                    "$expr": {"$gt": [{"$size": "$product"}, 1]}  # More than 1 product
                }
            },
            {
                "$project": {
                    "id_transaction": 1,
                    "product_ids": "$product.id_product",
                    "id_franchise": 1,
                    "calculated_revenue": {"$multiply": ["$order_quantity", 30000]}
                }
            }
        ]
        
        multi_product_transactions = list(self.transactions_collection.aggregate(multi_product_pipeline))
        
        # Get product data from Neo4j
        with self.neo4j_driver.session() as session:
            neo4j_query = """
            MATCH (p:Product)
            RETURN p.id_product as product_id, p.name as product_name, 
                   p.category as category
            """
            try:
                products = {item['product_id']: item for item in session.run(neo4j_query).data()}
            except Exception as e:
                logger.warning(f"Failed to get product data: {e}")
                products = {}
        
        # Analyze product combinations
        product_combinations = {}
        
        for transaction in multi_product_transactions:
            products_in_transaction = transaction['product_ids']
            
            # Analyze product pairs
            for i, product1 in enumerate(products_in_transaction):
                for product2 in products_in_transaction[i+1:]:
                    pair = tuple(sorted([product1, product2]))
                    product_combinations[pair] = product_combinations.get(pair, 0) + 1
        
        # Sort and format results
        top_product_pairs = sorted(product_combinations.items(), key=lambda x: x[1], reverse=True)[:20]
        
        formatted_product_pairs = []
        for (prod1, prod2), count in top_product_pairs:
            prod1_info = products.get(prod1, {})
            prod2_info = products.get(prod2, {})
            formatted_product_pairs.append({
                'product1': {'id': prod1, 'name': prod1_info.get('product_name', f'Product {prod1}')},
                'product2': {'id': prod2, 'name': prod2_info.get('product_name', f'Product {prod2}')},
                'frequency': count,
                'confidence': count / len(multi_product_transactions) if multi_product_transactions else 0
            })
        
        return {
            'analysis_summary': {
                'total_multi_product_transactions': len(multi_product_transactions),
                'unique_product_combinations': len(product_combinations)
            },
            'top_product_combinations': formatted_product_pairs
        }

    # ========== FIXED MongoDB-Only Analysis ==========
    def mongodb_only_analysis(self) -> Dict[str, Any]:
        """
        Comprehensive analysis using only MongoDB data
        """
        logger.info("Running MongoDB-only analysis...")
        
        # Transaction summary
        total_transactions = self.transactions_collection.count_documents({})
        
        # Top employees by transaction count
        employee_pipeline = [
            {
                "$addFields": {
                    "calculated_revenue": {"$multiply": ["$order_quantity", 30000]}
                }
            },
            {
                "$group": {
                    "_id": "$id_employee",
                    "transaction_count": {"$sum": 1},
                    "total_revenue": {"$sum": "$calculated_revenue"},
                    "avg_order_quantity": {"$avg": "$order_quantity"}
                }
            },
            {"$sort": {"total_revenue": -1}},
            {"$limit": 10}
        ]
        
        top_employees = list(self.transactions_collection.aggregate(employee_pipeline))
        
        # Top products (unwind the array first)
        product_pipeline = [
            {"$unwind": "$product"},
            {
                "$group": {
                    "_id": "$product.id_product",
                    "product_name": {"$first": "$product.name"},
                    "total_quantity": {"$sum": "$product.quantity"},
                    "order_count": {"$sum": 1}
                }
            },
            {"$sort": {"total_quantity": -1}},
            {"$limit": 10}
        ]
        
        top_products = list(self.transactions_collection.aggregate(product_pipeline))
        
        # Franchise analysis
        franchise_pipeline = [
            {
                "$addFields": {
                    "calculated_revenue": {"$multiply": ["$order_quantity", 30000]}
                }
            },
            {
                "$group": {
                    "_id": "$id_franchise",
                    "transaction_count": {"$sum": 1},
                    "total_revenue": {"$sum": "$calculated_revenue"}
                }
            },
            {"$sort": {"total_revenue": -1}},
            {"$limit": 10}
        ]
        
        top_franchises = list(self.transactions_collection.aggregate(franchise_pipeline))
        
        return {
            'summary': {
                'total_transactions': total_transactions,
                'analysis_type': 'MongoDB Only'
            },
            'top_employees': top_employees,
            'top_products': top_products,
            'top_franchises': top_franchises
        }

    # ========== FIXED Customer Segmentation ==========
    def customer_segmentation_analysis(self) -> Dict[str, Any]:
        """
        Segment customers based on transaction patterns
        """
        logger.info("Starting customer segmentation analysis...")
        
        pipeline = [
            {
                "$addFields": {
                    "calculated_revenue": {"$multiply": ["$order_quantity", 30000]}
                }
            },
            {
                "$group": {
                    "_id": "$name",  # Customer name as identifier
                    "total_spent": {"$sum": "$calculated_revenue"},
                    "transaction_count": {"$sum": 1},
                    "avg_order_value": {"$avg": "$calculated_revenue"},
                    "preferred_franchises": {"$addToSet": "$id_franchise"},
                    "last_transaction": {"$max": "$transaction_date"}
                }
            },
            {
                "$addFields": {
                    "customer_segment": {
                        "$switch": {
                            "branches": [
                                {
                                    "case": {"$and": [{"$gte": ["$total_spent", 500000]}, {"$gte": ["$transaction_count", 20]}]},
                                    "then": "VIP"
                                },
                                {
                                    "case": {"$and": [{"$gte": ["$total_spent", 200000]}, {"$gte": ["$transaction_count", 10]}]},
                                    "then": "Regular"
                                },
                                {
                                    "case": {"$and": [{"$lte": ["$total_spent", 100000]}, {"$lte": ["$transaction_count", 5]}]},
                                    "then": "Occasional"
                                }
                            ],
                            "default": "New"
                        }
                    },
                    "franchise_loyalty": {"$size": "$preferred_franchises"}
                }
            },
            {
                "$group": {
                    "_id": "$customer_segment",
                    "customer_count": {"$sum": 1},
                    "avg_total_spent": {"$avg": "$total_spent"},
                    "avg_transaction_count": {"$avg": "$transaction_count"},
                    "avg_order_value": {"$avg": "$avg_order_value"}
                }
            }
        ]
        
        segmentation_results = list(self.transactions_collection.aggregate(pipeline))
        
        return {
            'customer_segments': segmentation_results,
            'total_customers': sum(segment['customer_count'] for segment in segmentation_results)
        }

    # ========== Main Analysis Runner ==========
    def run_comprehensive_analysis(self) -> Dict[str, Any]:
        """
        Run all analysis functions and return comprehensive results
        """
        logger.info("Starting comprehensive analysis...")
        
        results = {}
        
        try:
            # Employee Performance Analysis
            try:
                results['employee_performance'] = self.employee_performance_analysis(
                    start_date="2020-01-01", 
                    end_date="2025-12-31"
                )
                logger.info("✓ Employee performance analysis completed")
            except Exception as e:
                logger.error(f"Employee performance analysis failed: {e}")
                results['employee_performance'] = {'error': str(e)}
            
            # Regional Product Analysis
            try:
                results['regional_products'] = self.regional_product_analysis()
                logger.info("✓ Regional product analysis completed")
            except Exception as e:
                logger.error(f"Regional product analysis failed: {e}")
                results['regional_products'] = {'error': str(e)}
            
            # Franchise Growth Analysis
            try:
                results['franchise_growth'] = self.franchise_growth_analysis(months_back=12)
                logger.info("✓ Franchise growth analysis completed")
            except Exception as e:
                logger.error(f"Franchise growth analysis failed: {e}")
                results['franchise_growth'] = {'error': str(e)}
            
            # Cross-selling Analysis
            try:
                results['cross_selling'] = self.cross_selling_analysis()
                logger.info("✓ Cross-selling analysis completed")
            except Exception as e:
                logger.error(f"Cross-selling analysis failed: {e}")
                results['cross_selling'] = {'error': str(e)}
            
            # Customer segmentation
            try:
                results['customer_segmentation'] = self.customer_segmentation_analysis()
                logger.info("✓ Customer segmentation completed")
            except Exception as e:
                logger.error(f"Customer segmentation failed: {e}")
                results['customer_segmentation'] = {'error': str(e)}
            
            # MongoDB-only analysis as backup
            try:
                results['mongodb_analysis'] = self.mongodb_only_analysis()
                logger.info("✓ MongoDB analysis completed")
            except Exception as e:
                logger.error(f"MongoDB analysis failed: {e}")
                results['mongodb_analysis'] = {'error': str(e)}
            
            logger.info("Comprehensive analysis completed!")
            
        except Exception as e:
            logger.error(f"Critical error during analysis: {str(e)}")
            results = {'error': 'Analysis failed', 'details': str(e)}
        
        return results

    # ========== Data Validation ==========
    def validate_data_structure(self) -> Dict[str, Any]:
        """
        Validate the data structure in both MongoDB and Neo4j
        """
        logger.info("Validating data structure...")
        
        validation_results = {
            'mongodb': {},
            'neo4j': {},
            'recommendations': []
        }
        
        # MongoDB validation
        try:
            sample_transaction = self.transactions_collection.find_one()
            if sample_transaction:
                validation_results['mongodb'] = {
                    'sample_structure': {
                        'fields': list(sample_transaction.keys()),
                        'has_product_info': 'product' in sample_transaction,
                        'product_is_array': isinstance(sample_transaction.get('product'), list),
                        'has_employee_info': 'id_employee' in sample_transaction,
                        'has_franchise_info': 'id_franchise' in sample_transaction
                    },
                    'total_documents': self.transactions_collection.count_documents({}),
                    'sample_product_structure': sample_transaction.get('product', [])[:2] if sample_transaction.get('product') else []
                }
            else:
                validation_results['mongodb']['error'] = 'No documents found'
        except Exception as e:
            validation_results['mongodb']['error'] = str(e)
        
        # Neo4j validation
        try:
            with self.neo4j_driver.session() as session:
                node_counts = {}
                for label in ['Employee', 'Product', 'Franchise']:
                    try:
                        result = session.run(f"MATCH (n:{label}) RETURN count(n) as count")
                        node_counts[label] = result.single()['count']
                    except Exception as e:
                        node_counts[label] = f"Error: {e}"
                
                validation_results['neo4j']['node_counts'] = node_counts
                
        except Exception as e:
            validation_results['neo4j']['error'] = str(e)
        
        return validation_results

# ========== Usage Example ==========
def main():
    """
    Example usage of the fixed MongoNeo4jAggregator
    """
    # Database connection parameters
    MONGO_URI = "mongodb://localhost:27017/"
    URI = os.getenv("NEO4J_URI")
    USERNAME = os.getenv("NEO4J_USERNAME")
    PASSWORD = os.getenv("NEO4J_PASSWORD")
    
    # Initialize aggregator
    aggregator = MongoNeo4jAggregator(
        mongo_uri=MONGO_URI,
        neo4j_uri=URI,
        neo4j_user=USERNAME,
        neo4j_password=PASSWORD
    )
    
    try:
        # Validate data structure first
        print("=== DATA VALIDATION ===")
        validation = aggregator.validate_data_structure()
        print(json.dumps(validation, indent=2, default=str))
        
        # Run comprehensive analysis
        print("\n=== RUNNING COMPREHENSIVE ANALYSIS ===")
        results = aggregator.run_comprehensive_analysis()
        
        # Save results to JSON file
        output_file = f'fixed_analysis_results_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
        with open(output_file, 'w') as f:
            json.dump(results, f, indent=2, default=str)
        
        print(f"\nResults saved to: {output_file}")
        
        # Print summary
        print("\n=== ANALYSIS SUMMARY ===")
        
        for analysis_type, data in results.items():
            if 'error' not in data:
                print(f"✓ {analysis_type.replace('_', ' ').title()}: Success")
                
                # Print specific metrics for each analysis
                if analysis_type == 'employee_performance':
                    emp_count = len(data.get('employee_performance', []))
                    print(f"  - {emp_count} employees analyzed")
                    if emp_count > 0:
                        top_performer = data['employee_performance'][0]
                        print(f"  - Top performer: {top_performer['employee_name']} (₹{top_performer['revenue_per_hour']:,.2f}/hour)")
                
                elif analysis_type == 'regional_products':
                    franchise_count = len(data.get('franchise_analysis', []))
                    print(f"  - {franchise_count} franchises analyzed")
                
                elif analysis_type == 'customer_segmentation':
                    total_customers = data.get('total_customers', 0)
                    print(f"  - {total_customers} customers segmented")
                    for segment in data.get('customer_segments', []):
                        print(f"    • {segment['_id']}: {segment['customer_count']} customers")
                
                elif analysis_type == 'mongodb_analysis':
                    total_trans = data['summary'].get('total_transactions', 0)
                    print(f"  - {total_trans:,} total transactions processed")
                
            else:
                print(f"✗ {analysis_type.replace('_', ' ').title()}: {data['error']}")
        
        print(f"\n=== DETAILED RESULTS SAVED TO: {output_file} ===")
        
    except Exception as e:
        logger.error(f"Main execution failed: {str(e)}")
        print(f"Analysis failed with error: {e}")
    
    finally:
        # Close connections
        aggregator.close_connections()
        print("Database connections closed.")

if __name__ == "__main__":
    main()

INFO:__main__:Validating data structure...


=== DATA VALIDATION ===


INFO:__main__:Starting comprehensive analysis...
INFO:__main__:Starting employee performance analysis for period: 2020-01-01 to 2025-12-31


{
  "mongodb": {
    "sample_structure": {
      "fields": [
        "_id",
        "id_transaction",
        "product",
        "name",
        "order_quantity",
        "id_franchise",
        "id_employee",
        "transaction_date"
      ],
      "has_product_info": true,
      "product_is_array": true,
      "has_employee_info": true,
      "has_franchise_info": true
    },
    "total_documents": 163500,
    "sample_product_structure": [
      {
        "id_product": "C5",
        "name": "Mocha",
        "quantity": 3
      }
    ]
  },
  "neo4j": {
    "node_counts": {
      "Employee": 50,
      "Product": 10,
      "Franchise": 10
    }
  },
  "recommendations": []
}

=== RUNNING COMPREHENSIVE ANALYSIS ===


INFO:__main__:✓ Employee performance analysis completed
INFO:__main__:Starting regional product analysis...
INFO:__main__:✓ Regional product analysis completed
INFO:__main__:Starting franchise growth analysis...
INFO:__main__:✓ Franchise growth analysis completed
INFO:__main__:Starting cross-selling analysis...
INFO:__main__:✓ Cross-selling analysis completed
INFO:__main__:Starting customer segmentation analysis...
INFO:__main__:✓ Customer segmentation completed
INFO:__main__:Running MongoDB-only analysis...
INFO:__main__:✓ MongoDB analysis completed
INFO:__main__:Comprehensive analysis completed!



Results saved to: fixed_analysis_results_20250526_153940.json

=== ANALYSIS SUMMARY ===
✓ Employee Performance: Success
  - 40 employees analyzed
  - Top performer: Employee 48 (₹234,061.64/hour)
✓ Regional Products: Success
  - 10 franchises analyzed
✓ Franchise Growth: Success
✓ Cross Selling: Success
✓ Customer Segmentation: Success
  - 85954 customers segmented
    • Regular: 775 customers
    • Occasional: 18670 customers
    • VIP: 175 customers
    • New: 66334 customers
✓ Mongodb Analysis: Success
  - 163,500 total transactions processed

=== DETAILED RESULTS SAVED TO: fixed_analysis_results_20250526_153940.json ===
Database connections closed.
