In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import  *
from pyspark.sql.types import *
import re
import findspark

findspark.init()
findspark.find()

# Hàm lấy ngôn ngữ lập trình bằng regex
def extract_language(string) :
    language_regex = r"Java|Python|C\+\+|C\#|Go|Ruby|Javascript|PHP|HTML|CSS|SQL"
    if string is not None :
        return re.findall(language_regex, string)
# Hàm lấy domain bằng regex 
def extract_domain(string):
    domain_regex = r"href=\"(\S+)\""
    domains = re.findall(domain_regex, string)
    output = []
  
    for domain in domains:
        try:
            output.append(domain.split('/')[2])
        except IndexError as e:
            pass
           
    return output

if __name__ == "__main__":
    #Đọc dữ liệu từ Mongo
    spark = SparkSession \
            .builder \
            .master('local[*]') \
            .appName('DEP3') \
            .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
            .config('spark.mongodb.input.uri', 'mongodb://localhost:27017/ASM3.questions') \
            .config("spark.memory.offHeap.enabled","true") \
            .config("spark.memory.offHeap.size","10g") \
            .getOrCreate()
        

In [2]:
#Đọc dữ liệu từ colection questions và chuẩn hóa liệu
df1 = spark.read \
        .format('com.mongodb.spark.sql.DefaultSource') \
        .load()

questions_df = df1.withColumn('OwnerUserId', expr("case when OwnerUserId == 'NA' then null else cast(OwnerUserId as int) end")) \
        .withColumn('CreationDate', col('CreationDate').cast(DateType())) \
        .withColumn('ClosedDate', expr("case when ClosedDate == 'NA' then null else cast(CreationDate as date) end"))
questions_df.printSchema()

root
 |-- Body: string (nullable = true)
 |-- ClosedDate: date (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)



In [3]:
#Đọc dữ liệu từ colection answers và chuẩn hóa liệu
df2 = spark.read \
      .format('com.mongodb.spark.sql.DefaultSource') \
      .option("uri", "mongodb://127.0.0.1/ASM3.answers") \
      .load()

answers_df = df2.withColumn('OwnerUserId', col('OwnerUserId').cast(IntegerType())) \
      .withColumn('CreationDate', col('CreationDate').cast(DateType()))
answers_df.printSchema()

root
 |-- Body: string (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)



In [46]:
#4. Yêu cầu 1: Tính số lần xuất hiện của các ngôn ngữ lập trình
extract_language_udf = udf(extract_language, ArrayType(StringType()))
   
languages_df = questions_df.withColumn('Programing_Language_Array', extract_language_udf(col('Body'))) \
    .withColumn('Programing_Language', explode(col('Programing_Language_Array'))) \
    .groupBy('Programing_Language') \
    .agg(count('Programing_Language').alias('Count')) \
    .show()


+-------------------+------+
|Programing_Language| Count|
+-------------------+------+
|                 C#| 32414|
|                C++| 28866|
|                CSS| 33556|
|               HTML| 89646|
|                PHP| 63479|
|                SQL|146094|
|                 Go| 79912|
|               Ruby| 16318|
|             Python| 44817|
|               Java|106659|
+-------------------+------+



In [48]:
# 5. Yêu cầu 2 : Tìm các domain được sử dụng nhiều nhất trong các câu hỏi
questions_df1 = questions_df.repartition(10)
extract_domains_udf = udf(extract_domain, ArrayType(StringType()))
domains_df = questions_df1.withColumn('Domain_Array', extract_domains_udf(col('Body'))) \
        .withColumn('Domain', explode(col('Domain_Array'))) \
        .groupBy('Domain') \
        .agg(count('Domain').alias('Count')) \
        .orderBy(col('Count').desc()) \
        .show()

+--------------------+-----+
|              Domain|Count|
+--------------------+-----+
|   stackoverflow.com|56042|
|   i.stack.imgur.com|38972|
|          github.com|23365|
|        jsfiddle.net|21726|
|  msdn.microsoft.com| 6988|
|    en.wikipedia.org| 4554|
|        pastebin.com| 3260|
|     code.google.com| 3011|
|developer.android...| 2758|
|developers.google...| 2559|
|            plnkr.co| 1915|
|          codepen.io| 1815|
|     gist.github.com| 1813|
| developer.apple.com| 1713|
|     docs.oracle.com| 1566|
|     www.youtube.com| 1548|
|         i.imgur.com| 1375|
|         example.com| 1310|
|           localhost| 1293|
|developers.facebo...| 1189|
+--------------------+-----+
only showing top 20 rows



In [30]:
#6. Yêu cầu 3 : Tính tổng điểm của User theo từng ngày
running_total_window = Window.partitionBy('OwnerUserId') \
        .orderBy('OwnerUserId', 'CreationDate') \
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
questions_df.dropna(subset=['OwnerUserId']) \
        .select('OwnerUserId', 'CreationDate', 'Score') \
        .orderBy('OwnerUserId', 'CreationDate').show()
scorePerDay_df = questions_df.withColumn("TotalScore", sum('Score').over(running_total_window))
scorePerDay_df.dropna(subset=['OwnerUserId']) \
        .select('OwnerUserId', 'CreationDate', 'TotalScore') \
        .orderBy('OwnerUserId', 'CreationDate').show()

+-----------+------------+-----+
|OwnerUserId|CreationDate|Score|
+-----------+------------+-----+
|          1|  2008-11-26|   10|
|          1|  2009-01-08|   20|
|          1|  2009-10-08|   28|
|          4|  2009-01-01|    4|
|          4|  2009-02-14|    9|
|          4|  2010-07-02|   66|
|          5|  2008-12-28|    0|
|          5|  2009-04-08|   12|
|          5|  2011-03-28|   11|
|          5|  2011-04-06|    2|
|          9|  2012-01-19|    2|
|         17|  2008-08-05|   14|
|         17|  2010-09-05|    1|
|         17|  2011-01-27|    0|
|         20|  2010-09-22|    2|
|         20|  2011-04-21|    2|
|         20|  2011-05-19|    3|
|         20|  2013-08-02|    0|
|         22|  2012-04-27|    1|
|         23|  2008-12-16|   27|
+-----------+------------+-----+
only showing top 20 rows

+-----------+------------+----------+
|OwnerUserId|CreationDate|TotalScore|
+-----------+------------+----------+
|          1|  2008-11-26|        10|
|          1|  2009-01-08|    

In [27]:
#7. Yêu cầu 4: Tính tổng số điểm mà User đạt được trong một khoảng thời gian

START = "2010-05-27"
END = '2011-11-27'

questions_df.dropna(subset=['OwnerUserId']) \
        .select('OwnerUserId', 'CreationDate', 'Score') \
        .orderBy('OwnerUserId', 'CreationDate') \
        .show()
scorePerRange_df = questions_df.filter((START < col('CreationDate')) & (col('CreationDate') < END)) \
        .dropna(subset=['OwnerUserId']) \
        .groupBy('OwnerUserId') \
        .agg(sum('Score').alias('TotalScore')) \
        .orderBy('OwnerUserId') \
        .show()

+-----------+------------+-----+
|OwnerUserId|CreationDate|Score|
+-----------+------------+-----+
|          1|  2008-11-26|   10|
|          1|  2009-01-08|   20|
|          1|  2009-10-08|   28|
|          4|  2009-01-01|    4|
|          4|  2009-02-14|    9|
|          4|  2010-07-02|   66|
|          5|  2008-12-28|    0|
|          5|  2009-04-08|   12|
|          5|  2011-03-28|   11|
|          5|  2011-04-06|    2|
|          9|  2012-01-19|    2|
|         17|  2008-08-05|   14|
|         17|  2010-09-05|    1|
|         17|  2011-01-27|    0|
|         20|  2010-09-22|    2|
|         20|  2011-04-21|    2|
|         20|  2011-05-19|    3|
|         20|  2013-08-02|    0|
|         22|  2012-04-27|    1|
|         23|  2008-12-16|   27|
+-----------+------------+-----+
only showing top 20 rows

+-----------+----------+
|OwnerUserId|TotalScore|
+-----------+----------+
|          4|        66|
|          5|        13|
|         17|         1|
|         20|         7|
|      

In [4]:
#8. Yêu cầu 5: Tìm các câu hỏi có nhiều câu trả lời

join_expr = questions_df.Id == answers_df.ParentId
questions_df.join(answers_df, join_expr,"inner") \
                .select(questions_df['Id'].alias('Question_Id'), answers_df['Id'].alias('Answer_Id')) \
                .groupBy('Question_Id') \
                .agg(count('Answer_Id').alias('Total_Answer')) \
                .orderBy(col('Question_Id').asc()) \
                .filter(col('Total Answer') > 5) \
                .show()

+-----------+------------+
|Question_Id|Total Answer|
+-----------+------------+
|        180|           9|
|        260|           9|
|        330|          10|
|        580|          14|
|        650|           6|
|        930|           7|
|       1040|           7|
|       1160|          12|
|       1300|           7|
|       1390|           6|
|       1610|           8|
|       1760|          12|
|       1970|           8|
|       2120|           7|
|       2300|           6|
|       2530|          38|
|       2550|           7|
|       2630|          13|
|       2750|           8|
|       2840|           6|
+-----------+------------+
only showing top 20 rows



In [20]:
#9. (Nâng cao) Yêu cầu 6: Tìm các Active User
#Tìm các User Có nhiều hơn 50 câu trả lời
request_1 = answers_df.select(col('OwnerUserId').alias('User_Id'),col('Score')) \
    .groupBy('User_Id') \
    .agg(count('User_Id').alias('Total_Answers')) \
    .filter((col('Total_Answers') > 50)) 
    
# Tìm các User có tổng số điểm đạt được khi trả lời lớn hơn 500
request_2 = answers_df.select(col('OwnerUserId').alias('User_Id'),col('Score')) \
    .groupBy('User_Id') \
    .agg(sum('Score').alias('Total_Scores')) \
    .filter(col('Total_Scores') > 500) 


# join_expr = questions_df.Id == answers_df.ParentId
# Tìm các User Có nhiều hơn 5 câu trả lời ngay trong ngày câu hỏi được tạo.
request_3 = questions_df.join(answers_df, join_expr, 'inner') \
    .filter(questions_df['CreationDate'] == answers_df['CreationDate']) \
    .groupBy(questions_df['Id']) \
    .agg(count(answers_df['Id']).alias('Total_Answers')) \
    .filter(col('Total_Answers') > 5) 
    

union_df = request_1.union(request_2) \
    .union(request_3) \
    .distinct()\
    .select(col('User_Id')) \
    .dropna(subset=['User_Id']) \
    .orderBy(col('User_Id')) \
    .show()

+-------+
|User_Id|
+-------+
|     13|
|     13|
|     29|
|     60|
|     67|
|     67|
|     77|
|     91|
|     91|
|    142|
|    157|
|    184|
|    267|
|    267|
|    304|
|    312|
|    357|
|    369|
|    372|
|    377|
+-------+
only showing top 20 rows

