In [7]:
import findspark
findspark.init()
import pandas as pd
import time
import os
import psutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType, col
from pyspark.sql.types import StructType, StructField, StringType
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes


# Create SparkSession
spark = SparkSession.builder.appName("DatasetEncryption").getOrCreate()

# Read the CSV file
df = spark.read.csv(r"C:\Users\91974\Documents\Bitcoin_tweets.csv", header=True, inferSchema=True)

# Choose the number of partitions
partitions = 4
df = df.repartition(partitions)

# Define a function to derive the AES key from the ECC private key
def derive_aes_key(ecc_private_key, key_size):
    shared_secret = ecc_private_key.exchange(ec.ECDH(), ecc_private_key.public_key())
    derived_key = HKDF(
        algorithm=hashes.SHA256(),
        length=key_size // 8,
        salt=None,
        info=None,
        backend=default_backend()
    ).derive(shared_secret)
    return derived_key

# Generate an ECC private key
ecc_private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())

# Derive an AES key from the ECC private key
key_size = 128  # Choose a key size (128, 192, or 256 bits)
key = derive_aes_key(ecc_private_key, key_size)

# Create an AES cipher in CTR mode with the derived key
nonce = os.urandom(16)

# Define the AES encryption function
@pandas_udf(returnType=StructType([
    StructField("key", StringType()),
    StructField("nonce", StringType())
] + [StructField("encrypted_" + c, StringType()) for c in df.columns
] + [StructField(c, StringType()) for c in df.columns]), functionType=PandasUDFType.GROUPED_MAP)
def aes_encrypt(df: pd.DataFrame) -> pd.DataFrame:
    from cryptography.hazmat.primitives.asymmetric import ec
    from cryptography.hazmat.primitives.kdf.hkdf import HKDF
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

    # Generate an ECC private key
    ecc_private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())

    # Derive an AES key from the ECC private key
    key_size = 128  # Choose a key size (128, 192, or 256 bits)
    key = derive_aes_key(ecc_private_key, key_size)

    # Create an AES cipher in CTR mode with the derived key
    nonce = os.urandom(16)
    cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
    encryptor = cipher.encryptor()

    # Encrypt each value in the DataFrame using encryptor
    start_time = time.time()
    encrypted_cols = {}
    for c in df.columns:
        encrypted_cols["encrypted_" + c] = df[c].apply(lambda v: encryptor.update(v.encode()).hex() if v is not None else None)
    end_time = time.time()
    encrypted_df = pd.concat([df, pd.DataFrame(encrypted_cols)], axis=1)
    encrypted_df["key"] = key.hex()
    encrypted_df["nonce"] = nonce.hex()
    # Calculate CPU usage and memory usage
    cpu_usage = psutil.cpu_percent()
    memory_usage = psutil.virtual_memory().percent
    # Return the encrypted DataFrame
    print(f"Encryption time: {end_time - start_time:.4f} seconds")
    print(f"CPU usage: {cpu_usage:.2f}%")
    print(f"Memory usage: {memory_usage:.2f}%")
    return encrypted_df



# Define the AES decryption function
@pandas_udf(returnType=StructType([
    StructField("key", StringType())
] + [StructField("decrypted_" + c, StringType()) for c in df.columns
]), functionType=PandasUDFType.GROUPED_MAP)
def aes_decrypt(df: pd.DataFrame) -> pd.DataFrame:
    from cryptography.hazmat.primitives.asymmetric import ec
    from cryptography.hazmat.primitives.kdf.hkdf import HKDF
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
    # Get the AES key and nonce from the first row of the DataFrame
    key = bytes.fromhex(df.iloc[0]["key"])
    nonce = bytes.fromhex(df.iloc[0]["nonce"])

    # Create an AES cipher in CTR mode with the key and nonce
    cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
    decryptor = cipher.decryptor()

    # Decrypt each value in the DataFrame using the decryptor
    start_time = time.time()
    for c in df.columns:
        if c.startswith("encrypted_"):
            df["decrypted_" + c.replace("encrypted_", "")] = df[c].apply(lambda v: decryptor.update(bytes.fromhex(v)).decode() if v is not None else None)
    end_time = time.time()
    # Calculate CPU usage and memory usage
    cpu_usage = psutil.cpu_percent()
    memory_usage = psutil.virtual_memory().percent
    print(f"Decryption time: {end_time - start_time:.4f} seconds")
    print(f"CPU usage: {cpu_usage:.2f}%")
    print(f"Memory usage: {memory_usage:.2f}%")
    # Return the decrypted DataFrame
    decrypted_df = df[[c for c in df.columns if c.startswith("decrypted_") or c == "key"]]
    return decrypted_df

# Encrypt the dataset using AES
start_time = time.time()
df_encrypted = df.groupBy().apply(aes_encrypt)
end_time = time.time()
encryption_time = end_time - start_time
print(f"\nAverage encryption time: {encryption_time/partitions:.4f} seconds")
print(f"Overall encryption time: {encryption_time:.4f} seconds\n")
df_encrypted.show()

# Decrypt the dataset using AES
start_time = time.time()
df_decrypted = df_encrypted.groupBy().apply(aes_decrypt)
end_time = time.time()
decryption_time = end_time - start_time
print(f"\nAverage decryption time: {decryption_time/partitions:.4f} seconds")
print(f"Overall decryption time: {decryption_time:.4f} seconds\n")
df_decrypted.show()



Average encryption time: 0.0079 seconds
Overall encryption time: 0.0315 seconds

+--------------------+--------------------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------+
|                 key|               nonce| encrypted_user_name|encrypted_user_location|encrypted_user_description|           user_name|       user_location|    user_description|
+--------------------+--------------------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------+
|ebc26c2f3cb97e7c8...|f5c5dc0be12c2508b...|1c7ad38ff0ab30b8e...|                   null|                      null|         Muhib Rabby|                null|                null|
|ebc26c2f3cb97e7c8...|f5c5dc0be12c2508b...|5ce26d240689d9da4...|                   null|                      null|I think im 18 yea...|                null|                null|
|ebc26c2f3cb97e7c8...|f

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 60995)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\spark\python\lib\py4j-0.10.9.5-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\91974\anaconda3\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\spark\python\lib\py4j-0.10.9.5-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\spark\python\lib\py4j-0.10.9.5-src.zip\py4j\clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it