In [28]:
from pyspark.sql import SparkSession, Window, Row, functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re

In [29]:
spark = SparkSession \
    .builder \
    .master('local[2]') \
    .appName('Stack Over Flow Tracking') \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
	.config('spark.jars.packages','org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2') \
	.config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .getOrCreate()

In [31]:
# 2. Đọc dữ liệu từ MongoDB với Spark
questions = spark.read \
    .format('mongo') \
    .option("spark.mongodb.input.uri",'mongodb://127.0.0.1/dep303-assignment1.questions').load()

answers = spark.read \
    .format('mongo') \
    .option("spark.mongodb.input.uri",'mongodb://127.0.0.1/dep303-assignment1.answers').load()

22/05/15 18:31:11 WARN MongoInferSchema: Field 'OwnerUserId' contains conflicting types converting to StringType
22/05/15 18:31:14 WARN MongoInferSchema: Field 'OwnerUserId' contains conflicting types converting to StringType


In [32]:
# 3. Chuẩn hóa dữ liệu
question_df = questions \
	.withColumn("ClosedDate", to_date(expr("case when ClosedDate == 'NA' then NULL else ClosedDate end"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
	.withColumn("CreationDate", to_date(expr("case when CreationDate == 'NA' then NULL else CreationDate end"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
	.withColumn("OwnerUserId", expr("case when OwnerUserId == 'NA' then NULL else CAST(OwnerUserId as int) end"))

In [42]:
# 4. Yêu cầu 1: Tính số lần xuất hiện của các ngôn ngữ lập trình
# Select only cột Body để nhẹ dữ liệu
onlybody_df = question_df.select("body")
# Split cột ra thành array tập hợp từ
splitquestion_df = onlybody_df.select(split(onlybody_df.body, '\s+').alias('split'))
# Chuyển array tập hợp từ sang word từng dòng
splitquestion_df = splitquestion_df.select(explode_outer('split').alias('word'))
# Chỉ chọn các word trong tập hợp
list = ['Java', 'Python', 'C++', 'C#', 'Go', 'Ruby', 'Javascript', 'PHP', 'HTML', 'CSS', 'SQL']
splitquestion_df = splitquestion_df.where(splitquestion_df.word.isin(list))
# splitquestion_df.show()
splitquestion_df = splitquestion_df.groupBy('word').agg(f.count('word').alias('count')).show()



+----------+-----+
|      word|count|
+----------+-----+
|        C#|19144|
|       C++|16442|
|Javascript| 7608|
|       CSS|16541|
|      HTML|34597|
|       PHP|33869|
|       SQL|44276|
|        Go| 1849|
|      Ruby| 6049|
|    Python|18703|
|      Java|32880|
+----------+-----+



                                                                                

In [34]:
# 5. Yêu cầu 2 : Tìm các domain được sử dụng nhiều nhất trong các câu hỏi

def parsing_domain(str):
    # regex = r'([a-zA-Z0-9.]*[a-zA-Z0-9\-]+\.[a-zA-Z0-9]+)(?=\/?)(?:[a-zA-Z0-9])'
    regex = r'https?://(?:[-\w.]|(?:%[\da-fA-F]2))+'
    result = re.findall(regex,str)
    return result if result else []

In [35]:
parsing_domain_udf = udf(parsing_domain, returnType=ArrayType(StringType()))
matched = onlybody_df.withColumn('matched',parsing_domain_udf('body'))
website = matched.select(explode('matched').alias('domain'))
website = website.filter((website.domain.isNotNull()) & (website.domain != ""))
website = website.filter(~(website.domain.contains('...')))\
        .withColumn("domain", substring_index("domain", "//", -1)) \
        .groupBy('domain').agg(f.count("*").alias('count'))
website = website.sort(col('count').desc())

In [41]:
website.show(5)

[Stage 9:>                                                          (0 + 2) / 2]

+-----------------+------+
|           domain| count|
+-----------------+------+
|i.stack.imgur.com|125386|
|stackoverflow.com| 58862|
|       github.com| 35417|
|     jsfiddle.net| 35385|
|        localhost| 24149|
+-----------------+------+
only showing top 5 rows



                                                                                

In [9]:
# 6. Yêu cầu 3 : Tính tổng điểm của User theo từng ngày

running_total_window = Window.partitionBy("OwnerUserId").orderBy("CreationDate") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

running_score = question_df.withColumn("TotalScore",f.sum("Score").over(running_total_window))
running_score = running_score.select("OwnerUserId","CreationDate","TotalScore")

In [12]:
running_score.filter(running_score.OwnerUserId.isNotNull()).show()



+-----------+------------+----------+
|OwnerUserId|CreationDate|TotalScore|
+-----------+------------+----------+
|          1|  2008-11-26|        10|
|          1|  2009-01-08|        30|
|          1|  2009-10-08|        58|
|         20|  2010-09-22|         2|
|         20|  2011-04-21|         4|
|         20|  2011-05-19|         7|
|         20|  2013-08-02|         7|
|         22|  2012-04-27|         1|
|         26|  2008-08-01|        26|
|         26|  2008-08-14|        27|
|         26|  2008-08-15|        31|
|         26|  2008-09-22|        34|
|         26|  2009-01-02|        44|
|         26|  2009-01-09|        45|
|         26|  2009-02-15|        45|
|         26|  2009-03-10|        51|
|         26|  2009-03-10|        52|
|         26|  2009-04-03|        52|
|         26|  2009-12-02|        64|
|         26|  2010-01-28|        68|
+-----------+------------+----------+
only showing top 20 rows



                                                                                

In [14]:
# 7. Yêu cầu 4: Tính tổng số điểm mà User đạt được trong một khoảng thời gian
START = '2008-01-01'
END = '2009-01-01'
range = (START,END)
date_from,date_to = [to_date(lit(s)).cast(DateType()) for s in range]
filterdate_df = question_df.filter((question_df.CreationDate>=date_from) & (question_df.CreationDate<=date_to))
select_filterdate_df = filterdate_df.select("OwnerUserId","Score")
select_filterdate_df.groupBy('OwnerUserID').agg(f.sum('Score').alias('TotalScore'))

In [18]:
select_filterdate_df.show()

+-----------+-----+
|OwnerUserId|Score|
+-----------+-----+
|         26|   26|
|         83|   21|
|    2089740|   53|
|         58|  144|
|         91|   49|
|         63|   29|
|         71|   13|
|         91|   21|
|        143|   79|
|        245|   28|
|        233|    9|
|         67|   14|
|        254|   42|
|        120|   36|
|        236|   17|
|        281|   17|
|         91|   23|
|         60|   18|
|        230|   18|
|        328|   63|
+-----------+-----+
only showing top 20 rows



In [6]:
# 8. Yêu cầu 5: Tìm các câu hỏi có nhiều câu trả lời
answers.show()

+--------------------+--------------------+---+-----------+--------+-----+--------------------+
|                Body|        CreationDate| Id|OwnerUserId|ParentId|Score|                 _id|
+--------------------+--------------------+---+-----------+--------+-----+--------------------+
|<p><a href="http:...|2008-08-01T14:45:37Z| 92|         61|      90|   13|{627a956f97fbe8e4...|
|<p>I've read some...|2008-08-01T19:36:46Z|199|         50|     180|    1|{627a956f97fbe8e4...|
|<p>Yes, I thought...|2008-08-01T23:49:57Z|269|         91|     260|    4|{627a956f97fbe8e4...|
|<p>I wound up usi...|2008-08-01T16:09:47Z|124|         26|      80|   12|{627a956f97fbe8e4...|
|<p><a href="http:...|2008-08-02T01:49:46Z|307|         49|     260|   28|{627a956f97fbe8e4...|
|<p>I would be a b...|2008-08-02T03:00:24Z|332|         59|     330|   19|{627a956f97fbe8e4...|
|<p>You might be a...|2008-08-02T04:18:15Z|344|        100|     260|    6|{627a956f97fbe8e4...|
|<P>You could use ...|2008-08-02T06:16:2

In [15]:
answers_df = answers \
	.withColumn("CreationDate", to_date(expr("case when CreationDate == 'NA' then NULL else CreationDate end"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
	.withColumn("OwnerUserId", expr("case when OwnerUserId == 'NA' then NULL else CAST(OwnerUserId as int) end"))

In [8]:
answers_df.printSchema()

root
 |-- Body: string (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)



In [16]:
more_5_ans_df = answers_df.select('ParentId','Id')
more_5_ans_df = more_5_ans_df.groupBy('ParentId').agg(f.count('Id').alias('count_ans'))
more_5_ans_df = more_5_ans_df.filter(more_5_ans_df.count_ans>5)

In [19]:
more_5_ans_df.show()



+--------+---------+
|ParentId|count_ans|
+--------+---------+
|   57020|       11|
|   10230|       13|
|   11500|       12|
|   61250|       13|
|     580|       14|
|   24200|        9|
|   27030|        6|
|   33550|       14|
|   35950|        7|
|   45030|       19|
|   46030|        8|
|   11520|        6|
|   14770|        8|
|   44630|       11|
|   64170|        9|
|   75500|       13|
|   65820|       31|
|   66800|       12|
|   70460|       12|
|   70850|        9|
+--------+---------+
only showing top 20 rows



                                                                                

In [11]:
spark.sql('CREATE DATABASE IF NOT EXISTS MY_DB')
spark.sql('USE MY_DB')

question_df.coalesce(1).write \
    .bucketBy(3,"Id") \
    .mode('overwrite') \
    .saveAsTable('MY_DB.questions')

                                                                                

In [12]:
bucket_questions = spark.read.table('MY_DB.questions')

In [17]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold',-1)
join_expr = bucket_questions.Id == more_5_ans_df.ParentId
join_df = bucket_questions.join(more_5_ans_df,join_expr,'inner')

In [18]:
join_df.sort(col('Id').asc()).show()



+--------------------+----------+------------+----+-----------+-----+--------------------+--------------------+--------+---------+
|                Body|ClosedDate|CreationDate|  Id|OwnerUserId|Score|               Title|                 _id|ParentId|count_ans|
+--------------------+----------+------------+----+-----------+-----+--------------------+--------------------+--------+---------+
|<p>This is someth...|      null|  2008-08-01| 180|    2089740|   53|Function for crea...|{627a9579f3cf4a44...|     180|        9|
|<p>I have a littl...|      null|  2008-08-01| 260|         91|   49|Adding scripting ...|{627a9579f3cf4a44...|     260|        9|
|<p>I am working o...|      null|  2008-08-02| 330|         63|   29|Should I use nest...|{627a9579f3cf4a44...|     330|       10|
|<p>I wonder how y...|      null|  2008-08-02| 580|         91|   21|Deploying SQL Ser...|{627a9579f3cf4a44...|     580|       14|
|<p>I would like t...|      null|  2008-08-03| 650|        143|   79|Automatically 

                                                                                

In [36]:
# 9. (Nâng cao) Yêu cầu 6: Tìm các Active User
# list users >50 câu trả lời:
ans_over_50 = answers_df.select('OwnerUserId','ParentId')
ans_over_50 = ans_over_50.groupBy('OwnerUserId').agg(f.count('ParentId').alias('count_ans'))
ans_over_50 = ans_over_50.filter(ans_over_50.count_ans > 50)

In [37]:
ans_over_50.show()



+-----------+---------+
|OwnerUserId|count_ans|
+-----------+---------+
|      17389|      169|
|      14148|      100|
|      17028|      103|
|       2525|       76|
|      20481|       77|
|      14660|      113|
|      31317|       54|
|      22364|       64|
|      16391|       54|
|      13313|       79|
|      16800|       85|
|       4725|      221|
|      12943|      117|
|       3043|      521|
|      25324|      207|
|       1175|       61|
|      12030|      231|
|       8123|       58|
|      12725|      161|
|      12711|      370|
+-----------+---------+
only showing top 20 rows



                                                                                

In [24]:
# list users có tổng số điểm đạt được khi trả lời lớn hơn 500
score_over_500 = answers_df.select('OwnerUserId','Score')
score_over_500 = score_over_500.groupBy('OwnerUserId').agg(f.sum('score').alias('sum_score'))
score_over_500 = score_over_500.filter(score_over_500.sum_score > 500)

In [25]:
score_over_500.show()



+-----------+---------+
|OwnerUserId|sum_score|
+-----------+---------+
|      17389|      549|
|      18529|      612|
|      20789|     1007|
|      21632|     1414|
|       2238|      655|
|      23303|      793|
|      25507|      512|
|       4725|     1004|
|       1199|      820|
|      34509|     3089|
|      20471|      521|
|       3043|     2415|
|       5597|      688|
|      25324|      614|
|       9450|      619|
|      12030|      606|
|       8123|      700|
|      12725|      764|
|      12711|     1822|
|      12597|     2186|
+-----------+---------+
only showing top 20 rows



                                                                                

In [30]:
# Có nhiều hơn 5 câu trả lời ngay trong ngày câu hỏi được tạo.
spark.conf.set('spark.sql.autoBroadcastJoinThreshold',-1)
join_expr_2 = (bucket_questions.Id == answers_df.ParentId,bucket_questions.CreationDate == answers_df.CreationDate)
join_df_2 = bucket_questions.join(answers_df,join_expr,'inner').select(answers_df.OwnerUserId,bucket_questions.Id)

In [31]:
join_df_2.show()



+-----------+---+
|OwnerUserId| Id|
+-----------+---+
|      12734|120|
|         59|330|
|        342|330|
|        370|330|
|      11521|330|
|      12416|330|
|      16434|330|
|       null|330|
|       9706|330|
|      21242|330|
|      13760|330|
|         49|470|
|         34|580|
|        149|580|
|        116|580|
|        111|580|
|         76|580|
|        307|580|
|         26|580|
|       1219|580|
+-----------+---+
only showing top 20 rows



                                                                                

In [32]:
over_5_aday = join_df_2.groupBy('OwnerUserId').agg(f.count('Id').alias('count_ans'))
over_5_aday = over_5_aday.filter(over_5_aday.count_ans > 5)

In [33]:
over_5_aday.show()

[Stage 40:>                                                         (0 + 3) / 3]

+-----------+---------+
|OwnerUserId|count_ans|
+-----------+---------+
|      33412|        7|
|      18866|       10|
|      22346|        6|
|      25462|        6|
|     427387|        7|
|      20382|       41|
|      13285|       13|
|      17389|      169|
|      20135|       19|
|     367456|      423|
|      23571|        7|
|     336508|       52|
|     718922|       23|
|      55283|       10|
|      62985|       10|
|      53963|        7|
|       9465|       31|
|      64121|       27|
|      74757|      137|
|       1088|       42|
+-----------+---------+
only showing top 20 rows



                                                                                

In [38]:
user_1 = ans_over_50.select('OwnerUserId')
user_2 = score_over_500.select('OwnerUserId')
user_3 = over_5_aday.select('OwnerUserId')
result = user_1.union(user_2).union(user_3)

In [40]:
result.distinct().show()



+-----------+
|OwnerUserId|
+-----------+
|      17389|
|      74757|
|      91299|
|     175201|
|     230513|
|      15619|
|     336508|
|     327563|
|     244128|
|     402253|
|     367456|
|     477127|
|     479863|
|     507519|
|     520612|
|     544198|
|     418413|
|     720912|
|     751090|
|     247985|
+-----------+
only showing top 20 rows



                                                                                