In [None]:
import pandas as pd
import numpy as np
import threading
from prophet import Prophet
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime

# --- SQLAlchemy Setup --- #
Base = declarative_base()
class Product(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    quantity = Column(Integer)
    barcode = Column(String)
    status = Column(String, default="Normal")
    # Updated backref name to avoid conflict
    product_purchases = relationship("Purchase", backref="purchased_product")

class Customer(Base):
    __tablename__ = 'customers'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)
    total_spent = Column(Integer, default=0)
    total_purchases = Column(Integer, default=0)

    # Relationship to track purchases made by the customer
    customer_purchases = relationship("Purchase", backref="purchasing_customer")

class Purchase(Base):
    __tablename__ = 'purchases'
    id = Column(Integer, primary_key=True)
    customer_id = Column(Integer, ForeignKey('customers.id'))
    product_id = Column(Integer, ForeignKey('products.id'))
    quantity = Column(Integer)
    timestamp = Column(String, default=datetime.now)

    customer = relationship("Customer", backref="purchases")
    product = relationship("Product", backref="purchases")

# --- Inventory Management --- #
class InventoryManager:
    def __init__(self, db_path='sqlite:///inventory.db'):
        self.engine = create_engine(db_path)
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)

    def add_sample_data(self):
        session = self.Session()
        try:
            if not session.query(Product).first():
                sample_products = [
                    Product(id=1, name="Soap", quantity=100, barcode="123456789"),
                    Product(id=2, name="Shampoo", quantity=50, barcode="987654321"),
                ]
                session.add_all(sample_products)
                session.commit()

            if not session.query(Customer).first():
                sample_customers = [
                    Customer(id=1, name="John Doe", email="john@example.com"),
                    Customer(id=2, name="Jane Smith", email="jane@example.com"),
                ]
                session.add_all(sample_customers)
                session.commit()

        finally:
            session.close()

    def update_quantity_and_status(self, product_name, quantity, status):
        session = self.Session()
        try:
            product = session.query(Product).filter_by(name=product_name).first()
            if product:
                product.quantity = quantity
                product.status = status
                session.commit()
                print(f"✅ Updated '{product.name}': Quantity={quantity}, Status={status}")
            else:
                new_product = Product(name=product_name, quantity=quantity, status=status)
                session.add(new_product)
                session.commit()
                print(f"🆕 Added new product '{product_name}' with Quantity={quantity}, Status={status}")
        finally:
            session.close()

    def add_purchase(self, customer_id, product_name, quantity):
        session = self.Session()
        try:
            customer = session.query(Customer).get(customer_id)
            product = session.query(Product).filter_by(name=product_name).first()

            if customer and product:
                purchase = Purchase(customer_id=customer.id, product_id=product.id, quantity=quantity)
                session.add(purchase)

                # Update customer and product statistics
                customer.total_spent += product.quantity * quantity  # Adjust calculation
                customer.total_purchases += quantity
                product.quantity -= quantity  # Decrease product stock after purchase

                session.commit()
                print(f"🛒 Purchase recorded for '{product_name}' by {customer.name}.")

        finally:
            session.close()

# --- Stock Prediction --- #
def stock_prediction():
    try:
        data = pd.read_csv("sales_data.csv", parse_dates=["ds"])
        m = Prophet()
        m.fit(data)
        future = m.make_future_dataframe(periods=30)
        forecast = m.predict(future)
        latest = forecast.tail(1)['yhat'].iloc[0]
        status = "Overstock" if latest > 80 else "Low Stock" if latest < 20 else "Normal"

        # Update DB
        InventoryManager().update_quantity_and_status("Soap", int(latest), status)

    except Exception as e:
        print("❌ Stock Prediction Error:", e)

# --- Demand Forecasting --- #
def demand_forecasting():
    try:
        data = pd.read_csv("sales_data.csv", parse_dates=["ds"])
        data['seasonality'] = np.sin(np.linspace(0, 2 * np.pi * len(data) / 30, len(data)))
        data['promotions'] = np.random.choice([0, 1], len(data))

        m = Prophet()
        m.add_regressor('seasonality')
        m.add_regressor('promotions')
        m.fit(data)

        future = m.make_future_dataframe(periods=30)
        future['seasonality'] = np.sin(np.linspace(0, 2 * np.pi * (len(data) + 30) / 30, len(data) + 30))
        future['promotions'] = np.random.choice([0, 1], len(data) + 30)

        forecast = m.predict(future)
        avg = forecast['yhat'].tail(30).mean()
        print(f"[Demand Forecasting] 30‑day avg demand: {avg:.1f} units")

        # Update DB
        InventoryManager().update_quantity_and_status("Shampoo", int(avg), "Forecasted")

    except Exception as e:
        print("❌ Demand Forecasting Error:", e)

# --- Customer Interaction Analysis --- #
def customer_behavior_analysis():
    session = InventoryManager().Session()
    customers = session.query(Customer).all()

    for customer in customers:
        print(f"📊 Customer {customer.name} - Total Spent: {customer.total_spent}, Total Purchases: {customer.total_purchases}")

    session.close()

# --- Start Threads --- #
def start_ml_threads():
    threading.Thread(target=stock_prediction, daemon=True).start()
    threading.Thread(target=demand_forecasting, daemon=True).start()

# --- Test Mode --- #
if __name__ == "__main__":
    # Add sample data and run ML threads
    InventoryManager().add_sample_data()
    start_ml_threads()
    import time
    time.sleep(6)

    # Run customer behavior analysis
    customer_behavior_analysis()


In [None]:
!pip install qrcode

Collecting qrcode
  Downloading qrcode-8.1-py3-none-any.whl.metadata (17 kB)
Downloading qrcode-8.1-py3-none-any.whl (45 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/45.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.7/45.7 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: qrcode
Successfully installed qrcode-8.1


In [None]:
import pandas as pd
import numpy as np
import threading
from prophet import Prophet
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
import qrcode
import sqlite3

# --- SQLAlchemy Setup --- #
Base = declarative_base()

class Product(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    quantity = Column(Integer)
    barcode = Column(String)
    status = Column(String, default="Normal")
    product_purchases = relationship("Purchase", backref="purchased_product")

class Customer(Base):
    __tablename__ = 'customers'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)
    total_spent = Column(Integer, default=0)
    total_purchases = Column(Integer, default=0)
    customer_purchases = relationship("Purchase", backref="purchasing_customer")

class Purchase(Base):
    __tablename__ = 'purchases'
    id = Column(Integer, primary_key=True)
    customer_id = Column(Integer, ForeignKey('customers.id'))
    product_id = Column(Integer, ForeignKey('products.id'))
    quantity = Column(Integer)
    timestamp = Column(String, default=datetime.now)
    customer = relationship("Customer", backref="purchases")
    product = relationship("Product", backref="purchases")

# --- Inventory Management --- #
class InventoryManager:
    def __init__(self, db_path='sqlite:///inventory.db'):
        self.engine = create_engine(db_path)
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)

    def add_sample_data(self):
        session = self.Session()
        try:
            if not session.query(Product).first():
                sample_products = [
                    Product(id=1, name="Soap", quantity=100, barcode="123456789"),
                    Product(id=2, name="Shampoo", quantity=50, barcode="987654321"),
                ]
                session.add_all(sample_products)
                session.commit()

            if not session.query(Customer).first():
                sample_customers = [
                    Customer(id=1, name="John Doe", email="john@example.com"),
                    Customer(id=2, name="Jane Smith", email="jane@example.com"),
                ]
                session.add_all(sample_customers)
                session.commit()

        finally:
            session.close()

    def update_quantity_and_status(self, product_name, quantity, status):
        session = self.Session()
        try:
            product = session.query(Product).filter_by(name=product_name).first()
            if product:
                product.quantity = quantity
                product.status = status
                session.commit()
                print(f"✅ Updated '{product.name}': Quantity={quantity}, Status={status}")
            else:
                new_product = Product(name=product_name, quantity=quantity, status=status)
                session.add(new_product)
                session.commit()
                print(f"🆕 Added new product '{product_name}' with Quantity={quantity}, Status={status}")
        finally:
            session.close()

    def add_purchase(self, customer_id, product_name, quantity):
        session = self.Session()
        try:
            customer = session.query(Customer).get(customer_id)
            product = session.query(Product).filter_by(name=product_name).first()

            if customer and product:
                purchase = Purchase(customer_id=customer.id, product_id=product.id, quantity=quantity)
                session.add(purchase)

                # Update customer and product statistics
                customer.total_spent += product.quantity * quantity  # Adjust calculation
                customer.total_purchases += quantity
                product.quantity -= quantity  # Decrease product stock after purchase

                session.commit()
                print(f"🛒 Purchase recorded for '{product_name}' by {customer.name}.")

        finally:
            session.close()

# --- QR Code Generation --- #
def generate_qr_code(product_id):
    # Create connection to the database
    conn = sqlite3.connect('inventory.db')
    cursor = conn.cursor()

    # Fetch product details based on the product_id
    cursor.execute("SELECT name, quantity, barcode FROM products WHERE id=?", (product_id,))
    product = cursor.fetchone()

    if product:
        product_name, quantity, barcode = product
        product_details = f"Product: {product_name}\nQuantity: {quantity}\nBarcode: {barcode}"

        # Generate QR Code with higher error correction level and larger size
        qr = qrcode.QRCode(
            version=4,  # version 4 allows more data (larger size)
            error_correction=qrcode.constants.ERROR_CORRECT_Q,  # Medium error correction level
            box_size=10,  # Larger box size
            border=6,  # Larger border
        )
        qr.add_data(product_details)
        qr.make(fit=True)

        # Create an image from the QR code
        img = qr.make_image(fill='black', back_color='white')

        # Save the QR code image
        img.save(f"{product_name}_qr_code.png")
        print(f"✅ QR Code generated for {product_name}!")
    else:
        print("❌ Product not found in the database.")

    # Close the database connection
    conn.close()

# --- Start Example --- #
if __name__ == "__main__":
    # Initialize inventory manager and add sample data
    inventory_manager = InventoryManager(db_path='sqlite:///inventory.db')
    inventory_manager.add_sample_data()  # Add sample data if necessary

    # Now generate QR code for a product
    generate_qr_code(1)  # Generate QR code for the product with id = 1


✅ QR Code generated for Soap!


  Base = declarative_base()
  if not session.query(Product).first():
  if not session.query(Product).first():
  if not session.query(Product).first():
  if not session.query(Product).first():
