In [22]:
import scala.xml._
import org.apache.spark.SparkContext._
import java.text.SimpleDateFormat
import java.io._

//load data
val lines_u = spark.textFile("fulldataset/allUsers/")
val lines_p = spark.textFile("fulldataset/allPosts/")

//format for parse datetime
val DateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")

java.text.SimpleDateFormat@d2ec523f





In [6]:
def parse_questions(row: xml.Elem) = {
    val id = (row \ "@AcceptedAnswerId").text
    val QuestionDate = DateFormat.parse((row \ "@CreationDate").text)
    (id, QuestionDate) 
}


def parse_answers(row: xml.Elem) = {
    val id = (row \ "@Id").text
    val AnswerDate = DateFormat.parse((row \ "@CreationDate").text)
    (id, AnswerDate)    
}


//define a function that marks quick answers (answers posted within 3 hours of the question)
def quick_answers(dQ: java.util.Date, dA: java.util.Date) = {
    val hour = dQ.getHours()
    val intervel = (dA.getTime() - dQ.getTime())/(3600*1000.0)
    if (intervel < 3){
        (hour, (1, 1))
    }
    else {
        (hour, (0, 1))
    }
    
}


//posts with accepted answers, return accepted answer id  
val question_post = lines_p.filter(x => x.contains("<row")&&(x.contains("/>"))&&(x.contains("AcceptedAnswerId")))
                           .map(row => scala.xml.XML.loadString(row))
                           .map(x => parse_questions(x))
                   
                
val accepted_answer_post = lines_p.filter(x => x.contains("<row")&&(x.contains("/>")))
                                  .map(row => scala.xml.XML.loadString(row))
                                  .filter(row => (row \ "@PostTypeId").text=="2")
                                  .map(x => parse_answers(x))

//join two tables and find the quick answers
val quick_answer_by_hour = question_post.join(accepted_answer_post)
                              .map{case (id, (questionDate, answerDate)) 
                                   => quick_answers(questionDate, answerDate)}
                              .reduceByKey{(x, y) => (x._1+y._1, x._2+y._2)}
                              .map{case (hour, (quick, all)) => (hour, quick.toFloat/all.toFloat)}

val res = quick_answer_by_hour.takeOrdered(24).map(_._2)
println(res.mkString(",\n"))

0.6905169,
0.69608223,
0.6996254,
0.70442957,
0.710104,
0.7178955,
0.72422063,
0.7270637,
0.7261533,
0.7229066,
0.728532,
0.73727894,
0.7444671,
0.7452482,
0.741457,
0.73444295,
0.7349357,
0.73950434,
0.7463565,
0.7475968,
0.7360097,
0.7158975,
0.7017918,
0.694689




# Identify verteran by first post stats

In [6]:
def parse_users(row: xml.Elem) = {
    val id = (row \ "@Id").text
    val creationDate = DateFormat.parse((row \ "@CreationDate").text)
    (id, creationDate)
}

def parse_posts_simple(row: xml.Elem) = {
    val id = (row \ "@OwnerUserId").text
    val activeDate = DateFormat.parse((row \ "@CreationDate").text)
    (id, activeDate)
}

def find_veterans(id: String, creationDate: java.util.Date, activeDate: java.util.Date)={
    val days = (activeDate.getTime() - creationDate.getTime())/(24*3600*1000.0)
    if (days>100 && days<150){
        (id, true)
    }
    else {
        (id, false)
    }
    
}


//Get user Id and user creation date
val users_creation = lines_u.filter(x => x.contains("<row")&&(x.contains("/>")))
                           .map(row => scala.xml.XML.loadString(row))
                           .map(x => parse_users(x))

//Get post's owneruserId and post creation date
val posts = lines_p.filter(x => x.contains("<row")&&(x.contains("/>")))
                                  .map(row => scala.xml.XML.loadString(row))
                                  .map(x => parse_posts_simple(x))
 
//Find the same users that have at least one post in the window of 100-150 days after the user creation date
val veterans = users_creation.join(posts).map{case (id, (creationDate, activeDate)) 
                                              => find_veterans(id, creationDate, activeDate)}
                                         .reduceByKey(_||_)
                                         .filter{case (id, boolean)=> boolean}
                                         .map{case (id, boolean) => id}.collect()

                                         

Array(2625152, 1436002, 3149113, 2086538, 953788, 2419281, 2510662, 1198901, 192999, 144449, 4265245, 148862, 3128823, 2638485, 556039, 389298, 3391143, 994947, 3933749, 3929684, 515380, 1820547, 941397, 106761, 1480139, 407774, 3119403, 733642, 895740, 803060, 1142380, 185608, 183579, 901325, 1865619, 4054019, 3423894, 2499570, 610674, 1239551, 1356077, 3422952, 1846192, 3401141, 3831571, 1415712, 2663340, 2514575, 1066513, 2809139, 1561006, 1285717, 1214848, 569872, 4187349, 2979715, 1971196, 909651, 3568094, 260533, 1626438, 342, 2378631, 2671318, 3789327, 3917293, 1325425, 732700, 2871095, 3022232, 82829, 684514, 2226895, 1691365, 4231622, 1184481, 1913805, 693934, 1915689, 2012698, 2287402, 1419625, 4202854, 2254721, 2516604, 4183436, 1183039, 2976744, 1192459, 75793, 1231225, 4064881, 2002336, 388356, 26301, 2506097, 3538384, 1514919, 1111228, 2792257, 2934643, 316545, 31308, 4121987, 1030861, 3253827, 1430060, 267417, 948781, 526329, 4137270, 3702154, 3117374, 682485, 2847327, 1





# Compare stats for verteran users and brief users

In [42]:
import scala.util.control.NonFatal //catch all nonfatal exceptions

def parse_posts_full(input: String)= {
    val row = scala.xml.XML.loadString(input)
    val id = (row \ "@OwnerUserId").text
    val creationDate = DateFormat.parse((row \ "@CreationDate").text).getTime()
    
    val postType_text = (row \ "@PostTypeId").text
    
    val postType = 
    if (postType_text=="1"){
        1
    }
    else {
        0
    }
    
    
    val score = 
    if (input.contains("Score")){
        try{(row \ "@Score").text.toInt}
        catch{case NonFatal(t) => 0 }
    }
    else {
        0
    }
    
    val views = 
    if (input.contains("ViewCount")){
        try{(row \ "@ViewCount").text.toInt}
        catch{case NonFatal(t) => 0 }
    }
    else {
        0
    }
    
    val answers = 
    if (input.contains("AnswerCount")){
        try{(row \ "@AnswerCount").text.toInt}
        catch{case NonFatal(t) => 0 }
    }
    else {
        0
    }
    
    val favs = 
    if (input.contains("FavoriteCount")){
        try{(row \ "@FavoriteCount").text.toInt}
        catch{case NonFatal(t) => 0 }
    }
    else {
        0
    }
    
       
    (id, (creationDate, postType, score, views, answers, favs))

}

def split_users(id:String, score:Int, views:Int, answers:Int, favs:Int)={
    if (veterans.contains(id)){
        ("vet", (score, views, answers, favs, 1))
    }
    else {
        ("brief", (score, views, answers, favs, 1))
    }
}


//compare average views, scores, number of favorites, and number of answers between "verteran users" and "brief unsers"
val first_posts_veteran = lines_p.filter(x => x.contains("<row")&&(x.contains("/>"))&&(x.contains("OwnerUserId"))
                                              &&(x.contains("PostTypeId=\"1\""))&&(x.contains("CreationDate")))
                                 .map(x => parse_posts_full(x))
                                 .reduceByKey{ (x,y) => {if (x._1< y._1) x else y}}
                                 .map{case (id, (creationDate, postType, score, views, answers, favs))
                                      => split_users(id, score, views, answers, favs)}
                                 .reduceByKey{(x,y)=> (x._1+y._1, x._2+y._2, x._3+y._3, x._4+y._4, x._5+y._5)}
                                 .map{case (id, (score, views, answers, favs, counts))
                                      => (id, views.toFloat/counts.toFloat, score.toFloat/counts.toFloat, 
                                         favs.toFloat/counts.toFloat, answers.toFloat/counts.toFloat)}
                                 .collect
                                  





Array((vet,1843.8676,2.2595208,0.8671652,1.8427472), (brief,1103.6241,1.142294,0.39187327,1.506717))