#Looking for the sum of disconnected groups of user interactions in different game categories based on their activity under videos.

In [5]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=80fc76a3790ebbd4bbe488a8b9d5248b3362dc41e0722d29d9e6426c6019026a
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("colab")\
        .getOrCreate()

In [7]:
from pyspark import SparkFiles
from pyspark.sql import functions as sf
from pyspark.sql.window import Window
import networkx as nx

In [9]:
spark.sparkContext.addFile("/content/games.csv")
spark.sparkContext.addFile("/content/game_movies.csv")
spark.sparkContext.addFile("/content/game_groups.csv")
spark.sparkContext.addFile("/content/comments.csv")

games = spark.read.csv("file:///"+SparkFiles.get("/content/games.csv"), header = True, inferSchema = True)
game_movies = spark.read.csv("file:///"+SparkFiles.get("/content/game_movies.csv"), header = True, inferSchema = True)
game_groups = spark.read.csv("file:///"+SparkFiles.get("/content/game_groups.csv"), header = True, inferSchema = True)
comments = spark.read.csv("file:///"+SparkFiles.get("/content/comments.csv"), header = True, inferSchema = True)

In [13]:
# Merging DataFrames, grouping, and sorting by id_movie
merged_data_game_group = comments.join(game_movies, 'id_movie', 'inner') \
                                  .join(games, 'id_game', 'inner') \
                                  .join(game_groups, 'id_game_group', 'inner') \
                                  .orderBy('id_movie')

# Window specifying data partitioning for each game group
window_spec = Window.partitionBy("id_game_group")

# Counting unique pairs (movie, user) within each game group
edges = merged_data_game_group.select("id_movie", "user", "id_game_group") \
    .groupBy("id_movie", "user", "id_game_group") \
    .agg(sf.count("*").over(window_spec).alias("interaction_count")) \
    .filter(sf.col("interaction_count") > 1) \
    .groupBy("id_game_group", "id_movie") \
    .agg(sf.collect_list("user").alias("users"))

# Creating a graph
def create_graph(rows):
    G = nx.Graph()
    for row in rows:
        users = row['users']
        unique_users = list(set(users))
        for i in range(len(unique_users)):
            for j in range(i + 1, len(unique_users)):
                user1 = unique_users[i]
                user2 = unique_users[j]
                G.add_edge(user1, user2)
    return G

# Applying the function to each partition
graph_per_group = edges.rdd.groupBy(lambda x: x["id_game_group"]).mapValues(create_graph)

# Counting the number of interaction groups for each game group
count_of_interactions_per_group = graph_per_group.mapValues(lambda G: len(list(nx.connected_components(G))))

# Summing the number of interaction groups for all game groups
sum_of_interactions_per_group = count_of_interactions_per_group.map(lambda x: x[1]).sum()

# Displaying the result
print(sum_of_interactions_per_group)


37
