In [3]:
!pip install pyspark
!pip install sparknlp
!pip install transformers torch
# 0. Mount Drive & install deps
from google.colab import drive
drive.mount('/content/drive')

Collecting sparknlp
  Downloading sparknlp-1.0.0-py3-none-any.whl.metadata (1.2 kB)
Downloading sparknlp-1.0.0-py3-none-any.whl (1.4 kB)
Installing collected packages: sparknlp
Successfully installed sparknlp-1.0.0
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# 1. Imports & Spark NLP start
import sparknlp
from pyspark.sql import SparkSession
spark = sparknlp.start()

import torch
from transformers import BertTokenizer, BertModel

import pandas as pd  # 新增

from pyspark.sql.functions import pandas_udf, udf
from pyspark.sql.types import ArrayType, DoubleType  # 修改为 DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT

from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover, HashingTF, IDF,
    NGram, Word2Vec, StringIndexer
)
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import matplotlib.pyplot as plt
import numpy as np

# 2. Load data
data = (spark.read
        .option("header", True)
        .option("inferSchema", True)
        .csv("/content/drive/MyDrive/Colab Notebooks/processed_data6Financial(2).csv")
        .select("Phrase", "Sentiment"))
label_idx = StringIndexer(inputCol="Sentiment", outputCol="label")
data = label_idx.fit(data).transform(data)
train, test = data.randomSplit([0.8, 0.2], seed=42)

# 3. Pipelines for TF-IDF, Word2Vec, N-Gram (unchanged)
tokenizer = Tokenizer(inputCol="Phrase", outputCol="words")
remover   = StopWordsRemover(inputCol="words", outputCol="filtered")

tfidf_pipeline = Pipeline(stages=[
    tokenizer, remover,
    HashingTF(inputCol="filtered", outputCol="rawTF", numFeatures=20000),
    IDF(inputCol="rawTF", outputCol="features"),
    OneVsRest(classifier=LinearSVC(featuresCol="features", labelCol="label"))
])
w2v_pipeline = Pipeline(stages=[
    tokenizer, remover,
    Word2Vec(vectorSize=100, minCount=0, inputCol="filtered", outputCol="features"),
    OneVsRest(classifier=LinearSVC(featuresCol="features", labelCol="label"))
])
ngram_pipeline = Pipeline(stages=[
    tokenizer, remover,
    NGram(n=2, inputCol="filtered", outputCol="ngrams"),
    HashingTF(inputCol="ngrams", outputCol="rawTF", numFeatures=20000),
    IDF(inputCol="rawTF", outputCol="features"),
    OneVsRest(classifier=LinearSVC(featuresCol="features", labelCol="label"))
])

# 4. BERT embeddings via Pandas UDF（已修改）
# Broadcast tokenizer & model
tok = BertTokenizer.from_pretrained('bert-base-uncased')
mdl = BertModel.from_pretrained('bert-base-uncased').eval()
bc_tok = spark.sparkContext.broadcast(tok)
bc_mdl = spark.sparkContext.broadcast(mdl)

@pandas_udf(ArrayType(DoubleType()))  # 修改为 DoubleType
def bert_embed_udf(texts: pd.Series) -> pd.Series:
    tk = bc_tok.value
    md = bc_mdl.value
    embs = []
    for txt in texts:
        inputs = tk(
            txt,
            padding='max_length',
            truncation=True,
            max_length=64,
            return_tensors='pt'
        )
        with torch.no_grad():
            out = md(**inputs)
        # 转到 CPU，确保是 float64
        vec = out.last_hidden_state[:,0,:] \
               .squeeze() \
               .cpu() \
               .numpy() \
               .astype('float64') \
               .tolist()
        embs.append(vec)
    return pd.Series(embs)  # 返回 pandas.Series

# UDF to convert array<double> to VectorUDT
vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# Prepare BERT train & test
train_bert = train.withColumn("bert_array", bert_embed_udf("Phrase")) \
                  .withColumn("features", vector_udf("bert_array"))
test_bert  = test.withColumn("bert_array", bert_embed_udf("Phrase")) \
                 .withColumn("features", vector_udf("bert_array"))

# OneVsRest on BERT (unchanged)
svm_bert = LinearSVC(featuresCol="features", labelCol="label", maxIter=100)
ovr_bert = OneVsRest(classifier=svm_bert, labelCol="label", featuresCol="features")

# 5. Evaluate baseline (unchanged)
evaluator = MulticlassClassificationEvaluator(
    metricName="accuracy",
    labelCol="label",
    predictionCol="prediction"
)

baseline_acc = {}
for name, pipe in [("TF-IDF", tfidf_pipeline),
                   ("Word2Vec", w2v_pipeline),
                   ("N-Gram", ngram_pipeline)]:
    model = pipe.fit(train)
    preds = model.transform(test)
    baseline_acc[name] = evaluator.evaluate(preds)

model_bert = ovr_bert.fit(train_bert)
preds_bert = model_bert.transform(test_bert)
baseline_acc["BERT"] = evaluator.evaluate(preds_bert)
print("Baseline Accuracies:", baseline_acc)

# 6. Hyperparameter tuning (unchanged)
tuned_acc = {}
for name, pipe in [("TF-IDF", tfidf_pipeline),
                   ("Word2Vec", w2v_pipeline),
                   ("N-Gram", ngram_pipeline)]:
    ovr_stage = pipe.getStages()[-1]
    cls = ovr_stage.getClassifier()
    grid = (ParamGridBuilder()
            .addGrid(cls.maxIter, [50, 100])
            .addGrid(cls.regParam, [0.001, 0.01])
            .build())
    cv = CrossValidator(
        estimator=pipe,
        estimatorParamMaps=grid,
        evaluator=evaluator,
        numFolds=3
    )
    cvm = cv.fit(train)
    preds = cvm.transform(test)
    tuned_acc[name] = evaluator.evaluate(preds)

grid_bert = (ParamGridBuilder()
             .addGrid(svm_bert.maxIter, [50, 100])
             .addGrid(svm_bert.regParam, [0.001, 0.01])
             .build())
cv_bert = CrossValidator(
    estimator=ovr_bert,
    estimatorParamMaps=grid_bert,
    evaluator=evaluator,
    numFolds=3
)
cvm_bert = cv_bert.fit(train_bert)
preds_bert_tuned = cvm_bert.transform(test_bert)
tuned_acc["BERT"] = evaluator.evaluate(preds_bert_tuned)
print("Tuned Accuracies:", tuned_acc)

# 7. Plot comparison (unchanged)
methods = list(baseline_acc.keys())
base_vals = [baseline_acc[m] for m in methods]
tuned_vals = [tuned_acc[m] for m in methods]

idx = np.arange(len(methods))
w = 0.35
plt.figure()
plt.bar(idx, base_vals, w, label="Baseline")
plt.bar(idx + w, tuned_vals, w, label="Tuned")
plt.xticks(idx + w/2, methods)
plt.ylabel("Accuracy")
plt.title("Baseline vs Tuned Accuracy")
plt.legend()
plt.show()


