## 1. Hack Preprocessing

Goal is to take an array of our downloaded raw files, extracting the description fields and saving them in corresponding single column csv file with the classifier as the header

This notebook assumes that the "rawData" directory on hdfs holds the data from our sources

In [179]:
//The root to where the rawData, processedData and dataSets directories are
val root = "hdfs://localhost:9000/project/";

//An array holding all the file names in rawData 
//(I reuse the same names in the processed data don't know if that is a good 
//idea or not, would be a problem if we did not have csv on some files)
val fileNames = Array("bbc-news-data.csv","goodreads_data.csv","job_postings.csv","mtsamples.csv");

//Test that the raw data can be accessed
val file = root + "rawData/bbc-news-data.csv";
val rawRDD = sc.textFile(file);
val rawRDDtop = rawRDD.take(10);
rawRDDtop.foreach(println);

category	filename	title	content
business	001.txt	Ad sales boost Time Warner profit	 Quarterly profits at US media giant TimeWarner jumped 76% to $1.13bn (£600m) for the three months to December, from $639m year-earlier.  The firm, which is now one of the biggest investors in Google, benefited from sales of high-speed internet connections and higher advert sales. TimeWarner said fourth quarter sales rose 2% to $11.1bn from $10.9bn. Its profits were buoyed by one-off gains which offset a profit dip at Warner Bros, and less users for AOL.  Time Warner said on Friday that it now owns 8% of search-engine Google. But its own internet business, AOL, had has mixed fortunes. It lost 464,000 subscribers in the fourth quarter profits were lower than in the preceding three quarters. However, the company said AOL's underlying profit before exceptional items rose 8% on the back of stronger internet advertising revenues. It hopes to increase subscribers by offering the online service free to TimeWarn

root: String = hdfs://localhost:9000/project/
fileNames: Array[String] = Array(bbc-news-data.csv, goodreads_data.csv, job_postings.csv, mtsamples.csv)
file: String = hdfs://localhost:9000/project/rawData/bbc-news-data.csv
rawRDD: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/project/rawData/bbc-news-data.csv MapPartitionsRDD[4051] at textFile at <console>:354
rawRDDtop: Array[String] = Array(category	filename	title	content, "business	001.txt	Ad sales boost Time Warner profit	 Quarterly profits at US media giant TimeWarner jumped 76% to $1.13bn (£600m) for the three months to December, from $639m year-earlier.  The firm, which is now one of the biggest investors in Google, benefited from sales of high-speed internet connections and higher advert sales. TimeWarner said fourth q...


#### Creating dataFrames for each of the raw files

In [16]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("ID2221 Project").master("local[*]").getOrCreate()

import spark.implicits._

//Reads the first file (news data)
val newsData = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter","\t").csv(root + "rawData/" + fileNames(0));
//newsData.show(5);
//newsData.printSchema();

//Reads the second file (book data)
val bookData = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "rawData/" + fileNames(1));
//bookData.show(5);
//bookData.printSchema();

//Reads the third file (job postings)  (It does not seem to parse correctly but it is sufficient to show and test my function)
val jobData = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "rawData/" + fileNames(2));
//jobData.show(5);
//jobData.printSchema();

//Reads the fourth file (medical transcripts)
val medicalData = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "rawData/" + fileNames(3));
//medicalData.show(5);
//medicalData.printSchema();

+---+--------------------+--------------------+--------------------+--------------------+--------------------+
|_c0|         description|   medical_specialty|         sample_name|       transcription|            keywords|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0| A 23-year-old wh...| Allergy / Immuno...|  Allergic Rhinitis |SUBJECTIVE:,  Thi...|allergy / immunol...|
|  1| Consult for lapa...|          Bariatrics| Laparoscopic Gas...|PAST MEDICAL HIST...|bariatrics, lapar...|
|  2| Consult for lapa...|          Bariatrics| Laparoscopic Gas...|"HISTORY OF PRESE...| at his highest h...|
|  3| 2-D M-Mode. Dopp...| Cardiovascular /...| 2-D Echocardiogr...|2-D M-MODE: , ,1....|cardiovascular / ...|
|  4|  2-D Echocardiogram| Cardiovascular /...| 2-D Echocardiogr...|1.  The left vent...|cardiovascular / ...|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+
o

spark = org.apache.spark.sql.SparkSession@2637325c
newsData = [category: string, filename: string ... 2 more fields]
bookData = [_c0: string, Book: string ... 6 more fields]
jobData = [job_id: string, company_id: string ... 25 more fields]
medicalData = [_c0: string, description: string ... 4 more fields]


[_c0: string, description: string ... 4 more fields]

#### Extracting and saving each of the dataFrames

In [34]:
//Creates dataframe with natural language under column "news article"
val newsNL = newsData.select(newsData("content").alias("news article"));

//Creates dataframe with natural language under column "book description"
val bookNL = bookData.select(bookData("Description").alias("book description"));

//Creates dataframe with natural language under column "job posting"
val jobNL = jobData.select(jobData("description").alias("job posting"));

//Creates dataframe with natural language under column "medical transcript"
val medicalNL = medicalData.select(medicalData("transcription").alias("medical transcript"));

//Saves the dataframes as processed csv files
newsNL.write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(0));
bookNL.write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(1));
jobNL.write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(2));
medicalNL.write.mode("overwrite").option("header","true").csv(root + "processedData/" + fileNames(3));

newsNL = [news article: string]
bookNL = [book description: string]
jobNL = [job posting: string]
medicalNL = [medical transcript: string]


[medical transcript: string]

In [88]:
//Checks that data saved correctly
for (file <- fileNames) {
    val temp = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "processedData/" + file);
    temp.show(5);
}

+--------------------+
|        news article|
+--------------------+
|Quarterly profits...|
|The dollar has hi...|
|The owners of emb...|
|British Airways h...|
|Shares in UK drin...|
+--------------------+
only showing top 5 rows

+--------------------+
|    book description|
+--------------------+
|A few years after...|
|The old life is d...|
|Having clear boun...|
|In Mary's world t...|
|"The magic of fin...|
+--------------------+
only showing top 5 rows

+--------------------+
|         job posting|
+--------------------+
|While many indust...|
|and now we need o...|
|AND benefits incl...|
|               45760|
|Are you a dynamic...|
+--------------------+
only showing top 5 rows

+--------------------+
|  medical transcript|
+--------------------+
|SUBJECTIVE:,  Thi...|
|PAST MEDICAL HIST...|
|"HISTORY OF PRESE...|
|2-D M-MODE: , ,1....|
|1.  The left vent...|
+--------------------+
only showing top 5 rows



## 2. Main Function

Goal: Should produce three balanced sets of data. The sets will be randomly shuffled and saved as a csv files with the headers "classification" and "value"

The function should take the parameters: a list of preprocessed csv files were each file has the classifier as header and the values as entries, an array of floats detailing the relative distribution of train/test/val sets, an output directory and a desired size per class (maximum possible if none specified).

An unintended consequence of flexible programing made it so that there is no need for explicitly splitting the data three ways (train/val/test) we could also split it into two sets or more if we wanted.

In [125]:
import org.apache.spark.sql.DataFrame
import scala.util.Random

//Messy codde for testing a bunch of things

/*
var testArray: Array[String] = Array("1","2","3")
testArray(2)="gretat"
testArray :+= "that"

Random.shuffle(testArray)

for (v <- testArray) {
    println(v)
}

val testInt : Int = testArray.length

val testArray2 : Array[DataFrame] = new Array[DataFrame](testInt) 

print(root)

testArray2(2) = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "processedData/" + "bbc-news-data.csv")

for (v <- testArray2) {
    println(v)
}

val testInt2 = 500;
var testFloat = .7
testFloat = testFloat + 0.1

println((testInt2 * testFloat).asInstanceOf[Int])

var testArray3: Array[Float] = Array(1,2,3)

for(v <- 0 to testArray3.length-1)
    testArray3(v)=testArray3(v)/3
for (v <- testArray3) {
    println(v)
}

scala.util.Random.alphanumeric.take(20)
*/

val df = spark.read.option("header", "true").option("inferSchema", "true").csv(root + "processedData/" + "bbc-news-data.csv")
df.show(5)
for (v <- df.take(5)) println(v)

import org.apache.spark.sql.functions.rand

//Shuffling 
val shuffledDF = df.orderBy(rand())
shuffledDF.show(5)

val tempDF = shuffledDF.limit(5)
tempDF.show()

for (v <- shuffledDF.take(5)) println(v)

+--------------------+
|        news article|
+--------------------+
|Quarterly profits...|
|The dollar has hi...|
|The owners of emb...|
|British Airways h...|
|Shares in UK drin...|
+--------------------+
only showing top 5 rows

[Quarterly profits at US media giant TimeWarner jumped 76% to $1.13bn (£600m) for the three months to December, from $639m year-earlier.  The firm, which is now one of the biggest investors in Google, benefited from sales of high-speed internet connections and higher advert sales. TimeWarner said fourth quarter sales rose 2% to $11.1bn from $10.9bn. Its profits were buoyed by one-off gains which offset a profit dip at Warner Bros, and less users for AOL.  Time Warner said on Friday that it now owns 8% of search-engine Google. But its own internet business, AOL, had has mixed fortunes. It lost 464,000 subscribers in the fourth quarter profits were lower than in the preceding three quarters. However, the company said AOL's underlying profit before exceptional 

+--------------------+
|        news article|
+--------------------+
|Chart stars Duran...|
|Partners of those...|
|Shares in Google ...|
|England forward M...|
|People using wire...|
+--------------------+
only showing top 5 rows

+--------------------+
|        news article|
+--------------------+
|Chart stars Duran...|
|Partners of those...|
|Shares in Google ...|
|England forward M...|
|People using wire...|
+--------------------+

[Chart stars Duran Duran are to appear in a VH1 special in the US including interviews and concert footage.  The show airs on Tuesday and will feature a studio performance, behind the scenes footage and fan interviews. "They seemed like a perfect fit with our audience," said Rick Krim, VH1's vice president of music and talent. The band recently released a new album, Astronaut, the first from the original line-up since 1983. They will also tour Japan and the US next year.  "When we started playing together, we didn't try and make a really sort of mature a

df = [news article: string]
shuffledDF = [news article: string]
tempDF = [news article: string]


[news article: string]

In [177]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().appName("ID2221 Project").master("local[*]").getOrCreate();

import spark.implicits._

//main function
def createDataSets(processedFiles : Array[String], relativeDist : Array[Double], outputDirectory : String, 
                   outputNames : Array[String], classSize : Int = 0 ) {
    //Creates an array holding dataframes of each input file
    val fileDF: Array[DataFrame] = new Array[DataFrame](processedFiles.length);
    for (i <- 0 to fileDF.length-1)
        fileDF(i) = spark.read.option("header", "true").option("inferSchema", "true").csv(processedFiles(i));
    
    //Calculates desired size
    var size : Int = classSize;
    if (size <= 0) { //If user has not specified a desired size, the fuction will create a data set which is as large as possible
        var minSize = fileDF(0).count();
        for (i <- 1 to fileDF.length-1) {
            val tempSize = fileDF(i).count();
            if (tempSize < minSize) 
                minSize=tempSize;
        }
        size=minSize.asInstanceOf[Int];
    }
    
    //Take "size" random rows from each dataframe
    val sampledDF: Array[DataFrame] = new Array[DataFrame](fileDF.length);
    for (i <- 0 to sampledDF.length-1) //Takes "size" random samples from each file
        sampledDF(i) = fileDF(i).orderBy(rand()).limit(size);
    
    //Create dataframes with columns "class" and "value"
    val pairDF: Array[DataFrame] = new Array[DataFrame](sampledDF.length);
    for (i <- 0 to pairDF.length-1) {
        pairDF(i)=sampledDF(i).withColumn("class", lit(sampledDF(i).columns(0))); //Adds "class" column with value of original header
        pairDF(i)=pairDF(i).withColumnRenamed(sampledDF(i).columns(0), "value"); //Renames original header to "value"
    }
    
    //Splits the created pairs into parts according to "relativeDist"
    val splitDF: Array[Array[DataFrame]] = new Array[Array[DataFrame]](sampledDF.length);
    for (i <- 0 to splitDF.length-1) {
        //The function sorts the results based on first column after picking random elements (we reshuffle when combining)
        splitDF(i) = pairDF(i).randomSplit(relativeDist, 0); 
    }
    
    //Combines the split pairs into the desired sets (one set for each entry in "relativeDist")
    val setDF: Array[DataFrame] = new Array[DataFrame](relativeDist.length);
    for (i <- 0 to relativeDist.length-1) {
        //Combines all the class dataframes
        setDF(i) = splitDF(0)(i);
        for (ii <- 1 to splitDF.length-1)
            setDF(i) = setDF(i).union(splitDF(ii)(i));
        
        //Shuffles the result
        setDF(i) = setDF(i).orderBy(rand());
    }
    
    //Temporary function for showing some samples
    for (v <- setDF) {
        //for (e <- v) println(e.count())
        //println("next")
        v.show(10);
        println(v.count());
    }
    
    //Saves the created sets as csv files
    for (i <- 0 to setDF.length-1)
        setDF(i).write.mode("overwrite").option("header","true").csv(outputDirectory + outputNames(i));
}

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2637325c
createDataSets: (processedFiles: Array[String], relativeDist: Array[Double], outputDirectory: String, outputNames: Array[String], classSize: Int)Unit


In [178]:
val rootProcessed : String = "hdfs://localhost:9000/project/processedData/";
val processedFiles : Array[String] = Array(rootProcessed + "bbc-news-data.csv", rootProcessed + "goodreads_data.csv", 
                                           rootProcessed + "job_postings.csv", rootProcessed + "mtsamples.csv");
// Just for the sake of testing 80% train, 15% val, 5% test
val dist : Array[Double] = Array(160, 30, 10); //Any values work, function only look at comparative sizes

val outputNames : Array[String] = Array("train", "val", "test");

createDataSets(processedFiles, dist, "hdfs://localhost:9000/project/dataSets/", outputNames);

+--------------------+------------------+
|               value|             class|
+--------------------+------------------+
|Prince Harry shou...|      news article|
|including local o...|       job posting|
|"This is volume t...|  book description|
|"Alternate cover ...|  book description|
|England will prot...|      news article|
|                  52|       job posting|
|PREOPERATIVE DIAG...|medical transcript|
|The author of eig...|  book description|
|Tony Blair has re...|      news article|
|UK gamers are get...|      news article|
+--------------------+------------------+
only showing top 10 rows

7036
+--------------------+----------------+
|               value|           class|
+--------------------+----------------+
|             R370075|     job posting|
|Robert Langdon, H...|book description|
|"A collection of ...|book description|
|Tony Blair has ap...|    news article|
|The First Phone C...|book description|
|000 Monthly Busin...|     job posting|
|She spent each da...

rootProcessed: String = hdfs://localhost:9000/project/processedData/
processedFiles: Array[String] = Array(hdfs://localhost:9000/project/processedData/bbc-news-data.csv, hdfs://localhost:9000/project/processedData/goodreads_data.csv, hdfs://localhost:9000/project/processedData/job_postings.csv, hdfs://localhost:9000/project/processedData/mtsamples.csv)
dist: Array[Double] = Array(160.0, 30.0, 10.0)
outputNames: Array[String] = Array(train, val, test)


## 3. Testing Distribution