In [49]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import functions as F
from graphframes import *

In [2]:
spark = SparkSession.builder.appName('sg.edu.smu.is459.assignment2').getOrCreate()

2021-09-15 17:11:36,490 WARN util.Utils: Your hostname, MSI resolves to a loopback address: 127.0.1.1; using 172.22.141.61 instead (on interface eth0)
2021-09-15 17:11:36,492 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2021-09-15 17:11:37,918 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [309]:
# Load data
posts_df = spark.read.load('/gerard_tan/spark/hardwarezone.parquet')

# Clean the dataframe by removing rows with any null value
# posts_df = posts_df.na.drop()

In [311]:
# Find distinct users
#distinct_author = spark.sql("SELECT DISTINCT author FROM posts")
author_df = posts_df.select('author').distinct()

print('Author number :' + str(author_df.count()))


Author number :4674


In [312]:
# Assign ID to the users
author_id = author_df.withColumn('id', monotonically_increasing_id())
author_id.show()

+-------------+---+
|       author| id|
+-------------+---+
|   FreshMaker|  0|
|  deathdealer|  1|
|xArchangelSGE|  2|
|       vaxvms|  3|
|    r0t1prata|  4|
|      plasmic|  5|
|       Rellit|  6|
|     VeryFree|  7|
|       limboz|  8|
|  moisuperman|  9|
|     kimoasis| 10|
|       reno77| 11|
|      taker42| 12|
|TheSacredSoul| 13|
|  silent_espy| 14|
|       Expert| 15|
|       jt2901| 16|
|      amekago| 17|
| Rothschild12| 18|
|       Fr3ziX| 19|
+-------------+---+
only showing top 20 rows



In [313]:
# Construct connection between post and author
left_df = posts_df.select('topic', 'author') \
    .withColumnRenamed("topic","ltopic") \
    .withColumnRenamed("author","src_author")

right_df =  left_df.withColumnRenamed('ltopic', 'rtopic') \
    .withColumnRenamed('src_author', 'dst_author')

#  Self join on topic to build connection between authors
author_to_author = left_df. \
    join(right_df, left_df.ltopic == right_df.rtopic) \
    .select(left_df.src_author, right_df.dst_author) \
    .distinct()
edge_num = author_to_author.count()
print('Number of edges with duplicate : ' + str(edge_num))



Number of edges with duplicate : 1515450


                                                                                

In [314]:
# Convert it into ids
id_to_author = author_to_author \
    .join(author_id, author_to_author.src_author == author_id.author) \
    .select(author_to_author.dst_author, author_id.id) \
    .withColumnRenamed('id','src')

id_to_id = id_to_author \
    .join(author_id, id_to_author.dst_author == author_id.author) \
    .select(id_to_author.src, author_id.id) \
    .withColumnRenamed('id', 'dst')

id_to_id = id_to_id.filter(id_to_id.src >= id_to_id.dst).distinct()

id_to_id.cache()

DataFrame[src: bigint, dst: bigint]

In [315]:
print("Number of edges without duplciate :" + str(id_to_id.count()))



Number of edges without duplciate :760062


                                                                                

In [316]:
# Build graph with RDDs
graph = GraphFrame(author_id, id_to_id)

In [317]:
# For complex graph queries, e.g., connected components, you need to set
# the checkopoint directory on HDFS, so Spark can handle failures.
# Remember to change to a valid directory in your HDFS
spark.sparkContext.setCheckpointDir('/gerard_tan/spark/spark-checkpoint')

## My Part

### How large are the communities (connected components)?

In [318]:
# Get number of connected components
connected_components = graph.connectedComponents()

2021-09-17 16:39:59,882 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021-09-17 16:39:59,883 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
2021-09-17 16:39:59,901 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021-09-17 16:39:59,936 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021-09-17 16:39:59,938 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
2021-09-17 16:40:00,301 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021

2021-09-17 16:40:01,706 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021-09-17 16:40:01,745 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021-09-17 16:40:01,775 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021-09-17 16:40:01,779 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021-09-17 16:40:01,853 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021-09-17 16:40:01,911 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
2021

In [320]:
# Get total number of communities
num_communities = connected_components.count()
print("Number of Communities : " + str(num_communities))

Number of Communities : 4674


In [321]:
print("How large are the communities : ")
connected_components_group = connected_components.groupBy(["component"]).agg(F.count("id").alias("CommunitySize"))
display(connected_components_group.sort(F.desc("CommunitySize")).show(num_communities))

How large are the communities : 
+-------------+-------------+
|    component|CommunitySize|
+-------------+-------------+
|            0|         4548|
| 403726925828|            3|
| 146028888068|            3|
| 309237645316|            3|
| 678604832775|            3|
| 721554505728|            2|
| 712964571140|            2|
| 103079215111|            2|
| 962072674307|            2|
| 154618822660|            2|
| 146028888071|            2|
| 420906795012|            2|
| 266287972355|            1|
| 661424963596|            1|
|1425929142280|            1|
| 257698037765|            1|
| 618475290624|            1|
|1443109011462|            1|
|1228360646662|            1|
| 197568495620|            1|
| 472446402577|            1|
| 257698037772|            1|
|1623497637895|            1|
| 592705486863|            1|
|1228360646658|            1|
| 798863917060|            1|
|1606317768708|            1|
| 377957122049|            1|
|1228360646659|            1|
|  8589

None

### What are the key words of the community (frequent words)?

In [322]:
import re
from nltk.corpus import stopwords
stop_words = stopwords.words('english')
stop_words.append("-") # add "-" as a stop word to be removed

In [323]:
posts_renamed = posts_df.withColumnRenamed("author", "post_author")

In [324]:
component_topic_df = (posts_renamed
                      .join(connected_components, posts_renamed.post_author == connected_components.author, "left")
                      .select(connected_components.component, posts_renamed.content)
                      .distinct()
                     )

In [325]:
def clean_tokenize_sentence(sentence):
    cleaned_sentence = re.sub("[^-9a-z ]", "" , sentence.lower()).split()
    return [w for w in cleaned_sentence if w not in stop_words]

In [326]:
component_rdd = component_topic_df.rdd

In [327]:
topic_word_count = (component_rdd
                    .map(lambda x: (x[0], clean_tokenize_sentence(x[1])))
                    .flatMap(lambda x: map(lambda e: (x[0], e), x[1]))
                    .map(lambda x: (x,1))
                    .reduceByKey(lambda x, y: x+y)
                    .map(lambda x: (x[0][0], (x[0][1], x[1])))
                   )

In [328]:
topic_word_count_list = (topic_word_count
    .sortBy(lambda x: x[1][1], ascending=False)
    .groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
)

                                                                                

In [364]:
# Retrieve the top n most frequent words in each community
top_n = 5

column_names = ["community"]
for i in range(1,top_n + 1):
    word_col = "(keyword_" + str(i) + ", freq_" + str(i) + ")"
    column_names.append(word_col)

df_rows = []
for row in topic_word_count_list:
    df_row = [row[0]]
    df_row.extend([None] * top_n)
    freq_words = row[1][:top_n]
    df_row[1:len(freq_words)+1] = freq_words
    df_rows.append(df_row)    
    
keyword_df = spark.createDataFrame(df_rows).toDF(*column_names)

In [365]:
print("Key words of each community. Printing the top", top_n, "key words, as well as how many times the word appears")
keyword_df.show(num_communities)

Key words of each community. Printing the top 5 key words, as well as how many times the word appears
+-------------+--------------------+--------------------+-------------------+--------------------+-------------------+
|    community| (keyword_1, freq_1)| (keyword_2, freq_2)|(keyword_3, freq_3)| (keyword_4, freq_4)|(keyword_5, freq_5)|
+-------------+--------------------+--------------------+-------------------+--------------------+-------------------+
|            0|          {u, 11812}|       {play, 11285}|      {game, 10967}|        {like, 9652}|        {got, 9595}|
|1073741824000|           {mths, 3}|          {rated, 3}|           {fan, 3}|            {box, 2}|           {pwm, 2}|
| 223338299403|         {server, 2}|        {working, 2}|          {play, 2}|      {community, 2}|       {private, 2}|
| 214748364803|        {playing, 2}|        {closers, 2}|        {global, 2}|         {online, 2}|            {na, 1}|
| 429496729603|         {people, 2}|        {discord, 2}|        

### How cohesive are the communities (Average # of triangles over every user in a community)?

In [330]:
triangle_count_per_user = graph.triangleCount()
connected_components_renamed = connected_components.withColumnRenamed("id", "cc_id")

In [331]:
triangle_count_component = (triangle_count_per_user
                            .join(connected_components_renamed, triangle_count_per_user.id == connected_components_renamed.cc_id)
                           )

In [332]:
triangle_count_avg = (triangle_count_component
                      .groupBy("component")
                      .avg("count")
                      )

In [510]:
triangle_count_avg.sort(F.desc("avg(count)")).show(num_communities)

2021-09-19 20:17:11,588 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:11,590 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:11,593 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:11,597 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:11,599 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:11,611 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:11,614 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:11,624 WARN expressions.RowBasedKeyVal

2021-09-19 20:17:41,598 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:43,478 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:43,484 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:43,511 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:43,544 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:43,550 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:43,617 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:17:43,711 WARN expressions.RowBasedKeyVal

2021-09-19 20:18:39,860 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:18:41,504 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:18:41,756 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:18:42,060 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:18:42,119 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:18:42,343 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:18:42,528 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:18:42,658 WARN expressions.RowBasedKeyVal

2021-09-19 20:20:05,026 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:20:07,421 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:20:07,897 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:20:08,889 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:20:09,433 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:20:13,712 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:20:47,256 WARN expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2021-09-19 20:21:30,982 WARN expressions.RowBasedKeyVal

+-------------+-----------------+
|    component|       avg(count)|
+-------------+-----------------+
|            0|59009.74208443272|
| 146028888068|              1.0|
| 309237645316|              1.0|
| 678604832775|              1.0|
| 403726925828|              1.0|
| 618475290624|              0.0|
| 266287972355|              0.0|
|1425929142280|              0.0|
| 377957122049|              0.0|
| 661424963596|              0.0|
|1443109011462|              0.0|
| 257698037765|              0.0|
| 197568495620|              0.0|
| 670014898184|              0.0|
| 438086664196|              0.0|
|1228360646658|              0.0|
| 798863917060|              0.0|
| 146028888071|              0.0|
| 592705486863|              0.0|
|1228360646659|              0.0|
|1228360646662|              0.0|
| 257698037772|              0.0|
|1073741824000|              0.0|
| 910533066762|              0.0|
| 884763262980|              0.0|
|1116691496969|              0.0|
|  77309411331



### Is there any strange community?

In [487]:
posts_community_df = (posts_renamed
    .join(connected_components, posts_renamed.post_author == connected_components.author, "left")
 )

In [488]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, datediff, to_date, lit, lag, col
from pyspark.sql.window import Window

convert_date_udf = udf(lambda post_date: post_date.split("+")[0].replace("T"," "), StringType())

# Convert date to suitable format for pyspark
posts_community_df = posts_community_df.withColumn("date_published", convert_date_udf(posts_community_df.date_published))

In [489]:
# Find average number of days between posts

In [490]:
posts_community_df = posts_community_df.withColumn("post_freq", datediff(posts_community_df.date_published, lag(posts_community_df.date_published, 1)
    .over(Window.partitionBy("component")
    .orderBy("date_published"))))

In [519]:
avg_post_freq_days_df = (posts_community_df
                         .groupBy("component")
                         .agg(F.round(F.avg("post_freq"), 5).alias("avg_post_freq_days"), 
                              F.count("post_id").alias("number_of_posts"),
                              F.countDistinct("post_author").alias("number_of_authors"),
                              F.min("date_published").alias("first_post_date"),
                              F.max("date_published").alias("last_post_date"),
                              datediff(F.max("date_published"),F.min("date_published")).alias("days_apart")
                             )
                        )
avg_post_freq_days_df = avg_post_freq_days_df.na.fill(0)

In [520]:
avg_post_freq_days_df.sort(F.desc("avg_post_freq_days")).show(num_communities)



+-------------+------------------+---------------+-----------------+-------------------+-------------------+----------+
|    component|avg_post_freq_days|number_of_posts|number_of_authors|    first_post_date|     last_post_date|days_apart|
+-------------+------------------+---------------+-----------------+-------------------+-------------------+----------+
|1039382085632|            1229.0|              2|                1|2018-02-27 17:59:29|2021-07-10 07:55:06|      1229|
| 712964571140|             108.0|              2|                2|2020-05-07 19:00:17|2020-08-23 12:27:47|       108|
| 962072674307|              29.0|              2|                2|2015-07-19 20:03:56|2015-08-17 14:47:44|        29|
| 154618822660|              22.5|              3|                2|2020-09-24 15:18:15|2020-11-08 10:04:53|        45|
|  94489280525|              12.0|              2|                1|2020-08-06 02:19:51|2020-08-18 17:11:06|        12|
| 146028888068|               9.0|      

                                                                                