In [0]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.hadoop.fs.{FileSystem, Path}
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.io.PrintWriter

In [1]:
val directoryPath = "/user/sc10648_nyu_edu/stocks/"

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

val csvFiles = fs.listStatus(new Path(directoryPath)).filter(_.getPath.getName.endsWith(".csv")).map(_.getPath.toString).take(20)

In [2]:
var allMetricsData = Seq.empty[(String, String, Double, Double, Double, Double, Long, Long)]

In [3]:
def processAndWriteToNewFolder(filePath: String, outputDir: String): Unit = {
    
    val rawDf = spark.read.option("header", false).option("inferSchema", "true").csv(filePath)
  
    val filteredRDD = rawDf.rdd.zipWithIndex().filter { case (_, idx) => idx >= 3 }.map(_._1)
    val filteredDf = spark.createDataFrame(filteredRDD, rawDf.schema)

    val columnNames = Seq("Date", "AdjClose", "Close", "Open", "High", "Low", "Volume")
    val finalDf = filteredDf.toDF(columnNames: _*)

    val selectedDf = finalDf.select($"Date", $"Close")
    val formattedDf = selectedDf.withColumn("Close", $"Close".cast("double")).withColumn("Date", to_date($"Date", "yyyy-MM-dd"))
  
    val startDate = formattedDf.select(min($"Date")).collect()(0)(0)
    val minClose = formattedDf.select(min($"Close")).collect()(0)(0).asInstanceOf[Double]
    val maxClose = formattedDf.select(max($"Close")).collect()(0)(0).asInstanceOf[Double]
    val avgClose = formattedDf.select(avg($"Close")).collect()(0)(0).asInstanceOf[Double]
    val stdDevClose = formattedDf.select(stddev($"Close")).collect()(0)(0).asInstanceOf[Double]
    val nullCount = formattedDf.filter($"Close".isNull).count()
    val zeroCount = formattedDf.filter($"Close" === 0).count()
  
    
    val fileNameWithoutExtension = filePath.split("/").last.replace(".csv", "")
    
    allMetricsData :+= (fileNameWithoutExtension, startDate.toString, minClose, maxClose, avgClose, stdDevClose, nullCount, zeroCount)
    
    val deduplicatedDf = formattedDf.dropDuplicates("Date")

    val adjustedDf = deduplicatedDf.withColumn("Close", when($"Close" === 0, lit(null)).otherwise($"Close"))

    val windowSpec = Window.orderBy("Date")
    val filledDf = adjustedDf.withColumn("Close",coalesce(last($"Close", ignoreNulls = true).over(windowSpec), first($"Close", ignoreNulls = true).over(windowSpec)))
  
    val fileName2 = filePath.split("/").last.replace(".csv", "")
    val outputPath = s"$outputDir/$fileName2"
  
    filledDf.write.mode("overwrite").option("header", "false").csv(outputPath)

}



In [4]:
val outputDir = "/user/sc10648_nyu_edu/stocks_processed"

val outputPath = new Path(outputDir)
if (!fs.exists(outputPath)) {
  fs.mkdirs(outputPath)
}


In [5]:
csvFiles.foreach(file => processAndWriteToNewFolder(file, outputDir))

In [6]:
val metricsDf = spark.createDataFrame(allMetricsData).toDF("File", "StartDate", "MinClose", "MaxClose", "AvgClose", "StdDevClose","NullCount", "ZeroCount")

metricsDf.show(false)