### Import datasets
- Question dataset

`mongoimport --type csv -d ASM1_dev -c Questions --headerline --drop Questions.csv`

- Answer dataset

`mongoimport --type csv -d ASM1_dev -c Answers --headerline --drop Answers.csv`

### Import thư viện cần thiết

In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import expr, explode, desc, count, sum, col, date_format, to_date

### Khởi tạo Spark Session

In [2]:
# địa chỉ ip của mongodb
mongo_host = '172.31.48.1'

spark = SparkSession \
  .builder \
  .master("local") \
  .appName("MyAssessment1") \
  .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
  .config("spark.mongodb.input.uri", f"mongodb://{mongo_host}:27017/ASM1_dev") \
  .config("spark.mongodb.output.uri", f"mongodb://{mongo_host}:27017/ASM1_dev") \
  .getOrCreate()

24/03/14 22:07:25 WARN Utils: Your hostname, VNHCM1QANB017 resolves to a loopback address: 127.0.1.1; using 172.31.49.251 instead (on interface eth0)
24/03/14 22:07:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/baodk/Programs/spark-3.5.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/baodk/.ivy2/cache
The jars for the packages stored in: /home/baodk/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3a79a479-d88d-4e9b-a0d2-baa633425079;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 166ms :: artifacts dl 9ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   

### Schema cho input

```cs
Question
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- ClosedDate: date (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Body: string (nullable = true)
```

```cs
Answer
 |-- Body: string (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 ```

In [3]:
question_schema = StructType([
  StructField("Id", IntegerType()),
  StructField("OwnerUserId", StringType()),
  StructField("CreationDate", StringType()),
  StructField("ClosedDate", StringType()),
  StructField("Score", IntegerType()),
  StructField("Title", StringType()),
  StructField("Body", StringType())
])

answer_schema = StructType([
  StructField("Id", IntegerType()),
  StructField("OwnerUserId", StringType()),
  StructField("CreationDate", StringType()),
  StructField("ClosedDate", StringType()),
  StructField("Score", IntegerType()),
  StructField("ParentId", IntegerType()),
  StructField("Body", StringType())
])


### Đọc dữ liệu

In [4]:
# Import dữ liệu từ 2 collection Questions và Answers
questions_df = spark.read \
    .format("mongo") \
    .option("collection", "Questions") \
    .schema(question_schema) \
    .load()

answers_df = spark.read \
    .format("mongo") \
    .option("collection", "Answers") \
    .schema(answer_schema) \
    .load()

In [5]:
# Chuẩn hóa dạng dữ liệu do chứa giá trị 'NA'
questions_df = questions_df \
    .withColumn("OwnerUserId", expr("CASE WHEN OwnerUserId=='NA' THEN NULL ELSE OwnerUserId END").cast("int")) \
    .withColumn("CreationDate", expr("CASE WHEN CreationDate=='NA' THEN NULL ELSE CreationDate END").cast("timestamp")) \
    .withColumn("ClosedDate", expr("CASE WHEN ClosedDate=='NA' THEN NULL ELSE ClosedDate END").cast("timestamp"))

answers_df = answers_df \
    .withColumn("OwnerUserId", expr("CASE WHEN OwnerUserId=='NA' THEN NULL ELSE OwnerUserId END").cast("int")) \
    .withColumn("CreationDate", expr("CASE WHEN CreationDate=='NA' THEN NULL ELSE CreationDate END").cast("timestamp")) \
    .withColumn("ClosedDate", expr("CASE WHEN ClosedDate=='NA' THEN NULL ELSE ClosedDate END").cast("timestamp"))

### Yêu cầu 1: Tính số lần xuất hiện của các ngôn ngữ lập trình

Với yêu cầu này, bạn sẽ cần đếm số lần mà các ngôn ngữ lập trình xuất hiện trong nội dung của các câu hỏi. Các ngôn ngữ lập trình cần kiểm tra là:

`Java, Python, C++, C#, Go, Ruby, Javascript, PHP, HTML, CSS, SQL`

Để hoàn thành yêu cầu này, bạn có thể sử dụng regex để trích xuất các ngôn ngữ lập trình đã xuất hiện trong từng câu hỏi. Sau đó sử dụng các phép Aggregation để tính tổng theo từng ngôn ngữ.
```cs
+-------------------+------+                                                    
|Programing Language| Count|
+-------------------+------+
|                 C#| 32414|
|                C++| 28866|
|                CSS| 33556|
|               HTML| 89646|
|                PHP| 63479|
|                SQL|146094|
|                 Go| 79912|
|               Ruby| 16318|
|             Python| 44817|
|               Java|106659|
+-------------------+------+
```

In [6]:
# Sau nhiều lần sửa lỗi thì regexp_extract_all chỉ dùng được trong expr với pattern có 2 dấu ' bao trùm
pattern1 = r"(Java|Python|C\\+\\+|C#|Go|Ruby|Javascript|PHP|HTML|CSS|SQL)"

# Hướng đi: Extract tất cả cụm từ trong nội dung Body được nêu ở đề bài thành dạng mảng, 
# sau đó explode ra thành hàng để count số lần xuất hiện của nó
questions_df.select("Body") \
    .withColumn("body_extract", expr(f"regexp_extract_all(Body, '{pattern1}')")) \
    .withColumn("Programing Language", explode("body_extract")) \
    .groupBy("Programing Language").agg(count("*").alias("Count")) \
    .show()



+-------------------+------+
|Programing Language| Count|
+-------------------+------+
|                 C#| 32414|
|                C++| 28866|
|                CSS| 33556|
|               HTML| 89646|
|                PHP| 63479|
|                SQL|146094|
|                 Go| 79912|
|               Ruby| 16318|
|             Python| 44817|
|               Java|106659|
+-------------------+------+



                                                                                

### Yêu cầu 2 : Tìm các domain được sử dụng nhiều nhất trong các câu hỏi

Trong các câu hỏi thường chúng ta sẽ dẫn link từ các trang web khác vào. Ở yêu cầu này, bạn sẽ cần tìm xem 20 domain nào được người dùng sử dụng nhiều nhất. Chú ý rằng các domain sẽ chỉ gồm tên domain, các bạn sẽ không cần trích xuất những tham số phía sau. Ví dụ về một domain: www.google.com, www.facebook.com,...

Để hoàn thành được yêu cầu này, bạn có thể sử dụng regex để trích xuất các url, sau đó áp dung một số biện pháp xử lý chuỗi để lấy ra được tên của domain, cuối cùng là dùng Aggregation để gộp nhóm lại. Kết quả sẽ như sau:
```cs
+--------------------+-----+                                                    
|              Domain|Count|
+--------------------+-----+
|   www.cs.bham.ac.uk|    4|
|groups.csail.mit.edu|    7|
|     fiddlertool.com|    1|
|   www.dynagraph.org|    1|
| images.mydomain.com|    1|
|  img7.imageshack.us|    3|
+--------------------+-----+
```

In [7]:
# pattern kiểm tra :// ở đầu và / ở cuối sau đó check các chữ có ngăn cách nhau bởi dấu chấm
pattern2 = r"\\b((?<=\\://)([a-z0-9]{2,}\\.)+([a-z]{2,}))(?=/)\\b"

# hướng đi giống yêu cầu 1, nhưng chỉ lấy group 1 là group bao trùm cả cụm domain
questions_df.select("Body").withColumn("ExtractBody", expr(f"regexp_extract_all(Body, '{pattern2}', 1)")) \
    .withColumn("Domain", explode("ExtractBody")) \
    .groupBy("Domain") \
    .agg(count("*").alias("Count")) \
    .sort(desc("Count")) \
    .show()



+--------------------+-----+
|              Domain|Count|
+--------------------+-----+
|   stackoverflow.com|58694|
|          github.com|35921|
|        jsfiddle.net|35371|
|          www.w3.org|18154|
| schemas.android.com|15781|
|www.springframewo...|12649|
|  msdn.microsoft.com| 9045|
|schemas.microsoft...| 5687|
|        java.sun.com| 5685|
|    en.wikipedia.org| 5521|
|        pastebin.com| 5502|
| ajax.googleapis.com| 5394|
|         example.com| 4651|
|     code.google.com| 4412|
|developers.google...| 3870|
|developer.android...| 3775|
|     www.youtube.com| 3540|
|     www.example.com| 3120|
|    maven.apache.org| 3028|
|            plnkr.co| 2995|
+--------------------+-----+
only showing top 20 rows



                                                                                

### Yêu cầu 3 : Tính tổng điểm của User theo từng ngày

Bạn cần biết được xem đến ngày nào đó thì User đạt được bao nhiêu điểm. Ví dụ với dữ liệu như sau:
```cs
+-----------+------------+-----+
|OwnerUserId|CreationDate|Score|
+-----------+------------+-----+
|         26|  2008-08-01|   26|
|         26|  2008-08-01|  144|
|         83|  2008-08-01|   21|
|    	  83|  2008-08-02|   53|
|         26|  2008-08-02|   29|
+-----------+------------+-----+
```
Thì bạn sẽ có được kết quả:
```cs
+-----------+------------+----------+
|OwnerUserId|CreationDate|TotalScore|
+-----------+------------+----------+
|         26|  2008-08-01|       170|
|         26|  2008-08-02|       199|
|         83|  2008-08-01|        21|
|         83|  2008-08-02|        74|
+-----------+------------+----------+
```
Để hoàn thành yêu cầu này, bạn sẽ cần sử dụng các thao tác **Windowing** và các thao tác **Aggregation**, bạn có thể tham khảo bài Bài 9 : Data Aggregations và Join trên Spark. Kết quả sẽ cần được sắp xếp theo trường **OwnerUserId** và **CreationDate**.

In [8]:
# chia khung dữ liệu theo "OwnerUserId" và được sắp xếp theo "CreationDate"
# Window được tính từ hàng đầu tiên đến hàng hiện tại (Running on total)
running_totalscore_window = Window.partitionBy("OwnerUserId") \
    .orderBy("OwnerUserId", "CreationDate") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Loại bỏ các user bị NA, cột CreationDate được định dạng date để phù hợp với yêu cầu tính tổng theo ngày
valid_user_score_df = questions_df.dropna(subset=["OwnerUserId"]) \
    .select("OwnerUserId", col("CreationDate").cast("date"), "Score")

# Bắt đầu tính tổng cột score theo Window đã định nghĩa ở trên
score_per_day_df = valid_user_score_df.withColumn("TotalScore", sum("Score").over(running_totalscore_window))
score_per_day_df \
    .select("OwnerUserId", "CreationDate", "TotalScore") \
    .orderBy("OwnerUserId", "CreationDate") \
    .show()

[Stage 8:>                                                          (0 + 1) / 1]

+-----------+------------+----------+
|OwnerUserId|CreationDate|TotalScore|
+-----------+------------+----------+
|          1|  2008-11-26|        10|
|          1|  2009-01-08|        30|
|          1|  2009-10-08|        58|
|          4|  2009-01-01|         4|
|          4|  2009-02-14|        13|
|          4|  2010-07-02|        79|
|          5|  2008-12-29|         0|
|          5|  2009-04-09|        12|
|          5|  2011-03-29|        23|
|          5|  2011-04-06|        25|
|          9|  2012-01-20|         2|
|         17|  2008-08-06|        14|
|         17|  2010-09-05|        15|
|         17|  2011-01-27|        15|
|         20|  2010-09-22|         2|
|         20|  2011-04-22|         4|
|         20|  2011-05-20|         7|
|         20|  2013-08-03|         7|
|         22|  2012-04-27|         1|
|         23|  2008-12-17|        27|
+-----------+------------+----------+
only showing top 20 rows



                                                                                

### Yêu cầu 4: Tính tổng số điểm mà User đạt được trong một khoảng thời gian

Ở yêu cầu này, bạn sẽ cần tính tổng điểm mà User đạt được khi đặt câu hỏi trong một khoảng thời gian. Ví dụ như bạn muốn tính xem từ ngày 01-01-2008 đến 01-01-2009 thì các user đạt được bao nhiêu điểm từ việc đặt câu hỏi. Các khoảng thời gian này sẽ được khai báo trực tiếp trong code, ví dụ như sau:

```python
START = '01-01-2008'
END = '01-01-2009'

if __name__ == '__main__':
    pass
```
Để hoàn thành yêu cầu này, bạn sẽ cần sử dụng filter() để lọc ra các dữ liệu thỏa mãn từ khung dữ liệu, sau đó có thể làm theo yêu cầu 4. Kết quả sẽ cần được sắp xếp theo trường OwnerUserId, ví dụ:
```cs
+-----------+----------+
|OwnerUserId|TotalScore|
+-----------+----------+
|       1580|         5|
|       4101|        11|
|      18051|         2|
|      18866|         6|
|    2376109|         5|
+-----------+----------+
```

In [9]:
START = '01-01-2008'
END = '01-01-2009'

# lỗi trên spark 3.5.1 nên dùng filter expression cùng với 2 dấu '' bao trùm
questions_df \
    .filter(f"CreationDate >= to_date('{START}', 'dd-MM-yyyy') AND CreationDate <= to_date('{END}', 'dd-MM-yyyy') ") \
    .groupBy("OwnerUserId").agg(sum("Score").alias("TotalScore")) \
    .sort("OwnerUserId") \
    .show()



+-----------+----------+
|OwnerUserId|TotalScore|
+-----------+----------+
|       NULL|      4636|
|          1|        10|
|          5|         0|
|         17|        14|
|         23|        27|
|         25|        10|
|         26|        34|
|         27|         9|
|         29|       206|
|         33|       222|
|         35|        25|
|         39|        16|
|         40|         7|
|         41|        16|
|         45|        12|
|         49|        22|
|         51|        30|
|         56|        28|
|         58|       171|
|         60|        22|
+-----------+----------+
only showing top 20 rows



                                                                                

### Yêu cầu 5: Tìm các câu hỏi có nhiều câu trả lời

Một câu hỏi tốt sẽ được tính số lượng câu trả lời của câu hỏi đó, nếu như câu hỏi có nhiều hơn 5 câu trả lời thì sẽ được tính là tốt. Bạn sẽ cần tìm xem có bao nhiêu câu hỏi đang được tính là tốt,  

Để hoàn thành yêu cầu này, bạn sẽ cần sử dụng các thao tác Join để gộp dữ liệu từ **Answers** với **Collections**, sau đó dụng các thao tác **Aggregation** để gộp nhóm, tính xem mỗi câu hỏi đã có bao nhiêu câu trả lời. Cuối cùng là dùng hàm **filter()** để lọc ra các câu hỏi có nhiều hơn 5 câu trả lời. 

Lưu ý: Do thao tác có thể tốn rất nhiều thời gian, nên bạn hãy sử dụng cơ chế Bucket Join để phân vùng cho các dữ liệu trước. Bạn có thể tham khảo Bài 9 : Data Aggregations và Join để hiểu rõ hơn về cơ chế này.

Kết quả sẽ cần được sắp xếp theo ID của các câu hỏi

In [10]:
bucketSize = 5

renamed_answers_df = answers_df.select("Id", "ParentId") \
    .withColumnRenamed("Id", "AnswerId") \
    .withColumnRenamed("ParentId", "Id") \
    .groupBy("Id").agg(count("*").alias("Answers"))
# Ta chạy ra bảng Id câu hỏi với tổng số câu trả lời > 5 trước sau đó join question Id với Id của bảng group này
renamed_answers_df \
    .filter(renamed_answers_df.Answers > 5) \
    .write.bucketBy(bucketSize, "Id").saveAsTable("Answer")

questions_df.write.bucketBy(bucketSize, "Id").saveAsTable("Question")
# tạo ra 2 bảng bucket với cột Id để tăng tốc Join
good_questions_df = spark.table("Question").join(spark.table("Answer"), "Id")
good_questions_df \
    .sort("Id") \
    .show()

# drop 2 bảng booket
spark.sql("DROP TABLE IF EXISTS Answer")
spark.sql("DROP TABLE IF EXISTS Question")


                                                                                

+----+-----------+-------------------+-------------------+-----+--------------------+--------------------+-------+
|  Id|OwnerUserId|       CreationDate|         ClosedDate|Score|               Title|                Body|Answers|
+----+-----------+-------------------+-------------------+-----+--------------------+--------------------+-------+
| 180|    2089740|2008-08-02 01:42:19|               NULL|   53|Function for crea...|<p>This is someth...|      9|
| 260|         91|2008-08-02 06:22:08|               NULL|   49|Adding scripting ...|<p>I have a littl...|      9|
| 330|         63|2008-08-02 09:51:36|               NULL|   29|Should I use nest...|<p>I am working o...|     10|
| 580|         91|2008-08-03 06:30:59|               NULL|   21|Deploying SQL Ser...|<p>I wonder how y...|     14|
| 650|        143|2008-08-03 18:12:52|               NULL|   79|Automatically upd...|<p>I would like t...|      6|
| 930|        245|2008-08-04 07:47:25|               NULL|   28|How do I connect

DataFrame[]

### (Nâng cao) Yêu cầu 6: Tìm các Active User
Một User được tính là Active sẽ cần thỏa mãn một trong các yêu cầu sau:

Có nhiều hơn 50 câu trả lời hoặc tổng số điểm đạt được khi trả lời lớn hơn 500.
Có nhiều hơn 5 câu trả lời ngay trong ngày câu hỏi được tạo.
Bạn hãy lọc các User thỏa mãn điều kiện trên.

In [11]:
# Tạo bucket cho bảng owneruserid với chi tiết câu hỏi
bucketSize = 5

# Với yêu cầu này ban đầu ta tìm ra các user có điểm tổng điểm trả lời > 500
user_scoregt500_df = answers_df.dropna(subset=["OwnerUserId"]) \
    .groupBy("OwnerUserId").agg(sum("Score").alias("TotalAnswerScoreOfUser")) \
    .filter("TotalAnswerScoreOfUser > 500")

# Từ user ta sẽ tìm được các câu hỏi của user đó
questionswithcreation_df = questions_df \
    .select("Id",col("OwnerUserId").alias("UserId"), 
            date_format("CreationDate", 'yyyy-MM-dd').alias("CreatedDate"))
users_withquestions_df = user_scoregt500_df \
    .join(questionswithcreation_df, 
          user_scoregt500_df.OwnerUserId == questionswithcreation_df.UserId, 
          "inner") \
    .drop("UserId") \

# Tạo bucket cho bảng bên trên để join question Id, join 1 cột sẽ nhanh hơn nên ta sẽ join 1 cột trước
users_withquestions_df.write \
    .bucketBy(bucketSize, "Id") \
    .mode("overwrite").saveAsTable("UsersWithQuestions")

# Tạo bucket cho bảng tổng số câu trả lời theo question Id và từng ngày trước, nhưng để join sau
answercounting_bydate_df = answers_df \
    .select(answers_df.ParentId.alias("Id"), 
            date_format("CreationDate", 'yyyy-MM-dd').alias("CreatedDate")) \
    .groupBy("Id", "CreatedDate").agg(count("*").alias("AnswersWithinDate"))

answercounting_bydate_df \
    .filter(answercounting_bydate_df.AnswersWithinDate > 5) \
    .write.bucketBy(10, "Id", "CreatedDate") \
    .mode("overwrite").saveAsTable("UsersWithQuestionsMore50Answers")

# Group tiếp tục để lấy tổng số câu trả lời cho từng câu hỏi để hạn chế group trên bảng nhiều dòng hơn
# Và tạo bucket cho bảng tổng này để cho mục đích join id
answercounting_gt50_df = answercounting_bydate_df.select("Id", "AnswersWithinDate") \
    .groupBy("Id").agg(sum("AnswersWithinDate").alias("Answers"))
answercounting_gt50_df \
    .filter(answercounting_gt50_df.Answers > 50) \
    .write.bucketBy(bucketSize, "Id") \
    .mode("overwrite").saveAsTable("AnswerCountingGt50ByQId")

# Ta bắt đầu join question Id để tìm ra tổng số câu trả lời theo câu hỏi
# Tạo bucket để phục vụ cho việc join question Id và ngày tạo
spark.table("UsersWithQuestions") \
    .join(spark.table("AnswerCountingGt50ByQId"), "Id") \
    .write.bucketBy(bucketSize, "Id", "CreatedDate") \
    .mode("overwrite").saveAsTable("AnswersWithinDateByQId")

# Cuối cùng, ta join question Id và ngày tạo để tìm ra tổng số câu trả lời > 5 trong cùng ngày đặt câu hỏi
active_users_df = spark.table("AnswersWithinDateByQId") \
    .join(spark.table("UsersWithQuestionsMore50Answers"), ["Id", "CreatedDate"]) 

# Cuối cùng dùng union chứ không dùng join
active_users_df.show()

# Drop 4 bảng bucket
spark.sql("DROP TABLE IF EXISTS UsersWithQuestions")
spark.sql("DROP TABLE IF EXISTS AnswersWithinDateByQId")
spark.sql("DROP TABLE IF EXISTS AnswerCountingGt50ByQId")
spark.sql("DROP TABLE IF EXISTS UsersWithQuestionsMore50Answers")

                                                                                

+------+-----------+-----------+----------------------+-------+-----------------+
|    Id|CreatedDate|OwnerUserId|TotalAnswerScoreOfUser|Answers|AnswersWithinDate|
+------+-----------+-----------+----------------------+-------+-----------------+
|100420| 2008-09-19|       9611|                   542|    100|               17|
|406760| 2009-01-02|      22656|                 30190|    408|               59|
+------+-----------+-----------+----------------------+-------+-----------------+



DataFrame[]