pt2.of hybrid join.

In [None]:
import pandas as pd
import pymysql
import threading
import time
from collections import deque, defaultdict
import sys

class HYBRIDJOIN:
    def __init__(self, db_credentials, hS=10000, vP=500):
        """
        HYBRIDJOIN Algorithm with exact parameters from project specs
        hS = 10,000 slots (hash table compartments)
        vP = 500 tuples (disk partition size)
        """
        # STEP 1: SETUP TOOLS (Exact parameters from specs)
        
        self.hS = hS  # 10,000 slots as specified
        self.vP = vP  # 500 tuples per disk partition as specified
        
        # Hash Table Multimap with exactly hS compartments
        self.hash_table = defaultdict(list)  # Allows multiple entries per key
        
        # Queue: FIFO order for fairness in processing
        self.queue = deque()  # Stores join attribute values (keys)
        
        # Disk Buffer: Memory buffer for disk partitions of size vP
        self.disk_buffer = []  # Join window - holds vP tuples from R
        
        # Stream Buffer: Temporary hold for bursty stream data
        self.stream_buffer = []  # Prevents loss of data in bursty scenarios
        
        # Stream Input (w): Available slots in hash table
        self.w = hS  # Starts with all hS compartments free
        
        # Algorithm control
        self.running = True
        self.processed_count = 0
        self.stream_finished = False
        
        # Database connection for disk-based relation R
        self.conn = pymysql.connect(**db_credentials)
        self.cursor = self.conn.cursor()
        self.cursor.execute("USE walmart")
        
        # Pre-load disk-based relation R (Master Data)
        self.load_disk_relation()
        
        print(f"STEP 1 COMPLETED: Tools initialized with hS={hS}, vP={vP}, w={self.w}")

    def load_disk_relation(self):
        """Load disk-based relation R (Master Data) into memory"""
        print("Loading disk-based relation R (Master Data)...")
        
        # Customer master data - part of disk relation R
        self.customer_master = {}
        self.cursor.execute("SELECT Customer_ID, Gender, Age, Occupation, City_Category, Stay_In_Current_City_Years, Marital_Status FROM DimCustomer")
        for row in self.cursor.fetchall():
            self.customer_master[row[0]] = {
                'Gender': row[1], 'Age': row[2], 'Occupation': row[3],
                'City_Category': row[4], 'Stay_In_Current_City_Years': row[5],
                'Marital_Status': row[6]
            }
        
        # Product master data - part of disk relation R
        self.product_master = {}
        self.cursor.execute("SELECT Product_ID, Product_Category, Price, StoreID, SupplierID, StoreName, SupplierName FROM DimProduct")
        for row in self.cursor.fetchall():
            self.product_master[row[0]] = {
                'Product_Category': row[1], 'Price': row[2], 
                'StoreID': row[3], 'SupplierID': row[4],
                'StoreName': row[5], 'SupplierName': row[6]
            }
        
        print(f"Disk relation R loaded: {len(self.customer_master)} customers, {len(self.product_master)} products")

    def stream_reader_thread(self):
        """
        Independent thread: Continuously gets data from transactional_data.csv
        into stream buffer as specified in project notes
        """
        print("Stream reader thread started - continuously feeding stream S")
        
        try:
            # Read transactional data in appropriate chunks for near-real-time simulation
            chunk_size = 200  # Balanced chunk size for performance
            stream_chunks = 0
            
            for chunk in pd.read_csv('transactional_data.csv', chunksize=chunk_size):
                if not self.running:
                    break
                
                transactions = chunk.to_dict('records')
                self.stream_buffer.extend(transactions)
                stream_chunks += 1
                
                print(f"Stream S → Stream Buffer: +{len(transactions)} tuples (Total: {len(self.stream_buffer)})")
                time.sleep(0.02)  # Near-real-time simulation delay
            
            self.stream_finished = True
            print(f"STREAM READING COMPLETED: Processed {stream_chunks} chunks")
            
        except Exception as e:
            print(f"Stream reader error: {e}")
            self.stream_finished = True

    def hash_function(self, key):
        """Hash function mapping join keys to hS compartments"""
        return hash(str(key)) % self.hS

    def hybrid_join_algorithm(self):
        """
        Main HYBRIDJOIN algorithm thread
        Implements the exact 5-step process from project specifications
        """
        print("HYBRIDJOIN algorithm thread started")
        
        iteration = 0
        max_empty_iterations = 100  # Stop after 100 empty iterations
        
        while self.running and iteration < 100000:  # Safety limit
            iteration += 1
            
            # STEP 2: STREAM BUFFER → HASH TABLE + QUEUE
            
            stream_processed = 0
            if self.stream_buffer and self.w > 0:
                # Take up to w pieces of data as specified
                tuples_to_process = min(self.w, len(self.stream_buffer))
                stream_processed = 0
                
                for i in range(tuples_to_process):
                    if not self.stream_buffer:
                        break
                    
                    stream_tuple = self.stream_buffer.pop(0)
                    join_key = stream_tuple['Customer_ID']  # Join attribute
                    
                    # Hash to pick compartment
                    slot = self.hash_function(join_key)
                    
                    # Store in hash table (multi-map structure)
                    hash_entry = {
                        'tuple': stream_tuple,
                        'key': join_key,
                        'queue_reference': len(self.queue)  # Pointer to queue node
                    }
                    self.hash_table[slot].append(hash_entry)
                    
                    # Add key to queue (FIFO order)
                    self.queue.append(join_key)
                    stream_processed += 1
                
                # w becomes smaller as specified
                self.w -= stream_processed
                
                if stream_processed > 0:
                    print(f"STEP 2: Loaded {stream_processed} tuples to hash table. w={self.w}, Queue size: {len(self.queue)}")

            # STEP 3: QUEUE → DISK BUFFER (Join Window)
            
            disk_loaded = 0
            if self.queue:
                # Use oldest key from queue (FIFO order)
                oldest_key = self.queue[0]
                
                # Load disk partition of size vP for this key
                self.disk_buffer = self.load_disk_partition(oldest_key)
                disk_loaded = len(self.disk_buffer)
                
                if disk_loaded > 0:
                    print(f"STEP 3: Loaded disk partition for key {oldest_key} (vP={disk_loaded})")

            # STEP 4: JOIN PROCESSING → DATA WAREHOUSE
            
            join_output_count = 0
            freed_slots = 0
            
            if self.disk_buffer and self.queue:
                oldest_key = self.queue[0]
                
                # Probe hash table with disk buffer tuples
                for disk_tuple in self.disk_buffer:
                    slot = self.hash_function(oldest_key)
                    matching_entries = [e for e in self.hash_table[slot] if e['key'] == oldest_key]
                    
                    # Process up to vP tuples as specified
                    for hash_entry in matching_entries[:self.vP]:
                        stream_tuple = hash_entry['tuple']
                        
                        # Generate join output (combined result)
                        enriched_data = self.generate_join_output(stream_tuple, disk_tuple)
                        if enriched_data:
                            # Load to Data Warehouse
                            if self.load_to_dw(enriched_data):
                                # Remove matched data from hash table
                                self.hash_table[slot].remove(hash_entry)
                                freed_slots += 1
                                join_output_count += 1
                                self.processed_count += 1
                
                # Update w with freed spaces
                self.w += freed_slots
                
                # Remove processed keys from queue
                self.cleanup_queue()
                
                if join_output_count > 0:
                    print(f"STEP 4: Joined {join_output_count} records. Freed {freed_slots} slots. Total processed: {self.processed_count}")

            # STEP 5: LOOP BACK TO STEP 2
            
            # Check completion conditions
            if self.stream_finished and not self.stream_buffer and not self.queue:
                print("ALL DATA PROCESSED - HYBRIDJOIN COMPLETED!")
                break
                
            # If no progress in this iteration, small delay
            if stream_processed == 0 and disk_loaded == 0 and join_output_count == 0:
                time.sleep(0.01)
                if iteration % 1000 == 0:
                    print(f"Progress: {self.processed_count} records, Buffer: {len(self.stream_buffer)}, Queue: {len(self.queue)}")
            else:
                time.sleep(0.001)  # Minimal delay when processing
        
        print(f"HYBRIDJOIN FINAL: {self.processed_count} records loaded to Data Warehouse")
        self.running = False

    def load_disk_partition(self, key):
        """
        STEP 3: Load disk partition of size vP from R
        Returns list of disk tuples related to the key
        """
        disk_tuples = []
        
        # Load customer data for the key
        if key in self.customer_master:
            customer_data = self.customer_master[key]
            disk_tuples.append(('customer', key, customer_data))
        
        # In full implementation, would load related product data etc.
        # Limited to vP=500 tuples as specified
        
        return disk_tuples

    def generate_join_output(self, stream_tuple, disk_tuple):
        """
        STEP 4: Generate combined result by joining stream + disk data
        """
        try:
            disk_type, key, disk_data = disk_tuple
            
            customer_id = stream_tuple['Customer_ID']
            product_id = stream_tuple['Product_ID']
            
            # Get product data from disk relation R
            product_info = self.product_master.get(product_id, {})
            
            # Calculate total amount
            quantity = stream_tuple['quantity']
            price = product_info.get('Price', 0)
            total_amount = quantity * price
            
            # Create enriched record with ALL columns for FactSales
            enriched = {
                # Core transactional data
                'Customer_ID': customer_id,
                'Product_ID': product_id,
                'Order_ID': stream_tuple['orderID'],
                'Date': stream_tuple['date'],
                'Quantity': quantity,
                
                # Enriched customer demographics (from DimCustomer)
                'Gender': disk_data.get('Gender'),
                'Age': disk_data.get('Age'),
                'Occupation': disk_data.get('Occupation'),
                'City_Category': disk_data.get('City_Category'),
                'Stay_In_Current_City_Years': disk_data.get('Stay_In_Current_City_Years'),
                'Marital_Status': disk_data.get('Marital_Status'),
                
                # Enriched product details (from DimProduct)
                'Product_Category': product_info.get('Product_Category'),
                'Price': price,
                'StoreID': product_info.get('StoreID'),
                'StoreName': product_info.get('StoreName'),
                'SupplierID': product_info.get('SupplierID'),
                'SupplierName': product_info.get('SupplierName'),
                
                # Calculated measures
                'Total_Amount': total_amount
            }
            return enriched
            
        except Exception as e:
            print(f"Join output error: {e}")
            return None

    def cleanup_queue(self):
        """Remove keys from queue that have no more tuples in hash table"""
        keys_to_remove = []
        
        for key in list(self.queue):
            slot = self.hash_function(key)
            if not any(entry['key'] == key for entry in self.hash_table[slot]):
                keys_to_remove.append(key)
        
        for key in keys_to_remove:
            while key in self.queue:
                self.queue.remove(key)

    def load_to_dw(self, enriched_data):
        """Load combined result to Data Warehouse fact table with ALL enriched columns"""
        try:
            self.cursor.execute("""
                INSERT INTO FactSales 
                (Customer_ID, Product_ID, Date_ID, Order_ID, Quantity,
                 Gender, Age, Occupation, City_Category, Stay_In_Current_City_Years, Marital_Status,
                 Product_Category, Price, StoreID, StoreName, SupplierID, SupplierName,
                 Total_Amount)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """, (
                # Core transactional data
                enriched_data['Customer_ID'],
                enriched_data['Product_ID'],
                enriched_data['Date'],
                enriched_data['Order_ID'],
                enriched_data['Quantity'],
                
                # Enriched customer demographics
                enriched_data['Gender'],
                enriched_data['Age'],
                enriched_data['Occupation'],
                enriched_data['City_Category'],
                enriched_data['Stay_In_Current_City_Years'],
                enriched_data['Marital_Status'],
                
                # Enriched product details
                enriched_data['Product_Category'],
                enriched_data['Price'],
                enriched_data['StoreID'],
                enriched_data['StoreName'],
                enriched_data['SupplierID'],
                enriched_data['SupplierName'],
                
                # Calculated measures
                enriched_data['Total_Amount']
            ))
            
            # Commit in batches for performance (every 50 records)
            if self.processed_count % 50 == 0:
                self.conn.commit()
                
            return True
        except Exception as e:
            print(f"DW load error: {e}")
            return False

    def run(self):
        """
        Run HYBRIDJOIN system with two independent threads as specified
        """
        print("=" * 60)
        print("STARTING HYBRIDJOIN SYSTEM FOR WALMART DATA WAREHOUSE")
        print("=" * 60)
        
        try:
            # Thread 1: Continuous stream reader (independent thread)
            stream_thread = threading.Thread(target=self.stream_reader_thread)
            stream_thread.daemon = True
            stream_thread.start()
            
            # Allow stream to populate initially
            print("Waiting for stream data to populate...")
            time.sleep(2)
            
            # Thread 2: HYBRIDJOIN algorithm (main thread)
            self.hybrid_join_algorithm()
            
        except KeyboardInterrupt:
            print("\n HYBRIDJOIN stopped by user")
            self.running = False
        except Exception as e:
            print(f"Error in HYBRIDJOIN execution: {e}")
        finally:
            # Final cleanup
            self.conn.commit()
            self.conn.close()
            print("HYBRIDJOIN system shutdown complete")

def main():
    host = input("Enter database host [localhost]: ") or "localhost"
    user = input("Enter database username [root]: ") or "root"
    password = input("Enter database password: ")
    
    #host = "localhost"
    #user = "root"
    #password = "yumn@1922@mn@"
    
    db_creds = {
        'host': host,
        'user': user,
        'password': password,
        'database': 'walmart'
    }
    
    try:
        hj = HYBRIDJOIN(db_creds, hS=10000, vP=500)
        hj.run()
    except Exception as e:
        print(f"Failed to initialize HYBRIDJOIN: {e}")

if __name__ == "__main__":
    main()