### 01 Develop Analysis Workflow

This notebook aims to develop and test the Spark workflow before implementing everything into dedicated Python files for running on the cluster.

In [1]:
from pathlib import Path
import pandas as pd

Although the intention here is to practice Spark and distributed computing, let's first look at the data in Pandas and use as a sanity check with Spark dataframes.

In [2]:
# Limit how many rows to import for speed
nrows = 100_000

In [3]:
questions = pd.read_csv(Path('./assets/Questions.csv'), nrows=nrows,
                encoding="ISO-8859-1").dropna(subset=["Id", "Body", "CreationDate"])
questions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 7 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   Id            100000 non-null  int64  
 1   OwnerUserId   95342 non-null   float64
 2   CreationDate  100000 non-null  object 
 3   ClosedDate    3331 non-null    object 
 4   Score         100000 non-null  int64  
 5   Title         100000 non-null  object 
 6   Body          100000 non-null  object 
dtypes: float64(1), int64(2), object(4)
memory usage: 5.3+ MB


In [4]:
questions.sort_values(by="Id").head(5)

Unnamed: 0,Id,OwnerUserId,CreationDate,ClosedDate,Score,Title,Body
0,80,26.0,2008-08-01T13:57:07Z,,26,SQLStatement.execute() - multiple queries in o...,<p>I've written a database generation script i...
1,90,58.0,2008-08-01T14:41:24Z,2012-12-26T03:45:49Z,144,Good branching and merging tutorials for Torto...,<p>Are there any really good tutorials explain...
2,120,83.0,2008-08-01T15:50:08Z,,21,ASP.NET Site Maps,<p>Has anyone got experience creating <strong>...
3,180,2089740.0,2008-08-01T18:42:19Z,,53,Function for creating color wheels,<p>This is something I've pseudo-solved many t...
4,260,91.0,2008-08-01T23:22:08Z,,49,Adding scripting functionality to .NET applica...,<p>I have a little game written in C#. It uses...


In [5]:
answers = pd.read_csv(Path('./assets/Answers.csv'), nrows=nrows,
                encoding="ISO-8859-1").dropna()
answers.info()

<class 'pandas.core.frame.DataFrame'>
Index: 96675 entries, 0 to 99999
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   Id            96675 non-null  int64  
 1   OwnerUserId   96675 non-null  float64
 2   CreationDate  96675 non-null  object 
 3   ParentId      96675 non-null  int64  
 4   Score         96675 non-null  int64  
 5   Body          96675 non-null  object 
dtypes: float64(1), int64(3), object(2)
memory usage: 5.2+ MB


In [6]:
answers.head(5)

Unnamed: 0,Id,OwnerUserId,CreationDate,ParentId,Score,Body
0,92,61.0,2008-08-01T14:45:37Z,90,13,"<p><a href=""http://svnbook.red-bean.com/"">Vers..."
1,124,26.0,2008-08-01T16:09:47Z,80,12,<p>I wound up using this. It is a kind of a ha...
2,199,50.0,2008-08-01T19:36:46Z,180,1,<p>I've read somewhere the human eye can't dis...
3,269,91.0,2008-08-01T23:49:57Z,260,4,"<p>Yes, I thought about that, but I soon figur..."
4,307,49.0,2008-08-02T01:49:46Z,260,28,"<p><a href=""http://www.codeproject.com/Article..."


In [7]:
tags = pd.read_csv(Path('./assets/Tags.csv'),
                encoding="ISO-8859-1").dropna()
tags.info()

<class 'pandas.core.frame.DataFrame'>
Index: 3749881 entries, 0 to 3750993
Data columns (total 2 columns):
 #   Column  Dtype 
---  ------  ----- 
 0   Id      int64 
 1   Tag     object
dtypes: int64(1), object(1)
memory usage: 85.8+ MB


In [8]:
tags.head()

Unnamed: 0,Id,Tag
0,80,flex
1,80,actionscript-3
2,80,air
3,90,svn
4,90,tortoisesvn


### PySpark: Preprocess

Now let's jump into Spark. We'll start by reading in the csv files. Note that we set a row limit to minimize computations here. When we're ready to run everything on the cluster, the limit will be removed.

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, LongType, StringType
from pyspark.sql import functions as F

spark = (SparkSession.builder
        .master("local[*]")
        .appName("StackOverflow")
        .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/24 11:40:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
df_questions = (spark.read.options(encoding="ISO-8859-1",
                header=True, multiLine=False, mode="DROPMALFORMED")
                .csv("./assets/Questions.csv")
                .limit(10_000)
                )

df_answers = (spark.read.options(encoding="ISO-8859-1", 
                header=True, mode="DROPMALFORMED", multiLine=False)
                .csv('./assets/Answers.csv')
                .limit(10_000)
                )

df_tags = (spark.read.options(encoding="ISO-8859-1", 
            header=True, mode="DROPMALFORMED", multiLine=False)
            .csv('./assets/Tags.csv')
            .limit(10_000)
            )

In [11]:
# Confirm row limits
print(df_questions.count(), df_answers.count(), df_tags.count())

10000 10000 10000


In [12]:
# Inspect Schemas
df_questions.printSchema()
df_answers.printSchema()
df_tags.printSchema()

root
 |-- Id: string (nullable = true)
 |-- OwnerUserId: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- ClosedDate: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Body: string (nullable = true)

root
 |-- Id: string (nullable = true)
 |-- OwnerUserId: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- ParentId: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Body: string (nullable = true)

root
 |-- Id: string (nullable = true)
 |-- Tag: string (nullable = true)



In [13]:
# Convert datatypes and add column for finding time differences

df_quest_filt = (df_questions
                .withColumn('Id', df_questions['Id'].cast(IntegerType()))
                .withColumn('OwnerUserId', df_questions['OwnerUserId'].cast(IntegerType()))
                .withColumn('CreationDate', F.regexp_replace('CreationDate', 'T', ' '))
                .withColumn('CreationDate', F.regexp_replace('CreationDate', 'Z', ''))
                .withColumn('CreationTime', F.unix_timestamp('CreationDate', 'y-M-d HH:mm:ss').cast(LongType()))
                .withColumn('ClosedDate', F.regexp_replace('ClosedDate', 'T', ' '))
                .withColumn('ClosedDate', F.regexp_replace('ClosedDate', 'Z', ''))
                .withColumn('ClosedTime', F.unix_timestamp('ClosedDate', 'y-M-d HH:mm:ss').cast(LongType()))
                .withColumn('ElapsedTime', (F.col('ClosedTime') - F.col('CreationTime')))
                .withColumn('Score', df_questions['Score'].cast(IntegerType()))
                ).na.drop()

df_answers_filt = (df_answers
                    .withColumn('Id', df_answers['Id'].cast(IntegerType()))
                    .withColumn('OwnerUserId', df_answers['OwnerUserId'].cast(IntegerType()))
                    .withColumn('ParentId', df_answers['ParentId'].cast(IntegerType()))
                    .withColumn('Score', df_answers['Score'].cast(IntegerType()))
                    .withColumn('CreationDate', F.regexp_replace('CreationDate', 'T', ' '))
                    .withColumn('CreationDate', F.regexp_replace('CreationDate', 'Z', ''))
                    .withColumn('CreationTime', F.unix_timestamp('CreationDate', 'y-M-d HH:mm:ss').cast(LongType()))
                    ).na.drop()

df_tags_filt = (df_tags
                .withColumn('Id', df_tags['Id'].cast(IntegerType()))
                ).na.drop()        

In [14]:
# Inspect counts after dropping nulls
print(df_quest_filt.count(), df_answers_filt.count(), df_tags_filt.count())

                                                                                

562 9397 10000


Around 97% of questions are eliminated after dropping nulls! This is most likely due to nulls in question's "ClosedDate" column. This also indicates that taking the time difference between a question's CreationDate and ClosedDate will not be a good indicator. Let's try a different approach to removing nulls.

In [15]:
df_quest_filt = (df_questions
                .withColumn('Id', df_questions['Id'].cast(IntegerType()))
                .withColumn('OwnerUserId', df_questions['OwnerUserId'].cast(IntegerType()))
                .withColumn('CreationDate', F.regexp_replace('CreationDate', 'T', ' '))
                .withColumn('CreationDate', F.regexp_replace('CreationDate', 'Z', ''))
                .withColumn('CreationTime', F.unix_timestamp('CreationDate', 'y-M-d HH:mm:ss').cast(LongType()))
                .withColumn('ClosedDate', F.regexp_replace('ClosedDate', 'T', ' '))
                .withColumn('ClosedDate', F.regexp_replace('ClosedDate', 'Z', ''))
                .withColumn('ClosedTime', F.unix_timestamp('ClosedDate', 'y-M-d HH:mm:ss').cast(LongType()))
                .withColumn('Score', df_questions['Score'].cast(IntegerType()))
                ).na.drop(subset=["Id", "Body", "CreationTime"])

In [16]:
df_quest_filt.count()

1155

In [17]:
df_quest_filt.sort("Id", ascending=True).show(10)



+---+-----------+-------------------+-------------------+-----+--------------------+--------------------+------------+----------+
| Id|OwnerUserId|       CreationDate|         ClosedDate|Score|               Title|                Body|CreationTime|ClosedTime|
+---+-----------+-------------------+-------------------+-----+--------------------+--------------------+------------+----------+
| 80|         26|2008-08-01 13:57:07|                 NA|   26|SQLStatement.exec...|"<p>I've written ...|  1217620627|      null|
| 90|         58|2008-08-01 14:41:24|2012-12-26 03:45:49|  144|Good branching an...|"<p>Are there any...|  1217623284|1356518749|
|120|         83|2008-08-01 15:50:08|                 NA|   21|   ASP.NET Site Maps|<p>Has anyone got...|  1217627408|      null|
|180|    2089740|2008-08-01 18:42:19|                 NA|   53|Function for crea...|<p>This is someth...|  1217637739|      null|
|260|         91|2008-08-01 23:22:08|                 NA|   49|Adding scripting ...|<p>I h

                                                                                

This approach looks much better. Let's start exploring the data. Some potentially interesting questions:
- What are the most commmon tags?
- Can we predict if a question contains a top tag from the body text?
- Can we predict question score based on text?
- Can we predict how long a question will take to answer?
- Relationship between score and how long a question takes to answer?

### 1. Most Common Tags
What are the most common tags?

In [18]:
tags_count = (df_tags_filt
            .groupBy('Tag')
            .count()
            .sort('count', ascending=False)
            .limit(10)
            )

In [19]:
tags_count.show()

+----------+-----+
|       Tag|count|
+----------+-----+
|        c#|  399|
|      .net|  362|
|      java|  254|
|   asp.net|  225|
|       c++|  178|
|javascript|  158|
|sql-server|  141|
|       sql|  130|
|    python|  127|
|       php|  124|
+----------+-----+



Let's save these top 10 as a list.

In [20]:
df = tags_count.select('Tag').toPandas()
most_common_tags = df['Tag'].to_list()
print(most_common_tags)

['c#', '.net', 'java', 'asp.net', 'c++', 'javascript', 'sql-server', 'sql', 'python', 'php']


Now let's build a model to predict whether or not a question contains a top 10 tag. We can accomplish this by joining dataframes and adding a yes/no label for whether a question contains a such a tag.

In [21]:
df_quest_filt.show(3)



+---+-----------+-------------------+-------------------+-----+--------------------+--------------------+------------+----------+
| Id|OwnerUserId|       CreationDate|         ClosedDate|Score|               Title|                Body|CreationTime|ClosedTime|
+---+-----------+-------------------+-------------------+-----+--------------------+--------------------+------------+----------+
| 80|         26|2008-08-01 13:57:07|                 NA|   26|SQLStatement.exec...|"<p>I've written ...|  1217620627|      null|
| 90|         58|2008-08-01 14:41:24|2012-12-26 03:45:49|  144|Good branching an...|"<p>Are there any...|  1217623284|1356518749|
|120|         83|2008-08-01 15:50:08|                 NA|   21|   ASP.NET Site Maps|<p>Has anyone got...|  1217627408|      null|
+---+-----------+-------------------+-------------------+-----+--------------------+--------------------+------------+----------+
only showing top 3 rows



                                                                                

In [22]:
# Groupby id and collect all tags into list
df_id_tags = (df_tags_filt
                .groupBy('Id')
                .agg(F.collect_list('Tag')
                .alias('Tags'))
                .sort('Id', ascending=True)
)

df_id_tags.show(5)

+---+--------------------+
| Id|                Tags|
+---+--------------------+
| 80|[flex, actionscri...|
| 90|[svn, tortoisesvn...|
|120|[sql, asp.net, si...|
|180|[algorithm, langu...|
|260|[c#, .net, script...|
+---+--------------------+
only showing top 5 rows



In [23]:
df_quest_filt.show(4)



+---+-----------+-------------------+-------------------+-----+--------------------+--------------------+------------+----------+
| Id|OwnerUserId|       CreationDate|         ClosedDate|Score|               Title|                Body|CreationTime|ClosedTime|
+---+-----------+-------------------+-------------------+-----+--------------------+--------------------+------------+----------+
| 80|         26|2008-08-01 13:57:07|                 NA|   26|SQLStatement.exec...|"<p>I've written ...|  1217620627|      null|
| 90|         58|2008-08-01 14:41:24|2012-12-26 03:45:49|  144|Good branching an...|"<p>Are there any...|  1217623284|1356518749|
|120|         83|2008-08-01 15:50:08|                 NA|   21|   ASP.NET Site Maps|<p>Has anyone got...|  1217627408|      null|
|180|    2089740|2008-08-01 18:42:19|                 NA|   53|Function for crea...|<p>This is someth...|  1217637739|      null|
+---+-----------+-------------------+-------------------+-----+--------------------+------

                                                                                

In [24]:
# Join dataframes
df_body_tags = (df_quest_filt
                  .join(df_id_tags,
                        df_quest_filt['Id'] == df_id_tags['Id'],
                        'inner')
                  .select(df_quest_filt['Id'], 'Tags', 'Score', 'Body')
                  )

df_body_tags.show(5)

+---+--------------------+-----+--------------------+
| Id|                Tags|Score|                Body|
+---+--------------------+-----+--------------------+
| 80|[flex, actionscri...|   26|"<p>I've written ...|
| 90|[svn, tortoisesvn...|  144|"<p>Are there any...|
|120|[sql, asp.net, si...|   21|<p>Has anyone got...|
|180|[algorithm, langu...|   53|<p>This is someth...|
|260|[c#, .net, script...|   49|<p>I have a littl...|
+---+--------------------+-----+--------------------+
only showing top 5 rows



In [25]:
# Test writing the results to csv
(df_body_tags
    .withColumn('Tags', F.col('Tags').cast('string'))
    .write.option("header", "true")
    .mode("overwrite")
    .csv("output")
)

With the proper dataframes joined, the next step is to parse the body text. We'll use the `udf` function to accomplish this. However, as you'll see the following steps results in unknown errors associated with `udf` and failure to "open socket to Python daemon". What's more confusing is that I've previously used the same `udf` workflow successfully on a different project on a different machine.

In [26]:
from pyspark.sql.functions import udf

# barebones udf function for debugging
def parse_body(body):
    return body

parse = udf(parse_body, StringType())

df_test = (df_quest_filt
           .withColumn('body_parsed', parse('Body'))
        )

# TODO alternatives also attempted
# parse = udf(lambda x: parse_body(x), StringType())
# 
# @udf(returnType=StringType())
# def parse_body(body):
#     # html = BeautifulSoup(body)
#     return body
# 
# df_test = (df_body_tags
#                   .withColumn('ParsedBody', parse_body(F.col('Body')))
#                   )

In [27]:
df_test.show(5)



+---+-----------+-------------------+-------------------+-----+--------------------+--------------------+------------+----------+--------------------+
| Id|OwnerUserId|       CreationDate|         ClosedDate|Score|               Title|                Body|CreationTime|ClosedTime|         body_parsed|
+---+-----------+-------------------+-------------------+-----+--------------------+--------------------+------------+----------+--------------------+
| 80|         26|2008-08-01 13:57:07|                 NA|   26|SQLStatement.exec...|"<p>I've written ...|  1217620627|      null|"<p>I've written ...|
| 90|         58|2008-08-01 14:41:24|2012-12-26 03:45:49|  144|Good branching an...|"<p>Are there any...|  1217623284|1356518749|"<p>Are there any...|
|120|         83|2008-08-01 15:50:08|                 NA|   21|   ASP.NET Site Maps|<p>Has anyone got...|  1217627408|      null|<p>Has anyone got...|
|180|    2089740|2008-08-01 18:42:19|                 NA|   53|Function for crea...|<p>This is

                                                                                

`udf` function is now working! Root cause of previous error "Py4JNetworkError: An error occurred while trying to connect to the Java server" was due to being connected to a VPN. Despite the operation running locally, the VPN appears to effect the hostname or IP address of the bind listening sockets.

Also experimented with setting the `JAVA_HOME` environmental variable as the PySpark documentation mentions this should be properly set. However, the code appears to run without error even without explicitly setting this variable.

For more info:
- https://towardsdatascience.com/spark-fix-cant-assign-driver-32406580375
- https://mkyong.com/java/how-to-set-java_home-environment-variable-on-mac-os-x/


**Aside:** Syntax for submitting a Spark job locally is:
```bash
spark-submit main.py --questions=./assets/Questions.csv --answers=./assets/Answers.csv --tags=./assets/Tags.csv
```

In [28]:
spark.stop()