# 1. Notebook configuration

In [1]:
%%configure
{"conf": {
    "spark.jars.packages":"com.databricks:spark-xml_2.12:0.16.0",
    "spark.pyspark.python":  "python3",
    "spark.pyspark.virtualenv.enabled": "true",
    "spark.pyspark.virtualenv.type":"native",
    "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
%pip install boto3 --upgrade
%pip install botocore --upgrade
%pip install emr-notebooks-magics --upgrade

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


# 2. Packages and modules

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
import re

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5,application_1685638037390_0006,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def text_preprocessing(df, col_name):
    """Function for text cleaning. """
    
    df = df.na.fill('', subset=col_name)
    
    patterns = [
        "<.*?>", #clean everything inside <>
        "<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});", #remove other html entities
        "(\s)?@\w+", #remove usernames @sth
        "\'\w+", #remove english s
        "-|/+", #remove '-' or '/'
        "\W+", #all characters which not build the words
        "\S*\d+\S*", #words with digits
    ]

    for pattern in patterns:
        df = df.withColumn(col_name, F.regexp_replace(col_name, pattern, " "))

    tokenizer = RegexTokenizer(toLowercase=True, inputCol=col_name, outputCol='words').setMinTokenLength(2)
    df = tokenizer.transform(df)

    stop_words = [re.sub("\'\w+",'', word) for word in StopWordsRemover().getStopWords()]
    sw_remover = StopWordsRemover(inputCol='words', outputCol='words_cleaned', stopWords=stop_words)
    df = sw_remover.transform(df)
    
    new_col_name = col_name
    df = df.drop(col_name, 'words').withColumnRenamed('words_cleaned', col_name)

    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def tags_cleaner(df, col_name):
    """ Function which cleans tags data. """
    
    df = df\
        .na.fill('', subset=col_name)\
        .withColumn(col_name, F.expr(f"substring({col_name}, 2, length({col_name})-2)"))\
        .withColumn(col_name, F.split(F.col(col_name), '><'))
    
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 3. Data cleaning

## 3.1. Posts

### Load the data

In [6]:
source_path = "s3://my-project-213242/drop/Posts.xml"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
schema_posts_df = StructType([
                        StructField("_AcceptedAnswerId",      IntegerType()), 
                        StructField("_AnswerCount",           IntegerType()), 
                        StructField("_Body",                  StringType()), 
                        StructField("_ClosedDate",            TimestampType()), 
                        StructField("_CommentCount",          IntegerType()), 
                        StructField("_CommunityOwnedDate",    TimestampType()), 
                        StructField("_ContentLicense",        StringType()), 
                        StructField("_CreationDate",          TimestampType()), 
                        StructField("_FavoriteCount",         IntegerType()), 
                        StructField("_Id",                    IntegerType()), 
                        StructField("_LastActivityDate",      TimestampType()), 
                        StructField("_LastEditDate",          TimestampType()), 
                        StructField("_LastEditorDisplayName", StringType()), 
                        StructField("_LastEditorUserId",      IntegerType()), 
                        StructField("_OwnerDisplayName",      StringType()), 
                        StructField("_OwnerUserId",           IntegerType()), 
                        StructField("_ParentId",              IntegerType()), 
                        StructField("_PostTypeId",            IntegerType()), 
                        StructField("_Score",                 IntegerType()), 
                        StructField("_Tags",                  StringType()), 
                        StructField("_Title",                 StringType()), 
                        StructField("_ViewCount",             IntegerType())
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
posts_df = (
    spark.read.format("com.databricks.spark.xml")
    .option("rowTag", "row")
    .option("charset", "UTF8")
    .schema(schema_posts_df) 
    .option("treatEmptyValuesAsNulls", "true")
    .load(source_path)
    .cache()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Delete _ from column names

In [9]:
posts_df = posts_df.select([F.col(col_name).alias(col_name.replace('_', '')) for col_name in posts_df.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Data exploration

- Number of rows

In [10]:
posts_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

63074

GroupBy and count `PostTypeId`:
- 1 = Question
- 2 = Answer
- 3 = Orphaned tag wiki
- 4 = Tag wiki excerpt
- 5 = Tag wiki
- 6 = Moderator nomination
- 7 = "Wiki placeholder" (seems to only be the election description)
- 8 = Privilege wiki

In [11]:
posts_df.groupBy('PostTypeId').count().orderBy('PostTypeId').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+
|PostTypeId|count|
+----------+-----+
|         1|22215|
|         2|36479|
|         4| 2177|
|         5| 2177|
|         6|   21|
|         7|    5|
+----------+-----+

- Display

In [12]:
%%display
posts_df.limit(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [13]:
posts_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- AcceptedAnswerId: integer (nullable = true)
 |-- AnswerCount: integer (nullable = true)
 |-- Body: string (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- CommentCount: integer (nullable = true)
 |-- CommunityOwnedDate: timestamp (nullable = true)
 |-- ContentLicense: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- FavoriteCount: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- LastEditDate: timestamp (nullable = true)
 |-- LastEditorDisplayName: string (nullable = true)
 |-- LastEditorUserId: integer (nullable = true)
 |-- OwnerDisplayName: string (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- PostTypeId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ViewCount: integer (nullable = true)

### Data cleaning

- Cleaning `Body` - text preprocessing

In [14]:
posts_df = text_preprocessing(posts_df, 'Body')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Cleaning `Title` - text preprocessing

In [15]:
posts_df = text_preprocessing(posts_df, 'Title')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Fill missing ID values (`AcceptedAnswerId`, `ParentId`, `LastEditorUserId`) with -1

In [16]:
posts_df = posts_df.na.fill(-1, subset=['AcceptedAnswerId', 'ParentId'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Clean `Tags`

In [17]:
posts_df = tags_cleaner(posts_df, 'Tags')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Clean `OwnerUserId` - put -99 instead of `NaN` if user is deleted

In [18]:
posts_df = posts_df.na.fill(-99, subset=['OwnerUserId', 'LastEditorUserId'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Drop `LastEditorDisplayName` - too little data

In [19]:
posts_df\
    .withColumn("isnull", F.when(F.col('LastEditorDisplayName').isNull(), 1).otherwise(0))\
    .groupBy('PostTypeId', 'isnull').count().orderBy('PostTypeId', 'isnull')\
    .show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+-----+
|PostTypeId|isnull|count|
+----------+------+-----+
|         1|     0|  370|
|         1|     1|21845|
|         2|     0|  487|
|         2|     1|35992|
|         4|     0|   38|
|         4|     1| 2139|
|         5|     0|   25|
|         5|     1| 2152|
|         6|     1|   21|
|         7|     1|    5|
+----------+------+-----+

In [20]:
posts_df = posts_df.drop('LastEditorDisplayName')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Drop `OwnerDisplayName` - too little data

In [21]:
posts_df\
    .withColumn("isnull", F.when(F.col('OwnerDisplayName').isNull(), 1).otherwise(0))\
    .groupBy('PostTypeId', 'isnull').count().orderBy('PostTypeId', 'isnull')\
    .show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+-----+
|PostTypeId|isnull|count|
+----------+------+-----+
|         1|     0|  894|
|         1|     1|21321|
|         2|     0| 1051|
|         2|     1|35428|
|         4|     0|   37|
|         4|     1| 2140|
|         5|     0|   25|
|         5|     1| 2152|
|         6|     1|   21|
|         7|     1|    5|
+----------+------+-----+

In [22]:
posts_df = posts_df.drop('OwnerDisplayName')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Cleaning results

In [23]:
posts_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- AcceptedAnswerId: integer (nullable = true)
 |-- AnswerCount: integer (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- CommentCount: integer (nullable = true)
 |-- CommunityOwnedDate: timestamp (nullable = true)
 |-- ContentLicense: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- FavoriteCount: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- LastEditDate: timestamp (nullable = true)
 |-- LastEditorUserId: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- PostTypeId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Tags: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- ViewCount: integer (nullable = true)
 |-- Body: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Title: array (nullable = true)
 |    |-- element: string 

- Show null values

In [24]:
df = posts_df
cols_nums = [col[0] for col in df.dtypes if col[1] in ('bigint', 'int', 'string')]
df = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in cols_nums])
df.show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0-----------------
 AcceptedAnswerId | 0     
 AnswerCount      | 40859 
 CommentCount     | 0     
 ContentLicense   | 0     
 FavoriteCount    | 58188 
 Id               | 0     
 LastEditorUserId | 0     
 OwnerUserId      | 0     
 ParentId         | 0     
 PostTypeId       | 0     
 Score            | 0     
 ViewCount        | 40859

In [25]:
%%display
posts_df.limit(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

- Number of rows

In [26]:
posts_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

63074

### Save the data

In [27]:
sink_path = "s3://my-project-213242/cleaned/Posts/"
posts_df.write.mode("overwrite").parquet(sink_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.2. Comments

### Load the data

In [28]:
source_path = "s3://my-project-213242/drop/Comments.xml"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
schema_comments_df = StructType([
                        StructField("_ContentLicense",        StringType()), 
                        StructField("_CreationDate",          TimestampType()), 
                        StructField("_Id",                    IntegerType()),
                        StructField("_PostId",                IntegerType()),
                        StructField("_Score",                 IntegerType()),
                        StructField("_Text",                  StringType()), 
                        StructField("_UserDisplayName",       StringType()), 
                        StructField("_UserId",                IntegerType()), 
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
comments_df  = (
    spark.read.format("com.databricks.spark.xml")
    .option("rowTag", "row")
    .option("charset", "UTF8")
    .schema(schema_comments_df) 
    .option("treatEmptyValuesAsNulls", "true")
    .load(source_path)
    .cache()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Delete _ from column names

In [31]:
comments_df = comments_df.select([F.col(col_name).alias(col_name.replace('_', '')) for col_name in comments_df.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Data exploration

In [32]:
comments_df.limit(20).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+---+------+-----+--------------------+---------------+------+
|ContentLicense|        CreationDate| Id|PostId|Score|                Text|UserDisplayName|UserId|
+--------------+--------------------+---+------+-----+--------------------+---------------+------+
|  CC BY-SA 3.0|2011-11-30 19:53:...|  7|     8|    2|Blake Edwards als...|           null|    22|
|  CC BY-SA 3.0|2011-11-30 19:55:...| 10|     4|    2|Interesting. I've...|           null|    29|
|  CC BY-SA 3.0|2011-11-30 20:10:...| 11|     3|    0|Is that the hour ...|           null|    10|
|  CC BY-SA 3.0|2011-11-30 21:53:...| 19|    17|    0|I know that Lucas...|           null|    36|
|  CC BY-SA 3.0|2011-11-30 22:00:...| 20|    17|    0|@bazmegakapa - fa...|           null|    40|
|  CC BY-SA 3.0|2011-11-30 22:16:...| 21|    18|    5|Excellent questio...|           null|    66|
|  CC BY-SA 3.0|2011-11-30 22:20:...| 22|    24|    1|Oh, we have the m...|           null|    66|
|  CC BY-S

In [33]:
comments_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

99059

In [34]:
comments_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ContentLicense: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- PostId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Text: string (nullable = true)
 |-- UserDisplayName: string (nullable = true)
 |-- UserId: integer (nullable = true)

### Data cleaning

- Cleaning `Text` - text preprocessing

In [35]:
comments_df = text_preprocessing(comments_df, 'Text')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Clean `UserId` - put -99 instead of `NaN` if user is deleted

In [36]:
comments_df = comments_df.na.fill(-99, subset=['UserId'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Drop `UserDisplayName` - too little data

In [37]:
comments_df\
    .withColumn("isnull", F.when(F.col('UserDisplayName').isNull(), 1).otherwise(0))\
    .groupBy('isnull').count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-----+
|isnull|count|
+------+-----+
|     1|95390|
|     0| 3669|
+------+-----+

In [38]:
comments_df = comments_df.drop('UserDisplayName')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Cleaning results

In [39]:
comments_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ContentLicense: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- PostId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- UserId: integer (nullable = true)
 |-- Text: array (nullable = true)
 |    |-- element: string (containsNull = true)

In [40]:
df = comments_df
cols_nums = [col[0] for col in df.dtypes if col[1] in ('bigint', 'int', 'string')]
df = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in cols_nums])
df.show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0-------------
 ContentLicense | 0   
 Id             | 0   
 PostId         | 0   
 Score          | 0   
 UserId         | 0

In [41]:
comments_df.limit(20).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+---+------+-----+------+--------------------+
|ContentLicense|        CreationDate| Id|PostId|Score|UserId|                Text|
+--------------+--------------------+---+------+-----+------+--------------------+
|  CC BY-SA 3.0|2011-11-30 19:53:...|  7|     8|    2|    22|[blake, edwards, ...|
|  CC BY-SA 3.0|2011-11-30 19:55:...| 10|     4|    2|    29|[interesting, alw...|
|  CC BY-SA 3.0|2011-11-30 20:10:...| 11|     3|    0|    10|[hour, plus, long...|
|  CC BY-SA 3.0|2011-11-30 21:53:...| 19|    17|    0|    36|[know, lucas, int...|
|  CC BY-SA 3.0|2011-11-30 22:00:...| 20|    17|    0|    40|[fair, point, als...|
|  CC BY-SA 3.0|2011-11-30 22:16:...| 21|    18|    5|    66|[excellent, quest...|
|  CC BY-SA 3.0|2011-11-30 22:20:...| 22|    24|    1|    66|[oh, motive, just...|
|  CC BY-SA 3.0|2011-11-30 22:25:...| 23|    26|    2|    22|[leave, open, ano...|
|  CC BY-SA 3.0|2011-11-30 22:35:...| 26|    26|    1|    26|[interesting, tho...|
|  C

In [42]:
comments_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

99059

### Save the data

In [43]:
sink_path = "s3://my-project-213242/cleaned/Comments/"
comments_df.write.mode("overwrite").parquet(sink_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.3. PostHistory 

### Load the data

In [44]:
source_path = "s3://my-project-213242/drop/PostHistory.xml"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
schema_postHistory_df = StructType([
                        StructField("_Comment",               StringType()), 
                        StructField("_ContentLicense",        StringType()), 
                        StructField("_CreationDate",          TimestampType()), 
                        StructField("_Id",                    IntegerType()), 
                        StructField("_PostHistoryTypeId",     IntegerType()), 
                        StructField("_PostId",                IntegerType()), 
                        StructField("_RevisionGUID",          StringType()), 
                        StructField("_Text",                  StringType()), 
                        StructField("_UserDisplayName",       StringType()), 
                        StructField("_UserId",                IntegerType())
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
postHistory_df  = (
    spark.read.format("com.databricks.spark.xml")
    .option("rowTag", "row")
    .option("charset", "UTF8")
    .schema(schema_postHistory_df) 
    .option("treatEmptyValuesAsNulls", "true")
    .load(source_path)
    .cache()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Delete _ from column names

In [47]:
postHistory_df = postHistory_df.select([F.col(col_name).alias(col_name.replace('_', '')) for col_name in postHistory_df.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Data exploration

In [48]:
postHistory_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Comment: string (nullable = true)
 |-- ContentLicense: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- PostHistoryTypeId: integer (nullable = true)
 |-- PostId: integer (nullable = true)
 |-- RevisionGUID: string (nullable = true)
 |-- Text: string (nullable = true)
 |-- UserDisplayName: string (nullable = true)
 |-- UserId: integer (nullable = true)

In [49]:
%%display
postHistory_df.limit(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [50]:
postHistory_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

238881

In [51]:
postHistory_df.select("PostHistoryTypeId").distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

29

### Data cleaning

- Clean `UserId` - put -99 instead of `NaN` if user is deleted

In [52]:
postHistory_df = postHistory_df.na.fill(-99, subset=['UserId'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- `UserDisplayName` - is valueable for PostHistoryTypeId 10 (Post Closed) and 11 (Post Reopened)

In [53]:
postHistory_df\
    .withColumn("isnull", F.when(F.col('UserDisplayName').isNotNull(), 0).otherwise(1))\
    .groupBy('PostHistoryTypeId', 'isnull').count().orderBy('PostHistoryTypeId', 'isnull')\
    .show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+------+-----+
|PostHistoryTypeId|isnull|count|
+-----------------+------+-----+
|                1|     0|  853|
|                1|     1|22978|
|                2|     0| 1882|
|                2|     1|61192|
|                3|     0|  853|
|                3|     1|21362|
|                4|     0|  258|
|                4|     1|13171|
|                5|     0| 1450|
|                5|     1|63740|
|                6|     0|  216|
|                6|     1|14229|
|                7|     0|    9|
|                7|     1|  306|
|                8|     0|   13|
|                8|     1|  654|
|                9|     0|    4|
|                9|     1|  200|
|               10|     1| 2460|
|               11|     1|  382|
+-----------------+------+-----+
only showing top 20 rows

### Cleaning results

In [54]:
postHistory_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Comment: string (nullable = true)
 |-- ContentLicense: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- PostHistoryTypeId: integer (nullable = true)
 |-- PostId: integer (nullable = true)
 |-- RevisionGUID: string (nullable = true)
 |-- Text: string (nullable = true)
 |-- UserDisplayName: string (nullable = true)
 |-- UserId: integer (nullable = true)

In [55]:
df = postHistory_df
cols_nums = [col[0] for col in df.dtypes if col[1] in ('bigint', 'int', 'string')]
df = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in cols_nums])
df.show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0-------------------
 Comment           | 115658 
 ContentLicense    | 26173  
 Id                | 0      
 PostHistoryTypeId | 0      
 PostId            | 0      
 RevisionGUID      | 0      
 Text              | 35032  
 UserDisplayName   | 233270 
 UserId            | 0

In [56]:
%%display
postHistory_df.limit(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [57]:
postHistory_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

238881

### Save the data

In [58]:
sink_path = "s3://my-project-213242/cleaned/PostHistory/"
postHistory_df.write.mode("overwrite").parquet(sink_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.4. PostLinks

### Load the data

In [59]:
source_path = "s3://my-project-213242/drop/PostLinks.xml"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
schema_postLinks_df = StructType([
                        StructField("_CreationDate",          TimestampType()), 
                        StructField("_Id",                    IntegerType()),
                        StructField("_LinkTypeId",            IntegerType()),
                        StructField("_PostId",                IntegerType()),
                        StructField("_RelatedPostId",         IntegerType())
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
postLinks_df = (
    spark.read.format("com.databricks.spark.xml")
    .option("rowTag", "row")
    .option("charset", "UTF8")
    .schema(schema_postLinks_df) 
    .option("treatEmptyValuesAsNulls", "true")
    .load(source_path)
    .cache()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Delete _ from column names

In [62]:
postLinks_df = postLinks_df.select([F.col(col_name).alias(col_name.replace('_', '')) for col_name in postLinks_df.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Data exploration

In [63]:
postLinks_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CreationDate: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- LinkTypeId: integer (nullable = true)
 |-- PostId: integer (nullable = true)
 |-- RelatedPostId: integer (nullable = true)

In [64]:
postLinks_df.limit(20).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+-----+----------+------+-------------+
|CreationDate           |Id   |LinkTypeId|PostId|RelatedPostId|
+-----------------------+-----+----------+------+-------------+
|2011-12-01 16:34:41.333|948  |1         |65    |90           |
|2011-12-04 06:07:21.05 |1722 |1         |99    |33           |
|2011-12-04 06:07:21.05 |1723 |1         |99    |53           |
|2011-12-09 16:14:42.213|3828 |1         |382   |328          |
|2011-12-10 15:26:10.283|4298 |1         |387   |334          |
|2011-12-18 12:42:34.397|6005 |1         |552   |35           |
|2011-12-18 12:42:34.4  |6006 |1         |552   |452          |
|2011-12-18 12:42:34.4  |6007 |1         |552   |61           |
|2011-12-19 06:01:43.193|6114 |1         |582   |222          |
|2011-12-22 15:59:31.68 |6709 |1         |617   |565          |
|2012-01-03 22:53:47.677|9598 |1         |770   |696          |
|2012-01-06 17:37:01.903|10329|1         |736   |604          |
|2012-01-11 17:07:42.853|11612|1        

In [65]:
postLinks_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5338

- LinkTypeId type of link
 - 1 = Linked (PostId contains a link to RelatedPostId)
 - 3 = Duplicate (PostId is a duplicate of RelatedPostId)

In [66]:
postLinks_df.groupBy('LinkTypeId').count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+
|LinkTypeId|count|
+----------+-----+
|         1| 3972|
|         3| 1366|
+----------+-----+

### Cleaning results

In [67]:
postLinks_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CreationDate: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- LinkTypeId: integer (nullable = true)
 |-- PostId: integer (nullable = true)
 |-- RelatedPostId: integer (nullable = true)

In [68]:
df = postLinks_df
cols_nums = [col[0] for col in df.dtypes if col[1] in ('bigint', 'int', 'string')]
df = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in cols_nums])
df.show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0------------
 Id            | 0   
 LinkTypeId    | 0   
 PostId        | 0   
 RelatedPostId | 0

In [69]:
%%display
postLinks_df.limit(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [70]:
postLinks_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5338

### Save the data

In [71]:
sink_path = "s3://my-project-213242/cleaned/PostLinks/"
postLinks_df.write.mode("overwrite").parquet(sink_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.5. Badges

### Load the data

In [72]:
source_path = "s3://my-project-213242/drop/Badges.xml"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [73]:
schema_badges_df = StructType([
                        StructField("_Class",                 IntegerType()),
                        StructField("_Date",                  TimestampType()), 
                        StructField("_Id",                    IntegerType()),
                        StructField("_Name",                  StringType()),
                        StructField("_TagBased",              BooleanType()),
                        StructField("_UserId",                IntegerType())
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
badges_df = (
    spark.read.format("com.databricks.spark.xml")
    .option("rowTag", "row")
    .option("charset", "UTF8")
    .schema(schema_badges_df) 
    .option("treatEmptyValuesAsNulls", "true")
    .load(source_path)
    .cache()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Delete _ from column names

In [75]:
badges_df = badges_df.select([F.col(col_name).alias(col_name.replace('_', '')) for col_name in badges_df.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Data exploration 

In [76]:
badges_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Class: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- TagBased: boolean (nullable = true)
 |-- UserId: integer (nullable = true)

In [77]:
badges_df.limit(20).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----------------------+---+--------------+--------+------+
|Class|Date                   |Id |Name          |TagBased|UserId|
+-----+-----------------------+---+--------------+--------+------+
|3    |2011-11-30 18:10:25.763|1  |Autobiographer|false   |1     |
|3    |2011-11-30 18:15:25.777|2  |Autobiographer|false   |2     |
|3    |2011-11-30 18:15:25.777|3  |Autobiographer|false   |3     |
|3    |2011-11-30 18:45:38.023|4  |Autobiographer|false   |5     |
|3    |2011-11-30 18:45:38.023|5  |Autobiographer|false   |7     |
|3    |2011-11-30 18:50:37.927|6  |Autobiographer|false   |8     |
|3    |2011-11-30 18:50:37.927|7  |Autobiographer|false   |13    |
|3    |2011-11-30 18:50:37.927|8  |Autobiographer|false   |17    |
|3    |2011-11-30 18:50:37.927|9  |Autobiographer|false   |18    |
|3    |2011-11-30 18:50:37.927|10 |Autobiographer|false   |19    |
|3    |2011-11-30 18:55:37.94 |11 |Autobiographer|false   |22    |
|3    |2011-11-30 18:55:37.94 |12 |Autobiographer|false   |23 

In [78]:
badges_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

153357

- Class
 - 1 = Gold
 - 2 = Silver
 - 3 = Bronze

In [79]:
badges_df.groupBy('Class').count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+
|Class| count|
+-----+------+
|    1|  3789|
|    3|119841|
|    2| 29727|
+-----+------+

### Cleaning results

In [80]:
badges_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Class: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- TagBased: boolean (nullable = true)
 |-- UserId: integer (nullable = true)

In [81]:
df = badges_df
cols_nums = [col[0] for col in df.dtypes if col[1] in ('bigint', 'int', 'string')]
df = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in cols_nums])
df.show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0-----
 Class  | 0   
 Id     | 0   
 Name   | 0   
 UserId | 0

In [82]:
badges_df.limit(20).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----------------------+---+--------------+--------+------+
|Class|Date                   |Id |Name          |TagBased|UserId|
+-----+-----------------------+---+--------------+--------+------+
|3    |2011-11-30 18:10:25.763|1  |Autobiographer|false   |1     |
|3    |2011-11-30 18:15:25.777|2  |Autobiographer|false   |2     |
|3    |2011-11-30 18:15:25.777|3  |Autobiographer|false   |3     |
|3    |2011-11-30 18:45:38.023|4  |Autobiographer|false   |5     |
|3    |2011-11-30 18:45:38.023|5  |Autobiographer|false   |7     |
|3    |2011-11-30 18:50:37.927|6  |Autobiographer|false   |8     |
|3    |2011-11-30 18:50:37.927|7  |Autobiographer|false   |13    |
|3    |2011-11-30 18:50:37.927|8  |Autobiographer|false   |17    |
|3    |2011-11-30 18:50:37.927|9  |Autobiographer|false   |18    |
|3    |2011-11-30 18:50:37.927|10 |Autobiographer|false   |19    |
|3    |2011-11-30 18:55:37.94 |11 |Autobiographer|false   |22    |
|3    |2011-11-30 18:55:37.94 |12 |Autobiographer|false   |23 

In [83]:
badges_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

153357

### Save the data

In [84]:
sink_path = "s3://my-project-213242/cleaned/Badges/"
badges_df.write.mode("overwrite").parquet(sink_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.6. Tags

### Load the data

In [85]:
source_path = "s3://my-project-213242/drop/Tags.xml"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [86]:
schema_tags_df = StructType([
                        StructField("_Count",                 IntegerType()),
                        StructField("_ExcerptPostId",          IntegerType()),
                        StructField("_Id",                    IntegerType()),
                        StructField("_TagName",                StringType()),
                        StructField("_WikiPostId",             IntegerType())
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
tags_df = (
    spark.read.format("com.databricks.spark.xml")
    .option("rowTag", "row")
    .option("charset", "UTF8")
    .schema(schema_tags_df) 
    .option("treatEmptyValuesAsNulls", "true")
    .load(source_path)
    .cache()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Delete _ from column names

In [88]:
tags_df = tags_df.select([F.col(col_name).alias(col_name.replace('_', '')) for col_name in tags_df.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Data exploration

In [89]:
tags_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Count: integer (nullable = true)
 |-- ExcerptPostId: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- TagName: string (nullable = true)
 |-- WikiPostId: integer (nullable = true)

In [90]:
tags_df.limit(20).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------+---+------------------+----------+
|Count|ExcerptPostId|Id |TagName           |WikiPostId|
+-----+-------------+---+------------------+----------+
|66   |280          |1  |comedy            |279       |
|1372 |113          |2  |analysis          |112       |
|323  |233          |5  |ending            |232       |
|6    |55014        |8  |the-tree-of-life  |55013     |
|430  |74           |9  |star-wars         |73        |
|44   |4281         |10 |action            |4280      |
|21   |284          |12 |stanley-kubrick   |283       |
|246  |1719         |14 |casting           |1718      |
|54   |104          |22 |pulp-fiction      |103       |
|23   |186          |23 |quentin-tarantino |185       |
|130  |94           |28 |book-adaptation   |93        |
|8130 |430          |32 |plot-explanation  |429       |
|75   |41           |34 |the-matrix        |40        |
|29   |395          |35 |sequels           |394       |
|541  |1717         |40 |film-techniques   |1716

In [91]:
tags_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4619

### Data cleaning

- Fill missing ID values (`ExcerptPostId`, `WikiPostId`) with -1

In [92]:
tags_df = tags_df.na.fill(-1, subset=['ExcerptPostId', 'WikiPostId'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Cleaning results

In [93]:
tags_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Count: integer (nullable = true)
 |-- ExcerptPostId: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- TagName: string (nullable = true)
 |-- WikiPostId: integer (nullable = true)

In [94]:
df = tags_df
cols_nums = [col[0] for col in df.dtypes if col[1] in ('bigint', 'int', 'string')]
df = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in cols_nums])
df.show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0------------
 Count         | 0   
 ExcerptPostId | 0   
 Id            | 0   
 TagName       | 0   
 WikiPostId    | 0

In [95]:
%%display
tags_df.limit(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [96]:
tags_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4619

### Save the data

In [97]:
sink_path = "s3://my-project-213242/cleaned/Tags/"
tags_df.write.mode("overwrite").parquet(sink_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.7. Users

### Load the data

In [98]:
source_path = "s3://my-project-213242/drop/Users.xml"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
schema_users_df = StructType([
                        StructField("_AboutMe",        StringType()), 
                        StructField("_AccountId",      IntegerType()), 
                        StructField("_CreationDate",   TimestampType()), 
                        StructField("_DisplayName",    StringType()), 
                        StructField("_DownVotes",      IntegerType()), 
                        StructField("_Id",             IntegerType()), 
                        StructField("_LastAccessDate", TimestampType()), 
                        StructField("_Location",       StringType()), 
                        StructField("_Reputation",     IntegerType()), 
                        StructField("_UpVotes",        IntegerType()), 
                        StructField("_Views",          IntegerType()), 
                        StructField("_WebsiteUrl",     StringType())
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [100]:
users_df = (
    spark.read.format("com.databricks.spark.xml")
    .option("rowTag", "row")
    .option("charset", "UTF8")
    .schema(schema_users_df)
    .option("treatEmptyValuesAsNulls", "true")
    .load(source_path)
    .cache()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Delete _ from column names

In [101]:
users_df = users_df.select([F.col(col_name).alias(col_name.replace('_', '')) for col_name in users_df.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Data exploration 

- `user_id` is the Id of the user on the specific site. 
- `account_id`, is the way to link these two users on the entire Stack Exchange Network. 

In [102]:
users_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- AboutMe: string (nullable = true)
 |-- AccountId: integer (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- DisplayName: string (nullable = true)
 |-- DownVotes: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- LastAccessDate: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Reputation: integer (nullable = true)
 |-- UpVotes: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- WebsiteUrl: string (nullable = true)

In [103]:
users_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

71841

In [104]:
%%display
users_df.limit(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

#### Data cleaning

- Cleaning `AboutMe` - text preprocessing

In [105]:
users_df = text_preprocessing(users_df, 'AboutMe')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Clean `AccountId` - put -99 instead of `NaN` if user is deleted

In [106]:
users_df = users_df.na.fill(-99, subset=['AccountId'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Drop entries with no `DisplayName`

In [107]:
users_df = users_df.na.drop(subset=["DisplayName"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Clean `Location` - text preprocessing

In [108]:
users_df = users_df.na.fill('', subset='Location').withColumn('Location',  F.split(F.col('Location'),', '))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Cleaning results

In [109]:
users_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- AccountId: integer (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- DisplayName: string (nullable = true)
 |-- DownVotes: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- LastAccessDate: timestamp (nullable = true)
 |-- Location: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- Reputation: integer (nullable = true)
 |-- UpVotes: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- WebsiteUrl: string (nullable = true)
 |-- AboutMe: array (nullable = true)
 |    |-- element: string (containsNull = true)

In [110]:
users_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

71841

In [111]:
df = users_df
cols_nums = [col[0] for col in df.dtypes if col[1] in ('bigint', 'int', 'string')]
df = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in cols_nums])
df.show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0------------
 AccountId   | 0     
 DisplayName | 4     
 DownVotes   | 0     
 Id          | 0     
 Reputation  | 0     
 UpVotes     | 0     
 Views       | 0     
 WebsiteUrl  | 58712

In [112]:
%%display
users_df.limit(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

### Save the data

In [113]:
sink_path = "s3://my-project-213242/cleaned/Users/"
users_df.write.mode("overwrite").parquet(sink_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.8. Votes

### Load the data

In [114]:
source_path = "s3://my-project-213242/drop/Votes.xml"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [115]:
schema_votes_df = StructType([
                        StructField("_BountyAmount",       IntegerType()), 
                        StructField("_CreationDate",       TimestampType()), 
                        StructField("_Id",                 IntegerType()), 
                        StructField("_PostId",             IntegerType()), 
                        StructField("_UserId",             IntegerType()), 
                        StructField("_VoteTypeId",         IntegerType())
    ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [116]:
votes_df = (
    spark.read.format("com.databricks.spark.xml")
    .option("rowTag", "row")
    .option("charset", "UTF8")
    .schema(schema_votes_df)
    .option("treatEmptyValuesAsNulls", "true")
    .load(source_path)
    .cache()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Delete _ from column names

In [117]:
votes_df = votes_df.select([F.col(col_name).alias(col_name.replace('_', '')) for col_name in votes_df.columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Data exploration

- `VoteTypeId` (listed in the VoteTypes table)
    - 1 = AcceptedByOriginator
    - 2 = UpMod (AKA upvote)
    - 3 = DownMod (AKA downvote)
    - 4 = Offensive
    - 5 = Favorite (AKA bookmark; UserId will also be populated) feature removed after October 2022 / replaced by Saves
    - 6 = Close (effective 2013-06-25: Close votes are only stored in table: PostHistory)
    - 7 = Reopen
    - 8 = BountyStart (UserId and BountyAmount will also be populated)
    - 9 = BountyClose (BountyAmount will also be populated)
    - 10 = Deletion
    - 11 = Undeletion
    - 12 = Spam
    - 15 = ModeratorReview (i.e., a moderator looking at a flagged post)
    - 16 = ApproveEditSuggestion
- `UserId` (present only if VoteTypeId in (5,8); -1 if user is deleted)
- `BountyAmount` (present only if VoteTypeId in (8,9))

In [118]:
votes_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- BountyAmount: integer (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- PostId: integer (nullable = true)
 |-- UserId: integer (nullable = true)
 |-- VoteTypeId: integer (nullable = true)

In [119]:
votes_df.limit(20).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-------------------+---+------+------+----------+
|BountyAmount|CreationDate       |Id |PostId|UserId|VoteTypeId|
+------------+-------------------+---+------+------+----------+
|null        |2011-11-30 00:00:00|1  |1     |null  |2         |
|null        |2011-11-30 00:00:00|2  |1     |null  |2         |
|null        |2011-11-30 00:00:00|3  |1     |null  |2         |
|null        |2011-11-30 00:00:00|4  |2     |null  |2         |
|null        |2011-11-30 00:00:00|5  |3     |null  |2         |
|null        |2011-11-30 00:00:00|6  |3     |null  |2         |
|null        |2011-11-30 00:00:00|7  |5     |null  |2         |
|null        |2011-11-30 00:00:00|8  |4     |null  |2         |
|null        |2011-11-30 00:00:00|9  |5     |null  |2         |
|null        |2011-11-30 00:00:00|10 |4     |null  |2         |
|null        |2011-11-30 00:00:00|11 |4     |null  |2         |
|null        |2011-11-30 00:00:00|12 |1     |null  |3         |
|null        |2011-11-30 00:00:00|13 |5 

In [120]:
votes_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

670161

#### Data cleaning

- Clean `UserId` - UserId (present only if VoteTypeId in (5,8); -1 if user is deleted) - replace -1 with -99 (as in previous dfs)

In [121]:
votes_df.filter(F.col('UserId')==-1).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

31

In [122]:
votes_df = votes_df.withColumn('UserId', F.when(F.col('UserId') == -1, -99).otherwise(F.col('UserId')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [123]:
votes_df.filter(F.col('UserId')==-99).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

31

### Cleaning results

In [124]:
votes_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- BountyAmount: integer (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- Id: integer (nullable = true)
 |-- PostId: integer (nullable = true)
 |-- UserId: integer (nullable = true)
 |-- VoteTypeId: integer (nullable = true)

In [125]:
df = votes_df
cols_nums = [col[0] for col in df.dtypes if col[1] in ('bigint', 'int', 'string')]
df = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in cols_nums])
df.show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0--------------
 BountyAmount | 668895 
 Id           | 0      
 PostId       | 0      
 UserId       | 669387 
 VoteTypeId   | 0

In [126]:
votes_df.limit(20).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-------------------+---+------+------+----------+
|BountyAmount|CreationDate       |Id |PostId|UserId|VoteTypeId|
+------------+-------------------+---+------+------+----------+
|null        |2011-11-30 00:00:00|1  |1     |null  |2         |
|null        |2011-11-30 00:00:00|2  |1     |null  |2         |
|null        |2011-11-30 00:00:00|3  |1     |null  |2         |
|null        |2011-11-30 00:00:00|4  |2     |null  |2         |
|null        |2011-11-30 00:00:00|5  |3     |null  |2         |
|null        |2011-11-30 00:00:00|6  |3     |null  |2         |
|null        |2011-11-30 00:00:00|7  |5     |null  |2         |
|null        |2011-11-30 00:00:00|8  |4     |null  |2         |
|null        |2011-11-30 00:00:00|9  |5     |null  |2         |
|null        |2011-11-30 00:00:00|10 |4     |null  |2         |
|null        |2011-11-30 00:00:00|11 |4     |null  |2         |
|null        |2011-11-30 00:00:00|12 |1     |null  |3         |
|null        |2011-11-30 00:00:00|13 |5 

In [127]:
votes_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

670161

### Save the data

In [128]:
sink_path = "s3://my-project-213242/cleaned/Votes/"
votes_df.write.mode("overwrite").parquet(sink_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…