In [3]:
import os
import sys
import json
import pickle
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, lower, udf
from pyspark.sql.types import IntegerType, FloatType, ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, CountVectorizer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Set environment variable for Hadoop (but we won't use it for saving)
os.environ['HADOOP_HOME'] = 'C:\\Program Files\\hadoop'

# Configure Spark with proper settings for Windows
spark = SparkSession.builder \
    .appName("DarkPatternsDetection") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.warehouse.dir", "spark-warehouse") \
    .getOrCreate()

# Configure Hadoop for Windows - use pure Java implementation
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
hadoop_conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

print("Spark session created successfully")

# Define keyword dictionaries for dark pattern types
urgency_words = ["limited", "hurry", "soon", "now", "quick", "fast", "expire", "deadline", 
                "flash", "urgent", "today", "last", "ending", "final", "exclusive", "now or never"]

scarcity_words = ["only", "left", "few", "limited", "exclusive", "rare", "stock", "running out", 
                 "almost gone", "remaining", "last chance", "shortage", "sell out", "popular"]

social_proof_words = ["popular", "best seller", "trending", "others", "customers", "reviews", 
                       "rating", "people", "join", "everyone", "trending", "favorite", "recommended"]

# Create functions for pattern detection
def count_matches(text, word_list):
    if not text:
        return 0
    text_lower = text.lower()
    count = 0
    for word in word_list:
        if word.lower() in text_lower:
            count += 1
    return count

# Register UDFs
urgency_udf = udf(lambda text: count_matches(text, urgency_words), IntegerType())
scarcity_udf = udf(lambda text: count_matches(text, scarcity_words), IntegerType())
social_proof_udf = udf(lambda text: count_matches(text, social_proof_words), IntegerType())
text_length_udf = udf(lambda text: len(text.split()) if text else 0, IntegerType())
caps_ratio_udf = udf(lambda t: sum(1 for c in t if c.isupper()) / max(len(t), 1) if t else 0.0, FloatType())
words_udf = udf(lambda text: text.lower().split() if text else [], ArrayType(StringType()))

# Load and preprocess data
print("Loading data...")
df = spark.read.csv("Data/websitedata.tsv", header=True, inferSchema=True, sep="\t")

# Print schema and sample data
print("Original schema:")
df.printSchema()
print("\nSample data:")
df.show(5, truncate=False)

# Clean and preprocess
df = df.drop("page_id")
df = df.na.drop(subset=["text", "label"])
df = df.withColumnRenamed("label", "label_index")
df = df.withColumn("label_index", col("label_index").cast("integer"))
df = df.na.drop(subset=["label_index"])

# Print data summary
print(f"Loaded {df.count()} records")
print("Label distribution:")
df.groupBy("label_index").count().show()

# Add custom features
print("Adding custom features...")
df = df.withColumn("text_cleaned", regexp_replace(lower(col("text")), "[^a-zA-Z0-9\\s]", " "))
df = df.withColumn("urgency_score", urgency_udf(col("text")))
df = df.withColumn("scarcity_score", scarcity_udf(col("text")))
df = df.withColumn("social_proof_score", social_proof_udf(col("text")))
df = df.withColumn("text_length", text_length_udf(col("text")))
df = df.withColumn("caps_ratio", caps_ratio_udf(col("text")))

# Instead of using the ML Pipeline's tokenizer which causes conflicts,
# precompute the tokens directly as a feature
df = df.withColumn("words_array", words_udf(col("text_cleaned")))

# Cache the dataframe
df = df.repartition(10)
df.cache()

# Create TF-IDF features from precomputed words
print("Creating TF-IDF features...")
cv = CountVectorizer(inputCol="words_array", outputCol="word_counts", vocabSize=1000, minDF=2.0)
cv_model = cv.fit(df)
df_counts = cv_model.transform(df)

idf = IDF(inputCol="word_counts", outputCol="tfidf_features")
idf_model = idf.fit(df_counts)
df_tfidf = idf_model.transform(df_counts)

# Train a model using direct approach without saving pipeline
from pyspark.ml.feature import VectorAssembler

# Create feature vector
print("Assembling features...")
assembler = VectorAssembler(
    inputCols=["tfidf_features", "urgency_score", "scarcity_score", 
               "social_proof_score", "text_length", "caps_ratio"],
    outputCol="features"
)
df_assembled = assembler.transform(df_tfidf)

# Split data
print("Splitting data...")
train, test = df_assembled.randomSplit([0.8, 0.2], seed=42)
print(f"Training set: {train.count()} samples, Test set: {test.count()} samples")

# Train LogisticRegression model
print("Training Logistic Regression model...")
lr = LogisticRegression(featuresCol="features", labelCol="label_index", maxIter=20)
lr_model = lr.fit(train)

# Evaluate model
print("Evaluating model...")
lr_predictions = lr_model.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="label_index", metricName="accuracy")
accuracy = evaluator.evaluate(lr_predictions)

binary_evaluator = BinaryClassificationEvaluator(labelCol="label_index")
auroc = binary_evaluator.evaluate(lr_predictions)

print(f"Logistic Regression - Accuracy: {accuracy:.4f}, AUC: {auroc:.4f}")

# Try RandomForest as well
print("Training Random Forest model...")
rf = RandomForestClassifier(featuresCol="features", labelCol="label_index", numTrees=20)
rf_model = rf.fit(train)

rf_predictions = rf_model.transform(test)
rf_accuracy = evaluator.evaluate(rf_predictions)
rf_auroc = binary_evaluator.evaluate(rf_predictions)

print(f"Random Forest - Accuracy: {rf_accuracy:.4f}, AUC: {rf_auroc:.4f}")

# Choose the best model
if rf_accuracy > accuracy:
    print("Random Forest performs better - using it as final model")
    best_model = rf_model
    best_accuracy = rf_accuracy
    best_auroc = rf_auroc
    model_type = "RandomForest"
else:
    print("Logistic Regression performs better - using it as final model")
    best_model = lr_model
    best_accuracy = accuracy
    best_auroc = auroc
    model_type = "LogisticRegression"

# Create model directory if it doesn't exist
model_dir = "models"
if not os.path.exists(model_dir):
    os.makedirs(model_dir)

# Save model metadata as JSON - this approach avoids Hadoop native library issues
print("Exporting model metadata for Chrome extension...")
model_metadata = {
    "model_type": model_type,
    "version": "1.0",
    "accuracy": float(best_accuracy),
    "auc": float(best_auroc),
    "trainDate": "2025-03-01",
    "features": {
        "urgency_words": urgency_words,
        "scarcity_words": scarcity_words,
        "social_proof_words": social_proof_words,
        "countVectorizerVocabSize": 1000
    }
}

# Add model-specific parameters
if model_type == "LogisticRegression":
    # Extract coefficients and intercept for the JavaScript implementation
    coefficients = best_model.coefficients.toArray().tolist()
    intercept = float(best_model.intercept)
    model_metadata["coefficients"] = coefficients
    model_metadata["intercept"] = intercept
    model_metadata["regParam"] = best_model.getRegParam()
    model_metadata["elasticNetParam"] = best_model.getElasticNetParam()
else:  # RandomForest
    model_metadata["numTrees"] = best_model.getNumTrees()
    model_metadata["maxDepth"] = best_model.getMaxDepth()
    
    # For Random Forest, extract feature importances
    if hasattr(best_model, "featureImportances"):
        importances = best_model.featureImportances.toArray().tolist()
        model_metadata["featureImportances"] = importances
        print("\nTop feature importances:")
        for i in range(min(10, len(importances))):
            print(f"Feature {i}: {importances[i]:.4f}")

# Save vocabulary from CountVectorizer if available
if hasattr(cv_model, "vocabulary"):
    # Save only a subset of the vocabulary to keep the JSON size reasonable
    top_vocab = cv_model.vocabulary[:100]  # Save first 100 words
    model_metadata["vocabulary"] = top_vocab

# Save model metadata to JSON
metadata_path = os.path.join(model_dir, "model_metadata.json")
with open(metadata_path, "w") as f:
    json.dump(model_metadata, f)
print(f"Saved model metadata to {metadata_path}")


Spark session created successfully
Loading data...
Original schema:
root
 |-- page_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- label: string (nullable = true)
 |-- Pattern Category: string (nullable = true)


Sample data:
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------------+
|page_id|text                                                                                                                                                                                                               |label|Pattern Category|
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------------+
|1012   |FLASH SA

In [5]:
# Function to predict if a webpage contains dark patterns
def predict_dark_patterns(webpage_text):
    input_data = spark.createDataFrame([(webpage_text,)], ["text"])
    input_data = input_data.withColumn("text_cleaned", regexp_replace(lower(col("text")), "[^a-zA-Z0-9\\s]", " "))
    input_data = input_data.withColumn("urgency_score", urgency_udf(col("text")))
    input_data = input_data.withColumn("scarcity_score", scarcity_udf(col("text")))
    input_data = input_data.withColumn("social_proof_score", social_proof_udf(col("text")))
    input_data = input_data.withColumn("text_length", text_length_udf(col("text")))
    input_data = input_data.withColumn("caps_ratio", caps_ratio_udf(col("text")))
    input_data = input_data.withColumn("words_array", words_udf(col("text_cleaned")))

    input_data_counts = cv_model.transform(input_data)
    input_data_tfidf = idf_model.transform(input_data_counts)
    input_data_features = assembler.transform(input_data_tfidf)

    prediction = best_model.transform(input_data_features)
    result = prediction.select("prediction").collect()[0][0]
    return bool(result)

# Example usage
webpage_text = input("Enter the text content of the webpage: ")
contains_dark_patterns = predict_dark_patterns(webpage_text)
print(f"Contains dark patterns: {contains_dark_patterns}")

Enter the text content of the webpage:  What is Lorem Ipsum? Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.  Why do we use it? It is a long established fact that a reader will be distracted by the readable content of a page when looking at its layout. The point of using Lorem Ipsum is that it has a more-or-less normal distribution of letters, as opposed to using 'Content here, content here', making it look like readable English. Many desktop publishing packages and we

Contains dark patterns: False


In [7]:
# Function to predict if a webpage contains dark patterns
def predict_dark_patterns(webpage_text):
    input_data = spark.createDataFrame([(webpage_text,)], ["text"])
    input_data = input_data.withColumn("text_cleaned", regexp_replace(lower(col("text")), "[^a-zA-Z0-9\\s]", " "))
    input_data = input_data.withColumn("urgency_score", urgency_udf(col("text")))
    input_data = input_data.withColumn("scarcity_score", scarcity_udf(col("text")))
    input_data = input_data.withColumn("social_proof_score", social_proof_udf(col("text")))
    input_data = input_data.withColumn("text_length", text_length_udf(col("text")))
    input_data = input_data.withColumn("caps_ratio", caps_ratio_udf(col("text")))
    input_data = input_data.withColumn("words_array", words_udf(col("text_cleaned")))

    input_data_counts = cv_model.transform(input_data)
    input_data_tfidf = idf_model.transform(input_data_counts)
    input_data_features = assembler.transform(input_data_tfidf)

    prediction = best_model.transform(input_data_features)
    result = prediction.select("prediction").collect()[0][0]
    return bool(result)

# Example usage
webpage_text = input("Enter the text content of the webpage: ")
contains_dark_patterns = predict_dark_patterns(webpage_text)
print(f"Contains dark patterns: {contains_dark_patterns}")

Enter the text content of the webpage:  Skip to Main content About this item About this item Buying options Compare with similar items Reviews Keyboard shortcuts Search alt + / Cart shift + alt + C Home shift + alt + H Orders shift + alt + O  Add to cart shift + alt + K  Show/Hide shortcuts shift + alt + Z To move between items, use your keyboard's up or down arrows. .in Delivering to Vellore 632106 Update location Home & Kitchen  Home & Kitchen Search Amazon.in EN Hello, sign in Account & Lists Returns & Orders 0 Cart All Fresh MX Player Sell Bestsellers Today's Deals Mobiles Prime Customer Service New Releases Electronics Fashion Amazon Pay Home & Kitchen Computers Books Car & Motorbike Toys & Games Home Improvement Beauty & Personal Care Sports, Fitness & Outdoors Gift Cards Custom Products Grocery & Gourmet Foods Health, Household & Personal Care Baby Video Games Pet Supplies Gift Ideas Audible AmazonBasics Subscribe & Save Kindle eBooks Amazon Home Kitchen & Home Appliances Large 

Contains dark patterns: True
