# Project - Scala Code

## Creating the DataFrame
The `SparkSession` class is the entry point for the DataFrames API. This class exposes a `DataFrameReader` named `read` that can be used to create a DataFrame from existing data in supported formats. In our application, we create a `SparkSession` and then create a DataFrame from multiple JSON files. The dataset we are using in this lab is the results of the year 2016/2017 votings in the Swedish parliament. The files are located in `data/`, in which each line has the following structure:
```
{"rm": "2016/17", "beteckning": "AU1", "hangar_id": "4500350", "votering_id": "B6C77723-93FD-415A-B3DB-A35D086748D3", "punkt": "1", "namn": "Lars Tysklind", "intressent_id": "0535526562910", "parti": "L", "valkrets": "Västra Götalands läns västra", "valkretsnummer": "17", "iort": null, "rost": "0", "avser": "sakfrågan", "votering": "huvud", "banknummer": "40", "fornamn": "Lars", "efternamn": "Tysklind", "kon": "man", "fodd": "1953", "datum": "2016-12-14"}
```

In [1]:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.master("local[*]").appName("ProjectScala").getOrCreate()
import spark.implicits._  

spark = org.apache.spark.sql.SparkSession@2aa83e81


## Static Information

Here we store the information that is static with respect to different voting rounds. It includes names, political parties etc. Also, we map the Swedish names to English ones and we modify one column to store age instead of year of birth.

In [2]:
// Store the names of all files in the data directory
import java.io.File
val files = new java.io.File("../data").listFiles.filter(_.getName.endsWith(".json"))

// Create a DataFrame with the static information
val df_first = spark.read.json(files(0).toString())   

val df_info = df_first.select(df_first("namn").alias("name"), 
                     df_first("parti").alias("party"),
                     df_first("kon").alias("sex"),
                     df_first("valkrets").alias("constituency"),
                     (-(df_first("fodd") - 2018)).alias("age").cast("int"))

df_info.show()

+--------------------+-----+------+--------------------+---+
|                name|party|   sex|        constituency|age|
+--------------------+-----+------+--------------------+---+
|      Andreas Norlén|    M|   man|   Östergötlands län| 45|
|     Ulrika Carlsson|    C|kvinna|Västra Götalands ...| 53|
| Margareta Cederfelt|    M|kvinna|   Stockholms kommun| 59|
|   Christina Östberg|   SD|kvinna|          Kalmar län| 50|
|   Cecilia Magnusson|    M|kvinna|    Göteborgs kommun| 56|
|     Penilla Gunther|   KD|kvinna|Västra Götalands ...| 54|
|      Jonas Eriksson|   MP|   man|          Örebro län| 51|
|          Per Åsling|    C|   man|       Jämtlands län| 61|
|      Peter Jeppsson|    S|   man|        Blekinge län| 50|
|         Lawen Redar|    S|kvinna|   Stockholms kommun| 29|
|      1n R Andersson|    M|   man|          Kalmar län| 48|
|    Robert Stenkvist|   SD|   man|Västra Götalands ...| 60|
|      Johan Forssell|    M|   man|   Stockholms kommun| 39|
|Annika Hirvonen Falk|  

files = Array(../data/H401MJU23-6-3E645C5C-7AD1-4765-A9EC-400DA9157A75.json, ../data/H401JuU31-2-3E2B04BD-0B9A-45DE-8F09-F076046C05B0.json, ../data/H401FiU7-7-7B36A546-4F6A-4B29-9EC0-7405D95B3EDD.json, ../data/H401KrU6-4-80E6C9D1-71B1-4638-8A60-C8DF2FD7126C.json, ../data/H401JuU1-18-E2F17D1F-8C7F-45A4-BE80-28E2BA678104.json, ../data/H401TU13-8-C6B9B38D-FC57-4762-8918-8373A7E1A402.json, ../data/H401MJU24-27-67CA0FD6-90CF-4BDB-8340-E998F6D2B339.json, ../data/H401SkU30-2-18864E2C-34A5-4799-8AD0-EC96139E0124.json, ../data/H401MJU7-2-E3BCEC54-02CE-4F7A-9F9E-433F0DF93077.json, ../data/H401UFîU4-1-9F957364-C627-4003-924E-31AF5C0D8CEB.json, ../data/H401NU3-1-16B4A1C4-6160-432A-946B-8CEBF917C709.json, ../data/H401FîU3-7-B767E0DC-32D2-4F59-A78E-E1C523FF514...


[../data/H401MJU23-6-3E645C5C-7AD1-4765-A9EC-400DA9157A75.json, ../data/H401JuU31-2-3E2B04BD-0B9A-45DE-8F09-F076046C05B0.json, ../data/H401FiU7-7-7B36A546-4F6A-4B29-9EC0-7405D95B3EDD.json, ../data/H401KrU6-4-80E6C9D1-71B1-4638-8A60-C8DF2FD7126C.json, ../data/H401JuU1-18-E2F17D1F-8C7F-45A4-BE80-28E2BA678104.json, ../data/H401TU13-8-C6B9B38D-FC57-4762-8918-8373A7E1A402.json, ../data/H401MJU24-27-67CA0FD6-90CF-4BDB-8340-E998F6D2B339.json, ../data/H401SkU30-2-18864E2C-34A5-4799-8AD0-EC96139E0124.json, ../data/H401MJU7-2-E3BCEC54-02CE-4F7A-9F9E-433F0DF93077.json, ../data/H401UFîU4-1-9F957364-C627-4003-924E-31AF5C0D8CEB.json, ../data/H401NU3-1-16B4A1C4-6160-432A-946B-8CEBF917C709.json, ../data/H401FîU3-7-B767E0DC-32D2-4F59-A78E-E1C523FF5144.json, ../data/H401AU5-4-7EAABF79-17B8-46BE-AAE5-694F1B4F1F69.json, ../data/H401SkU23-5-0E9C47AA-EC05-4A0F-AA21-27206A7E3F58.json, ../data/H401KU16-7-2C773ED4-66B8-42BC-BF0A-6BCEF98E99F4.json, ../data/H401KU22-3-A1C2CEAB-6791-4FA6-914B-33F3653B8B27.json, .

In [3]:
// print the dataframe schema
df_info.printSchema()

// print the first five rows
df_info.head(5)

root
 |-- name: string (nullable = true)
 |-- party: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- constituency: string (nullable = true)
 |-- age: integer (nullable = true)



0,1,2,3,4
Andreas Norlén,M,man,Östergötlands län,45
Ulrika Carlsson,C,kvinna,Västra Götalands läns östra,53
Margareta Cederfelt,M,kvinna,Stockholms kommun,59
Christina Östberg,SD,kvinna,Kalmar län,50
Cecilia Magnusson,M,kvinna,Göteborgs kommun,56


## Creating Features

Below, we merge the votes for different rounds into our final DataFrame. Here we have all the "static" information for each parliment member, and also their votes. 
We have chosen to represent the votes as following:
- "Yes" to 1, 
- "No" to -1, 
- "Refrain" to 0, 
- "Absent" to 0.

In [4]:
val num_questions = 9
var df = df_info

for( i <- 0 to num_questions){
    val column_name = i.toString()
    val df_i = spark.read.json(files(i).toString())
    val df_vote = df_i.select(df_i("namn").alias("name"), df_i("rost").alias("q"+i).cast("double"))
    df = df.join(df_vote, "name")
}

df.show()

+--------------------+-----+------+--------------------+---+----+----+----+----+----+----+----+----+----+----+
|                name|party|   sex|        constituency|age|  q0|  q1|  q2|  q3|  q4|  q5|  q6|  q7|  q8|  q9|
+--------------------+-----+------+--------------------+---+----+----+----+----+----+----+----+----+----+----+
|      Andreas Norlén|    M|   man|   Östergötlands län| 45| 0.0|-1.0| 1.0| 1.0|-1.0| 1.0|-1.0| 1.0| 0.0| 1.0|
|     Ulrika Carlsson|    C|kvinna|Västra Götalands ...| 53| 0.0|-1.0| 0.0| 1.0| 1.0| 1.0| 0.0|-1.0| 1.0| 1.0|
| Margareta Cederfelt|    M|kvinna|   Stockholms kommun| 59| 0.0|-1.0| 0.0| 1.0| 0.0| 1.0|-1.0| 1.0| 1.0| 0.0|
|   Christina Östberg|   SD|kvinna|          Kalmar län| 50|-1.0| 1.0|-1.0|-1.0| 1.0| 1.0| 1.0| 1.0| 1.0|-1.0|
|   Cecilia Magnusson|    M|kvinna|    Göteborgs kommun| 56| 0.0|-1.0| 1.0| 1.0|-1.0| 1.0|-1.0| 1.0| 1.0| 1.0|
|     Penilla Gunther|   KD|kvinna|Västra Götalands ...| 54| 0.0|-1.0| 0.0| 0.0| 1.0|-1.0|-1.0| 1.0| 0.0| 1.0|
|

num_questions = 9
df = [name: string, party: string ... 13 more fields]


[name: string, party: string ... 13 more fields]

## Dimensionality Reduction - SVD

In [5]:
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import spark.implicits._  


val data = df.select("name", "q0", "q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9")
case class MyCase(name : String, q0 : Double, q1 : Double, q2 : Double, q3 : Double, q4 : Double, q5 : Double, q6 : Double, q7 : Double, q8 : Double, q9 : Double)
val ds: Dataset[MyCase] = data.as[MyCase]
val rdd = ds.rdd.map(x => Vectors.dense(x.q0, x.q1, x.q2, x.q3, x.q4, x.q5, x.q6, x.q7, x.q8, x.q9))


val data_small = df.select("name", "q0", "q1", "q2", "q3", "q4")
case class MyCase_small(name : String, q0 : Double, q1 : Double, q2 : Double, q3 : Double, q4 : Double)
val ds_small: Dataset[MyCase] = data.as[MyCase]
val rdd_small = ds.rdd.map(x => Vectors.dense(x.q0, x.q1, x.q2, x.q3, x.q4))

data = [name: string, q0: double ... 9 more fields]
defined class MyCase
ds = [name: string, q0: double ... 9 more fields]
rdd = MapPartitionsRDD[120] at map at <console>:34
data_small = [name: string, q0: double ... 4 more fields]...


[name: string, q0: double ... 4 more fields]

### Five Features
Using only 5 votes, reducing dimensionality to 2 dimensions: $\mathbb{R}^5 \rightarrow \mathbb{R}^2  $

In [10]:
val mat_5f: RowMatrix = new RowMatrix(rdd_small)

// Reduce dimensionality to 2
val svd_5f: SingularValueDecomposition[RowMatrix, Matrix] = mat_5f.computeSVD(2, computeU = true)
val U_5f: RowMatrix = svd_5f.U

mat_5f = org.apache.spark.mllib.linalg.distributed.RowMatrix@2a5cb544
svd_5f = 
U_5f = org.apache.spark.mllib.linalg.distributed.RowMatrix@1524092c


org.apache.spark.mllib.linalg.distributed.RowMatrix@1524092c

### Ten Features
Using 10 votes, reducing dimensionality to 2 and 3 dimensions: $\mathbb{R}^{10} \rightarrow \mathbb{R}^2 \; / \; \mathbb{R}^3 $

In [11]:
val mat: RowMatrix = new RowMatrix(rdd)

// Reduce dimensionality to 2
val svd_2d: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(2, computeU = true)
val U_2d: RowMatrix = svd_2d.U  // The U factor is a RowMatrix.
/*val s: Vector = svd.s     // The singular values are stored in a local dense vector.
val V: Matrix = svd.V     // The V factor is a local dense  */

// Reduce dimensionality to 3
val svd_3d: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(3, computeU = true)
val U_3d: RowMatrix = svd_3d.U

mat = org.apache.spark.mllib.linalg.distributed.RowMatrix@5f8f6867
svd_2d = 


0.34764174478270116...


SingularValueDecomposition(org.apache.spark.mllib.linalg.distributed.RowMatrix@2b3bee5d,[36.42756404786852,25.998485575765503],0.27510203941300526  -0.007466759014940039  
0.18672231017658025  -0.5751708377710483    
0.3572578314729371   0.24585945191650155    
0.3596920952557538   0.3003474648467533     
0.23785941482083142  -0.36846002521713517   
0.39260327049072286  0.019971853969771858   
0.17868434080519918  -0.5360210319443278    
0.3394254483721828   -0.07793139374342961   
0.39209073524986504  0.032014543346194135   
0.34764174478270116  0.29646052049790117    )

### Saving the RDD - A 2 dimensional representation of the voting patterns
The `DataFrame` class  exposes a `DataFrameWriter` named write that can be used to save a DataFrame. There are four available write modes which can be specified, with error being the default:
* `append`: add this data to the end of any data already at the target location.
* `overwrite`: erase any existing data at the target location and replace with this data.
* `ignore`: silently skip this command if any data already exists at the target location.
* `error`: throw an exception if any data already exists at the target location.

Here, we save one of our generated DataFrames as JSON data in the folder `target/json` with the `overwrite` mode. If you look in this directory after running the code, you will see a JSON file where each line is a row of the DataFrame, along with a `_SUCCESS` indicator file.

In [12]:
val result_5f = U_5f.rows
val result_2d = U_2d.rows
val result_3d = U_3d.rows

result_5f.coalesce(1).saveAsTextFile("results/u_5")
result_2d.coalesce(1).saveAsTextFile("results/u_10_2d")
result_3d.coalesce(1).saveAsTextFile("results/u_10_3d")

result_5f = MapPartitionsRDD[163] at mapPartitions at RowMatrix.scala:443
result_2d = MapPartitionsRDD[165] at mapPartitions at RowMatrix.scala:443
result_3d = MapPartitionsRDD[167] at mapPartitions at RowMatrix.scala:443


MapPartitionsRDD[167] at mapPartitions at RowMatrix.scala:443

In [13]:
val info = df.select("name", "party", "sex", "constituency", "age").rdd
info.take(10)
info.coalesce(1).saveAsTextFile("results/info")

Name: org.apache.hadoop.mapred.FileAlreadyExistsException
Message: Output directory file:/Users/alexhermansson/KTH/DataIntensive/project/src/results/info already exists
StackTrace:   at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
  at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:283)
  at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withSco