In [0]:
spark.conf.set("spark.driver.memory", "32g")
spark.conf.set("spark.executor.memory", "32g")


In [1]:
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql import functions as F
import datetime

In [2]:
post_schema = StructType([StructField("PostId", StringType(), True), 
                            StructField("PostTypeId", StringType(), True),
                            StructField("AcceptedAnswerId", StringType(), True),
                            StructField("CreationDate", TimestampType(), True),
                            StructField("Score", DoubleType(), True),
                            StructField("OwnerUserId", StringType(), True),
                            StructField("AnswerCount", DoubleType(), True),
                            StructField("CommentCount", DoubleType(), True),
                            StructField("ParentId", StringType(), True),
                            StructField("CreationDateOfOwner", TimestampType(), True),
                            StructField("BodyWordNum", DoubleType(), True)])
                            
user_schema = StructType([StructField("Id", StringType(), True), 
                            StructField("CreationDate", TimestampType(), True),
                            StructField("isChurn", BooleanType(), False)])                     

In [3]:
PATH = "file:///input/project/"
#posts_dist.csv  users_train_dist.csv  users_val_dist.csv
post_df = spark.read.format("csv").option("header","true").schema(post_schema).load(PATH + "posts_dist.csv")
user_train = spark.read.format("csv").option("header","true").schema(user_schema).load(PATH + "users_train_dist.csv")
user_val = spark.read.format("csv").option("header","true").schema(user_schema).load(PATH + "users_val_dist.csv")
user_test = spark.read.format("csv").option("header","true").schema(user_schema).load("file:///home/ss00/users_test_dist.csv")

In [4]:
post_df.printSchema()
user_train.printSchema()
user_val.printSchema()
user_test.printSchema()                        

In [5]:
user_train_df = user_train.withColumn('label', user_train['IsChurn'].cast(IntegerType()))
user_train_df = user_train_df.withColumn('data_type', F.lit('train'))
user_val_df = user_val.withColumn('label', user_val['IsChurn'].cast(IntegerType()))
user_val_df = user_val_df.withColumn('data_type', F.lit('val'))
user_test_df = user_test.withColumn('label', user_test['IsChurn'].cast(IntegerType()))
user_test_df = user_test_df.withColumn('data_type', F.lit('test'))

user_train_val_test = user_train_df.union(user_val_df).union(user_test_df)
user_train_val_test = user_train_val_test.withColumnRenamed("CreationDate", "Id_creation_date")

post_df = post_df.withColumnRenamed("OwnerUserId", "Id")
post_df = post_df.withColumn('AcceptedAnswerId', post_df['AcceptedAnswerId'].cast(IntegerType()))
post_df = post_df.withColumn('AcceptedAnswerId', post_df['AcceptedAnswerId'].cast(StringType()))

post_df = post_df.withColumn('ParentId', post_df['ParentId'].cast(IntegerType()))
post_df = post_df.withColumn('ParentId', post_df['ParentId'].cast(StringType()))
post_df = post_df.withColumnRenamed("CreationDate", "Post_creation_date").withColumnRenamed("CreationDateOfOwner", "Id_creation_date")
post_df = post_df.withColumn('diff_post_id_creation', F.datediff(F.col('Post_creation_date'), F.col('Id_creation_date')))
post_df.show()

user_train_val_test.show()

In [6]:
print("Train user distinct cnt: ", user_train_df.select("ID").distinct().count())
print("Val user distinct cnt: ", user_val_df.select("ID").distinct().count())
print("Test user distinct cnt: ", user_test_df.select("ID").distinct().count())
print("Train & Val user distinct cnt: ", user_train_val_test.select("ID").distinct().count())

In [7]:
user_train_val_test.groupBy('label').count().show()

In [8]:
post_df.select('Id').distinct().count()

### Make the total ID column from post_df


In [10]:
result_1 = post_df.select('Id').distinct()
print("row_num: " , result_1.count())
result_1.show(5)

### Add label from train_val_test_df and drop the null values



In [12]:
result_1 = result_1.join(user_train_val_test,['Id'], how = 'left').select('Id', 'label', 'data_type')
result_1 = result_1.filter(result_1['label'].isNotNull() == True)
result_1.show()

In [13]:
result_1.groupBy('data_Type').count().show()

### Question - total number of questions by user


In [15]:
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30) ).groupBy('Id').agg(F.countDistinct("PostId").alias('post_cnt')).orderBy('post_cnt', ascending=False)
query.show()

In [16]:
result_2 = result_1.join(query, ['Id'], how = 'left')
print("null val cnt: ", result_2.filter(result_2['post_cnt'].isNotNull() == False).count())
result_2 = result_2.fillna(0, subset=['post_cnt'])
print("row_num: " , result_2.count())
print("null val cnt: ", result_2.filter(result_2['post_cnt'].isNotNull() == False).count())
result_2.show()

### Question - total or avg number of having the answer for the questions


In [18]:
answ_created_dt = post_df.filter(post_df['PostTypeId'] == 2).select('ParentId', 'PostId', 'Post_creation_date').dropDuplicates() \
                            .withColumnRenamed("Post_creation_date", "A_Post_creation_date") \
                            .withColumnRenamed("ParentId", "A_ParentId") \
                            .withColumnRenamed("PostId", "A_PostId")
answ_created_dt.orderBy('A_ParentId').show()

In [19]:
question_created_dt = post_df.filter(post_df['PostTypeId'] == 1)
question_created_dt_merged = question_created_dt.join(answ_created_dt, question_created_dt['PostId'] == answ_created_dt['A_ParentId'], how = 'inner')
question_created_dt_merged.show()

In [20]:
question_created_dt_merged = question_created_dt_merged.withColumn('Aswr_id_created_date_diff', F.datediff(question_created_dt_merged['A_Post_creation_date'], question_created_dt_merged['Id_creation_date']))
question_created_dt_merged.orderBy('PostId').show()

In [21]:
query = question_created_dt_merged.filter((question_created_dt_merged['Aswr_id_created_date_diff'] <= 30) & 
                                            (question_created_dt_merged['diff_post_id_creation'] <= 30)) \
                                            .groupBy('Id').agg(F.countDistinct('A_PostId').alias('ttl_answ_cnt_30days'))
query.show()

In [22]:
result_3 = result_2.join(query, ['Id'], how = 'left')
result_3 = result_3.fillna(0, subset=['ttl_answ_cnt_30days'])
print("row_num: " , result_3.count())
print("null val cnt: ", result_3.filter(result_3['ttl_answ_cnt_30days'].isNotNull() == False).count())
result_3.show()

### Question - avg / min / max number of having the answer for the questions ******* 30일동안 받은 게시물 수

In [24]:
query = question_created_dt_merged.groupBy(['Id', 'PostId']).agg(F.countDistinct('A_PostId').alias('A_post_cnt')).groupBy(['Id']).agg(F.min('A_post_cnt').alias('min_answ_cnt_30days'))
query.show()

In [25]:
result_4 = result_3.join(query, ['Id'], how = 'left')
result_4 = result_4.fillna(0, subset=['min_answ_cnt_30days'])
print("row_num: " , result_4.count())
print("null val cnt: ", result_4.filter(result_4['min_answ_cnt_30days'].isNotNull() == False).count())
result_4.show()

In [26]:
query = question_created_dt_merged.groupBy(['Id', 'PostId']).agg(F.countDistinct('A_PostId').alias('A_post_cnt')).groupBy(['Id']).agg(F.max('A_post_cnt').alias('max_answ_cnt_30days'))
query.show()

In [27]:
result_5 = result_4.join(query, ['Id'], how = 'left')
result_5 = result_5.fillna(0, subset=['max_answ_cnt_30days'])
print("row_num: " , result_5.count())
print("null val cnt: ", result_5.filter(result_5['max_answ_cnt_30days'].isNotNull() == False).count())
result_5.show()

In [28]:
query = result_5.withColumn('avg_answ_cnt_30days', result_5['ttl_answ_cnt_30days'] / result_5['post_cnt']).select('Id', 'avg_answ_cnt_30days')
query.show()

In [29]:
result_6 = result_5.join(query, ['Id'], how = 'left')
result_6 = result_6.fillna(0, subset=['avg_answ_cnt_30days'])
print("row_num: " , result_6.count())
print("null val cnt: ", result_6.filter(result_6['avg_answ_cnt_30days'].isNotNull() == False).count())
result_6.show()

### Question - total number of having the accepted answer for the questions 



In [31]:
question_created_dt_merged_filtered = question_created_dt_merged.filter(question_created_dt_merged['AcceptedAnswerId'] == question_created_dt_merged['A_PostId'])
question_created_dt_merged_filtered.orderBy('PostId').show()
print(question_created_dt_merged_filtered.select('PostID').count())
print(question_created_dt_merged_filtered.select('PostID').distinct().count())

In [32]:
query = question_created_dt_merged_filtered.filter((question_created_dt_merged_filtered['PostTypeId'] == 1) & 
                                                    (question_created_dt_merged_filtered['AcceptedAnswerId'].isNotNull()) & 
                                                    (question_created_dt_merged_filtered['diff_post_id_creation'] <= 30) & 
                                                    (question_created_dt_merged_filtered['Aswr_id_created_date_diff'] <= 30)).groupBy('Id').count().withColumnRenamed('count', 'accepted_answ_cnt')
query.show()

In [33]:
result_7 = result_6.join(query, ['Id'], how = 'left')
result_7 = result_7.fillna(0, subset=['accepted_answ_cnt'])
print("row_num: " , result_7.count())
print("null val cnt: ", result_7.filter(result_7['accepted_answ_cnt'].isNotNull() == False).count())
result_7.show()

### Question - avg/min/max/ttl comment cnt


In [35]:
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.avg('CommentCount').alias('avg_comment_cnt'))
query.show()

In [36]:
result_8 = result_7.join(query, ['Id'], how = 'left')
result_8 = result_8.fillna(0, subset=['avg_comment_cnt'])
print("row_num: " , result_8.count())
print("null val cnt: ", result_8.filter(result_8['avg_comment_cnt'].isNotNull() == False).count())
result_8.show()

In [37]:
#total
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.sum('CommentCount').alias('ttl_comment_cnt'))
query.show()

In [38]:
result_9 = result_8.join(query, ['Id'], how = 'left')
result_9 = result_9.fillna(0, subset=['ttl_comment_cnt'])
print("row_num: " , result_9.count())
print("null val cnt: ", result_9.filter(result_9['ttl_comment_cnt'].isNotNull() == False).count())
result_9.show()

In [39]:
#min
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.min('CommentCount').alias('min_comment_cnt'))
query.show()

In [40]:
result_10 = result_9.join(query, ['Id'], how = 'left')
result_10 = result_10.fillna(0, subset=['min_comment_cnt'])
print("row_num: " , result_10.count())
print("null val cnt: ", result_10.filter(result_10['min_comment_cnt'].isNotNull() == False).count())
result_10.show()

In [41]:
#max
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.max('CommentCount').alias('max_comment_cnt'))
query.show()

In [42]:
result_11 = result_10.join(query, ['Id'], how = 'left')
result_11 = result_11.fillna(0, subset=['max_comment_cnt'])
print("row_num: " , result_11.count())
print("null val cnt: ", result_11.filter(result_11['max_comment_cnt'].isNotNull() == False).count())
result_11.show()

### Question - avg / max / min / ttl score


In [44]:
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.avg('Score').alias('q_avg_score'))
query.show()

In [45]:
result_12 = result_11.join(query, ['Id'], how = 'left')
result_12 = result_12.fillna(0, subset=['q_avg_score'])
print("row_num: " , result_12.count())
print("null val cnt: ", result_12.filter(result_12['q_avg_score'].isNotNull() == False).count())
result_12.show()

In [46]:
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.max('Score').alias('q_max_score'))
query.show()

In [47]:
result_13 = result_12.join(query, ['Id'], how = 'left')
result_13 = result_13.fillna(0, subset=['q_max_score'])
print("row_num: " , result_13.count())
print("null val cnt: ", result_13.filter(result_13['q_max_score'].isNotNull() == False).count())
result_13.show()

In [48]:
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.min('Score').alias('q_min_score'))
query.show()

In [49]:
result_14 = result_13.join(query, ['Id'], how = 'left')
result_14 = result_14.fillna(0, subset=['q_min_score'])
print("row_num: " , result_14.count())
print("null val cnt: ", result_14.filter(result_14['q_min_score'].isNotNull() == False).count())
result_14.show()

In [50]:
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.sum('Score').alias('q_ttl_score'))
query.show()

In [51]:
result_15 = result_14.join(query, ['Id'], how = 'left')
result_15 = result_15.fillna(0, subset=['q_ttl_score'])
print("row_num: " , result_15.count())
print("null val cnt: ", result_15.filter(result_15['q_ttl_score'].isNotNull() == False).count())
result_15.show()

### Question - avg / max / min / ttl body words in the questions


In [53]:
#avg
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.avg('BodyWordNum').alias('q_avg_bdword'))
query.show()

In [54]:
result_16 = result_15.join(query, ['Id'], how = 'left')
result_16 = result_16.fillna(0, subset=['q_avg_bdword'])
print("row_num: " , result_16.count())
print("null val cnt: ", result_16.filter(result_16['q_avg_bdword'].isNotNull() == False).count())
result_16.show()

In [55]:
#max
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.max('BodyWordNum').alias('q_max_bdword'))
query.show()

In [56]:
result_17 = result_16.join(query, ['Id'], how = 'left')
result_17 = result_17.fillna(0, subset=['q_max_bdword'])
print("row_num: " , result_17.count())
print("null val cnt: ", result_17.filter(result_17['q_max_bdword'].isNotNull() == False).count())
result_17.show()

In [57]:
#min
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.min('BodyWordNum').alias('q_min_bdword'))
query.show()

In [58]:
result_18 = result_17.join(query, ['Id'], how = 'left')
result_18 = result_18.fillna(0, subset=['q_min_bdword'])
print("row_num: " , result_18.count())
print("null val cnt: ", result_18.filter(result_18['q_min_bdword'].isNotNull() == False).count())
result_18.show()

In [59]:
#ttl
query = post_df.filter((post_df['PostTypeId'] == 1) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.sum('BodyWordNum').alias('q_ttl_bdword'))
query.show()

In [60]:
result_19 = result_18.join(query, ['Id'], how = 'left')
result_19 = result_19.fillna(0, subset=['q_ttl_bdword'])
print("row_num: " , result_19.count())
print("null val cnt: ", result_19.filter(result_19['q_ttl_bdword'].isNotNull() == False).count())
result_19.show()

### Answer - the number of giving answers 


In [62]:
query = post_df.filter((post_df['PostTypeId'] == 2) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.countDistinct('PostID').alias('answ_cnt'))
query.show()

In [63]:
result_20 = result_19.join(query, ['Id'], how = 'left')
result_20 = result_20.fillna(0, subset=['answ_cnt'])
print("row_num: " , result_20.count())
print("null val cnt: ", result_20.filter(result_20['answ_cnt'].isNotNull() == False).count())
result_20.show()

## Feature 9
### Answer - the number of accepted answers among the giving answers


In [65]:
question_accepted_id = post_df.filter(post_df['PostTypeId'] == 1).select("AcceptedAnswerId").dropDuplicates()
answer_for_post_id = post_df.filter((post_df['PostTypeId'] == 2) & (post_df['diff_post_id_creation'] <= 30)).select("PostId", "Id").dropDuplicates()

accepted_answer_for_id = question_accepted_id.join(answer_for_post_id, question_accepted_id['AcceptedAnswerId'] == answer_for_post_id["PostId"], how = 'inner') \
                                                .groupBy('Id').agg(F.countDistinct("PostId").alias('accepted_answer_cnt_by_qner'))
accepted_answer_for_id.show()

In [66]:
result_21 = result_20.join(accepted_answer_for_id, ['Id'], how = 'left')
result_21 = result_21.fillna(0, subset=['accepted_answer_cnt_by_qner'])
print("null val cnt: ", result_21.filter(result_21['accepted_answer_cnt_by_qner'].isNotNull() == False).count())
print("row_num: " , result_21.count())
print("null val cnt: ", result_21.filter(result_21['accepted_answer_cnt_by_qner'].isNotNull() == False).count())
result_21.show()

In [67]:
result_21.repartition(1).write.format('csv').mode("overwrite").save("file:///home/ss00/churn_prj/data1",header = 'true')

## Feature 10
### Answer - avg/ttl/min/max bodyword count in answers 


In [69]:
query = post_df.filter((post_df['PostTypeId'] == 2) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.avg('BodyWordNum').alias('avg_answ_bdword_cnt'))
query.show()

In [70]:
result_22 = result_1.join(query, ['Id'], how = 'left')
result_22 = result_22.fillna(0, subset=['avg_answ_bdword_cnt'])
print("row_num: " , result_22.count())
print("null val cnt: ", result_22.filter(result_22['avg_answ_bdword_cnt'].isNotNull() == False).count())
result_22.show()

In [71]:
query = post_df.filter((post_df['PostTypeId'] == 2) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.max('BodyWordNum').alias('max_answ_bdword_cnt'))
query.show()

In [72]:
result_23 = result_22.join(query, ['Id'], how = 'left')
result_23 = result_23.fillna(0, subset=['max_answ_bdword_cnt'])
print("row_num: " , result_23.count())
print("null val cnt: ", result_23.filter(result_23['max_answ_bdword_cnt'].isNotNull() == False).count())
result_23.show()

In [73]:
query = post_df.filter((post_df['PostTypeId'] == 2) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.min('BodyWordNum').alias('min_answ_bdword_cnt'))
query.show()

In [74]:
result_24 = result_23.join(query, ['Id'], how = 'left')
result_24 = result_24.fillna(0, subset=['min_answ_bdword_cnt'])
print("row_num: " , result_24.count())
# print("null val cnt: ", result_24.filter(result_24['min_answ_bdword_cnt'].isNotNull() == False).count())
result_24.show()

In [75]:
query = post_df.filter((post_df['PostTypeId'] == 2) & (post_df['diff_post_id_creation'] <= 30)).groupBy('Id').agg(F.sum('BodyWordNum').alias('ttl_answ_bdword_cnt'))
query.show()

In [76]:
result_25 = result_24.join(query, ['Id'], how = 'left')
result_25 = result_25.fillna(0, subset=['ttl_answ_bdword_cnt'])
print("row_num: " , result_25.count())
# print("null val cnt: ", result_25.filter(result_25['ttl_answ_bdword_cnt'].isNotNull() == False).count())
result_25.show()

## post and answer creation interval


In [78]:
from pyspark.sql.window import Window
post_df_lag = post_df.filter(post_df['diff_post_id_creation'] <= 30) \
                        .withColumn('lag_creation_date',F.lag(post_df['Post_creation_date']).over(Window.partitionBy("Id").orderBy(F.col("Post_creation_date").desc())))
post_df_lag = post_df_lag.withColumn('lag_creation_date_final', F.when(post_df_lag['lag_creation_date'].isNull(), F.date_add(post_df_lag['Id_creation_date'], 30)).otherwise(post_df_lag['lag_creation_date']))
post_df_lag = post_df_lag.withColumn('post_answer_interval', F.datediff(post_df_lag['lag_creation_date_final'], post_df_lag['Post_creation_date']))

post_df_lag.show()

 

In [79]:
result_26 = post_df_lag.filter(post_df_lag['diff_post_id_creation'] <= 30).groupBy('Id').agg(F.avg('post_answer_interval').alias('avg_post_answer_interval'))
result_27 = post_df_lag.filter(post_df_lag['diff_post_id_creation'] <= 30).groupBy('Id').agg(F.min('post_answer_interval').alias('min_post_answer_interval'))
result_28 = post_df_lag.filter(post_df_lag['diff_post_id_creation'] <= 30).groupBy('Id').agg(F.max('post_answer_interval').alias('max_post_answer_interval'))
result_28.show()

In [80]:
merged = result_25.join(result_26, ['Id'], how = 'left')
merged = merged.fillna(30, subset=['avg_post_answer_interval'])
print("row_num: " , merged.count())
print("null val cnt: ", merged.filter(merged['avg_post_answer_interval'].isNotNull() == False).count())
merged.show()

In [81]:
merged = merged.join(result_27, ['Id'], how = 'left')
merged = merged.fillna(30, subset=['min_post_answer_interval'])
print("row_num: " , merged.count())
print("null val cnt: ", merged.filter(merged['min_post_answer_interval'].isNotNull() == False).count())
merged.show()

In [82]:
merged = merged.join(result_28, ['Id'], how = 'left')
merged = merged.fillna(30, subset=['max_post_answer_interval'])
print("row_num: " , merged.count())
print("null val cnt: ", merged.filter(merged['max_post_answer_interval'].isNotNull() == False).count())
merged.show()

## last_anw_post_dt_time_passed



In [84]:
last_post_answ_df = post_df.filter(post_df['diff_post_id_creation'] <= 30).groupBy('Id').agg(F.max('Post_creation_date').alias('last_post_answer_dt'))
id_thirtyday_passed_dt = post_df.withColumn('id_thirtyday_passed_dt', F.date_add(post_df['Id_creation_date'], 30)).groupBy('Id').agg(F.max('id_thirtyday_passed_dt').alias('id_thirtyday_passed_dt'))
time_passed_df = id_thirtyday_passed_dt.join(last_post_answ_df,['Id'],how = 'inner').withColumn('last_anw_post_dt_time_passed', F.datediff(F.col('id_thirtyday_passed_dt'), F.col('last_post_answer_dt'))).select('Id', 'last_anw_post_dt_time_passed')
time_passed_df.show()

In [85]:
merged = merged.join(time_passed_df, ['Id'], how = 'left')
print("null val cnt: ", merged.filter(merged['last_anw_post_dt_time_passed'].isNotNull() == False).count())
merged = merged.fillna(30, subset=['last_anw_post_dt_time_passed'])
print("row_num: " , merged.count())
print("null val cnt: ", merged.filter(merged['last_anw_post_dt_time_passed'].isNotNull() == False).count())
merged.show()

In [86]:
merged.repartition(1).write.format('csv').mode("overwrite").save("file:///home/ss00/churn_prj/data2",header = 'true')

## data1 and data2 will be merged in the jupyter notebook!