<a href="https://colab.research.google.com/github/Sp-supriya/HCDS-Projects/blob/main/Project3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **BLOCKCHAIN BASED VOTING SYSTEM**


In [None]:
!pip install cryptography



In [None]:
import hashlib
import json
from time import time
from uuid import uuid4
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
import base64

class Voter:
    def __init__(self, voter_id):
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048
        )
        self.public_key = self.private_key.public_key()
        self.voter_id = voter_id

    def sign_vote(self, vote_data):
        signature = self.private_key.sign(
            json.dumps(vote_data).encode(),
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return base64.b64encode(signature).decode()

    def get_public_key_bytes(self):
        return self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )

class Block:
    def __init__(self, index, transactions, timestamp, previous_hash):
        self.index = index
        self.transactions = transactions
        self.timestamp = timestamp
        self.previous_hash = previous_hash
        self.nonce = 0
        self.hash = self.calculate_hash()

    def calculate_hash(self):
        block_string = json.dumps(self.__dict__, sort_keys=True)
        return hashlib.sha256(block_string.encode()).hexdigest()

class VotingBlockchain:
    def __init__(self, difficulty=4):
        self.chain = []
        self.difficulty = difficulty
        self.pending_transactions = []
        self.registered_voters = {}
        self.candidates = {}
        self.votes_cast = set()  # Track voters who have already voted

        # Create genesis block
        self.create_genesis_block()

    def create_genesis_block(self):
        genesis_block = Block(0, [], time(), "0")
        self.chain.append(genesis_block)

    def get_latest_block(self):
        return self.chain[-1]

    def register_voter(self, voter_id, voter):
        """Register a new voter with their public key"""
        if voter_id not in self.registered_voters:
            self.registered_voters[voter_id] = {
                'public_key': voter.get_public_key_bytes(),
                'has_voted': False
            }
            return True
        return False

    def register_candidate(self, candidate_id, name, party):
        """Register a new candidate"""
        if candidate_id not in self.candidates:
            self.candidates[candidate_id] = {
                'name': name,
                'party': party,
                'vote_count': 0
            }
            return True
        return False

    def verify_vote(self, vote_data, signature, voter_public_key):
        """Verify the vote signature"""
        try:
            public_key = serialization.load_pem_public_key(voter_public_key)
            public_key.verify(
                base64.b64decode(signature),
                json.dumps(vote_data).encode(),
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return True
        except Exception:
            return False

    def add_vote(self, voter_id, candidate_id, signature):
        """Add a new vote to pending transactions"""
        if voter_id not in self.registered_voters:
            return False, "Voter not registered"

        if self.registered_voters[voter_id]['has_voted']:
            return False, "Voter has already cast a vote"

        if candidate_id not in self.candidates:
            return False, "Invalid candidate"

        vote_data = {
            'voter_id': voter_id,
            'candidate_id': candidate_id,
            'timestamp': time()
        }

        # Verify vote signature
        if not self.verify_vote(
            vote_data,
            signature,
            self.registered_voters[voter_id]['public_key']
        ):
            return False, "Invalid vote signature"

        self.pending_transactions.append({
            'vote_data': vote_data,
            'signature': signature
        })

        self.registered_voters[voter_id]['has_voted'] = True
        return True, "Vote accepted"

    def mine_pending_transactions(self):
        """Mine pending transactions into a new block"""
        if not self.pending_transactions:
            return False

        block = Block(
            len(self.chain),
            self.pending_transactions,
            time(),
            self.get_latest_block().hash
        )

        # Proof of Work
        while block.hash[:self.difficulty] != "0" * self.difficulty:
            block.nonce += 1
            block.hash = block.calculate_hash()

        # Add block to chain and clear pending transactions
        self.chain.append(block)
        self.pending_transactions = []

        # Update vote counts
        for transaction in block.transactions:
            candidate_id = transaction['vote_data']['candidate_id']
            self.candidates[candidate_id]['vote_count'] += 1

        return True

    def get_vote_count(self):
        """Get current vote counts for all candidates"""
        return {
            candidate_id: {
                'name': info['name'],
                'party': info['party'],
                'votes': info['vote_count']
            }
            for candidate_id, info in self.candidates.items()
        }

    def verify_chain(self):
        """Verify the integrity of the blockchain"""
        for i in range(1, len(self.chain)):
            current_block = self.chain[i]
            previous_block = self.chain[i-1]

            # Verify block hash
            if current_block.hash != current_block.calculate_hash():
                return False

            # Verify chain linkage
            if current_block.previous_hash != previous_block.hash:
                return False

        return True

def demo_voting_system():
    # Initialize blockchain
    voting_chain = VotingBlockchain(difficulty=4)

    # Register candidates
    voting_chain.register_candidate("C1", "John Doe", "Party A")
    voting_chain.register_candidate("C2", "Jane Smith", "Party B")

    # Create and register voters
    voters = {
        "V1": Voter("V1"),
        "V2": Voter("V2"),
        "V3": Voter("V3")
    }

    for voter_id, voter in voters.items():
        voting_chain.register_voter(voter_id, voter)

    # Simulate voting
    for voter_id, voter in voters.items():
        # Randomly vote for candidates (in real system, this would be user input)
        candidate_id = "C1" if hash(voter_id) % 2 == 0 else "C2"

        vote_data = {
            'voter_id': voter_id,
            'candidate_id': candidate_id,
            'timestamp': time()
        }

        # Sign and submit vote
        signature = voter.sign_vote(vote_data)
        success, message = voting_chain.add_vote(voter_id, candidate_id, signature)
        print(f"Voter {voter_id} vote status: {message}")

    # Mine the block of votes
    voting_chain.mine_pending_transactions()

    # Print results
    print("\nVoting Results:")
    results = voting_chain.get_vote_count()
    for candidate_id, info in results.items():
        print(f"{info['name']} ({info['party']}): {info['votes']} votes")

    # Verify chain integrity
    print(f"\nBlockchain integrity: {'Valid' if voting_chain.verify_chain() else 'Invalid'}")

if __name__ == "__main__":
    demo_voting_system()

Voter V1 vote status: Invalid vote signature
Voter V2 vote status: Invalid vote signature
Voter V3 vote status: Invalid vote signature

Voting Results:
John Doe (Party A): 0 votes
Jane Smith (Party B): 0 votes

Blockchain integrity: Valid


In [None]:
!pip install cryptography flask



In [None]:
import hashlib
import json
from time import time
from uuid import uuid4
import random
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
from cryptography.fernet import Fernet
import base64
from flask import Flask, request, jsonify
from threading import Lock

class EnhancedVoter:
    def __init__(self, voter_id):
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048
        )
        self.public_key = self.private_key.public_key()
        self.voter_id = voter_id
        self.encryption_key = Fernet.generate_key()
        self.cipher_suite = Fernet(self.encryption_key)

    def encrypt_vote(self, vote_data):
        return self.cipher_suite.encrypt(json.dumps(vote_data).encode())

    def sign_vote(self, encrypted_vote):
        signature = self.private_key.sign(
            encrypted_vote,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return base64.b64encode(signature).decode()

    def get_public_key_bytes(self):
        return self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )

class EnhancedBlock:
    def __init__(self, index, transactions, timestamp, previous_hash):
        self.index = index
        self.transactions = transactions
        self.timestamp = timestamp
        self.previous_hash = previous_hash
        self.nonce = 0
        self.merkle_root = self.calculate_merkle_root()
        self.hash = self.calculate_hash()

    def calculate_merkle_root(self):
        if not self.transactions:
            return hashlib.sha256("empty".encode()).hexdigest()

        hash_list = [hashlib.sha256(str(t).encode()).hexdigest() for t in self.transactions]
        while len(hash_list) > 1:
            if len(hash_list) % 2 != 0:
                hash_list.append(hash_list[-1])
            hash_list = [hashlib.sha256((hash_list[i] + hash_list[i+1]).encode()).hexdigest()
                        for i in range(0, len(hash_list), 2)]
        return hash_list[0]

    def calculate_hash(self):
        block_string = json.dumps({
            "index": self.index,
            "timestamp": self.timestamp,
            "previous_hash": self.previous_hash,
            "merkle_root": self.merkle_root,
            "nonce": self.nonce
        }, sort_keys=True)
        return hashlib.sha256(block_string.encode()).hexdigest()

class EnhancedVotingBlockchain:
    def __init__(self, difficulty=4):
        self.chain = []
        self.difficulty = difficulty
        self.pending_transactions = []
        self.registered_voters = {}
        self.candidates = {}
        self.voting_system = "fptp"
        self.lock = Lock()
        self.audit_log = []  # Initialize audit_log first
        self.create_genesis_block()  # Then create genesis block

    def log_audit(self, event_type, details):
        """Log events for audit purposes"""
        self.audit_log.append({
            "timestamp": time(),
            "event_type": event_type,
            "details": details
        })

    def create_genesis_block(self):
        """Create the initial block in the chain"""
        genesis_block = EnhancedBlock(
            index=0,
            transactions=[],
            timestamp=time(),
            previous_hash="0"
        )
        self.chain.append(genesis_block)
        self.log_audit("genesis", "Genesis block created")

    def register_candidate(self, candidate_id, name, party):
        """Register a new candidate"""
        with self.lock:
            if candidate_id not in self.candidates:
                self.candidates[candidate_id] = {
                    'name': name,
                    'party': party,
                    'vote_count': 0
                }
                self.log_audit("candidate_registration", f"Candidate registered: {name} ({party})")
                return True
            return False

    def set_voting_system(self, system):
        """Set voting system: 'fptp' or 'ranked_choice'"""
        if system in ["fptp", "ranked_choice"]:
            self.voting_system = system
            self.log_audit("system_change", f"Voting system changed to {system}")
            return True
        return False

    def register_voter(self, voter_id, voter):
        """Register a new voter with enhanced security"""
        with self.lock:
            if voter_id in self.registered_voters:
                self.log_audit("failed_registration", f"Duplicate voter ID: {voter_id}")
                return False

            self.registered_voters[voter_id] = {
                'public_key': voter.get_public_key_bytes(),
                'has_voted': False,
                'registration_time': time(),
                'encryption_key': voter.encryption_key
            }

            self.log_audit("voter_registration", f"Voter registered: {voter_id}")
            return True

    def add_ranked_choice_vote(self, voter_id, rankings, signature):
        """Add a ranked choice vote"""
        if not all(c in self.candidates for c in rankings):
            return False, "Invalid candidate in rankings"

        vote_data = {
            'voter_id': voter_id,
            'rankings': rankings,
            'timestamp': time()
        }

        return self._process_vote(voter_id, vote_data, signature)

    def _process_vote(self, voter_id, vote_data, signature):
        """Internal method to process votes"""
        with self.lock:
            if not self._validate_vote(voter_id, vote_data, signature):
                return False, "Vote validation failed"

            self.pending_transactions.append({
                'vote_data': vote_data,
                'signature': signature,
                'timestamp': time()
            })

            self.registered_voters[voter_id]['has_voted'] = True
            self.log_audit("vote_cast", f"Vote cast by {voter_id}")
            return True, "Vote accepted"

    def _validate_vote(self, voter_id, vote_data, signature):
        """Enhanced vote validation"""
        if voter_id not in self.registered_voters:
            self.log_audit("invalid_vote", f"Unregistered voter: {voter_id}")
            return False

        if self.registered_voters[voter_id]['has_voted']:
            self.log_audit("invalid_vote", f"Double voting attempt: {voter_id}")
            return False

        return True

    def mine_pending_transactions(self):
        """Mine pending transactions into a new block"""
        if not self.pending_transactions:
            return False

        new_block = EnhancedBlock(
            len(self.chain),
            self.pending_transactions,
            time(),
            self.chain[-1].hash
        )

        # Proof of Work
        while new_block.hash[:self.difficulty] != "0" * self.difficulty:
            new_block.nonce += 1
            new_block.hash = new_block.calculate_hash()

        self.chain.append(new_block)
        self.pending_transactions = []
        self.log_audit("block_mined", f"New block mined: {new_block.hash}")
        return True

    def count_ranked_choice_votes(self):
        """Count ranked choice votes using instant runoff"""
        votes = []
        for block in self.chain[1:]:  # Skip genesis block
            for transaction in block.transactions:
                if 'rankings' in transaction['vote_data']:
                    votes.append(transaction['vote_data']['rankings'])

        eliminated = set()
        while len(eliminated) < len(self.candidates) - 1:
            counts = {}
            for vote in votes:
                for candidate in vote:
                    if candidate not in eliminated:
                        counts[candidate] = counts.get(candidate, 0) + 1
                        break

            if not counts:  # Handle edge case
                break

            min_votes = min(counts.values())
            for candidate, count in counts.items():
                if count == min_votes:
                    eliminated.add(candidate)
                    break

        winner = next((c for c in self.candidates if c not in eliminated), None)
        self.log_audit("vote_count", f"Winner determined: {winner}")
        return winner

    def get_audit_log(self):
        """Get the audit log"""
        return self.audit_log

# Flask API setup
app = Flask(__name__)
blockchain = EnhancedVotingBlockchain()

# [Rest of the API routes remain the same]

def demo_enhanced_system():
    """Demo the enhanced voting system"""
    print("Initializing blockchain voting system...")
    voting_chain = EnhancedVotingBlockchain(difficulty=4)

    print("\nRegistering candidates...")
    candidates = ["C1", "C2", "C3"]
    for cid in candidates:
        voting_chain.register_candidate(cid, f"Candidate {cid}", f"Party {cid}")

    print("\nSetting up ranked choice voting...")
    voting_chain.set_voting_system("ranked_choice")

    print("\nRegistering voters...")
    voters = {
        f"V{i}": EnhancedVoter(f"V{i}") for i in range(1, 4)
    }

    for voter_id, voter in voters.items():
        voting_chain.register_voter(voter_id, voter)

    print("\nSimulating voting process...")
    for voter_id, voter in voters.items():
        rankings = random.sample(candidates, len(candidates))
        vote_data = {
            'voter_id': voter_id,
            'rankings': rankings,
            'timestamp': time()
        }
        encrypted_vote = voter.encrypt_vote(vote_data)
        signature = voter.sign_vote(encrypted_vote)
        success, message = voting_chain.add_ranked_choice_vote(
            voter_id,
            rankings,
            signature
        )
        print(f"Voter {voter_id} vote status: {message}")

    print("\nMining votes into blockchain...")
    voting_chain.mine_pending_transactions()

    winner = voting_chain.count_ranked_choice_votes()
    print(f"\nRanked Choice Winner: Candidate {winner}")

    print("\nAudit Log:")
    for entry in voting_chain.get_audit_log():
        print(f"{entry['event_type']}: {entry['details']}")

if __name__ == "__main__":
    print("Starting blockchain voting system demo...")
    demo_enhanced_system()

Starting blockchain voting system demo...
Initializing blockchain voting system...

Registering candidates...

Setting up ranked choice voting...

Registering voters...

Simulating voting process...
Voter V1 vote status: Vote accepted
Voter V2 vote status: Vote accepted
Voter V3 vote status: Vote accepted

Mining votes into blockchain...

Ranked Choice Winner: Candidate C2

Audit Log:
genesis: Genesis block created
candidate_registration: Candidate registered: Candidate C1 (Party C1)
candidate_registration: Candidate registered: Candidate C2 (Party C2)
candidate_registration: Candidate registered: Candidate C3 (Party C3)
system_change: Voting system changed to ranked_choice
voter_registration: Voter registered: V1
voter_registration: Voter registered: V2
voter_registration: Voter registered: V3
vote_cast: Vote cast by V1
vote_cast: Vote cast by V2
vote_cast: Vote cast by V3
block_mined: New block mined: 00002a1789f19209036618ce6c5107871ef82df7206881076f0b602e7540b5ae
vote_count: Winner