In [None]:
import org.apache.spark.sql.functions._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame

## data loading and preparation

In [2]:
val filePath = "buysell.csv"
val newFilePath = "new_data.csv"
val originalDF = spark.read
   .option("header", "true")
   .option("inferSchema", "true")
  .csv(newFilePath)
  .withColumnRenamed("BuySell","buySell")
  .withColumn("buySell",col("buySell").cast("double"))

z.show(originalDF)
// val TechDF = rawDF.filter(col("Category")==="Tech")

In [3]:
rawDF.printSchema

In [4]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lead, lag}

problemDF = rawDF.withColumn("difference",)

In [5]:
originalDF.columns

In [6]:
val rawDF = originalDF.select("Date", "Category", "avg_sentiment", "avg_subjectivity", "avg_articles_count", "buySell")
z.show(rawDF)

## modeling

In [8]:
// split the dataset
val sortedDF = rawDF.orderBy(asc("Date")) // replace the data with the proper name

val splitPoint = (sortedDF.count * 0.8).toInt  
val trainDF = sortedDF.limit(splitPoint)
val testDF = sortedDF.except(trainDF)

### linear regression

In [10]:
import org.apache.spark.ml.feature.RFormula
// val rFormula = new RFormula()
//   .setFormula("sectorIndex ~ . - Date - buySell")
//   .setFeaturesCol("features")
//   .setLabelCol("sectorIndex")
//   .setHandleInvalid("skip")
   
// import org.apache.spark.ml.regression.LinearRegression

// val linearRegression = new LinearRegression() 
//   .setFeaturesCol("features")
//   .setLabelCol("sectorIndex")
  
// val pipeline = new Pipeline().setStages(Array(rFormula, linearRegression))


// buysell as the label
import org.apache.spark.ml.feature.RFormula
val rFormula = new RFormula()
   .setFormula("buysell ~ . - Date - sectorIndex")
   .setFeaturesCol("features")
   .setLabelCol("buysell")
   .setHandleInvalid("skip")
   
import org.apache.spark.ml.regression.LinearRegression

val linearRegression = new LinearRegression() 
  .setFeaturesCol("features")
  .setLabelCol("buysell")
val pipeline = new Pipeline().setStages(Array(rFormula, linearRegression))

In [11]:
val linearPipelineModel = pipeline.fit(trainDF)
val predDF = linearPipelineModel.transform(testDF)

import org.apache.spark.ml.evaluation.RegressionEvaluator
val r2Evaluator = new RegressionEvaluator()
   .setPredictionCol("prediction")
   .setLabelCol("sectorIndex")
   .setMetricName("r2")
val r2 = r2Evaluator.evaluate(predDF)
println(f"r2 is $r2%.4f")

In [12]:
val categories = testDF.select("Category").distinct().collect().map(_(0).toString)


categories.foreach { category =>
  val filteredDF = testDF.filter(testDF("Category") === category)
  val predictions = linearPipelineModel.transform(filteredDF)
  
  val r2 = r2Evaluator.evaluate(predictions)
  println(s"Category: $category, R2 score: $r2")
}

### logistics regression

In [14]:
rawDF.columns

In [15]:
%spark
import org.apache.spark.ml.feature.RFormula
val rFormula = new RFormula()
  .setFormula("buySell ~ . - Date ")
  .setFeaturesCol("features")
  .setLabelCol("buySell")
  .setHandleInvalid("skip")

// // Assemble your features into a single vector column
// val assembler = new VectorAssembler()
//   .setInputCols(Array("Category", "avg_sentiment", "avg_subjectivity", "avg_articles_count"))
//   .setOutputCol("features")

// Define the logistic regression model
val logisticRegression = new LogisticRegression()
  .setLabelCol("buySell")
  .setFeaturesCol("features")
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Set up the pipeline with the stages
val logisticPipeline = new Pipeline()
  .setStages(Array(rFormula, logisticRegression))

// Fit the model
val logisticPipelineModel = logisticPipeline.fit(trainDF)

// Make predictions
val logisticPredDF = logisticPipelineModel.transform(testDF)


In [16]:
// This will filter out rows where 'buySell' or 'rawPrediction' is null
val cleanedDF = logisticPredDF.filter(col("buySell").isNotNull && col("rawPrediction").isNotNull)


import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("buySell")
  .setRawPredictionCol("rawPrediction")
  .setMetricName("areaUnderROC")

val accuracy = evaluator.evaluate(cleanedDF)
// val accuracy = evaluator.evaluate(logisticPredDF.withColumn("prediction",$"prediction".cast("boolean")))
println(s"Area Under ROC for logstics regression = $accuracy")


In [17]:
z.show(logisticPredDF)

 

### random forest

In [19]:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.sql.SparkSession

In [20]:
// Define the Random Forest Classifie
val rf = new RandomForestClassifier()
  .setLabelCol("buySell")
  .setFeaturesCol("features")

// Create the pipeline with the stages
val pipeline = new Pipeline()
  .setStages(Array(rFormula, rf))

// Train the model
val model = pipeline.fit(trainDF)

// Make predictions
val rfPredDF = model.transform(testDF)

val cleanedDF = rfPredDF.filter(col("buySell").isNotNull && col("rawPrediction").isNotNull)


In [21]:
// Evaluate the model
val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("buySell")
  .setRawPredictionCol("rawPrediction")
  .setMetricName("areaUnderROC")

val accuracy = evaluator.evaluate(cleanedDF)
println(s"Area Under ROC = $accuracy")

 
### polynomiaal regression

In [23]:
import org.apache.spark.ml.feature.PolynomialExpansion

val polyExpansion = new PolynomialExpansion()
  .setInputCol("features") // original features
  .setOutputCol("polyFeatures") // polynomial expansion of features
  .setDegree(2) // for quadratic terms
  
  
val logisticRegression = new LogisticRegression()
  .setLabelCol("buySell")
  .setFeaturesCol("polyFeatures")
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Set up the pipeline with the stages

val polyPipeline = new Pipeline()
  .setStages(Array(rFormula,polyExpansion, logisticRegression))

// Fit the model
val polyPipelineModel = logisticPipeline.fit(trainDF)

// Make predictions
val polyPredDF = polyPipelineModel.transform(testDF)
  

In [24]:
// This will filter out rows where 'buySell' or 'rawPrediction' is null
val cleanedDF = polyPredDF.filter(col("buySell").isNotNull && col("rawPrediction").isNotNull)


import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("buySell")
  .setRawPredictionCol("rawPrediction")
  .setMetricName("areaUnderROC")

val accuracy = evaluator.evaluate(cleanedDF)
// val accuracy = evaluator.evaluate(logisticPredDF.withColumn("prediction",$"prediction".cast("boolean")))
println(s"Area Under ROC for polynomial regression = $accuracy")
