# Phân tích hành vi và thói quen của người dùng Stack Overflow 

---

# Cài đặt Spark trên Google Colab

Để có thể sử dụng Spark trên môi trường Google Colab thì chúng ta sẽ cần cài đặt một số thành phần sau:
- Java 8
- Spark Binary
- findspark

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

[33m0% [Working][0m            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
[33m0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to security.[0m                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to security.[0m                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu focal InRelease
Hit:6 http://security.ubuntu.com/ubuntu focal-security InRelease
Hit:7 http://archive.ubuntu.com/ubuntu focal-updates InRelease
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InR

In [None]:
HADOOP_HOME = ""
JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64"
SPARK_HOME = "/content/spark-3.3.1-bin-hadoop3"

# URI for connect to local MongoDB
MONGODB_CLUSTER_URI = "mongodb://<username>:<password>@mongo:27017/stackoverflow"

# URI to connect to MongoDB Cloud
MONGODB_CLOUD_URI = "mongodb+srv://<username>:<password>@cluster.mongodb.net/stackoverflow"
MONGODB_DATABASE = "stackoverflow"

In [None]:
import os
import findspark

os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME

findspark.init()

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

# Connect Spark và Mongo

Create new Spark session connect to MongoDB

Ref: https://www.mongodb.com/docs/spark-connector/current/getting-started/

In [None]:
spark = SparkSession.builder \
                .master("local") \
                .appName("stackoverflow") \
                .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
                .config("spark.mongodb.read.connection.uri", MONGODB_CLOUD_URI) \
                .config("spark.mongodb.write.connection.uri", MONGODB_CLOUD_URI) \
                .getOrCreate()
                
sc = spark.sparkContext

Read data from MongoDB into DataFrame. Follow this instruction: https://www.mongodb.com/docs/spark-connector/current/read-from-mongodb/

In [None]:
# Load data from question collections
questions_df = spark.read \
                .format("mongodb") \
                .option("uri", MONGODB_CLOUD_URI) \
                .option("collection", "questions") \
                .load()

# Load data from answer collections
answers_df = spark.read \
                .format("mongodb") \
                .option("uri", MONGODB_CLOUD_URI) \
                .option("collection", "answers") \
                .load()

# Check data
questions_df.printSchema()
answers_df.printSchema()

root
 |-- Body: string (nullable = true)
 |-- ClosedDate: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- _id: string (nullable = true)

root
 |-- Body: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: string (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- _id: string (nullable = true)



# Install Ngrok to access Spark UI run on Colab

In [None]:
! pip install pyngrok

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyngrok
  Downloading pyngrok-5.2.1.tar.gz (761 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m761.3/761.3 KB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyngrok
  Building wheel for pyngrok (setup.py) ... [?25l[?25hdone
  Created wheel for pyngrok: filename=pyngrok-5.2.1-py3-none-any.whl size=19792 sha256=bb70b98b829cf811b7713db1f76f9b68319df26921724ebb9d208079178a4086
  Stored in directory: /root/.cache/pip/wheels/5d/f2/70/526da675d32f17577ec47ac4c663084efe39d47c826b6c3bb1
Successfully built pyngrok
Installing collected packages: pyngrok
Successfully installed pyngrok-5.2.1


In [None]:
import getpass
from pyngrok import ngrok, conf

print("Enter your authtoken, which can be copied from https://dashboard.ngrok.com/auth")
conf.get_default().auth_token = getpass.getpass()

# Open a TCP ngrok tunnel to the SSH server
connection_string = ngrok.connect(4040, "http")

print(f" * ngrok tunnel available, access with {connection_string}")

Enter your authtoken, which can be copied from https://dashboard.ngrok.com/auth
··········




 * ngrok tunnel available, access with NgrokTunnel: "http://1108-34-82-189-194.ngrok.io" -> "http://localhost:4040"


# Pre-processing Data

In [None]:
questions_df = questions_df.withColumn("OwnerUserId", F.col("OwnerUserId").cast("integer"))

questions_df = questions_df.withColumn("CreationDate", F.to_date("CreationDate"))
questions_df = questions_df.withColumn("ClosedDate", F.to_date("ClosedDate"))

questions_df.printSchema()

root
 |-- Body: string (nullable = true)
 |-- ClosedDate: date (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- _id: string (nullable = true)



In [None]:
answers_df = answers_df.withColumn("OwnerUserId", F.col("OwnerUserId").cast("integer"))
answers_df = answers_df.withColumn("ParentId", F.col("ParentId").cast("integer"))

answers_df = answers_df.withColumn("CreationDate", F.to_date("CreationDate"))

answers_df.printSchema()

root
 |-- Body: string (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- _id: string (nullable = true)



# Phân tích dữ liệu

## Yêu cầu 1: Tính số lần xuất hiện của các ngôn ngữ lập trình

In [None]:
lang_pattern = r"(java|python|c\+\+|c\#|(?<![\w.])go(?![.\w])|ruby|javascript|php|(?<![\w.])html(?![.\w])|css|sql)"

prog_lang_freq = questions_df.withColumnRenamed("Body", "Programing Language") \
                            .withColumn("Programing Language",
                                        F.regexp_extract(F.lower(F.col("Programing Language")),
                                                         lang_pattern,
                                                         0)) \
                            .groupBy("Programing Language") \
                            .count() \
                            .filter(F.col("Programing Language") != "") \
                            .show()

+-------------------+-----+
|Programing Language|count|
+-------------------+-----+
|                sql|    7|
|                 c#|    3|
|                c++|    1|
|               ruby|    1|
+-------------------+-----+



## Yêu cầu 2 : Tìm các domain được sử dụng nhiều nhất trong các câu hỏi

In [None]:
domain_pattern = r"(?<=http:\/\/)(.*?\.(edu|com|net|org|info|coop|int|co\.uk|co\.us|co\.ca|org\.uk|ac\.uk|uk|us))"

domain_freq = questions_df.withColumnRenamed("Body", "Domain") \
                        .withColumn("Domain",
                                    F.regexp_extract(F.col("Domain"),
                                                     domain_pattern,
                                                     0)) \
                        .where("trim(Domain) != ''") \
                        .groupBy("Domain") \
                        .count() \
                        .show()

+--------------------+-----+
|              Domain|count|
+--------------------+-----+
|    en.wikipedia.org|    2|
|svnbook.red-bean.com|    1|
|    www.red-gate.com|    1|
|   www.microsoft.com|    1|
|   stackoverflow.com|    1|
+--------------------+-----+



## Yêu cầu 3 : Tính tổng điểm của User theo từng ngày

In [None]:
user_score_by_day = questions_df.select(F.col("OwnerUserId"), F.col("CreationDate"), F.col("Score")) \
                                .groupBy("OwnerUserId", "CreationDate") \
                                .agg(F.sum("Score").alias("Total Score")) \
                                .sort(F.asc("OwnerUserId"), F.asc("CreationDate")) \
                                .show()

+-----------+------------+-----------+
|OwnerUserId|CreationDate|Total Score|
+-----------+------------+-----------+
|         26|  2008-08-01|         26|
|         58|  2008-08-01|        144|
|         60|  2008-08-04|         18|
|         63|  2008-08-02|         29|
|         67|  2008-08-04|         14|
|         71|  2008-08-02|         13|
|         83|  2008-08-01|         21|
|         91|  2008-08-01|         49|
|         91|  2008-08-02|         21|
|         91|  2008-08-04|         23|
|        116|  2008-08-05|         10|
|        120|  2008-08-04|         36|
|        143|  2008-08-03|         79|
|        194|  2008-08-05|         13|
|        230|  2008-08-04|         18|
|        233|  2008-08-03|          9|
|        234|  2008-08-05|         51|
|        236|  2008-08-04|         17|
|        245|  2008-08-04|         28|
|        254|  2008-08-04|         42|
+-----------+------------+-----------+
only showing top 20 rows



## Yêu cầu 4: Tính tổng số điểm mà User đạt được trong một khoảng thời gian

In [None]:
START = '2008-01-01'
END = '2009-01-01'

user_score_by_period = questions_df.select(F.col("OwnerUserId"), F.col("CreationDate"), F.col("Score")) \
                                .filter(F.col("CreationDate") > START) \
                                .filter(F.col("CreationDate") < END) \
                                .groupBy("OwnerUserId", "CreationDate") \
                                .agg(F.sum("Score").alias("Total Score")) \
                                .sort(F.desc("Total Score")) \
                                .show()

+-----------+------------+-----------+
|OwnerUserId|CreationDate|Total Score|
+-----------+------------+-----------+
|         58|  2008-08-01|        144|
|        143|  2008-08-03|         79|
|        383|  2008-08-05|         77|
|        328|  2008-08-04|         63|
|    2089740|  2008-08-01|         53|
|        234|  2008-08-05|         51|
|         91|  2008-08-01|         49|
|        254|  2008-08-04|         42|
|        120|  2008-08-04|         36|
|         63|  2008-08-02|         29|
|        245|  2008-08-04|         28|
|         26|  2008-08-01|         26|
|         91|  2008-08-04|         23|
|         91|  2008-08-02|         21|
|         83|  2008-08-01|         21|
|         60|  2008-08-04|         18|
|        230|  2008-08-04|         18|
|        236|  2008-08-04|         17|
|        281|  2008-08-04|         17|
|         67|  2008-08-04|         14|
+-----------+------------+-----------+
only showing top 20 rows



## Yêu cầu 5: Tìm các câu hỏi có nhiều câu trả lời

Để trả lời cho yêu cầu này, trước hết, chúng ta cần tiến hành kết bảng `questions` và `answers`. Để tiết kiệm thời gian, chúng ta có thể sử dụng cơ chế Bucket Join để phân vùng cho các dữ liệu trước, sau đó mới thực hiện truy vấn.

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS MY_DB")
spark.sql("USE MY_DB")

questions_df.write \
            .bucketBy(10, "Id") \
            .format("parquet") \
            .mode("overwrite") \
            .option("path", r"/content/tmp_questions") \
            .saveAsTable("MY_DB.questions")


answers_df.write \
        .bucketBy(10, "ParentId") \
        .format("parquet") \
        .mode("overwrite") \
        .option("path", r"/content/tmp_answers") \
        .saveAsTable("MY_DB.answers")

tmp_df_1 = spark.read.table("MY_DB.questions")
tmp_df_2 = spark.read.table("MY_DB.answers")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df_renamed = tmp_df_1.withColumnRenamed("Id", "QuestionID")
df_renamed_1 = df_renamed.withColumnRenamed("OwnerUserId", "QuestionerID")
df_renamed_2 = df_renamed_1.withColumnRenamed("Score", "QuestionScore")
df_renamed_3 = df_renamed_2.withColumnRenamed("CreationDate", "QuestionCreationDate")

join_expr = df_renamed_3.QuestionID == tmp_df_2.ParentId
df_renamed_3.join(tmp_df_2, join_expr, "inner").show(10)

+--------------------+----------+--------------------+----------+------------+-------------+--------------------+--------------------+--------------------+------------+---+-----------+--------+-----+--------------------+
|                Body|ClosedDate|QuestionCreationDate|QuestionID|QuestionerID|QuestionScore|               Title|                 _id|                Body|CreationDate| Id|OwnerUserId|ParentId|Score|                 _id|
+--------------------+----------+--------------------+----------+------------+-------------+--------------------+--------------------+--------------------+------------+---+-----------+--------+-----+--------------------+
|<p>Are there any ...|2012-12-26|          2008-08-01|        90|          58|          144|Good branching an...|63e5e2280ccaec448...|<p><a href="http:...|  2008-08-01| 92|         61|      90|   13|63e5e24a0ccaec448...|
|<p>I am working o...|      null|          2008-08-02|       330|          63|           29|Should I use nest...|63e

Truy vấn, tìm các câu hỏi có nhiều hơn 5 câu trả lời. Sử dụng hàm `filter()`.

In [None]:
result = df_renamed_3.join(tmp_df_2, join_expr, "inner") \
                    .select("QuestionID", "Id") \
                    .groupBy("QuestionID") \
                    .agg(F.count("Id").alias("Total Answers")) \
                    .filter(F.col("Total Answers") > 5) \
                    .sort(F.asc("QuestionID")) \
                    .show()

+----------+-------------+
|QuestionID|Total Answers|
+----------+-------------+
+----------+-------------+



## (Nâng cao) Yêu cầu 6: Tìm các Active User

In [None]:
# create a temp View on the joined dataframe to later query by sparkSQL select some columns only
df_renamed_3.join(tmp_df_2, join_expr, "inner") \
            .select("QuestionID", "Id", "OwnerUserId", "Score", "QuestionCreationDate", "CreationDate") \
            .createTempView("needed")
            
# find all active users
active_users = spark.sql("""
    select OwnerUserId, 
           count(Id) as TotalAnswers, 
           sum(Score) as TotalScore
    from needed
    group by OwnerUserId
    having count(Id) > 50 or sum(Score) > 500
        or OwnerUserId in (select OwnerUserId
                           from needed
                           group by QuestionID, QuestionCreationDate, OwnerUserId, CreationDate
                           having count(Id) > 5 and QuestionCreationDate = CreationDate)
    
""").show()

+-----------+------------+----------+
|OwnerUserId|TotalAnswers|TotalScore|
+-----------+------------+----------+
+-----------+------------+----------+



# HOW TO

## Run this notebook on Colab and using MongoDB Cloud

1/ Create account on MongoDB Cloud and using tool (MongoDB Compass/MongoDB CLI) upload data to cluster. Example:

`mongoimport --type csv -d stackoverflow -c questions --headerline --drop Question.csv`

2/ Upload this notebook to Google Drive, change const like `MONGODB_CLOUD_URI`, ...  and run on Google Colab



## Run on cluster deploy on Docker

###**1/ Prepare:**

* Convert this notebook to Python file.
* Create folder `/db-data/data/` and copy data into it.
* Create folder `/db-data/spark/` and copy source code into it.
* Create docker-compose file with the following content, change necessary const value such as `volume`, username, password, ... if needed.


###**2/ Deploy cluster with docker-compose**

To deploy cluster with docker-compose, using the following command:

`docker-compose up -d`

###**3/ Import data**

To import dataset to MongoDB, using mongoimport tool on `mongo` container. Do the following steps:

**3.1. Get the cluster info using the following command:**

    `docker ps`

**3.2. Login into `mongo` container with container's name collected before:**

    `docker exec -it project_mongo_1 /bin/bash`

**3.3. Import data:**

For `quesions` collection:

    `mongoimport "mongodb://<username>:<password>@127.0.0.1/stackoverflow?authSource=admin" -u <username> -p <password> --type csv -d stackoverflow -c questions --headerline --drop Questions.csv`

For `answers` collection:

    `mongoimport "mongodb://<username>:<password>@127.0.0.1/stackoverflow?authSource=admin" -u <username> -p <password> --type csv -d stackoverflow -c answers --headerline --drop Answers.csv`

**Note**: We use option `authSource=admin` on the uri because we use authentication from admin. See more: [authSource](https://www.mongodb.com/docs/manual/reference/connection-string/#mongodb-urioption-urioption.authSource)

**3.4. Logout from `mongo` container:**

**3.5. Run code:**

From the host machine, we can run the `spark-submit` by the following command:

`docker exec -it project_spark-master_1 spark-submit --master spark://spark:7077 /usr/local/share/StackOverflow_analyzing.py`