The following code aims to **characterize long-term vs. short-term users on Stack Exchange**. Long-term users are defined as having posted content between 100 and 150 days after account creation. The final output shows the *average scores, views, number of answers, and number of favorites* for users' very first question. Two folders (only relevant variables listed) were read line-by-line and then converted to df:

- allUsers
    - Id
    - CreationDate (account creation date)
- allPosts
    - OwnerUserID
    - CreationDate (post creation date)
    - PostTypeID (1 = question, 2 = answer)
    - Score
    - ViewCount
    - AnswerCount
    - FavoriteCount

In [1]:
from pyspark.sql import SparkSession

session = SparkSession.builder.appName('sparkproj').master('local[*]').getOrCreate()
sc = session.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/04 17:17:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/04 17:17:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
import xml.etree.ElementTree as ET

def parse_line(line):
    """
    this function uses xml to parse lines with '<row'
    """
    if '<row' in line:
        try:
            p = ET.fromstring(line)
            return p.attrib
        except ET.ParseError:
            return None
    else:
        return None

In [3]:
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F

In [4]:
user_lines = sc.textFile('spark-stats-data/allUsers/*.xml.gz')
users = user_lines.map(parse_line).filter(lambda x: x is not None)

In [5]:
# users_df
# select relevant cols
# convert types
# create ll and ul
# https://stackoverflow.com/questions/47897238/add-extra-hours-to-timestamp-columns-in-pyspark-data-frame
users_df = (users.toDF()
                 .select('Id', 'CreationDate')
                 .withColumn('Id', F.col('Id').cast(IntegerType()))
                 .withColumn('CreationDate', F.to_timestamp('CreationDate', "yyyy-MM-dd'T'HH:mm:ss.SSS"))
                 .withColumn('ll', F.col('CreationDate')+F.expr('INTERVAL 100 DAYS'))
                 .withColumn('ul', F.col('CreationDate')+F.expr('INTERVAL 150 DAYS'))
                 .drop('CreationDate')
           )

In [6]:
post_lines = sc.textFile('spark-stats-data/allPosts/*.xml.gz')
posts = post_lines.map(parse_line).filter(lambda x: x is not None)

In [7]:
# posts_df
# select relevant cols
# convert types
posts_df = (posts.toDF()
                 .select('OwnerUserID', 'CreationDate')
                 .withColumn('OwnerUserID', F.col('OwnerUserID').cast(IntegerType()))
                 .withColumn('CreationDate', F.to_timestamp('CreationDate', "yyyy-MM-dd'T'HH:mm:ss.SSS"))
            )

In [8]:
# helper function for in_range col
in_range = (F.when((F.col('CreationDate')>F.col('ll')) & (F.col('CreationDate')<F.col('ul')),
                   1).otherwise(0))

In [9]:
# helper function for status col
status = (F.when(F.col('sum_in_range')>0, 
                 'long').otherwise('short'))

In [10]:
# status_df
# join posts_df & users_df
# create in_range col
# group by Id and sum in_range
# create status col
status_df = (posts_df.join(users_df, on=(posts_df['OwnerUserId']==users_df['ID']))
                     .withColumn('in_range', in_range)
                     .groupby('Id')
                     .agg(F.sum(F.col('in_range')).alias('sum_in_range'))
                     .withColumn('status', status)
                     .drop('sum_in_range')
            )

In [11]:
# count of each status
status_df.groupby('status').agg(F.count(F.col('Id'))).show()

# note that this is a smaller dataset for proof of concept

+------+---------+
|status|count(Id)|
+------+---------+
|  long|     2027|
| short|    24820|
+------+---------+



In [12]:
# post_mask_df
# select relevant cols
# convert types
# filter questions (from answers)
# group by OwnerUserID and select min CreationDate
post_mask_df = (posts.toDF()
                     .select('PostTypeID',F.col('OwnerUserID').alias('maskID'),'CreationDate')
                     .withColumn('PostTypeID', F.col('PostTypeID').cast(IntegerType()))
                     .withColumn('maskID', F.col('maskID').cast(IntegerType()))
                     .withColumn('CreationDate', F.to_timestamp('CreationDate', "yyyy-MM-dd'T'HH:mm:ss.SSS"))
                     .filter(F.col('PostTypeID')==1)
                     .groupby('maskID')
                     .agg(F.min('CreationDate').alias('minCreationDate'))
                )

In [13]:
# post_deets_df
# select relevant cols
# convert types
post_deets_df = (posts.toDF()
                      .select('OwnerUserID','CreationDate','Score','ViewCount','AnswerCount','FavoriteCount')
                      .withColumn('OwnerUserID', F.col('OwnerUserID').cast(IntegerType()))
                      .withColumn('CreationDate', F.to_timestamp('CreationDate', "yyyy-MM-dd'T'HH:mm:ss.SSS"))
                      .withColumn('Score', F.col('Score').cast(IntegerType()))
                      .withColumn('ViewCount', F.col('ViewCount').cast(IntegerType()))
                      .withColumn('AnswerCount', F.col('AnswerCount').cast(IntegerType()))
                      .withColumn('FavoriteCount', F.col('FavoriteCount').cast(IntegerType()))
                )

In [14]:
# join post_mask_df
post_deets_df = post_deets_df.join(post_mask_df, 
                                   on=((post_deets_df['OwnerUserId']==post_mask_df['maskId']) & (post_deets_df['CreationDate']==post_mask_df['minCreationDate'])))

In [15]:
# join status_df
post_deets_df = post_deets_df.join(status_df, 
                                   on=(post_deets_df['OwnerUserId']==status_df['Id']))

In [16]:
# outcome_df
# replace na with 0
# group by status and average outcomes
outcome_df = (post_deets_df.na.fill(0)
                           .groupby('status')
                           .agg(F.avg('Score'),
                                F.avg('ViewCount'),
                                F.avg('AnswerCount'),
                                F.avg('FavoriteCount'))
             )

In [17]:
%%time

outcome_df.show()

+------+-----------------+-----------------+------------------+------------------+
|status|       avg(Score)|   avg(ViewCount)|  avg(AnswerCount)|avg(FavoriteCount)|
+------+-----------------+-----------------+------------------+------------------+
|  long| 3.54370533260033|925.8889499725124|1.2974161627267728|1.3001649257833974|
| short|2.100892857142857|553.4432330827068|0.9705827067669173|0.5757988721804511|
+------+-----------------+-----------------+------------------+------------------+

CPU times: user 9.99 ms, sys: 1.55 ms, total: 11.5 ms
Wall time: 13.8 s
