## 1. Hack Preprocessing

Goal is to take an array of our downloaded raw files, extracting the description fields and saving them in corresponding single column csv file with the classifier as the header

This notebook assumes that the "rawData" directory on hdfs holds the data from our sources

In [1]:
//The root to where the rawData, processedData and dataSets directories are
val root = "hdfs://localhost:9000/project/";

//An array holding all the file names in rawData 
//(We reuse the same names in the processed data)
val fileNames = Array("bbc-news-data.csv","goodreads_data.csv","job_postings.csv","mtsamples.csv");

//Test that the raw data can be accessed
val file = root + "rawData/bbc-news-data.csv";
val rawRDD = sc.textFile(file);
val rawRDDtop = rawRDD.take(2);
rawRDDtop.foreach(println);

category	filename	title	content
business	001.txt	Ad sales boost Time Warner profit	 Quarterly profits at US media giant TimeWarner jumped 76% to $1.13bn (£600m) for the three months to December, from $639m year-earlier.  The firm, which is now one of the biggest investors in Google, benefited from sales of high-speed internet connections and higher advert sales. TimeWarner said fourth quarter sales rose 2% to $11.1bn from $10.9bn. Its profits were buoyed by one-off gains which offset a profit dip at Warner Bros, and less users for AOL.  Time Warner said on Friday that it now owns 8% of search-engine Google. But its own internet business, AOL, had has mixed fortunes. It lost 464,000 subscribers in the fourth quarter profits were lower than in the preceding three quarters. However, the company said AOL's underlying profit before exceptional items rose 8% on the back of stronger internet advertising revenues. It hopes to increase subscribers by offering the online service free to TimeWarn

Array(category	filename	title	content, "business	001.txt	Ad sales boost Time Warner profit	 Quarterly profits at US media giant TimeWarner jumped 76% to $1.13bn (£600m) for the three months to December, from $639m year-earlier.  The firm, which is now one of the biggest investors in Google, benefited from sales of high-speed internet connections and higher advert sales. TimeWarner said fourth quart...

#### Creating dataFrames for each of the raw files

In [2]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("ID2221 Project").master("local[*]").getOrCreate()
import spark.implicits._

//Reads the first file (news data)
val newsData = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter","\t").option("multiLine", "true")
    .option("ignoreLeadingWhiteSpace", "true").option("ignoreTrailingWhiteSpace", "true").csv(root + "rawData/" + fileNames(0));
//newsData.show(5);
//newsData.printSchema();

//Reads the second file (book data)
val bookData = spark.read.option("header", "true").option("inferSchema", "true").option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true").option("multiLine", "true").csv(root + "rawData/" + fileNames(1));
//bookData.show(5);
//bookData.printSchema();

//Reads the third file (job postings)  (It does not seem to parse correctly but it is sufficient to show and test my function)
val jobData = spark.read.option("header", "true").option("inferSchema", "true").option("ignoreLeadingWhiteSpace", "true")
                 .option("ignoreTrailingWhiteSpace", "true").option("multiLine", "true").csv(root + "rawData/" + fileNames(2));
//jobData.show(5);
//jobData.printSchema();

//Reads the fourth file (medical transcripts)
val medicalData = spark.read.option("header", "true").option("inferSchema", "true").option("ignoreLeadingWhiteSpace", "true")
                 .option("ignoreTrailingWhiteSpace", "true").option("multiLine", "true").csv(root + "rawData/" + fileNames(3));
//medicalData.show(5);
//medicalData.printSchema();

spark = org.apache.spark.sql.SparkSession@5a365f8e
newsData = [category: string, filename: string ... 2 more fields]
bookData = [_c0: string, Book: string ... 6 more fields]
jobData = [job_id: string, company_id: string ... 25 more fields]
medicalData = [_c0: int, description: string ... 4 more fields]


[_c0: int, description: string ... 4 more fields]

#### Extracting and pre-processing each of the dataFrames

In [12]:
import org.apache.spark.sql.DataFrame

//Creates dataframe with natural language under column "news article"
val newsNL = newsData.select(newsData("content").alias("news article"));

//Creates dataframe with natural language under column "book description"
val bookNL = bookData.select(bookData("Description").alias("book description"));

//Creates dataframe with natural language under column "job posting"
val jobNL = jobData.select(jobData("description").alias("job posting"));

//Creates dataframe with natural language under column "medical transcript"
val medicalNL = medicalData.select(medicalData("transcription").alias("medical transcript"));

val dfNLArray = Array(newsNL, bookNL, jobNL, medicalNL);


//Pre-processing to remove punctuation and whitespace, make text lower case and cut the text which is very long.

//Removes punctuations, citations and whitespace and makes text lower case
import scala.util.matching 
var index = 0;
val roughProcessedDF : Array[DataFrame] = new Array[DataFrame](4);
for (df <- dfNLArray) {
    //First makes lowercase
    //Then replaces all whitespace, - and _ with a single blank space
    //Finally removes all punctations and trailing/leading spaces with an empty string
    val tempRDD = df.rdd.map(x => (x.toString.toLowerCase.replaceAll("""\-|_|\s+"""," ")
                             .replaceAll("""\p{Punct}|^\s+|\s+$""", ""),0)); //Makes a temporary tupple to allow for easier data frame creation
    roughProcessedDF(index) = spark.createDataFrame(tempRDD.collect().toSeq).toDF(df.columns(0),"garbage").select(df.columns(0));
    index = index + 1;
}

//Finds the average length of documents
var totalLength : Long = 0;
var totalCount : Long = 0;
for (df <- roughProcessedDF) {
    totalCount = totalCount + df.count();
    var length : Long = 0;
    for (row <- df.collect())
        length = length + row.mkString.length;
    totalLength = totalLength + length;
}
val averageLength = totalLength / totalCount;
println(s"The average length of the documents is $averageLength characters")

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// This shit gives an oom error. I cannot for the life of me avoid the collect call which causes it. Kill me now //
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////

//Cut the documents which are longer than average to make all the documents have similar length
val processedDF : Array[DataFrame] = new Array[DataFrame](4);
index = 0;
for (df <- roughProcessedDF) {
    val tempRDD = df.rdd.map(x => (x.toString.slice(0,averageLength.asInstanceOf[Int]),0)); //Makes a temporary tupple to allow for easier data frame creation
    //processedDF(index) = spark.createDataFrame(tempRDD.collect().toSeq).toDF(df.columns(0),"garbage").select(df.columns(0));
    processedDF(index) = tempRDD.toDF(df.columns(0),"garbage").select(df.columns(0));
    index = index + 1;
}

for (df <- processedDF) {
    df.show(10);
    println(df.count());
}


/*
val tempRDD = medicalNL.rdd.take(2).map(x => (x.toString.toLowerCase.replaceAll("""\-|_|\s+"""," ")
                             .replaceAll("""\p{Punct}|^\s+|\s+$""", ""),0))
println(tempRDD(0))
val tempSeq = tempRDD.toSeq
val tempDF = spark.createDataFrame(tempSeq).toDF("medical","garbage").select("medical");
*/

//tempDF.show()

Exception in thread "Executor task launch worker for task 0.0 in stage 65.0 (TID 188)" java.lang.SecurityException: Not allowed to invoke System.exit!
	at org.apache.toree.security.KernelSecurityManager.checkExit(KernelSecurityManager.scala:133)
	at java.lang.Runtime.halt(Runtime.java:264)
	at org.apache.spark.util.SparkUncaughtExceptionHandler.uncaughtException(SparkUncaughtExceptionHandler.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:772)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 65.0 failed 1 times, most recent failure: Lost task 0.0 in stage 65.0 (TID 188) (172.27.163.63 executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded

Driver stacktrace:

In [4]:
val temps : String = "test"
println(temps.slice(0,30))

test


temps = test


test

In [5]:
import org.apache.spark.sql.SaveMode.Overwrite
//Saves the dataframes as processed csv files
//It just ignores this and then reads something. Not sure what since the order is off from the regular data
//The news articles seems to read from entry 2004 dafaq
//Might have something to do with the console outputting "Notebook createDataSetsFunction.ipynb is not trusted"
//It can create new files but not remove old files?
roughProcessedDF(0).write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(0));
roughProcessedDF(1).write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(1));
roughProcessedDF(2).write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(2));
roughProcessedDF(3).write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(3));

//Checks that data saved correctly
for (file <- fileNames) {
    val temp = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "processedData/" + file);
    temp.show(5);
    println(temp.count());
}

Unknown Error: <console>:37: error: not found: value roughProcessedDF
       roughProcessedDF(0).write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(0));
       ^
<console>:38: error: not found: value roughProcessedDF
       roughProcessedDF(1).write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(1));
       ^
<console>:39: error: not found: value roughProcessedDF
       roughProcessedDF(2).write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(2));
       ^
<console>:40: error: not found: value roughProcessedDF
       roughProcessedDF(3).write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(3));
       ^


## 2. Main Function TODO

Goal: Should produce three balanced sets of data. The sets will be randomly shuffled and saved as a csv files with the headers "classification" and "value"

The function should take the parameters: a list of preprocessed csv files were each file has the classifier as header and the values as entries, an array of floats detailing the relative distribution of train/test/val sets, an output directory and a desired size per class (maximum possible if none specified).

The function is made in a way that there is no need for explicitly splitting the data three ways (train/val/test) we could also split it into two sets or more if we wanted.

In [6]:
import org.apache.spark.sql.DataFrame
import scala.util.Random

//Messy codde for testing a bunch of things

/*
var testArray: Array[String] = Array("1","2","3")
testArray(2)="gretat"
testArray :+= "that"

Random.shuffle(testArray)

for (v <- testArray) {
    println(v)
}

val testInt : Int = testArray.length

val testArray2 : Array[DataFrame] = new Array[DataFrame](testInt) 

print(root)

testArray2(2) = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "processedData/" + "bbc-news-data.csv")

for (v <- testArray2) {
    println(v)
}

val testInt2 = 500;
var testFloat = .7
testFloat = testFloat + 0.1

println((testInt2 * testFloat).asInstanceOf[Int])

var testArray3: Array[Float] = Array(1,2,3)

for(v <- 0 to testArray3.length-1)
    testArray3(v)=testArray3(v)/3
for (v <- testArray3) {
    println(v)
}

scala.util.Random.alphanumeric.take(20)
*/

val df = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "processedData/" + "bbc-news-data.csv")
df.show(5)
for (v <- df.take(5)) println(v)

import org.apache.spark.sql.functions.rand

//Shuffling 
val shuffledDF = df.orderBy(rand())
shuffledDF.show(5)

val tempDF = shuffledDF.limit(5)
tempDF.show()

for (v <- shuffledDF.take(5)) println(v)

java.lang.OutOfMemoryError: GC overhead limit exceeded

In [7]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().appName("ID2221 Project").master("local[*]").getOrCreate();

import spark.implicits._

//main function
def createDataSets(processedFiles : Array[String], relativeDist : Array[Double], outputDirectory : String, 
                   outputNames : Array[String], classSize : Int = 0 ) {
    //Creates an array holding dataframes of each input file
    val fileDF: Array[DataFrame] = new Array[DataFrame](processedFiles.length);
    for (i <- 0 to fileDF.length-1)
        fileDF(i) = spark.read.option("header", "true").option("inferSchema", "true").csv(processedFiles(i));
    
    //Calculates desired size
    var size : Int = classSize;
    if (size <= 0) { //If user has not specified a desired size, the fuction will create a data set which is as large as possible
        var minSize = fileDF(0).count();
        for (i <- 1 to fileDF.length-1) {
            val tempSize = fileDF(i).count();
            if (tempSize < minSize) 
                minSize=tempSize;
        }
        size=minSize.asInstanceOf[Int];
    }
    
    //Take "size" random rows from each dataframe
    val sampledDF: Array[DataFrame] = new Array[DataFrame](fileDF.length);
    for (i <- 0 to sampledDF.length-1) //Takes "size" random samples from each file
        sampledDF(i) = fileDF(i).orderBy(rand()).limit(size);
    
    //Create dataframes with columns "class" and "value"
    val pairDF: Array[DataFrame] = new Array[DataFrame](sampledDF.length);
    for (i <- 0 to pairDF.length-1) {
        pairDF(i)=sampledDF(i).withColumn("class", lit(sampledDF(i).columns(0))); //Adds "class" column with value of original header
        pairDF(i)=pairDF(i).withColumnRenamed(sampledDF(i).columns(0), "value"); //Renames original header to "value"
    }
    
    //Splits the created pairs into parts according to "relativeDist"
    val splitDF: Array[Array[DataFrame]] = new Array[Array[DataFrame]](sampledDF.length);
    for (i <- 0 to splitDF.length-1) {
        //The function sorts the results based on first column after picking random elements (we reshuffle when combining)
        splitDF(i) = pairDF(i).randomSplit(relativeDist, 0); 
    }
    
    //Combines the split pairs into the desired sets (one set for each entry in "relativeDist")
    val setDF: Array[DataFrame] = new Array[DataFrame](relativeDist.length);
    for (i <- 0 to relativeDist.length-1) {
        //Combines all the class dataframes
        setDF(i) = splitDF(0)(i);
        for (ii <- 1 to splitDF.length-1)
            setDF(i) = setDF(i).union(splitDF(ii)(i));
        
        //Shuffles the result
        setDF(i) = setDF(i).orderBy(rand());
    }
    
    //Temporary function for showing some samples
    for (v <- setDF) {
        //for (e <- v) println(e.count())
        //println("next")
        v.show(10);
        println(v.count());
    }
    
    //Saves the created sets as csv files
    for (i <- 0 to setDF.length-1)
        setDF(i).write.mode("overwrite").option("header","true").csv(outputDirectory + outputNames(i));
}

lastException = null
spark = org.apache.spark.sql.SparkSession@34e2edb4


createDataSets: (processedFiles: Array[String], relativeDist: Array[Double], outputDirectory: String, outputNames: Array[String], classSize: Int)Unit


org.apache.spark.sql.SparkSession@34e2edb4

In [8]:
val rootProcessed : String = "hdfs://localhost:9000/project/processedData/";
val processedFiles : Array[String] = Array(rootProcessed + "bbc-news-data.csv", rootProcessed + "goodreads_data.csv", 
                                           rootProcessed + "job_postings.csv", rootProcessed + "mtsamples.csv");
// Just for the sake of testing 80% train, 15% val, 5% test
val dist : Array[Double] = Array(160, 30, 10); //Any values work, function only look at comparative sizes

val outputNames : Array[String] = Array("train", "val", "test");

createDataSets(processedFiles, dist, "hdfs://localhost:9000/project/dataSets/", outputNames);

java.lang.OutOfMemoryError: GC overhead limit exceeded

## 3. Testing Distribution

# Just trying the connection with hadoop server

In [None]:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", "hdfs://35.175.92.154:9000")
hadoopConf.set("dfs.replication", "1") // Set replication factor if needed
// Set any other HDFS configuration parameters as needed

val hdfs = FileSystem.get(hadoopConf)

val path = new Path("/rawFiles")
val status = hdfs.listStatus(path)
for (fileStatus <- status) {
  println(fileStatus.getPath)
}


In [None]:
val filePath = "hdfs://35.175.92.154:9000/rawFiles/goodreads_data.csv";

val df = spark.read.csv(filePath);
df.show(5);

In [None]:
val temp = spark.read.option("header", "true").option("inferSchema", "true").csv("hdfs://35.175.92.154:9000/rawFiles/goodreads_data.csv");
    temp.show(5);

In [None]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val tableName = "tableProva"
val hbaseNamespace = "your_hbase_namespace"  // if applicable

val hbaseOptions = Map(
  "hbase.table" -> tableName,
  "hbase.namespace" -> hbaseNamespace
)

val hbaseDF = spark.read
  .format("org.apache.hadoop.hbase.spark")
  .options(hbaseOptions)
  .load()

hbaseDF.show()

## 4. Machine Learning on created data sets

In [None]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{IDF,HashingTF, Tokenizer}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

//Temporarily use val instead for faster testing
val train = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "dataSets/train");
val test = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "dataSets/test");

//Adding column for classes as integers
val mapper = spark.createDataFrame(Seq(("book description", 1.0),("news article", 2.0),("medical transcript", 3.0)
                                       ,("job posting", 4.0))).toDF("class", "label");
val trainSet = train.join(mapper, "class");
val testSet = test.join(mapper, "class");

//Number of words and documents for tf_idf
val totalSet = train.union(test);
val nDocs = totalSet.count();
val nWords = totalSet.select("value").flatMap(_.toString.split(" ")).distinct().count();

//Creating the pipeline for a simple classifier
val tokenizer = new Tokenizer().setInputCol("value").setOutputCol("words");
val hashTF = new HashingTF().setInputCol(tokenizer.getOutputCol).setOutputCol("hashTF").setNumFeatures(nWords.asInstanceOf[Int]);
val tf_idf = new IDF().setInputCol(hashTF.getOutputCol).setOutputCol("tf_idf").setMinDocFreq(5);//nDocs.asInstanceOf[Int]);
val model = new LogisticRegression().setMaxIter(20).setRegParam(0.01).setLabelCol("label").setFeaturesCol(tf_idf.getOutputCol)
    .setFamily("multinomial");
val pipeline = new Pipeline().setStages(Array(tokenizer, hashTF, tf_idf, model));

//Sets upp hyper parameter tuning using cross-validation
val evaluator = new MulticlassClassificationEvaluator().setPredictionCol("prediction")
val paramGrid = new ParamGridBuilder()//.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
    .addGrid(model.regParam, Array(0.03, 0.01, 0.1)).addGrid(model.maxIter, Array(3,10,30)).build();
val crossValidator = new CrossValidator().setEstimator(pipeline).setEvaluator(evaluator)
    .setEstimatorParamMaps(paramGrid).setNumFolds(4).setParallelism(2);
    
//Train the model
val trainedModel = pipeline.fit(trainSet);     //Set parameters (Fast with decent performance)
//val trainedModel = crossValidator.fit(trainSet); //Simple hyper parameter tuning using cross-validation (Slow)


In [None]:
val accuracy = evaluator.evaluate(trainedModel.transform(testSet));
println(s"The trained model had an accuracy of $accuracy");

In [None]:
//Save the trained model on disk
trainedModel.write.overwrite().save(root + "model/trainedLogisticRegressionModel");