# Big Data: Tools and Methods

# Assignment 3 - Xin Zhou

<font color=blue>In this assignment you are to use Scala and Spark. Turn in a Jupyter notebook that uses the Apache-Toree kernel. In problem 2-7 your code needs to use Spark and be able to run on a cluster using multiple worker nodes to the computations.

<font color=green>#1. Write a function that will put N doubles into a file. The doubles need to be normally distributed with mean 0 and standard deviation 1. The function should have two arguments: N and the full name of the file (ie includes path to file location).

In [1]:
import java.io._
import scala.io.Source._
import org.apache.spark.sql._

def NDFileGenerate(N:Int, fileDir: String, parsingString: String){
    val R = new scala.util.Random
    val values = Array.fill(N){R.nextGaussian()}
    val pw = new PrintWriter(new File(fileDir))
    pw.write(values.mkString(parsingString))
    pw.close
}
//Test for NDFileGenerate function
NDFileGenerate(10,"doubles.txt", " ")
val lines = fromFile("doubles.txt").getLines
var L:String = ""
for (line <- lines){
    L = line
    print (line)
}


-1.5713469585202726 0.64984617103597 1.054864833984803 0.3685464527890387 -0.8646884832901189 -0.5215683912930904 -0.20124766664715338 -0.21946351319069785 -0.11485764505829102 -0.3091966918688628

<font color=blue>Create a file with 50,000 doubles using the function from problem 1. This file will be used for the next several problems. It is best if you put the file in the current directory to avoid paths that do not exist on other machines.

In [2]:
NDFileGenerate(50000,"doubles.txt", " ")

<font color=green>#2.Read the file created in #1 into an RDD and compute the mean and standard deviation of the doubles in the file. Work on the RDD, that is do not convert the RDD to a DataFrame or Dataset.. You are to use Spark code to compute the values as we want this to run on a cluster using multiple machines. So the pure Scala code you used in assignment will not work.


In [3]:
val myRDD = sc.textFile("doubles.txt").flatMap(line => line.split(" ").map(_.toDouble))
println("The mean value calculated in RDD is " + myRDD.mean())
println("The standard deviation calculated in RDD is " + myRDD.stdev())

The mean value calculated in RDD is 1.7269809673229486E-4
The standard deviation calculated in RDD is 1.001931736052103


<font color=green>#3. Repeat #2 but using a DataFrame instead of RDD. Here work on the DataFrame not an RDD.

In [4]:
//If I directly use the file which contains " " space as parsing, then use spark.read.text()
//It will become one column which contains one string, parsing a string inside a DF column is complicated and unsolved
//convert RDD to DF instead of creating dataframe from text file
import org.apache.spark.sql.functions._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = myRDD.toDF()
df.explain() //successfully created data frame form rdd if explain() can be used
df.show(5)
val mean_stddev = df.agg(
    mean("value").alias("Mean"),
    stddev("value").alias("Standard Deviation")
)
mean_stddev.show()

== Physical Plan ==
*SerializeFromObject [input[0, double, false] AS value#1288]
+- Scan ExternalRDDScan[obj#1287]
+-------------------+
|              value|
+-------------------+
| 0.7588979840782909|
|-1.5014978846757265|
|  1.281998201953683|
|-0.4095865027394471|
|-0.5229824261995106|
+-------------------+
only showing top 5 rows

+-------------------+------------------+
|               Mean|Standard Deviation|
+-------------------+------------------+
|1.72698096732263E-4|1.0019417555197556|
+-------------------+------------------+



<font color=green>#4. Using a Data Frame create a random sample of about 100 elements of the file created in #2 and compute the mean of the sample.

In [5]:
val withReplacement = false
val sample = df.sample(withReplacement, 0.005).limit(100)
println("sample size is "+ sample.count())
sample.show(5)

sample size is 100
+--------------------+
|               value|
+--------------------+
|  -1.328019774772998|
|  0.5504744327125294|
|-0.34573308691858545|
|-0.27494027036026436|
| -1.1173925220264196|
+--------------------+
only showing top 5 rows



<font color=green>#5. Create a file of 100 normally distributed doubles. Read the doubles from the file into an RDD. Using the RDD create a sliding window of size 20 and compute the mean of each window.

In [6]:
import org.apache.spark.mllib.rdd.RDDFunctions
NDFileGenerate(100,"doubles100.txt", " ")
val rdd100 = sc.textFile("doubles100.txt").flatMap(line => line.split(" ").map(_.toDouble))
val rddFS = RDDFunctions.fromRDD(rdd100)
val means = rddFS.sliding(20).map{values20 =>
    val sum = values20.reduce(_ + _)
    sum/20
}
println("The size of means is " + means.collect.size)
println("Five samples from means: ")
means.take(5).foreach(mean => println(mean))

The size of means is 81
Five samples from means: 
-0.050513469699788226
-0.013737053566370139
0.016241509018171025
0.0684814896626845
0.005043026330456923


<font color=green>#6. The file “multiple-sites.tsv” contains two columns: site and dwell-time. Using Spark compute the average dwell time for each site.

In [7]:
import org.apache.spark.sql.functions._
val dwellDir = "Assignment3Data/multiple-sites.tsv"
val reader = spark.read 
reader.option("header", true).option("inferSchema",true).option("sep", "\t")
val dwellDf = reader.csv(dwellDir) 
// dwellDf.printSchema
// dwellDf.show(3)
val average = dwellDf.groupBy("site").agg(mean("dwell-time").alias("Average"))
println("The size of average of sites is: " + average.count())
average.sort("site").show(5)

The size of average of sites is: 20
+----+-----------------+
|site|          Average|
+----+-----------------+
|   0|79.85106382978724|
|   1|            106.0|
|   2|88.22916666666667|
|   3|97.47916666666667|
|   4|94.33333333333333|
+----+-----------------+
only showing top 5 rows



<font color=green>#7. The file “multiple-sites.tsv” contains two columns: date and dwell-time. Using Spark compute the following:<br>
1. The average dwell time each hour<br>
2. The average dwell time per day of week<br>
3. The average dwell time on week-days(Monday-Friday)<br>
4. Average dwell time on the weekend.<br>

In [8]:
import org.apache.spark.sql.functions._
//transform the dataframes
val reader = spark.read 
reader.option("header", true).option("inferSchema",true).option("sep", "\t")
val dwellDf = reader.csv("Assignment3Data/dwell-times.tsv") 

val df = dwellDf.withColumnRenamed("date", "TimeStamp").
withColumn("Date", to_date(col("TimeStamp"))).
withColumn("Hour", hour(col("TimeStamp"))).
withColumn("DayOfWeek", date_format(col("Date"),"EEEE"))
df.show(2)


+-------------------+----------+----------+----+---------+
|          TimeStamp|dwell-time|      Date|Hour|DayOfWeek|
+-------------------+----------+----------+----+---------+
|2014-12-31 16:03:43|        74|2014-12-31|  16|Wednesday|
|2014-12-31 16:32:12|       109|2014-12-31|  16|Wednesday|
+-------------------+----------+----------+----+---------+
only showing top 2 rows



In [9]:

// 7.1 The average dwell time each hour
val hourAvg = df.groupBy("Hour").agg(mean("dwell-time").alias("AvgDwellTimePerHour"))
println("There are " + hourAvg.count + " groups of average dwell time per hour")
hourAvg.sort("Hour").show(5)

//7.2 The average dwell time per day of week
val dayOfWeek = df.groupBy("DayOfWeek").agg(mean("dwell-time").alias("AvgDwellTimeDayPerWeek"))
println("There are " + dayOfWeek.count + " groups of average dwell time for per day of week")
dayOfWeek.sort("AvgDwellTimeDayPerWeek").show

//7.3 The average dwell time on week-days(Monday-Friday)
println("The average dwell time on week-days(Monday-Friday) shown below:")
val weekdays = dayOfWeek.where(col("DayOfWeek") =!= "Saturday").where(col("DayOfWeek") =!= "Sunday")
val weekdayAvg = weekdays.agg(mean("AvgDwellTimeDayPerWeek").alias("AvgTimeWeekdays"))
weekdayAvg.show

//7.4 Average dwell time on the weekend.
println("Average dwell time on the weekend shown below:")
val weekends = dayOfWeek.where(col("DayOfWeek").contains("Saturday").or(col("DayOfWeek").contains("Sunday")))
val weekendAvg = weekends.agg(mean("AvgDwellTimeDayPerWeek").alias("AvgTimeWeekends"))
weekendAvg.show

There are 24 groups of average dwell time per hour
+----+-------------------+
|Hour|AvgDwellTimePerHour|
+----+-------------------+
|   0|    94.708670095518|
|   1|  92.20954287620954|
|   2|  96.85937970490816|
|   3|  92.57110924839341|
|   4|  91.33086825527253|
+----+-------------------+
only showing top 5 rows

There are 7 groups of average dwell time for per day of week
+---------+----------------------+                                              
|DayOfWeek|AvgDwellTimeDayPerWeek|
+---------+----------------------+
|   Monday|     89.11023872679046|
|  Tuesday|     89.73922367574522|
|   Friday|     90.44266729389628|
| Thursday|      91.4947995556902|
|Wednesday|     91.57283771155643|
|   Sunday|    106.49005681818181|
| Saturday|    116.88253012048193|
+---------+----------------------+

The average dwell time on week-days(Monday-Friday) shown below:
+----------------+
| AvgTimeWeekdays|
+----------------+
|90.4719533927357|
+----------------+

Average dwell time on the we

<font color=green>#8. Do the average dwell times computed in #7 indicate any difference in users behavior?<br>
<font color=black>Users spends more time during weekend, and spend less time during weekdays. If we rank the average dwell time, we can see that the Monday has the least average dwell time, and Sunday Saturday has the most average dwell time. Moreover, Friday ranks at third place which makes sense because some people can spend a little more time on Friday night, but most other people prefer hang out and spend less time on internet.