In [2]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, length, from_json, expr, split, lit, to_date, explode, count, lower, trim, regexp_replace
from pyspark.sql.functions import substring, max as spark_max, ceil, input_file_name, from_unixtime, regexp_extract, concat
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField, MapType, ArrayType, DoubleType, DateType, IntegerType

In [3]:
import xml.etree.ElementTree as ET
import requests
import os
import collections
import time
import html
import json

import pandas as pd
import yake
import numpy as np
from tqdm import tqdm
from graphdatascience import GraphDataScience

In [4]:
import utils

In [5]:
AWS_ACCESS_KEY_ID = 'test_key_id'
AWS_SECRET_ACCESS_KEY = 'test_access_key'
HOST = 's3'
ENDPOINT_URL = f'http://{HOST}:4566'

TEMP_DIR = './local_data'
DOWNLOAD_FROM_S3 = False

In [6]:
spark = (
    SparkSession
    .builder
    .config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:5.3.0_for_spark_3")
    .getOrCreate()
)

# Collect data from S3

In [7]:
CONTENTS = ['movie', 'boardgame', 'videogame', 'anime']

In [8]:
def download_raw_data_of_content(content):
    print(f'Downloading raw-data of {content}...')
    
    target_dir = f"{TEMP_DIR}/{content}"
    
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)
        
    s3 = utils.S3_conn()

    paginator = s3.s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket='raw-data', Prefix=content)
    for page in page_iterator:
        if 'Contents' in page:
            for obj in tqdm(page['Contents']):
                key = obj['Key']
                local_file_path = f'{target_dir}/{key[len(content) + 1:]}'# os.path.join(target_dir, key[len(kind):])
                local_file_dir = os.path.dirname(local_file_path)
                
                if not os.path.exists(local_file_dir):
                    os.makedirs(local_file_dir)
                
                s3.s3_client.download_file('raw-data', key, local_file_path)

In [9]:
%%time
if DOWNLOAD_FROM_S3:
    for content in CONTENTS:
        download_raw_data_of_content(content)

CPU times: user 1 µs, sys: 2 µs, total: 3 µs
Wall time: 8.11 µs


# Transform data

In [10]:
s3 = utils.S3_conn()

In [11]:
def store_processed_parquet(local_directory, prefix):
    bucket_name = 'processed-data'
    
    for root, dirs, files in tqdm(os.walk(local_directory)):
        for filename in files:
            # Construct the full local path
            local_path = os.path.join(root, filename)
            
            # Construct the relative path for S3
            relative_path = os.path.relpath(local_path, local_directory)
            s3_path = os.path.join(prefix, relative_path).replace("\\", "/")  # Ensure Unix-style paths for S3
            
            # Upload the file to S3
            s3.s3_client.upload_file(local_path, bucket_name, s3_path)

## Boardgames

In [12]:
BOARDGAME_USERS_XML_PATH = './local_data/boardgame/collection'
BOARDGAME_USERS_PARQUET_PATH = './local_data/boardgame/processed_data/boardgame_users.parquet'
BOARDGAME_CONTENT_XML_PATH = './local_data/boardgame/boardgame'
BOARDGAME_CONTENT_PARQUET_PATH = './local_data/boardgame/processed_data/boardgame_content.parquet'

In [13]:
def xml_collection_to_dataframe(xml_file) -> pd.DataFrame:
    with open(xml_file, 'r') as f:
        r_text = f.read()
        root = ET.fromstring(r_text)

    df_user_id = []
    df_type = []
    df_content_id = []
    df_rating = []
    df_rating_date = []
    
    for bg in root:
        bg_name = bg[0].text
        coll_id = bg.attrib['collid']
        object_id = bg.attrib['objectid']  # This is the boardgame identifier

        rating_val = None
        for field in bg:
            if field.tag == 'stats':
                rating_val = field[0].attrib['value']
                if rating_val == 'N/A':
                    rating_val = None
            if field.tag == 'yearpublished':
                year_published = field.text
            if field.tag == 'status':
                date_of_rating = field.attrib['lastmodified']  # Not really the rating date, but it is as close as possible with the current information.

        df_user_id.append(xml_file.split('/')[-1][:-4])
        df_type.append('boardgame')
        df_content_id.append(object_id)
        df_rating.append(rating_val)
        df_rating_date.append(date_of_rating)

    return pd.DataFrame({
        'user_id': pd.Series(df_user_id, dtype='str'),
        'type': pd.Series(df_type, dtype='category'),
        'content_id': pd.Series(df_content_id, dtype='str'),
        'rating': pd.Series(df_rating, dtype='float64'),
        'rating_date': pd.Series(df_rating_date, dtype='datetime64[ms]')
    })

In [14]:
def create_boardgame_users_parquet():
    if not os.path.exists(BOARDGAME_USERS_PARQUET_PATH):
        os.makedirs(BOARDGAME_USERS_PARQUET_PATH)
        
    for xml in filter(lambda x: x.endswith('.xml'), os.listdir(BOARDGAME_USERS_XML_PATH)):
        try:
            df = xml_collection_to_dataframe(f'{BOARDGAME_USERS_XML_PATH}/{xml}')
            parquet_path = f'{BOARDGAME_USERS_PARQUET_PATH}/{xml[:-4]}.parquet'
            df.to_parquet(parquet_path)
        except Exception as e:
            print(e)
            print(f'Error: Invalid xml file: {xml}')

In [15]:
def xml_boardgame_to_dataframe():
    df_content_id = []
    df_content_description = []
    df_content_year = []
    df_title = []

    for folder in os.listdir(BOARDGAME_CONTENT_XML_PATH):
        with open(f"{BOARDGAME_CONTENT_XML_PATH}/{folder}/1.xml", 'r') as f:
            r_text = f.read()
        df_content_id.append(folder)
        root = ET.fromstring(r_text)
        for bg in root:
            for field in bg:
                if field.tag == 'name' and field.attrib['type'] == 'primary':
                    df_title.append(field.attrib['value'])
                if field.tag == 'description':
                    df_content_description.append(html.unescape(field.text))
                if field.tag == 'yearpublished':
                    df_content_year.append(int(field.attrib['value']))

    return pd.DataFrame({
        'content_id': pd.Series(df_content_id, dtype='str'),
        'description': pd.Series(df_content_description, dtype='str'),
        'release_year': pd.Series(df_content_year, dtype='Int16'),
        'title': pd.Series(df_title, dtype='str')
    })

In [16]:
def create_boardgame_content_parquet():
    if not os.path.exists(BOARDGAME_CONTENT_PARQUET_PATH):
        os.makedirs(BOARDGAME_CONTENT_PARQUET_PATH)

    schema = StructType([
        StructField("content_id", StringType(), True),
        StructField("description", StringType(), True),
        StructField("release_year", IntegerType(), True),
        StructField("title", StringType(), True)
    ])
    
    df = xml_boardgame_to_dataframe()
    df['description'] = df['description'].astype('str')
    df = df.replace([np.nan], [None])
    
    boardgame_content = (
        spark
        .createDataFrame(df, schema=schema)
        .withColumn('type', lit('boardgame'))
    )
    
    # Save parquet to processed-data zone
    boardgame_content.write.mode('overwrite').parquet(BOARDGAME_CONTENT_PARQUET_PATH)

In [17]:
def get_boardgame_users_df():
    boardgame_users = spark.read.parquet(BOARDGAME_USERS_PARQUET_PATH)
    return boardgame_users

In [18]:
def get_boardgame_content_df():
    boardgame_content = spark.read.parquet(BOARDGAME_CONTENT_PARQUET_PATH)
    return boardgame_content

In [19]:
create_boardgame_users_parquet()

'collid'
Error: Invalid xml file: Century.xml
'collid'
Error: Invalid xml file: Icythistle.xml
'collid'
Error: Invalid xml file: ItsCharlieVP.xml
'collid'
Error: Invalid xml file: nugenet.xml
'collid'
Error: Invalid xml file: marioymia.xml
'collid'
Error: Invalid xml file: RobMcWiz.xml
'collid'
Error: Invalid xml file: zigooloo.xml
'collid'
Error: Invalid xml file: Halenor.xml


In [20]:
create_boardgame_content_parquet()

In [21]:
boardgame_users = get_boardgame_users_df()
boardgame_users.show(5)
boardgame_users.printSchema()

+-----------+---------+----------+------+-------------------+
|    user_id|     type|content_id|rating|        rating_date|
+-----------+---------+----------+------+-------------------+
|zefquaavius|boardgame|    322232|   6.0|2023-08-01 14:52:32|
|zefquaavius|boardgame|    296402|   8.0|2023-08-02 14:18:24|
|zefquaavius|boardgame|    336537|  null|2023-08-02 14:18:38|
|zefquaavius|boardgame|    314445|  null|2023-08-02 14:18:54|
|zefquaavius|boardgame|    296404|  null|2023-08-02 14:19:11|
+-----------+---------+----------+------+-------------------+
only showing top 5 rows

root
 |-- user_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- content_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- rating_date: timestamp_ntz (nullable = true)



In [22]:
boardgame_content = get_boardgame_content_df()
boardgame_content.show(5)
boardgame_content.printSchema()

+----------+--------------------+------------+--------------------+---------+
|content_id|         description|release_year|               title|     type|
+----------+--------------------+------------+--------------------+---------+
|    189314|Set of seven prom...|        2015|Fallen: Outlands ...|boardgame|
|    157661|Grifters is a han...|        2015|            Grifters|boardgame|
|    174391|Exposed is a quic...|        2016|             Exposed|boardgame|
|    168054|Alone is a sci-fi...|        2019|               Alone|boardgame|
|    191932|From the official...|        2012|Soccero (Second E...|boardgame|
+----------+--------------------+------------+--------------------+---------+
only showing top 5 rows

root
 |-- content_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- type: string (nullable = true)



In [23]:
store_processed_parquet(BOARDGAME_USERS_PARQUET_PATH, prefix='boardgame')

1it [00:05,  5.97s/it]


In [24]:
store_processed_parquet(BOARDGAME_CONTENT_PARQUET_PATH, prefix='boardgame')

1it [00:00,  1.58it/s]


## Movies

In [25]:
MOVIE_BASE_PARQUET_PATH = './local_data/movie/review'
MOVIE_BASE_INFO_PATH = './local_data/movie/info'
MOVIE_USERS_PARQUET_PATH = "./local_data/movie/processed_data/movie_users.parquet"
MOVIE_CONTENT_PARQUET_PATH = "./local_data/movie/processed_data/movie_content.parquet"

In [26]:
def create_movie_users_parquet():
    schema = ArrayType(
        StructType([
            StructField("author", StringType(), True),
            StructField("author_details", StructType([
                StructField("rating", StringType(), True)
            ]), True),
            StructField("created_at", StringType(), True),
        ])
    )
    
    movie_users = spark.read.parquet(MOVIE_BASE_PARQUET_PATH)\
              .filter(length("results")>2)\
              .withColumn("results_test", col('results'))\
              .withColumn("results_parsed", from_json(col("results_test"), schema))\
              .withColumn("result_exploded", explode(col("results_parsed")))\
              .withColumn('result_exploded', col("result_exploded").cast(StringType()))
    
    split_col = split(movie_users['result_exploded'], ', ')
    
    movie_users = movie_users.withColumn('author', split_col.getItem(0)) \
               .withColumn('author', expr("substring(author,2, length(author) -1)")) \
               .withColumn('rating', split_col.getItem(1)) \
               .withColumn("rating", expr("substring(rating, 2, length(rating) - 2)"))\
               .withColumn("rating", col('rating').cast(DoubleType()))\
               .withColumn('rating_date', split_col.getItem(2))\
               .withColumn('rating_date', expr("substring(rating_date,1, length(rating_date) -1)"))\
               .withColumn("rating_date", to_date(col("rating_date"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))\
               .select(col('author').alias('user_id'), lit('movie').alias('type'), col('id').alias('content_id').cast(StringType()), 'rating', 'rating_date')

    if not os.path.exists(MOVIE_USERS_PARQUET_PATH):
        os.makedirs(MOVIE_USERS_PARQUET_PATH)

    movie_users.repartition(1).write.mode('overwrite').parquet(MOVIE_USERS_PARQUET_PATH)

In [27]:
def create_movie_content_parquet():
    if not os.path.exists(MOVIE_CONTENT_PARQUET_PATH):
        os.makedirs(MOVIE_CONTENT_PARQUET_PATH)
    
    movie_content = (
        spark
        .read.parquet(MOVIE_BASE_INFO_PATH)
        .select(col('id').alias('content_id'), col('overview').alias('description'), col('release_date').alias('release_year'),col('original_title').alias('title'))
        .withColumn('release_year', substring("release_year", 1, 4))
        .withColumn('type', lit('movie'))
        .repartition(1).write.mode('overwrite').parquet(MOVIE_CONTENT_PARQUET_PATH)
    )

In [28]:
def get_movie_users_df():
    movie_users = spark.read.parquet(MOVIE_USERS_PARQUET_PATH)
    return movie_users

In [29]:
def get_movie_content_df():
    movie_content = spark.read.parquet(MOVIE_CONTENT_PARQUET_PATH)
    return movie_content

In [30]:
create_movie_users_parquet()

In [31]:
create_movie_content_parquet()

In [32]:
movie_users = get_movie_users_df()
movie_users.show(5)
movie_users.printSchema()

+----------------+-----+----------+------+-----------+
|         user_id| type|content_id|rating|rating_date|
+----------------+-----+----------+------+-----------+
|         SWITCH.|movie|    577922|   6.0| 2020-08-25|
|Manuel São Bento|movie|    577922|   8.0| 2020-08-27|
|    Simon Massey|movie|    577922|   9.0| 2020-08-27|
|          Daniel|movie|    577922|  10.0| 2020-09-04|
|Stephen Campbell|movie|    577922|   2.0| 2020-09-30|
+----------------+-----+----------+------+-----------+
only showing top 5 rows

root
 |-- user_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- content_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- rating_date: date (nullable = true)



In [33]:
movie_content = get_movie_content_df()
movie_content.show(5)
movie_content.printSchema()

+----------+--------------------+------------+----------------------+-----+
|content_id|         description|release_year|                 title| type|
+----------+--------------------+------------+----------------------+-----+
|    137193|In a cabin of the...|        2012|  Le Jour des Corne...|movie|
|     17771|The story of a Je...|        1999|              Sunshine|movie|
|    605389|It is the early 1...|        2019|    アルキメデスの大戦|movie|
|    710488|A cinematic journ...|        2019|        Citizen Europe|movie|
|    126963|The events of Bat...|        2013|ドラゴンボールZ 神と神|movie|
+----------+--------------------+------------+----------------------+-----+
only showing top 5 rows

root
 |-- content_id: long (nullable = true)
 |-- description: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- title: string (nullable = true)
 |-- type: string (nullable = true)



In [34]:
store_processed_parquet(MOVIE_USERS_PARQUET_PATH, prefix='movie')

1it [00:00, 14.77it/s]


In [35]:
store_processed_parquet(MOVIE_CONTENT_PARQUET_PATH, prefix='movie')

1it [00:00,  8.97it/s]


## Anime

In [38]:
ANIME_BASE_CONTENT_PATH = './local_data/anime/info'
ANIME_BASE_USERS_PATH = './local_data/anime/user_info'
ANIME_TEMP_PARQUET_PATH = './local_data/anime/temp'
ANIME_USERS_PARQUET_PATH = "./local_data/anime/processed_data/anime_users.parquet"
ANIME_CONTENT_PARQUET_PATH = "./local_data/anime/processed_data/anime_content.parquet"

In [39]:
def create_anime_users_parquet():
    if not os.path.exists(ANIME_USERS_PARQUET_PATH):
        os.makedirs(ANIME_USERS_PARQUET_PATH)
    path_for_anime_lists = ANIME_BASE_USERS_PATH
    user_anime_lists_paths = os.listdir(path_for_anime_lists)
    
    df = spark.read.json(
        path = [f'{path_for_anime_lists}/{i}' for i in user_anime_lists_paths],
        multiLine = True, 
        mode = 'DROPMALFORMED'
    ).withColumn('file_name', input_file_name()).select(
        from_unixtime(col('updated_at')).alias('rating_date'),
        col('score').alias('rating'),
        col('anime_id').alias('content_id'),
        regexp_extract(col('file_name'), '\/([^\/]+)\.json$', 1).alias('user_id'),
    )\
    .withColumn('type', lit('anime'))\
    .coalesce(1).write.mode('overwrite').parquet(ANIME_USERS_PARQUET_PATH)

In [40]:
def create_anime_content_parquet():
    if not os.path.exists(ANIME_CONTENT_PARQUET_PATH):
        os.makedirs(ANIME_CONTENT_PARQUET_PATH)

    if not os.path.exists(ANIME_TEMP_PARQUET_PATH):
        os.makedirs(ANIME_TEMP_PARQUET_PATH)
    
    path_for_animes = ANIME_BASE_CONTENT_PATH
    anime_paths = os.listdir(path_for_animes)
    
    batch_size = 1000
    cnt = 0
    
    while len(anime_paths) > cnt * batch_size :
        df = spark.read.json(
            path = [f'{path_for_animes}/{i}' for i in anime_paths][cnt * batch_size: (cnt + 1) * batch_size],
            multiLine = True, 
            mode = 'DROPMALFORMED'
        )\
        .dropna(subset=['data.aired.prop.from.year'])
        df.write.mode('overwrite').parquet(f'{ANIME_TEMP_PARQUET_PATH}/{cnt}')
        cnt += 1
    
    parquet_files_path = ANIME_TEMP_PARQUET_PATH
    parquet_files = os.listdir(parquet_files_path)
    df = spark.read.parquet(*[f'{parquet_files_path}/{i}' for i in parquet_files])
    df.select(
        col('data.synopsis').alias('description'),
        col('data.title').alias('title'),
        col('data.mal_id').cast(StringType()).alias('content_id'),
        col('data.aired.prop.from.year').alias('release_year')
    )\
    .withColumn('type', lit('anime'))\
    .coalesce(1).write.mode('overwrite').parquet(ANIME_CONTENT_PARQUET_PATH)

In [41]:
def get_anime_users_df():
    anime_users = spark.read.parquet(ANIME_USERS_PARQUET_PATH)
    return anime_users

In [42]:
def get_anime_content_df():
    anime_content = spark.read.parquet(ANIME_CONTENT_PARQUET_PATH)
    return anime_content

In [43]:
create_anime_users_parquet()

In [44]:
create_anime_content_parquet()

In [45]:
anime_users = get_anime_users_df()
anime_users.show(5)
anime_users.printSchema()

+-------------------+------+----------+---------+-----+
|        rating_date|rating|content_id|  user_id| type|
+-------------------+------+----------+---------+-----+
|2023-03-17 00:35:34|     0|       918|Nabil_967|anime|
|2023-08-13 12:15:08|    10|        21|Nabil_967|anime|
|2022-07-05 23:35:51|     0|     48583|Nabil_967|anime|
|2023-07-29 14:19:26|     9|     52034|Nabil_967|anime|
|2022-03-01 19:53:09|     5|     41380|Nabil_967|anime|
+-------------------+------+----------+---------+-----+
only showing top 5 rows

root
 |-- rating_date: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- content_id: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- type: string (nullable = true)



In [46]:
anime_content = get_anime_content_df()
anime_content.show(5)
anime_content.printSchema()

+--------------------+--------------------+----------+------------+-----+
|         description|               title|content_id|release_year| type|
+--------------------+--------------------+----------+------------+-----+
|During their ques...|InuYasha Movie 1:...|       452|        2001|anime|
|In the year Cosmi...|Kidou Senshi Gund...|        93|        2002|anime|
|The final battle ...|Sword Art Online:...|     40540|        2020|anime|
|On his way to a c...|Tensei Kizoku no ...|     52608|        2023|anime|
|Awaking to absolu...|Sokushi Cheat ga ...|     53730|        2024|anime|
+--------------------+--------------------+----------+------------+-----+
only showing top 5 rows

root
 |-- description: string (nullable = true)
 |-- title: string (nullable = true)
 |-- content_id: string (nullable = true)
 |-- release_year: long (nullable = true)
 |-- type: string (nullable = true)



In [47]:
store_processed_parquet(ANIME_USERS_PARQUET_PATH, prefix='anime')

1it [00:00, 11.52it/s]


In [48]:
store_processed_parquet(ANIME_CONTENT_PARQUET_PATH, prefix='anime')

1it [00:00,  8.52it/s]


## Videogames

In [49]:
VIDEOGAME_BASE_SUMMARIES_PATH = './local_data/videogame/player_profile.json'
VIDEOGAME_BASE_PROFILES_PATH = './local_data/videogame/games_played.json'
VIDEOGAME_BASE_GAMES_PATH = './local_data/videogame/steam_games.json'
VIDEOGAME_USERS_PARQUET_PATH = "./local_data/videogame/processed_data/v_users.parquet"
VIDEOGAME_CONTENT_PARQUET_PATH = "./local_data/videogame/processed_data/videogame_content.parquet"

In [50]:
def create_videogame_users_parquet():
    # Load player_summaries.json
    with open(VIDEOGAME_BASE_SUMMARIES_PATH, 'r') as f:
        player_summaries_data = json.load(f)
    
    # Load steam_profiles.json
    with open(VIDEOGAME_BASE_PROFILES_PATH, 'r') as f:
        steam_profiles_data = json.load(f)
    
    # Initialize list to store data
    common_rows = []
    
    # Process data from steam_profiles_data
    for steam_profiles in steam_profiles_data:
        steamid = list(steam_profiles.keys())[0]
        games = steam_profiles[steamid]
        player_summary = next((summary for summary in player_summaries_data if steamid in summary), None)
        # Check if the player summary data is available and not empty
        if player_summary and player_summary[steamid]:
            personaname = player_summary[steamid].get('personaname', 'Unknown')
            for game in games:
                appid = game['appid']
                playtime_forever = game['playtime_forever']
                if playtime_forever > 0:  # Skip if playtime_forever is 0
                    common_rows.append({'user_id': personaname, 'type': 'videogame', 'content_id': appid, 'temp_rating': playtime_forever})

    common_df = spark.createDataFrame(common_rows)
    max_playtime = common_df.groupBy('user_id').agg(spark_max('temp_rating').alias('max_temp_rating'))
    common_df = common_df.join(max_playtime, on='user_id')
    common_df = common_df.withColumn('rating', (col('temp_rating') / col('max_temp_rating')) * 10)
    common_df = common_df.withColumn('rating', ceil(col('rating')))
    common_df = common_df.drop('temp_rating', 'max_temp_rating')
    common_df = common_df.withColumn('rating_date', lit(None).cast('string'))
    common_df.write.mode('overwrite').parquet(VIDEOGAME_USERS_PARQUET_PATH)

In [51]:
def create_videogame_content_parquet():
    # Load the dataset from games.json
    dataset = {}
    if os.path.exists(VIDEOGAME_BASE_GAMES_PATH):
        with open(VIDEOGAME_BASE_GAMES_PATH, 'r', encoding='utf-8') as fin:
            text = fin.read()
            if len(text) > 0:
                dataset = json.loads(text)
    
    # Initialize list to store data
    rows = []
    
    # Extract the relevant data
    for app_id, game_info in dataset.items():
        name = game_info.get('name', '')
        release_date = game_info.get('release_date', '')
        # Extract the year from the release_date
        if release_date:
            release_year = release_date.split()[-1]
        else:
            release_year = ''
        description = game_info.get('detailed_description', '')
    
        rows.append(Row(content_id=app_id, title=name, release_year=release_year, description=description))
    
    # Create Spark DataFrame
    df = spark.createDataFrame(rows)
    df = df.withColumn('type', lit('videogame'))
    df.write.mode('overwrite').parquet(VIDEOGAME_CONTENT_PARQUET_PATH)

In [52]:
def get_videogame_users_df():
    videogame_users = spark.read.parquet(VIDEOGAME_USERS_PARQUET_PATH)
    return videogame_users

In [53]:
def get_videogame_content_df():
    videogame_content = spark.read.parquet(VIDEOGAME_CONTENT_PARQUET_PATH)
    return videogame_content

In [54]:
create_videogame_users_parquet()

In [55]:
create_videogame_content_parquet()

In [56]:
videogame_users = get_videogame_users_df()
videogame_users.show(5)
videogame_users.printSchema()

+-------+----------+---------+------+-----------+
|user_id|content_id|     type|rating|rating_date|
+-------+----------+---------+------+-----------+
|   Fooo|       300|videogame|     1|       null|
|   Fooo|      4000|videogame|     1|       null|
|   Fooo|      2600|videogame|     1|       null|
|   Fooo|       220|videogame|     1|       null|
|   Fooo|       500|videogame|     1|       null|
+-------+----------+---------+------+-----------+
only showing top 5 rows

root
 |-- user_id: string (nullable = true)
 |-- content_id: long (nullable = true)
 |-- type: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- rating_date: string (nullable = true)



In [57]:
videogame_content = get_videogame_content_df()
videogame_content.show(5)
videogame_content.printSchema()

+----------+--------------------+------------+--------------------+---------+
|content_id|               title|release_year|         description|     type|
+----------+--------------------+------------+--------------------+---------+
|    837390|        My zero trip|        2018|My zero trip is a...|videogame|
|   1564580|Nevertales: Faryo...|        2021|Mad Head Games re...|videogame|
|    263560|      Paper Sorcerer|        2014|Paper Sorcerer is...|videogame|
|    831230|    Doors Quest Demo|        2018|Doors Quest Demo ...|videogame|
|   1738500|       velvet clouds|        2021|Velvet Clouds - a...|videogame|
+----------+--------------------+------------+--------------------+---------+
only showing top 5 rows

root
 |-- content_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- description: string (nullable = true)
 |-- type: string (nullable = true)



In [58]:
store_processed_parquet(VIDEOGAME_USERS_PARQUET_PATH, prefix='videogame')

1it [00:00, 10.85it/s]


In [59]:
store_processed_parquet(VIDEOGAME_CONTENT_PARQUET_PATH, prefix='videogame')

1it [00:00,  1.23it/s]


# Merging all content

In [60]:
merged_users = (
    boardgame_users
    .union(movie_users.select(['user_id', 'type', 'content_id', 'rating', 'rating_date']))
    .union(anime_users.select(['user_id', 'type', 'content_id', 'rating', 'rating_date']))
    .union(videogame_users.select(['user_id', 'type', 'content_id', 'rating', 'rating_date']))
    .withColumn('user_id', trim(lower(col('user_id'))))
    .withColumn('type', lower(col('type')))
)

merged_users.printSchema()
merged_users.show()

root
 |-- user_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- content_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- rating_date: string (nullable = true)

+-----------+---------+----------+------+-------------------+
|    user_id|     type|content_id|rating|        rating_date|
+-----------+---------+----------+------+-------------------+
|zefquaavius|boardgame|    322232|   6.0|2023-08-01 14:52:32|
|zefquaavius|boardgame|    296402|   8.0|2023-08-02 14:18:24|
|zefquaavius|boardgame|    336537|  null|2023-08-02 14:18:38|
|zefquaavius|boardgame|    314445|  null|2023-08-02 14:18:54|
|zefquaavius|boardgame|    296404|  null|2023-08-02 14:19:11|
|zefquaavius|boardgame|    296406|  null|2023-08-01 14:56:42|
|zefquaavius|boardgame|    322429|  null|2023-08-02 14:19:32|
|zefquaavius|boardgame|    309782|   6.0|2023-08-01 14:57:01|
|zefquaavius|boardgame|    314446|  null|2023-08-02 14:19:57|
|zefquaavius|boardgame|    296407|  null|2023-08-01 

In [61]:
limit_rows = 5000
merged_content = (
    boardgame_content.limit(limit_rows)
    .unionByName(movie_content.limit(limit_rows).select(['content_id', 'description', 'release_year', 'type','title']))
    .unionByName(anime_content.limit(limit_rows).select(['content_id', 'description', 'release_year', 'type','title']))
    .unionByName(videogame_content.limit(limit_rows).select(['content_id', 'description', 'release_year', 'type','title']))
)

merged_content.printSchema()
merged_content.show()

root
 |-- content_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- title: string (nullable = true)
 |-- type: string (nullable = true)

+----------+--------------------+------------+--------------------+---------+
|content_id|         description|release_year|               title|     type|
+----------+--------------------+------------+--------------------+---------+
|    189314|Set of seven prom...|        2015|Fallen: Outlands ...|boardgame|
|    157661|Grifters is a han...|        2015|            Grifters|boardgame|
|    174391|Exposed is a quic...|        2016|             Exposed|boardgame|
|    168054|Alone is a sci-fi...|        2019|               Alone|boardgame|
|    191932|From the official...|        2012|Soccero (Second E...|boardgame|
|    181495|The theme of the ...|        2015|Yo Fui a EGB: El ...|boardgame|
|    131581|A quick trivia ga...|        2014|                Slap|boardgame|
|    210179|

In [62]:
(
    merged_users
    .select('type')
    .groupBy(col('type'))
    .count()
    .sort(col('count'), ascending=False)
    .show()
)

+---------+------+
|     type| count|
+---------+------+
|    anime|388227|
|videogame|289674|
|boardgame|258496|
|    movie|  1164|
+---------+------+



In [63]:
(
    merged_content
    .select('type')
    .groupBy(col('type'))
    .count()
    .sort(col('count'), ascending=False)
    .show()
)

+---------+-----+
|     type|count|
+---------+-----+
|boardgame| 5000|
|    movie| 5000|
|    anime| 5000|
|videogame| 5000|
+---------+-----+



In [64]:
merged_users.groupBy(col('user_id')).count().sort(col('count'), ascending=False).show()

+------------------+-----+
|           user_id|count|
+------------------+-----+
|       zefquaavius| 5547|
|      akapastorguy| 4636|
|   piston smashed™| 4615|
|        slashbunny| 3447|
|          naarnold| 3212|
|       invader_bzz| 3190|
|             tydel| 3171|
|         adrimetum| 2959|
|          doccabet| 2840|
|            huffa2| 2434|
|              muyf| 2372|
|           domi123| 2243|
|       donnie lama| 2224|
|          fitadine| 2158|
|kreikkaturkulainen| 2004|
|         doomfarer| 1994|
|            d0gb0t| 1974|
|    saxophonechapa| 1951|
|            landru| 1922|
|     thechrisglass| 1919|
+------------------+-----+
only showing top 20 rows



In [65]:
(
    merged_users
    .select('user_id', 'type')
    .withColumn('user_id', trim(regexp_replace(lower(col('user_id')), '[^a-zA-Z0-9]', '')))
    .distinct()
    .groupBy(col('user_id'))
    .count()
    .sort(col('count'), ascending=False)
    .show()
)

+---------------+-----+
|        user_id|count|
+---------------+-----+
|            rob|    2|
|               |    2|
|   luckyoneputt|    1|
|      eagle1207|    1|
|nicaustinqueerq|    1|
|      astronoud|    1|
|         sqicky|    1|
| spacebutterfly|    1|
|         r4yn3x|    1|
|    soxrivotril|    1|
|     thetheredk|    1|
|       satertek|    1|
|        hukaers|    1|
|    monkayylmao|    1|
|            gua|    1|
|       jettison|    1|
|    simonknight|    1|
|           holz|    1|
|         buddah|    1|
|     scarlsberg|    1|
+---------------+-----+
only showing top 20 rows



In [66]:
(
    merged_users
    .filter(merged_users['user_id'] == 'daimyo')
    .groupBy(col('type'))
    .count()
    .show()
)

+---------+-----+
|     type|count|
+---------+-----+
|videogame|  251|
+---------+-----+



# Yake

## RDD Way
It took 2 min and 7 seconds

In [67]:
# df = merged_content
# rddK = df.rdd.map(lambda x: (x['content_id'], get_kw(x['description'])))
# rddK = spark.createDataFrame(rddK).select(col('_1').alias('content_id'), col('_2').alias('keyword'))
# dfK = (rddK.withColumn("keyword_1", expr("keyword[0]"))
#                 .withColumn("keyword_2", expr("keyword[1]"))
#                 .withColumn("keyword_3", expr("keyword[2]"))
#                 .withColumn("keyword_4", expr("keyword[3]"))
#                 .withColumn("keyword_5", expr("keyword[4]"))
#                 .select('content_id','keyword_1','keyword_2','keyword_3','keyword_4','keyword_5' )
#       )
# dfK.show()

In [68]:
# %%time
# dfK.write.mode('overwrite').parquet('./foo/keywords_full_rdd')

- [ ] connect directly spark to neo4j (using the right connector)
- [ ] maybe provide some analytics about the users' profile

In [69]:
# Check what the RS does for the NULL values.
# - We could impute something, like the average score the user gives.

## Dataframe UDF way
It took 2 min and 5 s

In [70]:
# %%time
# merged_content.withColumn('yake',get_kw(col('description')))\
# .select(
#     expr("yake[0]").alias('keyword_0'),
#     expr("yake[1]").alias('keyword_1'),
#     expr("yake[2]").alias('keyword_2'),
#     expr("yake[3]").alias('keyword_3'),
#     expr("yake[4]").alias('keyword_4'),
# )\
# .show()
# .write.mode('overwrite').parquet('./foo/keywords_full_not_exploded')

## Explode way 
because it is easier to upload in neo4j
Takes a lot longer, 3 min 53 seconds.

In [71]:
%%time
def get_kw(text):
   kw_extractor = yake.KeywordExtractor(
       lan='en',
       n=1,  # Max n-gram size
       top=15  # Number of keywords
   )
    
   return list(map(lambda x: str.lower(x[0]) if x else '', kw_extractor.extract_keywords(text)))

get_kw = udf(get_kw, ArrayType(StringType()))

# merged_content.withColumn('yake',get_kw(col('description')))\
# .select(
#     explode(col('yake')).alias('keywords'),
#     'description',
#     'release_year',
#     'type',
#     'title',
#     'content_id'
# )#.write.mode('overwrite').parquet('./foo/keywords_full_2')

CPU times: user 358 µs, sys: 52 µs, total: 410 µs
Wall time: 976 µs


# Neo4j

In [85]:
# Utility code to drop the Neo4j database
# gds.run_cypher('MATCH (n) DETACH DELETE n;')
# gds.graph.drop('embedding-projection')

graphName                                             embedding-projection
database                                                             neo4j
databaseLocation                                                     local
memoryUsage                                                               
sizeInBytes                                                             -1
nodeCount                                                            67626
relationshipCount                                                   590278
configuration            {'relationshipProjection': {'like': {'aggregat...
density                                                           0.000129
creationTime                           2024-06-10T08:31:07.568375238+00:00
modificationTime                       2024-06-10T08:31:08.932890105+00:00
schema                   {'graphProperties': {}, 'nodes': {'content': {...
schemaWithOrientation    {'graphProperties': {}, 'nodes': {'content': {...
Name: 0, dtype: object

In [73]:
NEO4J_URL = 'bolt://neo4j:7687'

In [74]:
gds = GraphDataScience("neo4j://neo4j",aura_ds=False)

## Uploads Keywords

In [75]:
gds.run_cypher('CREATE INDEX content_id IF NOT EXISTS FOR (n:content) ON (n.content_id)')
gds.run_cypher('CREATE INDEX keywords IF NOT EXISTS FOR (n:keyword) ON (n.keyword)')
gds.run_cypher('CREATE INDEX users_id IF NOT EXISTS FOR (n:users) ON (n.users_id)')
gds.run_cypher('CREATE INDEX content_type IF NOT EXISTS FOR (n:content) ON (n.type)')

In [76]:
%%time
kw_df = (
    merged_content
    .withColumn('yake',get_kw(col('description')))
    .select(
        explode(col('yake')).alias('keywords'),
        'description',
        'release_year',
        'type',
        'title',
        'content_id'
    )
)

for kind in CONTENTS:
    (
        kw_df.filter(col('type') == kind).write
        # // Create new relationships
        .mode('Append')
        .format("org.neo4j.spark.DataSource")
        # // Assign a type to the relationships
        .option("relationship", "has_keyword")
        # // Use `keys` strategy
        .option("relationship.save.strategy", "keys")
        # // Overwrite source nodes and assign them a label
        .option("relationship.source.save.mode", "Overwrite")
        # .option("relationship.source.labels", f":{kind}")
        .option("relationship.source.labels", ':content')
        # // Map the DataFrame columns to node properties
        .option("relationship.source.node.properties", "title,content_id,type,description")
        # // Node keys are mandatory for overwrite save mode
        .option("relationship.source.node.keys", "content_id,type")
        # // Overwrite target nodes and assign them a label
        .option("relationship.target.save.mode", "Overwrite")
        .option("relationship.target.labels", ":keyword")
        # // Map the DataFrame columns to node properties
        .option("relationship.target.node.properties", "keywords")
        # // Node keys are mandatory for overwrite save mode
        .option("relationship.target.node.keys", "keywords")
        # // Map the DataFrame columns to relationship properties
        # .option("relationship.properties", "quantity,order")
        .option("url", NEO4J_URL)
        .save()
    )

CPU times: user 294 ms, sys: 131 ms, total: 425 ms
Wall time: 43min 24s


## Upload users

In [77]:
filter_users = merged_users\
.join(
    merged_content, 
    (merged_content.type == merged_users.type) & (merged_content.content_id == merged_users.content_id), 
    how='right'
).select(['user_id',merged_users['type'],merged_users['content_id'],'rating'])

In [83]:
%%time
# TODO: Fix the "double run" bug
for kind in CONTENTS:
    for r_type in ['like', 'dislike']:
        if r_type == 'like':
            df = filter_users.filter(col('rating') >= 8)
        else :
            df = filter_users.filter((col('rating') >= 1) & (col('rating')<8))
        (
        df.filter(col('type') == kind).write
          # // Create new relationships
          .mode('Append')
          .format("org.neo4j.spark.DataSource")
          # // Assign a type to the relationships
          .option("relationship", r_type)
          # // Use `keys` strategy
          .option("relationship.save.strategy", "keys")
          # // Overwrite source nodes and assign them a label
          .option("relationship.source.save.mode", "Overwrite")
          # .option("relationship.source.labels", f":{kind}")
          .option("relationship.source.labels", ':content')
          # // Map the DataFrame columns to node properties
          # .option("relationship.source.node.properties", "content_id,type")
          # // Node keys are mandatory for overwrite save mode
          .option("relationship.source.node.keys", "content_id,type")
          # // Overwrite target nodes and assign them a label
          .option("relationship.target.save.mode", "Overwrite")
          .option("relationship.target.labels", ":users")
          # // Map the DataFrame columns to node properties
          # .option("relationship.target.node.properties", "user_id,type")
          # // Node keys are mandatory for overwrite save mode
          .option("relationship.target.node.keys", "user_id,type")
          # // Map the DataFrame columns to relationship properties
          # .option("relationship.properties", "rating_date")
          .option("url", NEO4J_URL)
          .save()
        )

CPU times: user 41.6 ms, sys: 25.7 ms, total: 67.3 ms
Wall time: 3min 55s


## Neo4j Data Science

In [86]:
# Project the graph embedding
g0, res = gds.graph.project(
    'embedding-projection', 
    ['content', 'users','keyword' ],
    {
        'like':{'orientation':'UNDIRECTED'},
        # 'dislike':{'orientation':'UNDIRECTED'},
        'has_keyword':{'orientation':'UNDIRECTED'},
    },
)
res

nodeProjection            {'content': {'label': 'content', 'properties':...
relationshipProjection    {'like': {'aggregation': 'DEFAULT', 'orientati...
graphName                                              embedding-projection
nodeCount                                                             71089
relationshipCount                                                    840430
projectMillis                                                           109
Name: 0, dtype: object

In [87]:
%%time
# gds.beta.graphSage.mutate(g0, mutateProperty='embedding', embeddingDimension=50, randomSeed=7474, relationshipWeightProperty='weight');
# gds.beta.graphSage.mutate(g0, mutateProperty='embedding',modelName='exampleTrainModel');
gds.fastRP.mutate(g0, mutateProperty='embedding', embeddingDimension=256, randomSeed=7474);

CPU times: user 662 µs, sys: 3.45 ms, total: 4.11 ms
Wall time: 170 ms


In [88]:
gds.graph.writeNodeProperties(g0, ["embedding"], ["content","users"])

writeMillis                           588
graphName            embedding-projection
nodeProperties                [embedding]
propertiesWritten                   23575
Name: 0, dtype: object

## Centrality

In [89]:
%%time
# ArticleRank is a variant of the Page Rank algorithm, which measures the transitive influence of nodes.
gds.run_cypher('''
    CALL gds.articleRank.mutate('embedding-projection', {
      mutateProperty: 'centrality'
    })
    YIELD nodePropertiesWritten, ranIterations
    '''
)

gds.graph.writeNodeProperties(g0, ["centrality"], ["content"])

CPU times: user 2.4 ms, sys: 4.73 ms, total: 7.13 ms
Wall time: 409 ms


writeMillis                            66
graphName            embedding-projection
nodeProperties               [centrality]
propertiesWritten                   19880
Name: 0, dtype: object

## Page Rank

In [90]:
%%time
gds.run_cypher('''
CALL gds.pageRank.mutate('embedding-projection', {
  maxIterations: 20,
  dampingFactor: 0.85,
  mutateProperty: 'pagerank'
})
YIELD nodePropertiesWritten, ranIterations
''')
gds.graph.writeNodeProperties(g0, ["pagerank"], ["content"])

CPU times: user 5.67 ms, sys: 198 µs, total: 5.86 ms
Wall time: 285 ms


writeMillis                            61
graphName            embedding-projection
nodeProperties                 [pagerank]
propertiesWritten                   19880
Name: 0, dtype: object

In [None]:
def get_recommendation_raw(content_id, origin_kind, destination_kind, limit):
    result = gds.run_cypher( '''
    MATCH (c1:content {content_id:$CONTENT_ID, type:$ORIGIN_KIND})
    MATCH (c2:content {type:$DESTINATION_KIND})
    WHERE (id(c1) <> id(c2))
    WITH c1, c2, gds.similarity.cosine(c1.embedding, c2.embedding) AS cosineSimilarity
    RETURN c2.type,c2.title,c2.content_id, cosineSimilarity
    order by cosineSimilarity DESC
    limit $LIMIT
        ''', params={
        'CONTENT_ID': content_id, 
        'ORIGIN_KIND':origin_kind, 
        'DESTINATION_KIND':destination_kind,
        'LIMIT':limit
        })
    result = result.dropna()
    return [tuple(r)[:3] for r in result.values]

def get_id_from_text(text, kind):
    result = gds.run_cypher( '''
        MATCH (n:content)
        WHERE toLower(n.title) CONTAINS toLower($text)
        RETURN n.content_id, n.type
        ''', params={'text': text}
    )

    result = result.dropna()
    return result.values[0]


def get_recommendation(text, kind):
    idd, origin_kind = get_id_from_text(text, kind)
    result = []
    for destination_kind in CONTENTS:
        result += get_recommendation_raw(idd, origin_kind, destination_kind, 8)
    return result


def get_recommendation_for(content_name, kind):
    """
    This function controls the recommendations being used for a given content.
    It provides a content_name and kind.
    """
    return get_recommendation(content_name, kind)[:8]


get_recommendation_for('berserk', 'anime')

In [None]:
merged_content.filter(merged_content['type'] == 'anime').orderBy('title').show(500)

In [None]:
foo = gds.run_cypher('MATCH(n:content) RETURN n.content_id, n.embedding LIMIT 1000')

In [None]:
# Sanity check, embeddings should be different

# import seaborn as sns
foo['sum'] = foo['n.embedding'].apply(np.sum)
foo['sum'].value_counts()

In [None]:
def get_recommendation_raw(content_id, origin_kind, destination_kind, limit):
    '''
        given the origin content_id and origin type
        give me all the results 
    '''
    result = gds.run_cypher( '''
    MATCH (c1:content {content_id:$CONTENT_ID, type:$ORIGIN_KIND})
    MATCH (c2:content {type:$DESTINATION_KIND})
    WHERE (id(c1) <> id(c2))
    WITH c1, c2, gds.similarity.cosine(c1.embedding, c2.embedding) AS cosineSimilarity
    RETURN c2.type,c2.title,c2.content_id, cosineSimilarity
    order by cosineSimilarity DESC
    limit $LIMIT
        ''', params={
        'CONTENT_ID': content_id, 
        'ORIGIN_KIND':origin_kind, 
        'DESTINATION_KIND':destination_kind,
        'LIMIT':limit
        })
    result = result.dropna()
    return [tuple(r)[:3] for r in result.values]

content_id = '34096'
origin_kind = 'anime'
destination_kind = 'videogame'
get_recommendation_raw(content_id, origin_kind, destination_kind, 10)

In [None]:
def get_id_from_text(text):
    result = gds.run_cypher( '''
    MATCH (n:content)
    WHERE toLower(n.title) CONTAINS toLower($text)
    RETURN n.content_id, n.type
        ''', params={'text': text})
    result = result.dropna()
    return result.values[0]

def get_recommendation(text):
    id, origin_kind = get_id_from_text(text)
    result = []
    for destination_kind in CONTENTS:
        result += get_recommendation_raw(id, origin_kind, destination_kind, 8)
    return result

get_recommendation('berserk')