In [41]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, month, year, date_format, length, size, concat_ws,lit

In [2]:
import os

spark = SparkSession.builder.appName('Optimize I').getOrCreate()

#base_path = os.getcwd()

#project_path = ('/').join(base_path.split('/')[0:-3])

#answers_input_path = os.path.join(project_path, 'data/answers')

#questions_input_path = os.path.join(project_path, 'data/questions')

In [3]:
answers_input_path = os.getcwd()+'/data/answers'
questions_input_path = os.getcwd()+'/data/questions'

In [4]:
#job 0
answersDF = spark.read.option('path', answers_input_path).load()

In [5]:
#job 1
questionsDF = spark.read.option('path', questions_input_path).load()

In [6]:
#job 2
answers_month = answersDF.withColumn('month', month('creation_date')) \
    .groupBy('question_id', 'month').agg(count('*').alias('cnt'))

In [7]:
#job 3
resultDF = questionsDF.join(answers_month, 'question_id') \
    .select('question_id', 'creation_date', 'title', 'month', 'cnt')

In [8]:
#job 4
resultDF.orderBy('question_id', 'month').show(truncate=False)

                                                                                

+-----------+-----------------------+-------------------------------------------------------------+-----+---+
|question_id|creation_date          |title                                                        |month|cnt|
+-----------+-----------------------+-------------------------------------------------------------+-----+---+
|155989     |2014-12-31 17:59:44.5  |Frost bubble formation                                       |2    |1  |
|155989     |2014-12-31 17:59:44.5  |Frost bubble formation                                       |12   |1  |
|155990     |2014-12-31 18:51:15.123|The abstract space of metrics in GR                          |1    |1  |
|155990     |2014-12-31 18:51:15.123|The abstract space of metrics in GR                          |12   |1  |
|155992     |2014-12-31 19:44:10.697|centrifugal force and radius of its circle                   |12   |1  |
|155993     |2014-12-31 19:56:29.603|How can I estimate the cooking time of a roast?              |1    |1  |
|155995   

In [9]:
answersDF.createOrReplaceTempView("adfTab")
print('**************************************************ANSWERS**********************************\
*******************\n')
answersDF. \
    withColumn('creation_date',date_format('creation_date','MM/dd/yyyy')). \
    withColumnRenamed('creation_date','date'). \
    withColumnRenamed('comments','cmts'). \
    show(5,truncate=False)

print('schema: ')
answersDF.printSchema()
print('\n\n**********************************************QUESTIONS****************************\
*************************\n')
questionsDF.withColumnRenamed('accepted_answer_id','acc_answer_id').createOrReplaceTempView("qdfTab")
spark.sql('SELECT * from qdfTab where (length(title) + length(string(tags))) < 41'). \
    withColumn('creation_date',date_format('creation_date','MM/dd/yyyy')). \
    withColumnRenamed('creation_date','date'). \
    withColumnRenamed('comments','cmts'). \
    show(5,truncate=False)

print('schema: ')
questionsDF.withColumnRenamed('accepted_answer_id','acc_answer_id').printSchema()

print('\n\n**********************************************THE NUMBERS****************************\
*************************\n')
qids = set(questionsDF.select('question_id').rdd.map(lambda x : x[0]).collect())
qids_a = set(answersDF.select('question_id').rdd.map(lambda x : x[0]).collect())

aid = set(answersDF.select('answer_id').rdd.map(lambda x : x[0]).collect())
aaid_q = set(questionsDF.select('accepted_answer_id').rdd.map(lambda x : x[0]).collect())
aaid_q_nnull = set(questionsDF.select('accepted_answer_id').rdd.map(lambda x : x[0]).distinct().collect()) - set([None])

print('total questions asked: ' + str(len(qids)))
print('questions with atleast 1 answer:      ' + str(len(qids_a))+' ('+str(round(100*68173/86936,2))+'%)')
print('questions without any answers:        ' + str(len(qids - qids_a))+' ('+str(round(100*18763/86936,2))+'%)')
print('questions without an accepted answer: '+ str(questionsDF.filter('accepted_answer_id IS NULL').count()) \
                                              +' ('+str(round(100*53654/86936,2))+'%)'+'\n')

print('total answers provided: ' + str(len(aid)))
print('answers that were accepted:     ' + str(len(aaid_q_nnull))+' ('+str(round(100*33282/110714,2))+'%)')
print('answers that were not accepted: ' + str(len(aid - aaid_q_nnull))+' ('+str(round(100*77432/110714,2))+'%)')

print(str(round(110714/86936,2))+ ' ratio of answers to questions')

**************************************************ANSWERS*****************************************************

+-----------+---------+----------+----+-------+-----+
|question_id|answer_id|date      |cmts|user_id|score|
+-----------+---------+----------+----+-------+-----+
|226592     |226595   |12/29/2015|3   |82798  |2    |
|388057     |388062   |02/22/2018|8   |520    |21   |
|293286     |293305   |11/17/2016|0   |47472  |2    |
|442499     |442503   |11/21/2018|0   |137289 |0    |
|293009     |293031   |11/16/2016|0   |83721  |0    |
+-----------+---------+----------+----+-------+-----+
only showing top 5 rows

schema: 
root
 |-- question_id: long (nullable = true)
 |-- answer_id: long (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- comments: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- score: long (nullable = true)



**********************************************QUESTIONS*****************************************************

+-----------

                                                                                

total questions asked: 86936
questions with atleast 1 answer:      68173 (78.42%)
questions without any answers:        18763 (21.58%)
questions without an accepted answer: 53654 (61.72%)

total answers provided: 110714
answers that were accepted:     33282 (30.06%)
answers that were not accepted: 77432 (69.94%)
1.27 ratio of answers to questions


In [10]:
qDF = questionsDF.withColumnRenamed('comments', 'cmtQ'). \
    withColumnRenamed('user_id', 'userQ'). \
    withColumnRenamed('creation_date', 'dateQ'). \
    withColumnRenamed('question_id', 'idQ'). \
    withColumnRenamed('accepted_answer_id', 'idAA')
        
aDF = answersDF.withColumnRenamed('creation_date', 'dateA'). \
    withColumnRenamed('comments', 'cmtA'). \
    withColumnRenamed('user_id', 'userA'). \
    withColumnRenamed('question_id', 'idQ'). \
    withColumnRenamed('score', 'S'). \
    withColumnRenamed('answer_id', 'idA')

jDF = aDF.join(qDF, 'idQ'). \
    withColumn('dateQ',date_format('dateQ','MMddyyyy')). \
    withColumn('dateA',date_format('dateA','MMddyyyy')). \
    orderBy(['idQ','idA','S','views'],ascending=[True,False,False])

jDF.createOrReplaceTempView("jTab")
print('Answers left-joined to the right of questions:\n')
spark.sql('SELECT * from jTab where (length(title) + length(string(tags))) < 36 and size(tags)>1'). \
    select('idQ','dateQ','title','tags','userQ','cmtQ','views','idAA','idA','dateA','userA','cmtA','S'). \
    withColumn('tags',concat_ws(',',col('tags'))). \
    show(100,truncate=False)

print('\ncount of answers left-joined to the right of questions: ' + str(jDF.count()) +'\n')

Answers left-joined to the right of questions:

+------+--------+----------------------+----------------+------+----+-----+------+------+--------+------+----+---+
|idQ   |dateQ   |title                 |tags            |userQ |cmtQ|views|idAA  |idA   |dateA   |userA |cmtA|S  |
+------+--------+----------------------+----------------+------+----+-----+------+------+--------+------+----+---+
|155989|12312014|Frost bubble formation|ice,bubble      |68794 |1   |2032 |387977|387977|02222018|165299|2   |23 |
|155989|12312014|Frost bubble formation|ice,bubble      |68794 |1   |2032 |387977|155994|12312014|68800 |1   |2  |
|180684|05022015|Solar Power Fusion    |sun,fusion      |78234 |4   |121  |180691|180691|05022015|8007  |5   |2  |
|208335|09212015|Designing a lens      |optics,lenses   |93423 |4   |99   |null  |208339|09212015|43164 |0   |2  |
|218634|11152015|Transferable force    |forces,statics  |98564 |0   |31   |null  |218645|11152015|36562 |0   |0  |
|219260|11182015|Weighing a balo

In [11]:
jDF2 = qDF.join(aDF, 'idQ'). \
    withColumn('dateQ',date_format('dateQ','MMddyyyy')). \
    withColumn('dateA',date_format('dateA','MMddyyyy')). \
    orderBy('idA')

jDF2.createOrReplaceTempView("jTab2")
print('Questions left-joined to the right of answers:\n')
spark.sql('SELECT * from jTab2 where (length(title) + length(string(tags))) < 36 and size(tags)>1'). \
    select('idA','dateA','userA','cmtA','S','idQ','idAA','dateQ','title','tags','userQ','cmtQ','views'). \
    withColumn('tags',concat_ws(',',col('tags'))). \
    show(100,truncate=False)

print('\ncount of questions left-joined to the right of answers: ' + str(jDF2.count()) +'\n')

Questions left-joined to the right of answers:

+------+--------+------+----+---+------+------+--------+----------------------+----------------+------+----+-----+
|idA   |dateA   |userA |cmtA|S  |idQ   |idAA  |dateQ   |title                 |tags            |userQ |cmtQ|views|
+------+--------+------+----+---+------+------+--------+----------------------+----------------+------+----+-----+
|155994|12312014|68800 |1   |2  |155989|387977|12312014|Frost bubble formation|ice,bubble      |68794 |1   |2032 |
|180691|05022015|8007  |5   |2  |180684|180691|05022015|Solar Power Fusion    |sun,fusion      |78234 |4   |121  |
|208339|09212015|43164 |0   |2  |208335|null  |09212015|Designing a lens      |optics,lenses   |93423 |4   |99   |
|218645|11152015|36562 |0   |0  |218634|null  |11152015|Transferable force    |forces,statics  |98564 |0   |31   |
|219291|11182015|86822 |3   |2  |219260|null  |11182015|Weighing a baloon     |mass,buoyancy   |90248 |5   |502  |
|220093|11222015|3257  |0   |0  

In [12]:
print('number of answers per question per month: \n')
answers_month.orderBy('question_id','month','cnt').show()
print('answers_month total rows: '+str(answers_month.count()))
print('\nresultDF: \n')
resultDF.orderBy('question_id', 'month').show(truncate=False)
print('resultDF #rows: '+str(resultDF.count()))

number of answers per question per month: 

+-----------+-----+---+
|question_id|month|cnt|
+-----------+-----+---+
|     155989|    2|  1|
|     155989|   12|  1|
|     155990|    1|  1|
|     155990|   12|  1|
|     155992|   12|  1|
|     155993|    1|  1|
|     155995|    1|  3|
|     155996|    1|  2|
|     155996|    2|  1|
|     155996|   11|  1|
|     155997|    1|  3|
|     155999|    1|  1|
|     156008|    1|  2|
|     156008|   11|  1|
|     156016|    1|  1|
|     156020|    1|  1|
|     156021|    2|  1|
|     156022|    1|  1|
|     156025|    1|  1|
|     156026|    1|  3|
+-----------+-----+---+
only showing top 20 rows

answers_month total rows: 73030

resultDF: 

+-----------+-----------------------+-------------------------------------------------------------+-----+---+
|question_id|creation_date          |title                                                        |month|cnt|
+-----------+-----------------------+----------------------------------------------------

In [13]:
print('questionsDF total # rows:                  '+str(questionsDF.select('question_id').count()))
print('questionsDF question_id # distinct:        '+str(questionsDF.select('question_id').distinct().count())+' (total # of unique questions)')
print('questionsDF accepted_answer_id # distinct: '+str(questionsDF.select('accepted_answer_id').distinct().count())+' (< 86936 because some questions have no answers)\n')
print('answersDF total # rows:           '+str(answersDF.select('question_id').count()) + ' (total # of unique answers)')
print('answersDF  answer_id  # distinct: '+str(answersDF.select('answer_id').distinct().count())+ ' (> 86936 because some questions have multiple proposed answers)')
print('answersDF question_id # distinct:  '+str(answersDF.select('question_id').distinct().count())+' (> 33283 because not some answers arent accepted)')
print('                                         (< 86936 because some questions arent answers)')

questionsDF total # rows:                  86936
questionsDF question_id # distinct:        86936 (total # of unique questions)
questionsDF accepted_answer_id # distinct: 33283 (< 86936 because some questions have no answers)

answersDF total # rows:           110714 (total # of unique answers)
answersDF  answer_id  # distinct: 110714 (> 86936 because some questions have multiple proposed answers)
answersDF question_id # distinct:  68173 (> 33283 because not some answers arent accepted)
                                         (< 86936 because some questions arent answers)
