In [None]:
%python
!pip install protobuf==3.20.2
!pip install onnx==1.10.1
!pip install onnxmltools==1.9.0


In [1]:
%python
!pip install pyspark onnx scikit-learn skl2onnx onnxmltools pandas

In [2]:
%python
# ... (your existing imports)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col,when
from pyspark.sql.types import StringType, DoubleType
from pyspark.ml.feature import Tokenizer, HashingTF, IDF ,StringIndexer
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import split
import onnxmltools
from onnxmltools.convert.common.data_types import StringType


In [3]:
%python
spark = SparkSession.builder.appName("SentimentAnalysis").master("local[*]").config("spark.driver.maxResultSize", "8g").config("spark.executor.memory", "6g").getOrCreate()
print("SparkSession created successfully")

test_file = "hdfs://namenode:9000/user/input/test.ft.txt"
train_file = "hdfs://namenode:9000/user/input/train.ft.txt"

test_df = spark.read.text(test_file)
train_df = spark.read.text(train_file)

print("\nFirst few rows of test.ft.txt:")
#test_df.show(5, truncate=False)

print("\nFirst few rows of train.ft.txt:")
#train_df.show(5, truncate=False)
print("---------------------------__--------------:")

In [4]:
%python
# drop nan
test_df = test_df.na.drop()
train_df = train_df.na.drop()

test_df.count()
train_df.count()

# Train DataFrame
split_col_tr = split(train_df["value"], " ", 2)
train_df = train_df.withColumn("label", when(split_col_tr.getItem(0) == "__label__1", 1.0).otherwise(2.0))
train_df = train_df.withColumn("Comment", split_col_tr.getItem(1))

# Test DataFrame
split_col_ts = split(test_df["value"], " ", 2)
test_df = test_df.withColumn("label", when(split_col_ts.getItem(0) == "__label__1", 1.0).otherwise(2.0))
test_df = test_df.withColumn("Comment", split_col_ts.getItem(1))

# Use only the "Comment" column as the feature
train_processed_df = train_df.select("Comment", "label")
test_processed_df = test_df.select("Comment", "label")

print("After preprocessing:")
train_processed_df.show(4, truncate=False)
test_processed_df.show(4, truncate=False)

train_processed_set, val_processed_set = train_processed_df.randomSplit([0.90, 0.10], seed=2000)

In [5]:
%python
# Continue with the rest of your code for model training and evaluation...

models = [
    ('NaiveBayes', NaiveBayes()),
    ('RandomForestClassifier', RandomForestClassifier())
]

best_model = None
best_accuracy = 0.0

for name, model in models:
    tokenizer = Tokenizer(inputCol="Comment", outputCol="words")
    hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=5000)
    idf = IDF(inputCol="rawFeatures", outputCol="features")

    indexer = StringIndexer(inputCol="label", outputCol="label_indexed")
    classifier = model.setLabelCol("label_indexed").setFeaturesCol("features")

    pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, indexer, classifier])

    print(f"Training {name} model...")
    model_fit = pipeline.fit(train_processed_set)
    print(f"{name} model trained successfully.")

    predictions = model_fit.transform(train_processed_set)
    predictions_val = model_fit.transform(val_processed_set)

    print("predictions-..........")
    #predictions.show(7, truncate=False)
    print("predictions_val-..........")
    #predictions_val.show(7, truncate=False)

    evaluator = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print(f'{name} Accuracy: {accuracy}')

    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_model = model_fit

In [6]:
%python
import onnxmltools
from onnxmltools.convert import convert_sparkml
from onnxmltools.utils import save_model
from onnxmltools.convert.common.data_types import StringType, FloatTensorType

# Assuming best_model is your PipelineModel
# Extract the final stage of the pipeline (presumably your NaiveBayesModel)
final_model = best_model.stages[-1]
print(final_model)
# Define the input types based on your model
num_features = 5000  # Update this based on your actual number of features


# Convert the model to ONNX format
onnx_model = convert_sparkml(final_model, 'Test',[('features', FloatTensorType([1,num_features]))], spark_session=spark)

# Save the ONNX model
onnx_path = "/opt/zeppelin/"
save_model(onnx_model, 'best_model.onnx')

