In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import jieba
import time

def initialize_spark():
    return SparkSession.builder \
        .appName("TextClassification") \
        .master("local[12]") \
        .config("spark.executor.memory", "4g") \
        .config("spark.executor.cores", "5") \
        .config("spark.task.cpus", "1") \
        .config("spark.executor.instances", "2") \
        .config("spark.sql.auto.repartition", "true") \
        .config("spark.driver.memory", "8g") \
        .getOrCreate()

def load_data(spark, data_dir):
    return spark.read.json(data_dir)

def jieba_tokenizer(text):
    words = jieba.cut(text)
    return [word for word in words]

def register_jieba_udf(data):
    jieba_udf = udf(jieba_tokenizer, ArrayType(StringType()))
    return data.withColumn("words", jieba_udf(data["content"]))



# Train and evaluate a logistic regression model
def train_and_evaluate_model(train_data, test_data):
    # Feature extraction using HashingTF
    hashingTF = HashingTF(numFeatures=2**18, inputCol="words", outputCol="features")
    train_data = hashingTF.transform(train_data)
    test_data = hashingTF.transform(test_data)

    # Train a logistic regression model
    lr = LogisticRegression(maxIter=10, regParam=0.02, featuresCol="features")
    model = lr.fit(train_data)

    # Evaluate the model
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
    auc = evaluator.evaluate(predictions)
    accuracy = predictions.filter(predictions["prediction"] == predictions["label"]).count() / predictions.count()

    return model, auc, accuracy

# Save the trained model to disk
def save_model(model, model_path):
    model.save(model_path)

if __name__ == "__main__":
    start_time = time.time()
    
    # Initialize
    jieba.initialize() 
    spark = initialize_spark()

    # Load data
    data_dir = "/path/to/data"
    data = load_data(spark, data_dir)

    # Split data into train and test sets
    train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

    # Register Jieba tokenizer as a UDF
    train_data = register_jieba_udf(train_data)
    test_data = register_jieba_udf(test_data)

    # Train and evaluate the model
    model, auc, accuracy = train_and_evaluate_model(train_data, test_data)
    print("Area Under the ROC Curve (AUC):", auc)
    print("Accuracy:", accuracy)
    
    # Get size of wights
    num_weights = len(model.coefficients)
    print("Number of Weights (Coefficients) in the Model:", num_weights)
    
    # Save the model to disk
    model_path = "/path/to/model"
    save_model(model, model_path)  

    end_time = time.time()
    execution_time = end_time - start_time
    print("Code execution time:", execution_time, "seconds")