In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
# Get path to pyspark package and its jars directory
spark_jars_dir = os.path.join(os.path.dirname(pyspark.__file__), "jars")
print(f"Spark jars directory: {spark_jars_dir}")

# Download the connector JAR to the Spark jars directory
!wget -q https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar -P {spark_jars_dir}

# Create Spark session
spark = SparkSession.builder \
    .appName("Read GCS Parquet") \
    .config("spark.jars", "gcs-connector-hadoop3-latest.jar") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.1-s_2.12") \
    .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.hadoop.fs.gs.auth.type", "NONE") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "false") \
    .config("spark.hadoop.fs.gs.project.id", "") \
    .config("spark.hadoop.google.cloud.auth.null.enable", "true") \
    .getOrCreate()

Spark jars directory: /usr/local/lib/python3.11/dist-packages/pyspark/jars


In [3]:
# Load the data
posts_df = spark.read.parquet("gs://rotman-data/so-small/posts.parquet")

In [4]:
posts_df.printSchema()
posts_df.show(10)

root
 |-- _AcceptedAnswerId: long (nullable = true)
 |-- _AnswerCount: long (nullable = true)
 |-- _Body: string (nullable = true)
 |-- _ClosedDate: timestamp (nullable = true)
 |-- _CommentCount: long (nullable = true)
 |-- _CommunityOwnedDate: timestamp (nullable = true)
 |-- _ContentLicense: string (nullable = true)
 |-- _CreationDate: timestamp (nullable = true)
 |-- _Id: long (nullable = true)
 |-- _LastActivityDate: timestamp (nullable = true)
 |-- _LastEditDate: timestamp (nullable = true)
 |-- _LastEditorDisplayName: string (nullable = true)
 |-- _LastEditorUserId: long (nullable = true)
 |-- _OwnerDisplayName: string (nullable = true)
 |-- _OwnerUserId: long (nullable = true)
 |-- _ParentId: long (nullable = true)
 |-- _PostTypeId: long (nullable = true)
 |-- _Score: long (nullable = true)
 |-- _Tags: string (nullable = true)
 |-- _Title: string (nullable = true)
 |-- _ViewCount: long (nullable = true)

+-----------------+------------+--------------------+-----------+---------

In [5]:
tags_df = spark.read.parquet("gs://rotman-data/so-small/tags.parquet")

In [6]:
tags_df.printSchema()
tags_df.show(20)

root
 |-- _Count: long (nullable = true)
 |-- _ExcerptPostId: long (nullable = true)
 |-- _Id: long (nullable = true)
 |-- _TagName: string (nullable = true)
 |-- _WikiPostId: long (nullable = true)

+-------+--------------+---+----------+-----------+
| _Count|_ExcerptPostId|_Id|  _TagName|_WikiPostId|
+-------+--------------+---+----------+-----------+
| 341481|       3624959|  1|      .net|    3607476|
|1191229|       3673183|  2|      html|    3673182|
|2537512|       3624960|  3|javascript|    3607052|
| 808824|       3644670|  4|       css|    3644669|
|1469725|       3624936|  5|       php|    3607050|
| 408021|       3624961|  8|         c|    3607013|
|1626156|       3624962|  9|        c#|    3607007|
| 814806|       3624963| 10|       c++|    3606997|
| 229389|       3624964| 12|      ruby|    3607043|
|   7037|       3656743| 14|      lisp|    3656742|
|2219286|       3624965| 16|    python|    3607014|
|1923031|       3624966| 17|      java|    3607018|
| 261134|       3624

Option 2: Tag Co-occurrence Networks

Precise Question: Which technologies are most commonly used together, and what distinct technology ecosystems can be identified from tag co-occurrence patterns?

Expected Output:
1. A DataFrame with columns: tag1, tag2, co_occurrence_count, similarity
- tag1, tag2: Pairs of tags that appear together
- co_occurrence_count: Number of posts where both tags appear
- similarity: Similarity score calculated as intersection/union of posts (Jaccard similarity, TF-IDF, etc)
2. A list of the top 30 most strongly connected tag pairs, ranked by similarity
3. A visualization or table showing at least 3 identified technology clusters with their core and peripheral technologies

Key Spark Concepts: Complex aggregations, self-joins, graph analysis techniques

### A DataFrame with columns: tag1, tag2, co_occurrence_count, similarity

In [7]:
from pyspark.sql import functions as F

In [8]:
# Change the tags string into lists
posts_with_tags_df = posts_df.withColumn(
    "tags",
    F.split(                                # Step 3: split on commas
        F.regexp_replace(                  # Step 2: replace >< with ,
            F.regexp_replace(             # Step 1: remove leading/trailing <>
                F.col("_Tags"),
                "^<|>$",                  # regex for start-of-string < or end-of-string >
                ""
            ),
            "><",
            ","
        ),
        ","
    )
)

In [9]:
posts_with_tags_df = posts_with_tags_df.select("_Id", "tags")

In [10]:
posts_with_tags_df.show(10)

+------+--------------------+
|   _Id|                tags|
+------+--------------------+
|917569|                NULL|
|917421|                NULL|
|917603|                NULL|
|917621|                NULL|
|917689|                NULL|
|917391|[c++, qt, syntax-...|
|917714|                NULL|
|917801|                NULL|
|917484|[java, maps, geos...|
|917281|                NULL|
+------+--------------------+
only showing top 10 rows



In [12]:
# Filter out non_null tags
non_null_posts = posts_with_tags_df.filter(F.col("tags").isNotNull())

In [13]:
non_null_posts.show(5)

+------+--------------------+
|   _Id|                tags|
+------+--------------------+
|917391|[c++, qt, syntax-...|
|917484|[java, maps, geos...|
|917283|[interop, marshal...|
|917419|    [xhtml, ms-word]|
|917616|[c#, delegates, l...|
+------+--------------------+
only showing top 5 rows



In [14]:
# Make sure each post’s tag array has no duplicates
distinct_tags_df = non_null_posts.select(
    F.col("_Id"),
    F.array_distinct(F.col("tags")).alias("tags")
)

In [15]:
# transforming each element of the list into its own row
exploded = distinct_tags_df.select(
    "_Id",
    F.explode("tags").alias("tag")
)

In [16]:
# Self-join on postId to find pairs
pairs_df = (
    exploded.alias("df1")
    .join(exploded.alias("df2"), on="_Id")
    .filter(F.col("df1.tag") < F.col("df2.tag"))  # enforce ordering tag1 < tag2
    .select(
        F.col("df1.tag").alias("tag1"),
        F.col("df2.tag").alias("tag2")
    )
)

In [19]:
# grouping by each unique pair of tags (tag1, tag2) and then counting how many times each pair appear
cooccurrence_df = pairs_df.groupBy("tag1", "tag2").count()
cooccurrence_df = cooccurrence_df.withColumnRenamed("count", "co_occurrence_count")

In [None]:
# rename columns for clarity
tags_count_df = tags_df.select(
    F.col("_TagName").alias("tag"),
    F.col("_Count").alias("usage_count")
)

In [21]:
# Join the co-occurrence counts to usage counts
coocc_with_counts_df = (
    cooccurrence_df
    .join(tags_count_df.alias("t1"), cooccurrence_df["tag1"] == F.col("t1.tag"))
    .join(tags_count_df.alias("t2"), cooccurrence_df["tag2"] == F.col("t2.tag"))
    .select(
        F.col("tag1"),
        F.col("tag2"),
        F.col("co_occurrence_count"),
        F.col("t1.usage_count").alias("count_tag1"),
        F.col("t2.usage_count").alias("count_tag2")
    )
)

In [22]:
# Calculate Jaccard similarity
coocc_with_similarity_df = coocc_with_counts_df.withColumn(
    "similarity",
    F.col("co_occurrence_count") / (
        F.col("count_tag1") + F.col("count_tag2") - F.col("co_occurrence_count")
    )
)

In [23]:
MAIN_DF = coocc_with_similarity_df.drop("count_tag1", "count_tag2")

This is the desired DataFrame with columns: tag1, tag2, co_occurrence_count,similarity.

In [24]:
MAIN_DF.show(5)

+--------+------------+-------------------+--------------------+
|    tag1|        tag2|co_occurrence_count|          similarity|
+--------+------------+-------------------+--------------------+
|    html|radio-button|                259|2.152908005160329...|
|     sql|     tagging|                  5|7.393562277453775E-6|
|   shell|        unix|               1195|0.008557964994700507|
|     git|       macos|                174|6.385766347011351E-4|
|database|  text-files|                 10|4.939613228284225E-5|
+--------+------------+-------------------+--------------------+
only showing top 5 rows



### A list of the top 30 most strongly connected tag pairs, ranked by similarity

In [25]:
top_30_pairs_df = (
    MAIN_DF
    .orderBy(F.desc("similarity"))   # Sort descending by similarity
    .limit(30)                       # Keep only top 30
)

In [26]:
top_30_list = top_30_pairs_df.collect()

# Now top_30_list is a list of Row objects. You can convert each Row to a tuple if desired:
top_30_tuples = [
    (row["tag1"], row["tag2"], row["similarity"])
    for row in top_30_list
]

Here is a list of the top 30 most strongly connected tag pairs, ranked by similarity.

In [27]:
print(top_30_tuples)
print(len(top_30_tuples))

[('compress4j', 'jarchivelib', 1.0), ('harvester', 'harvester-hci', 1.0), ('fabric-lakehouse', 'fabric-warehouse', 1.0), ('appenvoy', 'mobileposse', 1.0), ('node-ottoman', 'ottoman', 1.0), ('mutablemapping', 'shadowsocks', 1.0), ('isolated-context', 'rebrowser', 1.0), ('android-xr', 'scenecore', 1.0), ('field-device-tool', 'xdr-schema', 0.5), ('ton-gas', 'tvm-ton', 0.5), ('intel-dl-frameworks', 'intel-dnn', 0.3333333333333333), ('android-compose-switch', 'jetpack-compose-switch', 0.25), ('resourcesloader', 'resourcesprovider', 0.25), ('camelot-.net-connector', 'camelot-php-tools', 0.25), ('angular-moviemasher', 'moviemasher', 0.25), ('drakon', 'drakon-editor', 0.2), ('netrexx', 'oorexx', 0.2), ('android-lockscreenwidget', 'flutter-lockscreenwidget', 0.2), ('tus-js-client', 'tusdotnet', 0.2), ('mostjs', 'motorcycle', 0.2), ('palmdb', 'pdb-palm', 0.16666666666666666), ('jetbrains-space', 'jetbrains-space-automation', 0.16666666666666666), ('mainlooper', 'robolectric-shadows', 0.166666666

### A visualization or table showing at least 3 identified technology clusters with their core and peripheral technologies

In [28]:
# In order to form more meaningful clusters, first filter out low similarity pairs
filtered_df = MAIN_DF.filter(F.col("similarity") >= 0.025)

In [29]:
filtered_df.show(5)

+-----------------+-----------------+-------------------+--------------------+
|             tag1|             tag2|co_occurrence_count|          similarity|
+-----------------+-----------------+-------------------+--------------------+
|           livedb|          racerjs|                  2| 0.14285714285714285|
|        fluent-ui|   fluentui-react|                 24|0.031413612565445025|
|easyphp-devserver|easyphp-webserver|                  1|0.047619047619047616|
|          ggbreak|        manhattan|                  1| 0.02631578947368421|
|          sunspot|    sunspot-rails|                 32|0.026122448979591838|
+-----------------+-----------------+-------------------+--------------------+
only showing top 5 rows



In [30]:
# Prepare edges for an undirected graph (src, dst, weight)
edges_df = filtered_df.select(
    F.col("tag1").alias("src"),
    F.col("tag2").alias("dst"),
    F.col("similarity").alias("weight")
)

# Add reverse edges for undirected analysis
reverse_edges_df = edges_df.select(
    F.col("dst").alias("src"),
    F.col("src").alias("dst"),
    F.col("weight")
)

In [31]:
# include all edges
all_edges_df = edges_df.union(reverse_edges_df).distinct()

In [32]:
all_edges_df.show(5)

+---------------+-----------------+--------------------+
|            src|              dst|              weight|
+---------------+-----------------+--------------------+
| catalystbyzoho|     zohocatalyst| 0.04081632653061224|
|      iso-image|          xorriso|0.045454545454545456|
|   swagger-node|    swagger-tools|                0.04|
|resourcesloader|resourcesprovider|                0.25|
|           iiif|          mirador| 0.04878048780487805|
+---------------+-----------------+--------------------+
only showing top 5 rows



In [33]:
# Retreive counts for each tags form tag_df
tags_count_df = tags_df.select(
    F.col("_TagName").alias("id"),
    F.col("_Count").alias("usage_count")
).distinct()

In [34]:
# Import GraphFrame for graph analysis
from graphframes import GraphFrame

In [35]:
# Build the GraphFrame
g = GraphFrame(tags_count_df, all_edges_df)



In [36]:
# detect communities using label propagation for 5 iterations
communities_df = g.labelPropagation(maxIter=5)



In [37]:
print("Columns:", communities_df.columns)
communities_df.printSchema()

Columns: ['id', 'usage_count', 'label']
root
 |-- id: string (nullable = true)
 |-- usage_count: long (nullable = true)
 |-- label: long (nullable = true)



In [38]:
# Import window for window functions
from pyspark.sql.window import Window

In [39]:
# Step 1. Rank each tag within its cluster by usage_count (highest first)
window_by_label = Window.partitionBy("label").orderBy(F.desc("usage_count"))
ranked_df = communities_df.withColumn("rank", F.row_number().over(window_by_label))


In [40]:
ranked_df.show(10)

+--------------------+-----------+-----+----+
|                  id|usage_count|label|rank|
+--------------------+-----------+-----+----+
|                .lib|        131|    0|   1|
|          agility.js|          8|    1|   1|
|        allow-modals|         12|    2|   1|
|amazon-cognito-fa...|         65|    3|   1|
|      android-alarms|       1182|    4|   1|
|android-jetpack-c...|          5|    5|   1|
|    android-manifest|       6111|    6|   1|
|    android-security|        660|    7|   1|
|android-shortcutm...|         19|    8|   1|
|       angular-chart|        300|    9|   1|
+--------------------+-----------+-----+----+
only showing top 10 rows



In [41]:
# Step 2. Define "core" as the top 1 tag per cluster, and "peripheral" as the remaining tags.
core_df = ranked_df.filter(F.col("rank") == 1)
peripheral_df = ranked_df.filter(F.col("rank") > 1)

In [42]:
# Step 3. Aggregate the tags for each cluster.
# Use collect_list to create an array of core and peripheral tags.
core_agg = core_df.groupBy("label").agg(F.collect_list("id").alias("core_tags"))
peripheral_agg = peripheral_df.groupBy("label").agg(F.collect_list("id").alias("peripheral_tags"))


In [43]:
# Join the core and peripheral aggregations on the cluster label.
clusters_df = core_agg.join(peripheral_agg, on="label", how="outer")

In [50]:
clusters_df.show(5)

+-----+--------------------+---------------+
|label|           core_tags|peripheral_tags|
+-----+--------------------+---------------+
|    0|              [.lib]|           NULL|
|    1|        [agility.js]|           NULL|
|    5|[android-jetpack-...|           NULL|
|    6|  [android-manifest]|           NULL|
|    7|  [android-security]|           NULL|
+-----+--------------------+---------------+
only showing top 5 rows



In [44]:
# Filter out clusters that have multiple peripheral tags
clusters_df_filtered = clusters_df.filter(
    (F.col("peripheral_tags").isNotNull()) & (F.size("peripheral_tags") > 1)
)

In [45]:
clusters_df_filtered.show(5)

+------------+--------------------+--------------------+
|       label|           core_tags|     peripheral_tags|
+------------+--------------------+--------------------+
|103079215205|    [fastlane-pilot]|[unity3d-cloud-bu...|
|214748364928|           [sciruby]|[daru, iruby, nma...|
|283467841539|            [scully]|[scullyio, angula...|
|283467841542|[android-virtuald...|[android-multi-di...|
|326417514680|        [neshan-map]|[neshan, neshan-p...|
+------------+--------------------+--------------------+
only showing top 5 rows



Here is A table showing 5 identified technology clusters with their core and peripheral technologies

In [46]:
DF_show = clusters_df_filtered.limit(5)

In [47]:
summary_df = DF_show.toPandas()

In [48]:
import pandas as pd

In [49]:
summary_df.head()

Unnamed: 0,label,core_tags,peripheral_tags
0,103079215205,[fastlane-pilot],"[unity3d-cloud-build, unity-cloud-build]"
1,214748364928,[sciruby],"[daru, iruby, nmatrix]"
2,283467841539,[scully],"[scullyio, angular-scully, analogjs]"
3,283467841542,[android-virtualdisplay],"[android-multi-display, android-presentation]"
4,326417514680,[neshan-map],"[neshan, neshan-platform, neshan-android-sdk]"
