In [1]:
pip install findspark


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


Import findspark and related parts

In [1]:
import findspark
findspark.init()

In [2]:
import re
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as f

2. Read database from mongodb with Spark

In [9]:
spark = SparkSession \
    .builder \
    .master('local') \
    .appName('MyApp') \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.0') \
    .getOrCreate()

3. Standardized data

In [10]:
answers_schema = StructType([
	StructField("Id", StringType(), True),
	StructField("OwnerUserId", StringType(), True),
	StructField("CreationDate", StringType(), True),
	StructField("ParentId", StringType(), True),
	StructField("Score", StringType(), True),
	StructField("Body", StringType(), True),])

questions_schema = StructType([
	StructField("Id", StringType(), True),
	StructField("OwnerUserId", StringType(), True),
	StructField("CreationDate", StringType(), True),
	StructField("ClosedDate", StringType(), True),
	StructField("Score", StringType(), True),
	StructField("Title", StringType(), True),
	StructField("Body", StringType(), True),])

In [11]:
answers_df = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://127.0.0.1/ASM1_DEP303.answers_reduced") \
	.schema(answers_schema) \
	.load()

questions_df = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://127.0.0.1/ASM1_DEP303.questions_reduced") \
	.schema(questions_schema) \
	.load()

3.1 Transform column "CreationDate", "ClosedDate" from StringType to DateType

In [12]:
def to_date_df(df, dateformat, datetype):
	return df.withColumn(datetype, to_date(col(datetype), dateformat))

def replace(column, value):
    return when(column != value, column).otherwise(lit(None))

new_answers_df = answers_df.withColumn("CreationDate",to_date(col("CreationDate"),"yyyy-MM-dd'T'HH:mm:ss'Z'")) \
							.withColumn("OwnerUserId", replace(col("OwnerUserId"), "NA"))\
							.withColumnRenamed("Id","AnswerId")\
							.withColumn("AnswerId",col("AnswerId").cast("Integer"))\
							.withColumn("OwnerUserId",col("OwnerUserId").cast("Integer"))\
							.withColumn("ParentId",col("ParentId").cast("Integer"))\
							.withColumn("Score",col("Score").cast("Integer"))

new_answers_df.show(10)

new_questions_df = questions_df.withColumn("ClosedDate",to_date(col("ClosedDate"),"yyyy-MM-dd'T'HH:mm:ss'Z'")) \
                                .withColumn("CreationDate",to_date(col("CreationDate"),"yyyy-MM-dd'T'HH:mm:ss'Z'")) \
								.withColumn("OwnerUserId", replace(col("OwnerUserId"), "NA"))\
								.withColumnRenamed("Id","QuestionId")\
								.withColumn("QuestionId",col("QuestionId").cast("Integer"))\
								.withColumn("OwnerUserId",col("OwnerUserId").cast("Integer"))\
								.withColumn("Score",col("Score").cast("Integer"))					
new_questions_df.show(10)

+--------+-----------+------------+--------+-----+--------------------+
|AnswerId|OwnerUserId|CreationDate|ParentId|Score|                Body|
+--------+-----------+------------+--------+-----+--------------------+
|      92|         61|  2008-08-01|      90|   13|<p><a href="http:...|
|     124|         26|  2008-08-01|      80|   12|<p>I wound up usi...|
|     199|         50|  2008-08-01|     180|    1|<p>I've read some...|
|     269|         91|  2008-08-01|     260|    4|<p>Yes, I thought...|
|     332|         59|  2008-08-02|     330|   19|<p>I would be a b...|
|     344|        100|  2008-08-02|     260|    6|<p>You might be a...|
|     359|        119|  2008-08-02|     260|    5|<P>You could use ...|
|     473|         49|  2008-08-02|     470|    8|<p>No, what you'r...|
|     529|         86|  2008-08-02|     180|    3|<p>Isn't it also ...|
|     307|         49|  2008-08-02|     260|   28|<p><a href="http:...|
+--------+-----------+------------+--------+-----+--------------

4. Count the number of Programming Languages appear in column "Body" in "question" collection

In [8]:
prog_lang_re = r"Javascript|Java|Python|C\+\+|C#|Go|Ruby|PHP|HTML|CSS|SQL"

def prog_lang(string):
	return re.findall(prog_lang_re,string)
parse_prog_lang_udf = udf(prog_lang, ArrayType(StringType()))
prog_lang_df = new_questions_df.withColumn("Body", parse_prog_lang_udf("Body"))\

prog_lang_df.select(prog_lang_df.QuestionId, explode(prog_lang_df.Body).alias("Programming Language"))\
			.groupBy("Programming Language")\
			.count()\
			.show()

+--------------------+-----+
|Programming Language|count|
+--------------------+-----+
|                  C#| 6738|
|                 C++| 5227|
|          Javascript| 1544|
|                 CSS| 2864|
|                HTML| 8056|
|                 PHP| 7755|
|                 SQL|16719|
|                  Go| 5699|
|                Ruby| 2606|
|              Python| 4388|
|                Java|10364|
+--------------------+-----+



5. Find the most used domains in the "question" collection 

In [16]:
domain_re = r'\bhttps?://(?:www\.|ww2\.)?((?:[\w-]+\.){1,}\w+)\b'
def domain(string):
	return re.findall(domain_re,string)
parse_domain_udf = udf(domain, ArrayType(StringType()))
domain_df = new_questions_df.withColumn("Body", parse_domain_udf("Body"))
domain_df = domain_df.select(domain_df.QuestionId, explode(domain_df.Body).alias("Domain"))\
					#.groupBy("Domain")\
					#.count()\
					#.sort("Domain")

domain_df.show()

+----------+--------------------+
|QuestionId|              Domain|
+----------+--------------------+
|        90|svnbook.red-bean.com|
|        80|    en.wikipedia.org|
|        80|    en.wikipedia.org|
|       260|    en.wikipedia.org|
|       580|        red-gate.com|
|      1390|       microsoft.com|
|      2120|   stackoverflow.com|
|      2970|             98hs.ru|
|      2970|             98hs.ru|
|      2970|whois.domaintools...|
|      2970|             98hs.ru|
|      2970|             98hs.ru|
|      2970|             98hs.ru|
|      2970|             98hs.ru|
|      2970|             98hs.ru|
|      2970|             98hs.ru|
|      2970|             porv.ru|
|      2970|             98hs.ru|
|      2970|             porv.ru|
|      2970|             98hs.ru|
+----------+--------------------+
only showing top 20 rows



In [17]:
domain_re = r'\bhttps?://(?:www\.|ww2\.)?((?:[\w-]+\.){1,}\w+)\b'
def domain(string):
	return re.findall(domain_re,string)
parse_domain_udf = udf(domain, ArrayType(StringType()))
domain_df = new_questions_df.withColumn("Body", parse_domain_udf("Body"))
domain_df = domain_df.select(domain_df.QuestionId, explode(domain_df.Body).alias("Domain"))\
					.groupBy("Domain")\
					.count()\
					.sort(col("count").desc())

domain_df.show()

+--------------------+-----+
|              Domain|count|
+--------------------+-----+
|   stackoverflow.com| 6091|
|   i.stack.imgur.com| 1867|
|              w3.org| 1841|
|  msdn.microsoft.com| 1577|
|    en.wikipedia.org| 1417|
|     code.google.com|  851|
|         example.com|  843|
|          github.com|  750|
|schemas.microsoft...|  731|
|          google.com|  644|
|        jsfiddle.net|  623|
|        java.sun.com|  481|
| schemas.android.com|  380|
|        pastebin.com|  350|
| springframework.org|  343|
|      blogs.msdn.com|  284|
|          domain.com|  276|
|developer.android...|  267|
| developer.apple.com|  255|
|          mysite.com|  238|
+--------------------+-----+
only showing top 20 rows



6. Calculate the score Users get by day

In [34]:

score_user_df = new_questions_df.select("OwnerUserId", "CreationDate", "Score")\
								.withColumn("OwnerUserId",col("OwnerUserId").cast("Integer"))\
								.withColumn("Score",col("Score").cast("Integer"))\

running_total_window = Window.partitionBy("OwnerUserId")\
	.orderBy("OwnerUserId","CreationDate")\
	.rowsBetween(Window.unboundedPreceding, Window.currentRow)

score_user_df\
    .na.drop(how="any")\
    .withColumn("Score_total",f.sum("Score") .over(running_total_window))\
	.show()
# delete row null

+-----------+------------+-----+-----------+
|OwnerUserId|CreationDate|Score|Score_total|
+-----------+------------+-----+-----------+
|          1|  2008-11-26|   10|         10|
|          1|  2009-01-08|   20|         30|
|          1|  2009-10-08|   28|         58|
|          4|  2009-01-01|    4|          4|
|          4|  2009-02-14|    9|         13|
|          4|  2010-07-02|   66|         79|
|          5|  2008-12-28|    0|          0|
|          5|  2009-04-08|   12|         12|
|          5|  2011-03-28|   11|         23|
|          5|  2011-04-06|    2|         25|
|         17|  2008-08-05|   14|         14|
|         17|  2011-01-27|    0|         14|
|         20|  2011-04-21|    2|          2|
|         23|  2008-12-16|   27|         27|
|         25|  2008-08-23|   10|         10|
|         25|  2009-04-13|   71|         81|
|         25|  2010-12-21|    2|         83|
|         26|  2008-08-01|   26|         26|
|         26|  2008-08-14|    1|         27|
|         

7. Calculate the total of the score Users get for a period of time

In [19]:
START = '2008-01-01'
END = '2009-01-01'
score_user_df2 = score_user_df.filter(score_user_df.CreationDate > START)\
                                .filter(score_user_df.CreationDate < END)\

running_total_window2 = Window.partitionBy("OwnerUserId")\
	.orderBy("OwnerUserId","CreationDate")\
	.rowsBetween(Window.unboundedPreceding, Window.currentRow)

score_user_df2\
    .na.drop(how="any")\
    .withColumn("Score_total",f.sum("Score") .over(running_total_window2))\
    .show(5)

+-----------+------------+-----+-----------+
|OwnerUserId|CreationDate|Score|Score_total|
+-----------+------------+-----+-----------+
|          1|  2008-11-26|   10|         10|
|          5|  2008-12-28|    0|          0|
|         17|  2008-08-05|   14|         14|
|         23|  2008-12-16|   27|         27|
|         25|  2008-08-23|   10|         10|
+-----------+------------+-----+-----------+
only showing top 5 rows



8. Find the questions have number of answers more than 5


8.1. Bucket Join

In [20]:
spark.sql("CREATE DATABASE IF NOT EXISTS MY_DB")
spark.sql("USE MY_DB")

new_answers_df.coalesce(1).write\
    .bucketBy(5, "AnswerId")\
    .mode("overwrite")\
    .saveAsTable("MY_DB.answer_data")

new_questions_df.coalesce(1).write\
    .bucketBy(5, "QuestionId")\
    .mode("overwrite")\
    .saveAsTable("MY_DB.question_data")

8.2. Inner join 2 tables "answer" and "question"

In [21]:

new_answers_df2 = spark.read.table("MY_DB.answer_data")
new_questions_df2 = spark.read.table("MY_DB.question_data")

join_expr = new_questions_df2.QuestionId == new_answers_df2.ParentId

join_df = new_answers_df2.join(new_questions_df2, join_expr,"inner")\
                        .select("AnswerId","QuestionId")\
                        .groupBy("QuestionId")\
                        .count()\
                        .withColumn("count",col("count").cast("Integer"))
join_df.printSchema()
join_df.filter(col("count") > 5)\
        .sort(col("QuestionId"))\
        .show(5)


root
 |-- QuestionId: integer (nullable = true)
 |-- count: integer (nullable = false)

+----------+-----+
|QuestionId|count|
+----------+-----+
|       180|    7|
|       260|    8|
|       330|   10|
|       580|   14|
|       930|    6|
+----------+-----+
only showing top 5 rows



9. Find the Active User

In [22]:
Active_User_df1 = new_answers_df\
                                .na.drop(how="any")\
                                .groupBy(col("OwnerUserId").alias("UserId_answer"))\
                                .count()\
                                .withColumn("count",col("count").cast("Integer"))\
                                .sort("UserId_answer")\
                                .filter(col("count") > 50)

Active_User_df1.show(5)

Active_User_df2 = new_answers_df\
                                .na.drop(how="any")\
                                .groupBy(col("OwnerUserId").alias("UserId_score"))\
                                .agg(f.sum("Score").alias("TotalScore"))\
                                .withColumn("TotalScore",col("TotalScore").cast("Integer"))\
                                .sort("UserId_score")\
                                .filter(col("TotalScore") > 500)
Active_User_df2.show(5)

Active_User_df3 = new_answers_df\
                                .na.drop(how="any")\
                                .groupBy(col("OwnerUserId").alias("UserId_answer_date"),"CreationDate")\
                                .count()\
                                .withColumn("count",col("count").cast("Integer"))\
                                .sort(col("UserId_answer_date"))\
                                .filter(col("count") > 5)

Active_User_df3.show(5)


+-------------+-----+
|UserId_answer|count|
+-------------+-----+
|           13|  101|
|           67|  112|
|           91|   62|
|          267|  175|
|          304|   87|
+-------------+-----+
only showing top 5 rows

+------------+----------+
|UserId_score|TotalScore|
+------------+----------+
|          13|      1088|
|          29|      1635|
|          67|       817|
|          77|      1085|
|          91|       525|
+------------+----------+
only showing top 5 rows

+------------------+------------+-----+
|UserId_answer_date|CreationDate|count|
+------------------+------------+-----+
|               893|  2010-12-28|    6|
|              1114|  2011-06-10|    6|
|              1591|  2011-07-08|    6|
|              1659|  2008-09-24|    6|
|              1944|  2008-09-19|    8|
+------------------+------------+-----+
only showing top 5 rows



In [23]:
Active_User_df4 = Active_User_df1.select(col("UserId_answer").alias("ActiveUserId"))
Active_User_df5 = Active_User_df2.select(col("UserId_score").alias("ActiveUserId"))
Active_User_df6 = Active_User_df3.select(col("UserId_answer_date").alias("ActiveUserId"))

Active_User_df = Active_User_df4.union(Active_User_df5).distinct()
Active_User_df = Active_User_df.union(Active_User_df6).distinct()

Active_User_df.sort("ActiveUserId").show()


+------------+
|ActiveUserId|
+------------+
|          13|
|          29|
|          67|
|          77|
|          91|
|         142|
|         157|
|         184|
|         267|
|         304|
|         312|
|         357|
|         372|
|         419|
|         459|
|         476|
|         572|
|         615|
|         714|
|         740|
+------------+
only showing top 20 rows



In [33]:
number_answers_per_question_df = new_answers_df.select(col("ParentId").alias("Id"))\
                                .groupBy(col("Id"))\
                                .count()\
                                .withColumnRenamed("count","Number of answers")\
                                .sort(col("Id"))
number_answers_per_question_df.show(10)


+---+-----------------+
| Id|Number of answers|
+---+-----------------+
| 80|                3|
| 90|                3|
|120|                1|
|180|                7|
|260|                8|
|330|               10|
|470|                1|
|580|               14|
|650|                5|
|810|                4|
+---+-----------------+
only showing top 10 rows

