In [1]:
import os
from pyspark.sql.types import *
from pyspark.sql import SparkSession,Window
from pyspark.sql import functions as f
from pyspark.sql.functions import *
import re

import findspark
findspark.init()

In [2]:
spark = SparkSession \
    .builder \
    .master('local') \
    .appName('MyApp') \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .config('spark.mongodb.input.uri', 'mongodb://localhost:27017/Spark_ASM1') \
    .config('spark.mongodb.output.uri', 'mongodb://localhost:27017/Spark_ASM1') \
    .getOrCreate()
    

In [3]:
spark.version

'3.4.1'

# Tạo cấu trúc cho bảng Questions

In [4]:
schema = StructType([
    StructField("Id", IntegerType()),
    StructField("OwnerUserId", StringType()),
    StructField("CreationDate", StringType()),
    StructField("ClosedDate", StringType()),
    StructField("Score", IntegerType()),
    StructField("Title", StringType()),
    StructField("Body", StringType())
])

# Tạo cấu trúc cho bảng Answers

In [5]:
schema_1 = StructType([
    StructField("Id", IntegerType()),
    StructField("OwnerUserId", StringType()),
    StructField("CreationDate", StringType()),
    StructField("ParentId", IntegerType()),
    StructField("Score", IntegerType()),
    StructField("Body", StringType())
])

# Đọc bảng Questions

In [6]:
df = spark.read \
    .format('com.mongodb.spark.sql.DefaultSource') \
    .option("spark.mongodb.input.collection","Questions") \
    .schema(schema) \
    .load()
df.show()

+----+-----------+--------------------+--------------------+-----+--------------------+--------------------+
|  Id|OwnerUserId|        CreationDate|          ClosedDate|Score|               Title|                Body|
+----+-----------+--------------------+--------------------+-----+--------------------+--------------------+
|  90|         58|2008-08-01T14:41:24Z|2012-12-26T03:45:49Z|  144|Good branching an...|<p>Are there any ...|
| 180|    2089740|2008-08-01T18:42:19Z|                  NA|   53|Function for crea...|<p>This is someth...|
| 120|         83|2008-08-01T15:50:08Z|                  NA|   21|   ASP.NET Site Maps|<p>Has anyone got...|
|  80|         26|2008-08-01T13:57:07Z|                  NA|   26|SQLStatement.exec...|<p>I've written a...|
| 470|         71|2008-08-02T15:11:47Z|2016-03-26T05:23:29Z|   13|Homegrown consump...|<p>I've been writ...|
| 260|         91|2008-08-01T23:22:08Z|                  NA|   49|Adding scripting ...|<p>I have a littl...|
| 330|         63|2

# Đọc bảng Answers

In [7]:
df_aws = spark.read \
    .format('com.mongodb.spark.sql.DefaultSource') \
    .option("spark.mongodb.input.collection","Answers") \
    .schema(schema_1) \
    .load()
df_aws.show()

+---+-----------+--------------------+--------+-----+--------------------+
| Id|OwnerUserId|        CreationDate|ParentId|Score|                Body|
+---+-----------+--------------------+--------+-----+--------------------+
| 92|         61|2008-08-01T14:45:37Z|      90|   13|<p><a href="http:...|
|124|         26|2008-08-01T16:09:47Z|      80|   12|<p>I wound up usi...|
|199|         50|2008-08-01T19:36:46Z|     180|    1|<p>I've read some...|
|269|         91|2008-08-01T23:49:57Z|     260|    4|<p>Yes, I thought...|
|307|         49|2008-08-02T01:49:46Z|     260|   28|<p><a href="http:...|
|332|         59|2008-08-02T03:00:24Z|     330|   19|<p>I would be a b...|
|344|        100|2008-08-02T04:18:15Z|     260|    6|<p>You might be a...|
|473|         49|2008-08-02T15:33:13Z|     470|    8|<p>No, what you'r...|
|529|         86|2008-08-02T18:16:07Z|     180|    3|<p>Isn't it also ...|
|359|        119|2008-08-02T06:16:23Z|     260|    5|<P>You could use ...|
|585|        149|2008-08-

# Đổi lại DataType và loại bỏ các giá trị trống

In [8]:
df_fix_aws = df_aws \
    .withColumn("OwnerUserId", when(col("OwnerUserId") == "NA", None).otherwise(col("OwnerUserId")).cast(IntegerType())) \
    .withColumn('CreationDate',when(col('CreationDate') == "NA", None )
                            .otherwise(df_aws['CreationDate'].cast(TimestampType()))) 
# df_fix_aws.printSchema()
# df_fix_aws.show()

# Chọn ra Cột ID và ParentID của bảng Answers lưu vào bucket để lát nữa join

In [9]:
# df_fix_aws1 = df_fix_aws.select('id','ParentId')
# # df_fix_aws1.show()
# df_fix_aws1.write.bucketBy(5, "Id").saveAsTable(
#     "bucketed", format="csv", mode="overwrite")

# Chỉnh lại dataType và loại bỏ các giá trị trống của bảng Questions

In [10]:
df_fix = df \
    .withColumn("OwnerUserId", when(col("OwnerUserId") == "NA", None).otherwise(col("OwnerUserId")).cast(IntegerType())) \
    .withColumn('CreationDate',when(col('CreationDate') == "NA", None )
                            .otherwise(df['CreationDate'].cast(TimestampType()))) \
    .withColumn('ClosedDate',when(col('ClosedDate') == "NA", None )
                            .otherwise(df['ClosedDate'].cast(TimestampType())))
df_fix.printSchema()
df_fix.show()


root
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Body: string (nullable = true)

+----+-----------+-------------------+-------------------+-----+--------------------+--------------------+
|  Id|OwnerUserId|       CreationDate|         ClosedDate|Score|               Title|                Body|
+----+-----------+-------------------+-------------------+-----+--------------------+--------------------+
|  90|         58|2008-08-01 21:41:24|2012-12-26 10:45:49|  144|Good branching an...|<p>Are there any ...|
| 180|    2089740|2008-08-02 01:42:19|               null|   53|Function for crea...|<p>This is someth...|
| 120|         83|2008-08-01 22:50:08|               null|   21|   ASP.NET Site Maps|<p>Has anyone got...|
|  80|         26|2008-08-01 20:57:07|               null|   26|SQLSta

# Chọn ra 2 cột ID và OwnerUserId của bảng Questions lưu vào bucketed1

In [11]:
df_fix1 = df_fix.select('Id','OwnerUserId')
df_fix1.write.bucketBy(5, "Id").saveAsTable(
    "bucketed1", format="csv", mode="overwrite")

# Sử dụng regex để tách ra các ngôn ngữ lt và website được nhắc đến

In [18]:
languages_regex = r'\b(Python|Java|C\+\+|JavaScript|C#|Ruby|Swift|Go|Kotlin|TypeScript|CSS|HTML|SQL|PHP)\b'
regex_web = r"(?<=://)([^/]+)"

def extract_languages(text):
    matches = re.findall(languages_regex, text)
    return matches 
extract_languages_udf = udf(extract_languages,ArrayType(StringType()))

def extract_websites(text):
    matches = re.findall(regex_web, text)
    return matches 
extract_website_udf = udf(extract_websites,ArrayType(StringType()))


In [19]:
df_with_array = df_fix\
    .withColumn('mid_Languages', extract_languages_udf(df_fix['Body']))\
    .withColumn('mid_Webs', extract_website_udf(df_fix['Body']))

# Đếm số lần được nhắc đến với mỗi ngôn ngữ LT

In [20]:
df_explode_languages = df_with_array\
    .withColumn("Languages", explode('mid_Languages'))\
    .select('ID','Languages')\
    .groupBy("Languages").count().orderBy(desc("count"))
df_explode_languages.show()

+----------+-----+
| Languages|count|
+----------+-----+
|      HTML|58214|
|       SQL|57622|
|       PHP|51774|
|      Java|50974|
|    Python|33772|
|       CSS|29976|
|JavaScript|21076|
|      Ruby|10663|
|     Swift| 4912|
|        Go| 4519|
|       C++| 1874|
|TypeScript|  994|
|    Kotlin|  222|
|        C#|  129|
+----------+-----+



# Đếm số lần được nhắc đến với mỗi Website

In [21]:
df_explode_websites = df_with_array\
    .withColumn("Websites", explode('mid_Webs'))\
    .select('ID','Websites')\
    .groupBy("Websites").count().orderBy(desc("count"))
df_explode_websites.show()

+--------------------+------+
|            Websites| count|
+--------------------+------+
|   i.stack.imgur.com|125387|
|   stackoverflow.com| 58694|
|          github.com| 35921|
|        jsfiddle.net| 35371|
|          www.w3.org| 18154|
| schemas.android.com| 15781|
|www.springframewo...| 12649|
|  msdn.microsoft.com|  9045|
|           localhost|  7436|
|schemas.microsoft...|  5687|
|        java.sun.com|  5685|
|    en.wikipedia.org|  5521|
|        pastebin.com|  5502|
| ajax.googleapis.com|  5394|
|         example.com|  4651|
|     code.google.com|  4412|
|         i.imgur.com|  4165|
|developers.google...|  3870|
|developer.android...|  3775|
|      localhost:8080|  3725|
+--------------------+------+
only showing top 20 rows



# Tạo window để chứa những dữ liệu cần cho việc tính điểm cho từng user

In [None]:
df_window = df_fix\
    .withColumn('CreationDate',col('CreationDate').cast(DateType()))\
    .filter(col('OwnerUserId').isNotNull())\
    .select(col('OwnerUserId'),col('CreationDate'),col('score'))

df_window.show()

+-----------+------------+-----+
|OwnerUserId|CreationDate|score|
+-----------+------------+-----+
|         58|  2008-08-01|  144|
|    2089740|  2008-08-02|   53|
|         83|  2008-08-01|   21|
|         26|  2008-08-01|   26|
|         71|  2008-08-02|   13|
|         91|  2008-08-02|   49|
|         63|  2008-08-02|   29|
|        233|  2008-08-04|    9|
|         91|  2008-08-03|   21|
|         67|  2008-08-04|   14|
|        143|  2008-08-03|   79|
|        245|  2008-08-04|   28|
|        120|  2008-08-04|   36|
|        236|  2008-08-04|   17|
|        281|  2008-08-04|   17|
|         91|  2008-08-04|   23|
|         60|  2008-08-04|   18|
|        194|  2008-08-05|   13|
|        230|  2008-08-05|   18|
|        254|  2008-08-04|   42|
+-----------+------------+-----+
only showing top 20 rows



# Dùng running total để tính tổng số điểm cho từng user

In [None]:
running_total_window = Window.partitionBy("OwnerUserId") \
    .orderBy("CreationDate") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_window.withColumn("TotalScore",f.sum(col("score")).over(running_total_window)) \
    .show()

+-----------+------------+-----+----------+
|OwnerUserId|CreationDate|score|TotalScore|
+-----------+------------+-----+----------+
|          1|  2008-11-26|   10|        10|
|          1|  2009-01-08|   20|        30|
|          1|  2009-10-08|   28|        58|
|          4|  2009-01-01|    4|         4|
|          4|  2009-02-14|    9|        13|
|          4|  2010-07-02|   66|        79|
|          5|  2008-12-29|    0|         0|
|          5|  2009-04-09|   12|        12|
|          5|  2011-03-29|   11|        23|
|          5|  2011-04-06|    2|        25|
|          9|  2012-01-20|    2|         2|
|         17|  2008-08-06|   14|        14|
|         17|  2010-09-05|    1|        15|
|         17|  2011-01-27|    0|        15|
|         20|  2010-09-22|    2|         2|
|         20|  2011-04-22|    2|         4|
|         20|  2011-05-20|    3|         7|
|         20|  2013-08-03|    0|         7|
|         22|  2012-04-27|    1|         1|
|         23|  2008-12-17|   27|

# Tính số điểm cho từng user theo ngày bắt đầu và ngày kết thúc

Lọc ra những caai hỏi có thời gian tương úng

In [None]:
START = '2008-01-01'
END = '2009-01-01'
df_date = df_fix\
    .withColumn('CreationDate',col('CreationDate').cast(DateType()))\
    .withColumn('ClosedDate',col('ClosedDate').cast(DateType()))\
    .filter(col('OwnerUserId').isNotNull())\
    .filter(col('ClosedDate').isNotNull())\
    .filter(col('CreationDate') > START)\
    .filter(col('ClosedDate') < END)\
    .select(col('OwnerUserId'),col('score'))

Tính điểm cho từng user trong df chứa dữ liệu vừa lọc

In [None]:
running_date_window = Window.partitionBy("OwnerUserId") \
    .orderBy("OwnerUserId") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_date.withColumn("TotalScore",f.sum(col("score")).over(running_date_window)) \
    .show()

+-----------+-----+----------+
|OwnerUserId|score|TotalScore|
+-----------+-----+----------+
|         83|    0|         0|
|        342|    0|         0|
|        342|    0|         0|
|       2128|    2|         2|
|       2424|    1|         1|
|       2757|    2|         2|
|       3153|    0|         0|
|      13850|    0|         0|
|      13913|    3|         3|
|      19731|    3|         3|
|      20003|    1|         1|
|      21709|    0|         0|
|      26880|    1|         1|
|      28149|    7|         7|
|      31671|    2|         2|
|      32037|    0|         0|
|      32136|    2|         2|
|      36182|    9|         9|
|      37509|    6|         6|
|      39655|    1|         1|
+-----------+-----+----------+
only showing top 20 rows



# Vô hiệu hóa BroadcastJoin và tạo 2 df mới chứa dữ liệu trong Bucketed và Bucketed1

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
df1 = spark.table("bucketed")
df1_rename = df1.withColumnRenamed('id', 'answers_id')
df2 = spark.table("bucketed1")
# df1_rename.show()
df2.show()



+--------+-----------+
|      Id|OwnerUserId|
+--------+-----------+
|35400350|    2054434|
|35400390|     405790|
|35400620|    4048006|
|35400700|    2603268|
|35400790|     630301|
|35400840|    2684718|
|35401060|    4726942|
|35401530|    5359841|
|35402040|    4172460|
|35402100|    2232085|
|35402350|    3167859|
|35402400|    5928348|
|35402570|     980050|
|35402600|    3369417|
|35402850|    5928466|
|35402950|    5856565|
|35403210|    5653217|
|35403220|    3765130|
|35403240|    5167845|
|35403310|    5928568|
+--------+-----------+
only showing top 20 rows



# Tạo điều kiện join và Join 2 df vừa tạo

In [None]:

join_expr = df1_rename.ParentId == df2.Id
join_df = df1_rename.join(df2,join_expr,"inner")

join_df.show()


+----------+--------+---+-----------+
|answers_id|ParentId| Id|OwnerUserId|
+----------+--------+---+-----------+
|    202317|      90| 90|         58|
|   1466832|      90| 90|         58|
|        92|      90| 90|         58|
|   4289753|     260|260|         91|
|  11527418|     260|260|         91|
|      3637|     260|260|         91|
|      7217|     260|260|         91|
|       269|     260|260|         91|
|       344|     260|260|         91|
|     79013|     260|260|         91|
|       307|     260|260|         91|
|       359|     260|260|         91|
|    123101|     330|330|         63|
|    136881|     330|330|         63|
|      2019|     330|330|         63|
|     82288|     330|330|         63|
|       332|     330|330|         63|
|      2023|     330|330|         63|
|    109887|     330|330|         63|
|     70727|     330|330|         63|
+----------+--------+---+-----------+
only showing top 20 rows



# Nhóm các DF mới theo ID của DF questions và lọc ra những câu hỏi có nhiều hơn 5 câu trả lời

In [None]:
join_df_agg = join_df\
    .groupBy('Id')\
    .count().orderBy(col('Id').asc())\

    
join_df_agg.filter(col('count')>5).show()


+----+-----+
|  Id|count|
+----+-----+
| 180|    9|
| 260|    9|
| 330|   10|
| 580|   14|
| 650|    6|
| 930|    7|
|1040|    7|
|1160|   12|
|1300|    7|
|1390|    6|
|1610|    8|
|1760|   12|
|1970|    8|
|2120|    7|
|2300|    6|
|2530|   38|
|2550|    7|
|2630|   13|
|2750|    8|
|2840|    6|
+----+-----+
only showing top 20 rows

