In [1]:
//for stacktable class
import java.io.File
import scala.io.{ BufferedSource, Source }

abstract class StackTable[T] {

  val file: File

  def getDate(n: scala.xml.NodeSeq): Long = n.text match {
    case "" => 0
    case s => dateFormat.parse(s).getTime
  }

def getDateString(n: scala.xml.NodeSeq): Long = n.text match {
    case "" => 0
    case s => (s slice(11,13)).toInt
  }

  def dateFormat = {
    import java.text.SimpleDateFormat
    import java.util.TimeZone
    val f = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
    f.setTimeZone(TimeZone.getTimeZone("GMT"))
    f
  }

  def getInt(n: scala.xml.NodeSeq): Int = n.text match {
    case "" => 0 
    case x => x.toInt
  }
def getAnswerId(n:scala.xml.NodeSeq):Int = n.text match{  
    case "" => -5
    case x => x.toInt  
    }

  def parseXml(x: scala.xml.Elem): T


  def parse(s: String): Option[T] =
    if (s.startsWith("  <row ") & s.endsWith(" />")) 
        Some(parseXml(scala.xml.XML.loadString(s)))
    else None
}





In [2]:
// for user 
import scala.xml.{ NodeSeq, MetaData }
import java.io.File
import scala.io.{ BufferedSource, Source }

object User extends StackTable[User] {

  val file = new File("")
  //val file = new File("data/allUsers/")
  //val file = new File("data/Users.xml")
//  assert(file.exists)

  override def parseXml(x: scala.xml.Elem): User = User(
    getInt(x \ "@Id"),
    getInt(x \ "@Reputation"),
    getDate(x \ "@CreationDate"),
    (x \ "@DisplayName").text,
    getDate(x \ "@LastAccessDate"),
    (x \ "@WebsiteUrl").text,
    (x \ "@Location").text,
    (x \ "@AboutMe").text,
    getInt(x \ "@Views"),
    getInt(x \ "@UpVotes"),
    getInt(x \ "@DownVotes"),
    (x \ "@EmailHash").text,
    getInt(x \ "@Age"))
}

case class User(
  id: Int,
  reputation: Int,
  creationDate: Long,
  displayName: String,
  lastAccessDate: Long,
  websiteUrl: String,
  location: String,
  aboutMe: String,
  views: Int,
  upVotes: Int,
  downVotes: Int,
  emailHash: String,
  age: Int) 






In [3]:
// for Posts

object Post extends StackTable[Post] {

    val file = new File("")
//  val file = new File(Dir+"/allPosts/")
 // val file = new File("data/Posts.xml")
//  assert(file.exists)

  override def parseXml(x: scala.xml.Elem): Post = Post(
    getInt(x \ "@Id"),
    getInt(x \ "@PostTypeId"),
    getInt(x \ "@ParentId"),
    getAnswerId(x \ "@AcceptedAnswerId"),
    getDate(x \ "@CreationDate"),
    getDateString(x \ "@CreationDate"),
    getInt(x \ "@Score"),
    getInt(x \ "@ViewCount"),
    (x \ "@Body").text,
    getInt(x \ "@OwnerUserId"),
    getDate(x \ "@LastActivityDate"),
    (x \ "@Title").text,
    getTags(x \ "@Tags"),
    getInt(x \ "@AnswerCount"),
    getInt(x \ "@CommentCount"),
    getInt(x \ "@FavoriteCount"),
    getDate(x \ "@CommunityOwnedDate"))

  def getTags(x: scala.xml.NodeSeq): Array[String] = x.text match {
    case "" => Array()
    case s => s.drop(1).dropRight(1).split("><")
  }
}
case class Post(
  id: Int,
  postTypeId: Int,
  parentId: Int,
  acceptedAnswerId: Int,
  creationDate: Long,
  creationhour:Long,
  score: Int,
  viewCount: Int,
  body: String,
  ownerUserId: Int,
  lastActivityDate: Long,
  title: String,
  tags: Array[String],
  answerCount: Int,
  commentCount: Int,
  favoriteCount: Int,
  communityOwnedDate: Long) 






In [4]:
// for vote

import scala.xml.{ NodeSeq, MetaData }
import java.io.File
import scala.io.{ BufferedSource, Source }

object Vote extends StackTable[Vote] {

  val file = new File("")
 // val file = new File("data/allVotes/")
  //val file = new File("data/Votes.xml")
  //assert(file.exists)

  override def parseXml(x: scala.xml.Elem): Vote = Vote(
    getInt(x \ "@Id"),
    getInt(x \ "@PostId"),
    getInt(x \ "@VoteTypeId"),
    getInt(x \ "@UserId"),
    getDate(x \ "@CreationDate"))
}

// <row Id="1264793" PostId="486481" VoteTypeId="5" UserId="175880" CreationDate="2013-05-30T00:00:00.000" />
case class Vote(
  id: Int,
  postId: Int,
  voteTypeId: Int,
  userId: Int,
  creationDate: Long) 





In [5]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
//val inputDir = "./test"
val inputDir = "./data"





./data

In [6]:
/*
val conf = new SparkConf();
conf.setAppName("test");
conf.set("spark.driver.allowMultipleContexts", "true");
conf.setMaster("local[*]");
val sc = new SparkContext(conf)
*/





: 

In [6]:
import org.apache.spark.rdd.RDD
val testdata = spark.textFile(inputDir+"/allPosts/",minPartitions=2)
val objData = testdata.flatMap(Post.parse)
objData.cache
var query: RDD[Post] = objData





MapPartitionsRDD[2] at flatMap at <console>:33

In [7]:
val jsonVoteData = spark.textFile(inputDir+"/allVotes/", minPartitions=2)
val voteData = jsonVoteData.flatMap(Vote.parse)
voteData.cache
var queryVote: RDD[Vote] = voteData



MapPartitionsRDD[5] at flatMap at <console>:33



In [7]:
val jsonUserData = spark.textFile(inputDir+"/allUsers/", minPartitions=2)
val userData = jsonUserData.flatMap(User.parse)
userData.cache
var queryUser: RDD[User] = userData





MapPartitionsRDD[5] at flatMap at <console>:33

In [None]:
//q1
val numUpvote = voteData.filter(_.voteTypeId==2).count()
val numDownvote = voteData.filter(_.voteTypeId==3).count()
println("Total upvotes:"+numUpvote)
println("Total downvotes:"+numDownvote)
        
//***** count upVotes, downVotes and favVotes for each post
// count UpVotes, DownVotes and FavVotes 
val upVotes = voteData.filter(_.voteTypeId==2).map(x=>(x.postId,1)).reduceByKey(_+_)
val downVotes = voteData.filter(_.voteTypeId==3).map(x=>(x.postId,1)).reduceByKey(_+_)
val favVotes = voteData.filter(_.voteTypeId==5).map(x=>(x.postId,1)).reduceByKey(_+_)
        
// join UpVotes, DownVotes and FavVotes counts, key by FavCounts
val updownVotes = upVotes.fullOuterJoin(downVotes)
                          .map(x=>(x._1,(x._2._1.getOrElse(0), x._2._2.getOrElse(0))))
val updownfavVotes = updownVotes.fullOuterJoin(favVotes)
                          .map(x=>(x._2._2.getOrElse(0), x._2._1.getOrElse((0,0))))
        
// group by Fav, calculate average ratio
val favUpDown = updownfavVotes.reduceByKey((x,y)=>(x._1+y._1, x._2+y._2))
                                      .mapValues(x=>x._1*1./(x._1+x._2))
favUpDown.sortByKey().take(51).foreach(println)
        
//***** check points: Mean of top 50 favorite counts: 24.76
println("Check point:", updownfavVotes.countByKey.toSeq.sortBy(_._1).take(50).mkString(","))

In [8]:
//q6
/*
println("getting first ans post")
val firstAnsPosts = query.filter(_.postTypeId==1).keyBy(_.ownerUserId).groupByKey().mapValues(x => x.toSeq.sortBy(_.creationDate).take(1))
println(firstAnsPosts.count())
println("getting user create date")
val userCreaDate = queryUser.keyBy(_.id).mapValues(x=>x.creationDate)
println(userCreaDate.count())
println("getting veteran user")
val vetUser = query.keyBy(_.ownerUserId).join(userCreaDate).mapValues(x=>(x._1.creationDate - x._2)).filter(x => (x._2 >= 1000*3600*24*100L && x._2 <= 1000*3600*24*150L)).mapValues(x=>1).reduceByKey((x,y)=>x)
println(vetUser.count())
println("getting all result")
val statVetNew = firstAnsPosts.leftOuterJoin(vetUser).map(x=>(x._2._2.getOrElse(0),x._2._1(0))).mapValues(x =>(x.score, x.viewCount, x.answerCount, x.favoriteCount, 1)).reduceByKey((x,y)=>(x._1+y._1, x._2+y._2, x._3+y._3, x._4+y._4, x._5+y._5))
println(statVetNew.count())
println("print out")

statVetNew.foreach(println)
*/

//val keybyidpost =  query.filter(_.postTypeId==1).keyBy(_.ownerUserId)
//keybyidpost.count()
val userCreaDate = queryUser.keyBy(_.id).mapValues(x=>x.creationDate)
//val vetUser = query.keyBy(_.ownerUserId).join(userCreaDate).mapValues(x=>(x._1.creationDate - x._2)).filter(x => (x._2 >= 1000*3600*24*100L && x._2 <= 1000*3600*24*150L)).mapValues(x=>1).reduceByKey((x,y)=>x)
//userCreaDate.count()
val vetUser = query.map(x=>(x.ownerUserId,x.creationDate)).join(userCreaDate).mapValues(x=>(x._1 - x._2)).filter(x => (x._2 >= 1000*3600*24*100L && x._2 <= 1000*3600*24*150L)).reduceByKey((_,_)=>{1}).map(x=>(x._1,1))
val fisrtquestionpostsstruct = query.filter(_.postTypeId==1).map(x=>(x.ownerUserId,(x.creationDate,x.score, x.viewCount, x.answerCount, x.favoriteCount)))
val firstquestionpost = fisrtquestionpostsstruct.reduceByKey((x,y)=>{if(x._1<y._1) x else y})
val firstPostsVetNew = firstquestionpost.leftOuterJoin(vetUser).map(x=>(x._2._2.getOrElse(0),x._2._1))
val statVetNew = firstPostsVetNew.mapValues(x=>(x._2,x._3,x._4,x._5,1)).reduceByKey((x,y)=>(x._1+y._1 , x._2+y._2 , x._3+y._3 , x._4+y._4 , x._5+y._5))
println("start calculation")
statVetNew.take(2)

start calculation




Array((0,(1604313,1550017094,2116239,550353,1404508)), (1,(586556,478630216,478263,225117,259556)))

In [10]:
statVetNew.take(2)

Array((0,(44946,11887316,20742,12349,21316)), (1,(6442,1684192,2360,2365,1818)))





In [91]:
//(userid,1)
val vetUser = query.map(x=>(x.ownerUserId,x.creationDate)).join(userCreaDate).mapValues(x=>(x._1 - x._2)).filter(x => (x._2 >= 1000*3600*24*100L && x._2 <= 1000*3600*24*150L)).reduceByKey((_,_)=>{1}).map(x=>(x._1,1))





MapPartitionsRDD[114] at map at <console>:49

In [92]:
println(vetUser.count())
vetUser.take(30)

2027




Array((27115,1), (3454,1), (3443,1), (3586,1), (12518,1), (4499,1), (40073,1), (8657,1), (16137,1), (319,1), (28732,1), (6149,1), (49643,1), (48455,1), (32065,1), (58729,1), (25839,1), (17072,1), (6369,1), (1496,1), (32472,1), (58542,1), (52151,1), (18128,1), (10098,1), (40656,1), (35926,1), (48741,1), (7788,1), (27412,1))

In [93]:
//now we try to get the first question
//|(userid,(creationdate,vet_views,vet_score,vet_favorites,vet_answers))
val fisrtquestionpostsstruct = query.filter(_.postTypeId==1).map(x=>(x.ownerUserId,(x.creationDate,x.score, x.viewCount, x.answerCount, x.favoriteCount)))
val firstquestionpost = fisrtquestionpostsstruct.reduceByKey((x,y)=>{if(x._1<y._1) x else y})

ShuffledRDD[117] at reduceByKey at <console>:43





In [94]:
firstquestionpost.count()



23134



In [95]:
val firstPostsVetNew = firstquestionpost.leftOuterJoin(vetUser).map(x=>(x._2._2.getOrElse(0),x._2._1))

MapPartitionsRDD[121] at map at <console>:54





In [96]:
println(firstPostsVetNew.first())
println(firstPostsVetNew.count())

(0,(1323656453027,0,468,0,0))
23134




In [97]:
val statVetNew = firstPostsVetNew.mapValues(x=>(x._2,x._3,x._4,x._5,1)).reduceByKey((x,y)=>(x._1+y._1 , x._2+y._2 , x._3+y._3 , x._4+y._4 , x._5+y._5))



ShuffledRDD[123] at reduceByKey at <console>:56



In [100]:
println(statVetNew.count())
statVetNew.take(2)

2




Array((0,(44946,11887316,20742,12349,21316)), (1,(6442,1684192,2360,2365,1818)))

In [63]:
val statpos = statVetNew.filter(x=>x._1==0).map(x=>(x._2._1,x._2._2,x._2._3,x._2._4,x._2._5)).reduce((x,y)=>(x._1+y._1 , x._2+y._2 , x._3+y._3 , x._4+y._4 , x._5+y._5))



(44945,11887297,20742,12349,21316)



In [64]:
val statvet = statVetNew.filter(x=>x._1==1).map(x=>(x._2._1,x._2._2,x._2._3,x._2._4,x._2._5)).reduce((x,y)=>(x._1+y._1 , x._2+y._2 , x._3+y._3 , x._4+y._4 , x._5+y._5))



(3338,777656,1095,1376,762)



In [65]:
val a = statVetNew.filter(x=>(x._1!=1&&x._1!=0))



MapPartitionsRDD[66] at filter at <console>:58



In [67]:
a.first()





(10267970170,(1,10,1,0,1))

In [10]:
//val firstAnsPosts = keybyidpost.reduceByKey((x,y) => {if(x.creationDate <= y.creationDate) x else y})//.mapValues()
val firstAnsPosts = keybyidpost.reduceByKey((x,y) =>x)
firstAnsPosts.count()





org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 24, localhost): java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$Post
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91)
	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:64)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:

In [10]:
//q8
import org.apache.spark.mllib.linalg._
//import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

       val q6_input = query.map(x=>x.tags.toSeq)

       val word2vec = new Word2Vec()

       val word2vec_model = word2vec.setSeed(42L).fit(q6_input)

       val synonyms = word2vec_model.findSynonyms("ggplot2", 25)

       for((synonym, cosineSimilarity) <- synonyms) {
         println(s"('$synonym', $cosineSimilarity),")
       }

('lattice', 0.9487745761871338),
('shiny', 0.8836259245872498),
('data.frame', 0.881213366985321),
('r-grid', 0.8789350986480713),
('lm', 0.8762001395225525),
('plyr', 0.8754066228866577),
('boxplot', 0.8743278980255127),
('data.table', 0.8688276410102844),
('na', 0.8672543168067932),
('levelplot', 0.8644376993179321),
('geom-bar', 0.8603816628456116),
('quantmod', 0.8532227873802185),
('reshape', 0.8528526425361633),
('ggvis', 0.8518608212471008),
('r-factor', 0.8515896201133728),
('facet-wrap', 0.8468636274337769),
('vegan', 0.8437350392341614),
('dplyr', 0.8425593376159668),
('lme4', 0.8422262668609619),
('do.call', 0.8420583009719849),
('plotmath', 0.840865969657898),
('zoo', 0.8397353887557983),
('rgl', 0.839381217956543),
('gtable', 0.8386776447296143),
('read.table', 0.8386289477348328),




In [9]:
//q9
//get questions
//we try to get top 100 tags with largest count
//try to get the records with tags from above
//we try to tokenize all the record's body text
//try to produce a vector from the body text
//


import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.SQLContext







In [7]:
val question = query.filter(_.postTypeId==1)



MapPartitionsRDD[3] at filter at <console>:38



In [8]:
question.count()





52060

In [14]:
val tagcount = question.flatMap(x=>(x.tags)).map(x=>(x,1)).reduceByKey(_+_)



ShuffledRDD[12] at reduceByKey at <console>:40



In [15]:
val toptag = tagcount.sortBy(_._2,false).take(100).map(x=>(x._1))





Array(r, regression, time-series, machine-learning, probability, hypothesis-testing, distributions, self-study, logistic, correlation, classification, statistical-significance, bayesian, anova, normal-distribution, clustering, mathematical-statistics, confidence-interval, data-visualization, multiple-regression, estimation, categorical-data, mixed-model, generalized-linear-model, spss, variance, repeated-measures, sampling, t-test, pca, svm, forecasting, multivariate-analysis, chi-squared, cross-validation, maximum-likelihood, data-mining, modeling, neural-networks, data-transformation, predictive-models, matlab, nonparametric, interaction, survival, model-selection, p-value, linear-model, dataset, binomial, poisson, econometrics, standard-deviation, stata, mean, bootstrap, feature-selection, references, sample-size, interpretation, multiple-comparisons, optimization, least-squares, python, conditional-probability, random-forest, experiment-design, arima, prediction, panel-data, standa

In [23]:
//import org.apache.spark.SparkContext
//import org.apache.spark.ml.classification.LogisticRegression
//import org.apache.spark.ml.param.ParamMap
//import org.apache.spark.mllib.linalg.{Vector, Vectors}
//import org.apache.spark.mllib.regression.LabeledPoint
//import org.apache.spark.sql.SQLContext

//val sqlContext = new SQLContext(spark)






java.lang.NoClassDefFoundError: Could not initialize class 

In [26]:
import sqlContext.implicits._

val df = question.map(x=>(x.tags,x.body)).toDF()





java.lang.NoClassDefFoundError: Could not initialize class 

In [23]:
val des = "./result4/"





./result4/

In [24]:
question.map(x => x.tags.mkString(",") + ",cyx0723," + x.id + ",cyx0723," + x.body + ",cyx0723recordend").saveAsTextFile(des)





In [17]:
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) {
  val p = new java.io.PrintWriter(f)
  try { op(p) } finally { p.close() }
}
import java.io._
//val data = Array("Five","strings","in","a","file!")
printToFile(new File("toptag.txt")) { p =>
  toptag.foreach(p.println)
}





In [25]:
val destr = "./result5/train/"
val deste = "./result5/test/"

val splited = question.map(x => x.tags.mkString(",") + ",cyx0723," + x.id + ",cyx0723," + x.body + ",cyx0723recordend").randomSplit(Array(0.9,0.1),42)
splited(0).saveAsTextFile(destr)
splited(1).saveAsTextFile(deste)





In [27]:
splited(1).count()





5212

In [38]:
val testsp = question.randomSplit(Array(0.9,0.1),42)



Array(PartitionwiseSampledRDD[45] at randomSplit at <console>:43, PartitionwiseSampledRDD[46] at randomSplit at <console>:43)



In [39]:
testsp(1).count()



5212



In [44]:
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext.implicits._
case class Record(id: Int, tags: Array[String])
val df = question.map(x=>Record(x.id,x.tags)).toDF()





: 

In [45]:
import org.apache.spark.sql.SQLContext





In [53]:
case class Record(id: Int, tags: Array[String])
val sqlContext = new SQLContext(spark)





org.apache.spark.sql.SQLContext@40d27d90

In [61]:
import sqlContext.implicits._
val df = question.map(x=>x.id).toDF()



java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext$implicits$.intRddToDataFrameHolder(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/DataFrameHolder;

