***Loading Data***

In [None]:
from math import isclose
import dask.array as da
import dask.bag as db
import urllib.request
import numpy as np

urls = ['https://datarepo.eng.ucsd.edu/mcauley_group/data/steam/australian_user_reviews.json.gz',
     'https://datarepo.eng.ucsd.edu/mcauley_group/data/steam/australian_users_items.json.gz',
     'https://cseweb.ucsd.edu/~wckang/steam_games.json.gz']

filenames = ["australian_user_reviews.json.gz",
        "australian_users_items.json.gz",
        "steam_games.json.gz"]

text_files = {}
for url,filename in zip(urls, filenames):
  urllib.request.urlretrieve(url, filename)
  text_file = db.read_text(filename)
  print(f"{filename} loaded with {text_file.count().compute()} lines")

  text_files[filename] = text_file


for filename, text_file in text_files.items():
  print(f"Processing data for {filename}")
  # Example: Print the first few lines
  print(text_file.take(1))
  print()

australian_user_reviews.json.gz loaded with 25799 lines
australian_users_items.json.gz loaded with 88310 lines
steam_games.json.gz loaded with 32135 lines
Processing data for australian_user_reviews.json.gz
('{\'user_id\': \'76561197970982479\', \'user_url\': \'http://steamcommunity.com/profiles/76561197970982479\', \'reviews\': [{\'funny\': \'\', \'posted\': \'Posted November 5, 2011.\', \'last_edited\': \'\', \'item_id\': \'1250\', \'helpful\': \'No ratings yet\', \'recommend\': True, \'review\': \'Simple yet with great replayability. In my opinion does "zombie" hordes and team work better than left 4 dead plus has a global leveling system. Alot of down to earth "zombie" splattering fun for the whole family. Amazed this sort of FPS is so rare.\'}, {\'funny\': \'\', \'posted\': \'Posted July 15, 2011.\', \'last_edited\': \'\', \'item_id\': \'22200\', \'helpful\': \'No ratings yet\', \'recommend\': True, \'review\': "It\'s unique and worth a playthrough."}, {\'funny\': \'\', \'posted\'

***Filter the information needed for each bag***

In [None]:
import json
import re

#converts line to json string
def convert_to_json(data):
  python_dict = eval(data)
  json_string = json.dumps(python_dict)
  return json_string

#Userdata：extracts 'steam_id', 'items' data
def get_user_items(record):
  record = convert_to_json(record)
  data = json.loads(record)

  steam_id = data['steam_id']
  items = data['items']
  result = {
      'steam_id': steam_id,
      'items_count': len(items),
      'items': [{'item_id': item['item_id'], 'item_name': item['item_name'], 'playtime_forever': item['playtime_forever']} for item in items]
  }
  return result


def get_reviews(record):
  record = convert_to_json(record)
  data = json.loads(record)

  user_id = data['user_id']
  reviews = data['reviews']
  result = {
      'user_id': user_id,
      'reviews': [{'game_id': review['item_id'], 'recommend': review['recommend']} for review in reviews]
  }
  return result

#GameDetail：extracts 'game_name', 'game_id', 'tags', 'specs' data
def get_game_details(record):
  record = convert_to_json(record)
  data = json.loads(record)

  try:
    game_name = data['app_name'] if 'app_name' in data else data['title']
    game_id = data['id']
    tags = data['tags'] if 'tags' in data else []
    specs = data['specs'] if 'specs' in data else []
    result = {
      'game_name': game_name,
      'game_id': game_id,
      'tags': tags,
      'specs': specs
    }
    return result
  except Exception as e:
    print(f'Processing {data}')
    print(e)




# gameDetails = get_game_details(text_files['steam_games.json.gz'])
userItems = text_files['australian_users_items.json.gz'].map(lambda x: get_user_items(x))
gameDetails = text_files['steam_games.json.gz'].map(lambda x: get_game_details(x))
gameDetails = gameDetails.filter(lambda x:x is not None)
reviews = text_files['australian_user_reviews.json.gz'].map(lambda x: get_reviews(x))
#gameDetails.compute()
print('userItems example: ')
print(userItems.take(1))
print('gameDetails example: ')
print(gameDetails.take(1))
print('reviews example: ')
print(reviews.take(1))



userItems example: 
({'steam_id': '76561197970982479', 'items_count': 277, 'items': [{'item_id': '10', 'item_name': 'Counter-Strike', 'playtime_forever': 6}, {'item_id': '20', 'item_name': 'Team Fortress Classic', 'playtime_forever': 0}, {'item_id': '30', 'item_name': 'Day of Defeat', 'playtime_forever': 7}, {'item_id': '40', 'item_name': 'Deathmatch Classic', 'playtime_forever': 0}, {'item_id': '50', 'item_name': 'Half-Life: Opposing Force', 'playtime_forever': 0}, {'item_id': '60', 'item_name': 'Ricochet', 'playtime_forever': 0}, {'item_id': '70', 'item_name': 'Half-Life', 'playtime_forever': 0}, {'item_id': '130', 'item_name': 'Half-Life: Blue Shift', 'playtime_forever': 0}, {'item_id': '300', 'item_name': 'Day of Defeat: Source', 'playtime_forever': 4733}, {'item_id': '240', 'item_name': 'Counter-Strike: Source', 'playtime_forever': 1853}, {'item_id': '3830', 'item_name': 'Psychonauts', 'playtime_forever': 333}, {'item_id': '2630', 'item_name': 'Call of Duty 2', 'playtime_forever':

***Top20games***

In [None]:
#Get the top 20 games with the highest playtime for each user
def get_user_top20games(data):
  steam_id = data['steam_id']
  items = data['items']

  top20_games = sorted(items, key=lambda x: x['playtime_forever'], reverse=True)[:20]
  items = top20_games
  result = {
      'steam_id': steam_id,
      'items_count': len(items),
      'items': [{'game_id': item['item_id'], 'item_name': item['item_name'], 'playtime_forever': item['playtime_forever']} for item in items]
  }
  return result

userItemsTop20 = userItems.map(lambda x: get_user_top20games(x))
print('userItemsTop20 example: ')
print(userItemsTop20.take(1))
print()

userItemsTop20 example: 
({'steam_id': '76561197970982479', 'items_count': 20, 'items': [{'game_id': '730', 'item_name': 'Counter-Strike: Global Offensive', 'playtime_forever': 23532}, {'game_id': '35450', 'item_name': 'Rising Storm/Red Orchestra 2 Multiplayer', 'playtime_forever': 14194}, {'game_id': '8930', 'item_name': "Sid Meier's Civilization V", 'playtime_forever': 10345}, {'game_id': '1250', 'item_name': 'Killing Floor', 'playtime_forever': 10006}, {'game_id': '232090', 'item_name': 'Killing Floor 2', 'playtime_forever': 6494}, {'game_id': '24960', 'item_name': 'Battlefield: Bad Company 2', 'playtime_forever': 5716}, {'game_id': '24980', 'item_name': 'Mass Effect 2', 'playtime_forever': 5001}, {'game_id': '300', 'item_name': 'Day of Defeat: Source', 'playtime_forever': 4733}, {'game_id': '17450', 'item_name': 'Dragon Age: Origins', 'playtime_forever': 4431}, {'game_id': '3590', 'item_name': 'Plants vs. Zombies: Game of the Year', 'playtime_forever': 4413}, {'game_id': '200510', 

 ***A Prior Algorithm***

In [None]:
#Map each line into a list of all pairs of games in the user's inventory
def get_pairs(basket, frequent_items):
  basket = set(basket) & frequent_items
  pairs = []
  basketList = list(basket)
  for i in range(len(basket)):
    for j in range(i + 1, len(basket)):
      pairs.append(tuple(sorted((basketList[i], basketList[j]))))
  return pairs

#Get pair counts - A Prior Algorithm
def a_priori(data, support=10):
  itemCount = data.flatten().frequencies()
  itemFrequencies = itemCount.compute()
  frequentItems = set(item for item, count in itemFrequencies if count >= support)

  pairCounts = data.map(lambda basket: get_pairs(basket, frequentItems)) \
                  .flatten() \
                  .frequencies() \
                  .filter(lambda x: x[1] >= support)
  return itemCount, pairCounts, itemFrequencies



itemsFrequent = userItemsTop20.map(lambda x: [item['item_name'] for item in x['items']])
print("First User's game")
print(itemsFrequent.take(1))
resIndiv, resPairs, resIndivCompute = a_priori(itemsFrequent)

#print(res_pairs.compute())
print()
print('item count example:')
for i in resIndivCompute[:10]:
  print(i)
#print(resIndivCompute)

First User's game
(['Counter-Strike: Global Offensive', 'Rising Storm/Red Orchestra 2 Multiplayer', "Sid Meier's Civilization V", 'Killing Floor', 'Killing Floor 2', 'Battlefield: Bad Company 2', 'Mass Effect 2', 'Day of Defeat: Source', 'Dragon Age: Origins', 'Plants vs. Zombies: Game of the Year', 'XCOM: Enemy Unknown', 'Just Cause 2', 'Borderlands', 'Insurgency', 'FINAL FANTASY VII', 'Dirty Bomb', 'Deus Ex: Game of the Year Edition', 'The Elder Scrolls V: Skyrim', 'Call of Duty: Black Ops - Multiplayer', "Deus Ex: Human Revolution - Director's Cut"],)

item count example:
('Counter-Strike: Global Offensive', 38698)
('Rising Storm/Red Orchestra 2 Multiplayer', 2299)
("Sid Meier's Civilization V", 10651)
('Killing Floor', 7257)
('Killing Floor 2', 1735)
('Battlefield: Bad Company 2', 2454)
('Mass Effect 2', 1113)
('Day of Defeat: Source', 1529)
('Dragon Age: Origins', 489)
('Plants vs. Zombies: Game of the Year', 896)


***Confidence***

In [None]:
# conf_I_j = sup(I U j) / sup(I)
# sup(I) = num. baskets I is in / n


#Get confidence score for all pairs
def confidence(itemCounts, pairCounts, n):
  supItem = itemCounts.map(lambda x:(x[0], x[1]/n))
  sup_ij = pairCounts.map(lambda x:(x[0][0], (x[0][1], x[1]/n)))
  sup_ji = pairCounts.map(lambda x:(x[0][1], (x[0][0], x[1]/n)))
  sup = db.concat([sup_ij, sup_ji])
  all = sup.join(supItem,lambda x: x[0])
  confidences = all.map(lambda x: ((x[1][0], x[1][1][0]), x[1][1][1] / x[0][1]))
  return confidences

#australian_users_items.json.gz loaded with 88310 lines
N = 88310

ruleConfidences = confidence(resIndiv, resPairs, N).compute()
print('Confidences')
print(ruleConfidences)



---


***Cosine similarity***

---

In [None]:
gamesDict = {gameName[0]:index for index, gameName in enumerate(resIndivCompute)}
print('gamesDict: ')
print(gamesDict)
totalNumGames = len(gamesDict)

gamesList = [(index, gameName[0]) for index, gameName in enumerate(resIndivCompute)]
gamesBag = db.from_sequence(gamesList)
print('gamesBag: ')
print(gamesBag.take(1))

gamesDict: 
{'Counter-Strike: Global Offensive': 0, 'Rising Storm/Red Orchestra 2 Multiplayer': 1, "Sid Meier's Civilization V": 2, 'Killing Floor': 3, 'Killing Floor 2': 4, 'Battlefield: Bad Company 2': 5, 'Mass Effect 2': 6, 'Day of Defeat: Source': 7, 'Dragon Age: Origins': 8, 'Plants vs. Zombies: Game of the Year': 9, 'XCOM: Enemy Unknown': 10, 'Just Cause 2': 11, 'Borderlands': 12, 'Insurgency': 13, 'FINAL FANTASY VII': 14, 'Dirty Bomb': 15, 'Deus Ex: Game of the Year Edition': 16, 'The Elder Scrolls V: Skyrim': 17, 'Call of Duty: Black Ops - Multiplayer': 18, "Deus Ex: Human Revolution - Director's Cut": 19, 'Terraria': 20, 'Saints Row: The Third': 21, 'Portal 2': 22, 'Deus Ex: Human Revolution': 23, "Assassin's Creed IV Black Flag": 24, "Assassin's Creed Unity": 25, 'Saints Row IV': 26, 'Tropico 4': 27, 'Batman™: Arkham Knight': 28, 'Far Cry® 3': 29, 'Middle-earth: Shadow of Mordor': 30, "Assassin's Creed II": 31, 'Batman™: Arkham Origins': 32, 'The Witcher 2: Assassins of Kings

In [None]:
# Create game matrix
def create_game_matrix(id, items):
  row = np.zeros(totalNumGames)
  for item in items:
    itemName = item['item_name']
    gameIndex = gamesDict[itemName]
    row[gameIndex] = 1

  return (int(id), row)

userGamesMatrix = userItemsTop20.map(lambda user : create_game_matrix(user['steam_id'],user['items']))
print('userGamesMatrix example:')
print(userGamesMatrix.take(2))

userGamesMatrix example:
((76561197970982479, array([1., 1., 1., ..., 0., 0., 0.])), (76561198035864385, array([0., 0., 0., ..., 0., 0., 0.])))


In [None]:
# Computes cosine similarity between two numpy arrays
def cosine_similarity(u, v):
 # YOUR CODE HERE
  dot_product = np.dot(u, v)
  norm_u = np.linalg.norm(u)
  norm_v = np.linalg.norm(v)
  if norm_u == 0 or norm_v == 0:
    return 0.0
  similarity = dot_product / (norm_u * norm_v)
  return similarity

In [None]:
# def search_user_matrix(id, matrix):
#   res = matrix.filter(lambda x: x[0] in id)
#   return res
# forUserId = [76561197970982479, 76561198035864385]
# testUser = search_user_matrix(forUserId, userGamesMatrix)

# Test first two user
test_user = db.from_sequence(userGamesMatrix.take(2))
print('test_user: ')
print(test_user.compute())

test_user: 
[(76561197970982479, array([1., 1., 1., ..., 0., 0., 0.])), (76561198035864385, array([0., 0., 0., ..., 0., 0., 0.]))]


***Apply cosine similarity to search recommend games***

In [None]:
# Find three nearest neighbors and return the recommended game name
def cs_recommend_games(test_user, userGamesMatrix, gamesBag):
  similarUsers = userGamesMatrix.map(lambda x : (x[0], cosine_similarity(x[1], test_user), x[1])).topk(4, key=1).compute()
  similarUserIds = [(user_id, matrix) for user_id, _, matrix in similarUsers[1:]]
  similarGames = [np.where(shows > 0)[0] for _, shows in similarUserIds]

  commonIndices = set(similarGames[0])
  for games in similarGames[1:]:
    commonIndices = commonIndices.union(set(games))
  testGamesIndices = np.where(test_user > 0)[0]
  recommendedIndices = commonIndices - set(testGamesIndices)
  recommendGames = gamesBag.filter(lambda x: (x[0]) in recommendedIndices).map(lambda x: x[1]).compute()
  return recommendGames

recommendGames = test_user.map(lambda x: (x[0], cs_recommend_games(x[1], userGamesMatrix, gamesBag))).compute()
print('Cosine similarity recommend games: ')
print(recommendGames)


Cosine similarity recommend games: 
[(76561197970982479, ['Terraria', 'Fallout: New Vegas', 'Fallout 4', 'Borderlands 2', 'XCOM 2', 'Grand Theft Auto IV', 'The Witcher 3: Wild Hunt', 'Counter-Strike: Source', 'PAYDAY 2', 'Call of Duty: Black Ops', 'Warframe', 'Left 4 Dead', 'Rocket League', 'Call of Duty: Modern Warfare 2 - Multiplayer', "Garry's Mod", 'Borderlands: The Pre-Sequel', 'Dead Island', 'Arma 2: Operation Arrowhead', 'Dead Island Riptide', 'Arma 3', 'DayZ', 'War of the Roses', 'Call of Duty: Modern Warfare 3 - Multiplayer', 'Mass Effect', "Tom Clancy's The Division", 'LEGO® The Lord of the Rings™']), (76561198035864385, ['Counter-Strike: Global Offensive', "Sid Meier's Civilization V", 'Just Cause 2', 'Borderlands 2', 'FTL: Faster Than Light', 'BioShock Infinite', 'Portal', 'Batman: Arkham City™', 'Call of Duty: Black Ops III', 'PAYDAY 2', 'Grand Theft Auto V', 'ARK: Survival Evolved', 'The Binding of Isaac', 'Dark Souls: Prepare to Die Edition', 'DARK SOULS™ II: Scholar of 

***Get tags for all games and create tags Matrix***

In [None]:
#{'game_name': 'Lost Summoner Kitty', 'game_id': '761140', 'tags': ['Strategy', 'Action', 'Indie', 'Casual', 'Simulation'], 'specs': ['Single-player']}
# Get all tags
collectTags = gameDetails.map(lambda x: x['tags'] + x['specs'] if x is not None else []).flatten().distinct().compute()
print(collectTags)

gamesTagsDict = {gameTag:index for index, gameTag in enumerate(collectTags)}
print(gamesTagsDict)
total_num_tags = len(gamesTagsDict)
print(total_num_tags)

# Create the game tags matrix
def createRecommendMatrix(gameTags):
  res = []
  for id, tags in gameTags:
    gamesTagsMatrix = np.zeros(total_num_tags)
    for tag in tags:
      gameIndex = gamesTagsDict[tag]
      gamesTagsMatrix[gameIndex] = 1
    res.append((id, gamesTagsMatrix))
  return res

# Get games tags from recommended games name
def getGamesTags(games):
  res = []
  for recommendGames in games:
    tags = gameDetails.map(lambda x: x['tags'] + x['specs'] if x['game_name'] in recommendGames[1] else []).flatten().distinct().compute()
    steamId = recommendGames[0]
    res.append((steamId, tags))
  return res

csRecommendGamesTags = getGamesTags(recommendGames)
print()
print(csRecommendGamesTags)
csTagsMatrix = createRecommendMatrix(csRecommendGamesTags)
print()
print('cosine_similarity TagsMatrix: ')
print(csTagsMatrix)

['Strategy', 'Action', 'Indie', 'Casual', 'Simulation', 'Single-player', 'Free to Play', 'RPG', 'Card Game', 'Trading Card Game', 'Turn-Based', 'Fantasy', 'Tactical', 'Dark Fantasy', 'Board Game', 'PvP', '2D', 'Competitive', 'Replay Value', 'Character Customization', 'Female Protagonist', 'Difficult', 'Design & Illustration', 'Multi-player', 'Online Multi-Player', 'Cross-Platform Multiplayer', 'Steam Achievements', 'Steam Trading Cards', 'In-App Purchases', 'Sports', 'Multiplayer', 'Stats', 'Adventure', 'Full controller support', 'HTC Vive', 'Oculus Rift', 'Tracked Motion Controllers', 'Room-Scale', 'FPS', 'Shooter', 'Third-Person Shooter', 'Sniper', 'Third Person', 'Downloadable Content', 'Steam Cloud', 'Steam Leaderboards', 'Racing', 'Partial Controller Support', 'Early Access', 'Survival', 'Pixel Graphics', 'Cute', 'Physics', 'Science', 'VR', 'Seated', 'Standing', 'Local Co-op', 'Shared/Split Screen', 'Tutorial', 'Classic', 'Gore', "1990's", 'Singleplayer', 'Sci-fi', 'Aliens', 'Firs

***Apply a prior to search recommend games***

In [None]:
##======Search recommend game(Prior)=================================================================
# Get game name from userItemsTop20 dask bag
def get_games(data):
  items = data['items']
  result = [item['item_name'] for item in items]
  return result

user1_2 = db.from_sequence(userItemsTop20.take(2))
user1_2 = user1_2.map(lambda x: (int(x['steam_id']), get_games(x)))
print('test for two user:')
print(user1_2.compute())
print(ruleConfidences)

# Return the recommended games by the a prior algorithm
def priorRecomm(userInventory, ruleConfidences):
  #res = [info for info in ruleConfidences if (info[0][0] or info[0][1] in userInventory) and (info[1]<0.99)]
  res = []
  for info in ruleConfidences:
    if (info[0][0] not in userInventory) or (info[0][1] not in userInventory):
      if (info[0][0] in userInventory) and (info[0][1] not in userInventory):
        res.append((info[0][1], info[1]))
      elif (info[0][1] in userInventory) and (info[0][0] not in userInventory):
        res.append((info[0][0], info[1]))

  searchGames = sorted(res, key=lambda x: x[1], reverse=True)[:26]
  return searchGames

priorRecommend = user1_2.map(lambda x: (x[0], priorRecomm(x[1], ruleConfidences))).compute()

priorRecommendGames = []
for user in priorRecommend:
  priorRecommendGames.append((user[0], [games[0] for games in user[1]]))
print()
print('prior recommend game: ')
print(priorRecommend)
print('prior recommend game name: ')
print(priorRecommendGames)

priorGamesTags = getGamesTags(priorRecommendGames)
print(priorGamesTags)
priorTagsMatirx = createRecommendMatrix(priorGamesTags)
print()
print('priorTagsMatirx: ')
print(priorTagsMatirx)

***Get tags of games that users frequently play and Create the game tags matrix for user***

In [None]:
user1 = db.from_sequence(userItemsTop20.take(2))

# Get tags of games that users frequently play
def get_user_tags(userData):
  userData = userData.map(lambda x: [(item['game_id'], x['steam_id']) for item in x['items']]).flatten()
  gamesDetails= gameDetails.map(lambda x: (x['game_id'], x['tags'] + x['specs']))
  innerJoin = userData.join(gamesDetails, lambda x: x[0])
  return innerJoin

user_games_tags = get_user_tags(user1)
print(user_games_tags.compute())
user_games_tags = user_games_tags.groupby(lambda x: x[1][1])
print(user_games_tags.compute())
user_games_tags = user_games_tags.map(lambda x: (x[0], [i[j][1] for i in x[1] for j in range(len(i)) if j == 0]))
print(user_games_tags.compute())

# Create the game tags matrix for user
def createTagsMatrix(id, gameTags):
  print(id)
  gamesTagsMatrix = np.zeros(total_num_tags)
  for gameList in gameTags:
    for gameTag in gameList:
      gameIndex = gamesTagsDict[gameTag]
      gamesTagsMatrix[gameIndex] = 1
  return (int(id), gamesTagsMatrix)

userTagsMatrix = user_games_tags.map(lambda user : createTagsMatrix(user[0],user[1])).compute()
print()
print('userTagsMatrix:')
print(userTagsMatrix)

[(('730', ['FPS', 'Multiplayer', 'Shooter', 'Action', 'Team-Based', 'Competitive', 'Tactical', 'First-Person', 'e-sports', 'PvP', 'Online Co-Op', 'Military', 'Co-op', 'Strategy', 'War', 'Trading', 'Realistic', 'Difficult', 'Fast-Paced', 'Moddable', 'Multi-player', 'Steam Achievements', 'Full controller support', 'Steam Trading Cards', 'Steam Workshop', 'In-App Purchases', 'Valve Anti-Cheat enabled', 'Stats']), ('730', '76561197970982479')), (('35450', ['Realistic', 'World War II', 'FPS', 'Multiplayer', 'Action', 'Historical', 'Tactical', 'First-Person', 'Shooter', 'Military', 'Team-Based', 'War', 'Atmospheric', 'Simulation', 'Tanks', 'Difficult', 'Co-op', 'Strategy', 'Gore', 'Singleplayer', 'Multi-player', 'Online Multi-Player', 'Steam Achievements', 'Steam Trading Cards', 'Steam Workshop', 'Valve Anti-Cheat enabled', 'Stats', 'Includes level editor']), ('35450', '76561197970982479')), (('8930', ['Turn-Based Strategy', 'Strategy', 'Turn-Based', 'Multiplayer', 'Historical', '4X', 'Singl

***Result***

In [None]:
# Result - Displaying the similarity scores between the user's frequently played game tags and the tags of the recommended games from the Apriori and cosine similarity algorithms
def get_tag_similarity(userTagsMatrix, priorTagsMatirx, csTagsMatrix):
  res = []
  for key in userTagsMatrix:
    #print(userTagsMatrix[key], priorTagsMatirx[key])
    priorSimilarity = cosine_similarity(userTagsMatrix[key], priorTagsMatirx[key])
    cosineSimilarity = cosine_similarity(userTagsMatrix[key], csTagsMatrix[key])
    res.append(("UserID: " + str(key)+"   Prior game tags similarity: ", priorSimilarity))
    res.append(("UserID: " + str(key)+"   Cosine similarity game tags similarity: ", cosineSimilarity))

  return res


userTagsMatrix, priorTagsMatirx, csMatrix = dict(userTagsMatrix), dict(priorTagsMatirx), dict(csTagsMatrix)

print('Cosine similarity recommend games: ')
print(recommendGames)
print('Cosine similarity recommend games tags: ')
print(csRecommendGamesTags)
print()
print('Prior recommend game name: ')
print(priorRecommendGames)
print('Prior recommend games tags: ')
print(priorGamesTags)

print()
print('Result:')
print("Displaying the similarity scores between the user's frequently played game tags and the tags of the recommended games from the Apriori and cosine similarity algorithms.")
print(get_tag_similarity(userTagsMatrix, priorTagsMatirx, csMatrix))

Cosine similarity recommend games: 
[(76561197970982479, ['Terraria', 'Fallout: New Vegas', 'Fallout 4', 'Borderlands 2', 'XCOM 2', 'Grand Theft Auto IV', 'The Witcher 3: Wild Hunt', 'Counter-Strike: Source', 'PAYDAY 2', 'Call of Duty: Black Ops', 'Warframe', 'Left 4 Dead', 'Rocket League', 'Call of Duty: Modern Warfare 2 - Multiplayer', "Garry's Mod", 'Borderlands: The Pre-Sequel', 'Dead Island', 'Arma 2: Operation Arrowhead', 'Dead Island Riptide', 'Arma 3', 'DayZ', 'War of the Roses', 'Call of Duty: Modern Warfare 3 - Multiplayer', 'Mass Effect', "Tom Clancy's The Division", 'LEGO® The Lord of the Rings™']), (76561198035864385, ['Counter-Strike: Global Offensive', "Sid Meier's Civilization V", 'Just Cause 2', 'Borderlands 2', 'FTL: Faster Than Light', 'BioShock Infinite', 'Portal', 'Batman: Arkham City™', 'Call of Duty: Black Ops III', 'PAYDAY 2', 'Grand Theft Auto V', 'ARK: Survival Evolved', 'The Binding of Isaac', 'Dark Souls: Prepare to Die Edition', 'DARK SOULS™ II: Scholar of 

# ***Google Cloud***

In [None]:
%%writefile dask_apriori.py
# the above line tells colab to write the contents of this cell to a file instead of running them
import time
import sys
import multiprocessing
import os
import urllib.request

# def a_priori(text_file_bag, support=1000):
#   # PASTE YOUR CODE HERE (and then remove the following line)!
#   raise(Exception("You forgot to paste your A-Priori code!"))


if __name__ == "__main__":
  # url = 'https://drive.google.com/uc?export=download&confirm=t&id=1Red1JDV7S7smMIylsYerWnvYbUMpCBxJ'
  # filename = "epinions_20000.txt"
  # filename, _ = urllib.request.urlretrieve(url, filename)
  # print(f"downloaded {filename}")

  support_threshold = 10
  n_cpus = multiprocessing.cpu_count()

  if len(sys.argv) == 2:
    support_threshold = int(sys.argv[1])

  if len(sys.argv) == 3:
    n_cpus = int(sys.argv[2])

  from dask import bag as db
  from dask.distributed import LocalCluster
  from dask.distributed import Client
  from dask.distributed import get_task_stream

  cluster = LocalCluster(n_workers=n_cpus, threads_per_worker=1) # force to start 8 processes if we have 8 cpus
  print(cluster)
  client = cluster.get_client()

  with get_task_stream(plot='save', filename=f"task-stream-{n_cpus}.html") as ts:
    # force load of text file into partitions so that each process has one chunk
    # file_stats = os.stat(filename)
    # file_mb = file_stats.st_size / (1024 * 1024)
    # b = db.read_text(filename, encoding='unicode_escape', blocksize=str((file_mb / n_cpus))+"MB")
    time_start = time.time()
    # pair_counts = a_priori(b, support_threshold)
    # print(pair_counts.topk(10, key=1).compute())
    print(support_threshold)
    from math import isclose

    import dask.array as da
    import dask.bag as db
    import urllib.request
    import numpy as np

    urls = ['https://datarepo.eng.ucsd.edu/mcauley_group/data/steam/australian_user_reviews.json.gz',
        'https://datarepo.eng.ucsd.edu/mcauley_group/data/steam/australian_users_items.json.gz',
        'https://cseweb.ucsd.edu/~wckang/steam_games.json.gz']

    filenames = ["australian_user_reviews.json.gz",
            "australian_users_items.json.gz",
            "steam_games.json.gz"]

    text_files = {}
    for url,filename in zip(urls, filenames):
      urllib.request.urlretrieve(url, filename)
      text_file = db.read_text(filename)
      print(f"{filename} loaded with {text_file.count().compute()} lines")

      text_files[filename] = text_file


    for filename, text_file in text_files.items():
      print(f"Processing data for {filename}")
      # Example: Print the first few lines
      print(text_file.take(2))
      print()


    import json
    import re

    #converts line to json string
    def convert_to_json(data):
      python_dict = eval(data)
      json_string = json.dumps(python_dict)
      return json_string

    #Userdata：extracts 'steam_id', 'items' data
    def get_user_items(record):
      record = convert_to_json(record)
      data = json.loads(record)

      steam_id = data['steam_id']
      items = data['items']
      result = {
          'steam_id': steam_id,
          'items_count': len(items),
          'items': [{'item_id': item['item_id'], 'item_name': item['item_name'], 'playtime_forever': item['playtime_forever']} for item in items]
      }
      return result


    def get_reviews(record):
      record = convert_to_json(record)
      data = json.loads(record)

      user_id = data['user_id']
      reviews = data['reviews']
      result = {
          'user_id': user_id,
          'reviews': [{'game_id': review['item_id'], 'recommend': review['recommend']} for review in reviews]
      }
      return result

    #GameDetail：extracts 'game_name', 'game_id', 'tags', 'specs' data
    def get_game_details(record):
      record = convert_to_json(record)
      data = json.loads(record)

      try:
        game_name = data['app_name'] if 'app_name' in data else data['title']
        game_id = data['id']
        tags = data['tags'] if 'tags' in data else []
        specs = data['specs'] if 'specs' in data else []
        result = {
          'game_name': game_name,
          'game_id': game_id,
          'tags': tags,
          'specs': specs
        }
        return result
      except Exception as e:
        print(f'Processing {data}')
        print(e)




    # gameDetails = get_game_details(text_files['steam_games.json.gz'])
    userItems = text_files['australian_users_items.json.gz'].map(lambda x: get_user_items(x))
    gameDetails = text_files['steam_games.json.gz'].map(lambda x: get_game_details(x))
    gameDetails = gameDetails.filter(lambda x:x is not None)
    reviews = text_files['australian_user_reviews.json.gz'].map(lambda x: get_reviews(x))
    #gameDetails.compute()
    print('userItems example: ')
    print(userItems.take(2))
    print('gameDetails example: ')
    print(gameDetails.take(2))
    print('reviews example: ')
    print(reviews.take(2))


    # ==============================================================================
    #Get the top 20 games with the highest playtime for each user
    def get_user_top20games(data):
      steam_id = data['steam_id']
      items = data['items']

      top20_games = sorted(items, key=lambda x: x['playtime_forever'], reverse=True)[:20]
      items = top20_games
      result = {
          'steam_id': steam_id,
          'items_count': len(items),
          'items': [{'game_id': item['item_id'], 'item_name': item['item_name'], 'playtime_forever': item['playtime_forever']} for item in items]
      }
      return result

    userItemsTop20 = userItems.map(lambda x: get_user_top20games(x))
    print('userItemsTop20 example: ')
    print(userItemsTop20.take(1))
    print()


    # ==============================================================================
    #Map each line into a list of all pairs of games in the user's inventory
    def get_pairs(basket, frequent_items):
      basket = set(basket) & frequent_items
      pairs = []
      basketList = list(basket)
      for i in range(len(basket)):
        for j in range(i + 1, len(basket)):
          pairs.append(tuple(sorted((basketList[i], basketList[j]))))
      return pairs

    #Get pair counts - A Prior Algorithm
    def a_priori(data, support=10):
      itemCount = data.flatten().frequencies()
      itemFrequencies = itemCount.compute()
      frequentItems = set(item for item, count in itemFrequencies if count >= support)

      pairCounts = data.map(lambda basket: get_pairs(basket, frequentItems)) \
                      .flatten() \
                      .frequencies() \
                      .filter(lambda x: x[1] >= support)
      return itemCount, pairCounts, itemFrequencies



    itemsFrequent = userItemsTop20.map(lambda x: [item['item_name'] for item in x['items']])
    print("First User's game")
    print(itemsFrequent.take(1))
    resIndiv, resPairs, resIndivCompute = a_priori(itemsFrequent, support_threshold)

    #print(res_pairs.compute())
    print()
    print(resIndivCompute)

    # ==============================================================================

    # conf_I_j = sup(I U j) / sup(I)
    # sup(I) = num. baskets I is in / n

    #Get confidence score for all pairs
    def confidence(itemCounts, pairCounts, n):
      supItem = itemCounts.map(lambda x:(x[0], x[1]/n))
      sup_ij = pairCounts.map(lambda x:(x[0][0], (x[0][1], x[1]/n)))
      sup_ji = pairCounts.map(lambda x:(x[0][1], (x[0][0], x[1]/n)))
      sup = db.concat([sup_ij, sup_ji])
      all = sup.join(supItem,lambda x: x[0])
      confidences = all.map(lambda x: ((x[1][0], x[1][1][0]), x[1][1][1] / x[0][1]))
      return confidences

    #australian_users_items.json.gz loaded with 88310 lines
    N = 88310

    ruleConfidences = confidence(resIndiv, resPairs, N).compute()
    print('Confidences')
    print(ruleConfidences)

    # ==============================================================================

    gamesDict = {gameName[0]:index for index, gameName in enumerate(resIndivCompute)}
    print('gamesDict: ')
    print(gamesDict)
    totalNumGames = len(gamesDict)

    gamesList = [(index, gameName[0]) for index, gameName in enumerate(resIndivCompute)]
    gamesBag = db.from_sequence(gamesList)
    print('gamesBag: ')
    print(gamesBag.take(1))

    # ==============================================================================
    # Create game matrix
    def create_game_matrix(id, items):
      row = np.zeros(totalNumGames)
      for item in items:
        itemName = item['item_name']
        gameIndex = gamesDict[itemName]
        row[gameIndex] = 1

      return (int(id), row)

    userGamesMatrix = userItemsTop20.map(lambda user : create_game_matrix(user['steam_id'],user['items']))
    print('userGamesMatrix example:')
    print(userGamesMatrix.take(2))


    # Computes cosine similarity between two numpy arrays
    def cosine_similarity(u, v):
    # YOUR CODE HERE
      dot_product = np.dot(u, v)
      norm_u = np.linalg.norm(u)
      norm_v = np.linalg.norm(v)
      if norm_u == 0 or norm_v == 0:
        return 0.0
      similarity = dot_product / (norm_u * norm_v)
      return similarity

    # ==============================================================================

    test_user = db.from_sequence(userGamesMatrix.take(2))
    print('test_user: ')
    print(test_user.compute())
    # Find three nearest neighbors and return the recommended game name
    def cs_recommend_games(test_user, userGamesMatrix, gamesBag):
      similarUsers = userGamesMatrix.map(lambda x : (x[0], cosine_similarity(x[1], test_user), x[1])).topk(4, key=1).compute()
      similarUserIds = [(user_id, matrix) for user_id, _, matrix in similarUsers[1:]]
      similarGames = [np.where(shows > 0)[0] for _, shows in similarUserIds]

      commonIndices = set(similarGames[0])
      for games in similarGames[1:]:
        commonIndices = commonIndices.union(set(games))
      testGamesIndices = np.where(test_user > 0)[0]
      recommendedIndices = commonIndices - set(testGamesIndices)
      recommendGames = gamesBag.filter(lambda x: (x[0]) in recommendedIndices).map(lambda x: x[1]).compute()
      return recommendGames

    recommendGames = test_user.map(lambda x: (x[0], cs_recommend_games(x[1], userGamesMatrix, gamesBag))).compute()
    print('Cosine similarity recommend games: ')
    print(recommendGames)

    # ==============================================================================

    #{'game_name': 'Lost Summoner Kitty', 'game_id': '761140', 'tags': ['Strategy', 'Action', 'Indie', 'Casual', 'Simulation'], 'specs': ['Single-player']}
    # Get all tags
    collectTags = gameDetails.map(lambda x: x['tags'] + x['specs'] if x is not None else []).flatten().distinct().compute()
    print(collectTags)

    gamesTagsDict = {gameTag:index for index, gameTag in enumerate(collectTags)}
    print(gamesTagsDict)
    total_num_tags = len(gamesTagsDict)
    print(total_num_tags)

    # Create the game tags matrix
    def createRecommendMatrix(gameTags):
      res = []
      for id, tags in gameTags:
        gamesTagsMatrix = np.zeros(total_num_tags)
        for tag in tags:
          gameIndex = gamesTagsDict[tag]
          gamesTagsMatrix[gameIndex] = 1
        res.append((id, gamesTagsMatrix))
      return res

    # Get games tags from recommended games name
    def getGamesTags(games):
      res = []
      for recommendGames in games:
        tags = gameDetails.map(lambda x: x['tags'] + x['specs'] if x['game_name'] in recommendGames[1] else []).flatten().distinct().compute()
        steamId = recommendGames[0]
        res.append((steamId, tags))
      return res

    csRecommendGamesTags = getGamesTags(recommendGames)
    print()
    print(csRecommendGamesTags)
    csTagsMatrix = createRecommendMatrix(csRecommendGamesTags)
    print()
    print('cosine_similarity TagsMatrix: ')
    print(csTagsMatrix)

    # ==============================================================================

    ##======Search recommend game(Prior)=================================================================
    # Get game name from userItemsTop20 dask bag
    def get_games(data):
      items = data['items']
      result = [item['item_name'] for item in items]
      return result

    user1_2 = db.from_sequence(userItemsTop20.take(2))
    user1_2 = user1_2.map(lambda x: (int(x['steam_id']), get_games(x)))
    print('test for two user:')
    print(user1_2.compute())
    print(ruleConfidences)

    # Return the recommended games by the a prior algorithm
    def priorRecomm(userInventory, ruleConfidences):
      #res = [info for info in ruleConfidences if (info[0][0] or info[0][1] in userInventory) and (info[1]<0.99)]
      res = []
      for info in ruleConfidences:
        if (info[0][0] not in userInventory) or (info[0][1] not in userInventory):
          if (info[0][0] in userInventory) and (info[0][1] not in userInventory):
            res.append((info[0][1], info[1]))
          elif (info[0][1] in userInventory) and (info[0][0] not in userInventory):
            res.append((info[0][0], info[1]))

      searchGames = sorted(res, key=lambda x: x[1], reverse=True)[:26]
      return searchGames

    priorRecommend = user1_2.map(lambda x: (x[0], priorRecomm(x[1], ruleConfidences))).compute()

    priorRecommendGames = []
    for user in priorRecommend:
      priorRecommendGames.append((user[0], [games[0] for games in user[1]]))
    print()
    print('prior recommend game: ')
    print(priorRecommend)
    print('prior recommend game name: ')
    print(priorRecommendGames)

    priorGamesTags = getGamesTags(priorRecommendGames)
    print(priorGamesTags)
    priorTagsMatirx = createRecommendMatrix(priorGamesTags)
    print()
    print('priorTagsMatirx: ')
    print(priorTagsMatirx)

    # ==============================================================================

    user1 = db.from_sequence(userItemsTop20.take(2))

    # Get tags of games that users frequently play
    def get_user_tags(userData):
      userData = userData.map(lambda x: [(item['game_id'], x['steam_id']) for item in x['items']]).flatten()
      gamesDetails= gameDetails.map(lambda x: (x['game_id'], x['tags'] + x['specs']))
      innerJoin = userData.join(gamesDetails, lambda x: x[0])
      return innerJoin

    user_games_tags = get_user_tags(user1)
    print(user_games_tags.compute())
    user_games_tags = user_games_tags.groupby(lambda x: x[1][1])
    print(user_games_tags.compute())
    user_games_tags = user_games_tags.map(lambda x: (x[0], [i[j][1] for i in x[1] for j in range(len(i)) if j == 0]))
    print(user_games_tags.compute())

    # Create the game tags matrix for user
    def createTagsMatrix(id, gameTags):
      gamesTagsMatrix = np.zeros(total_num_tags)
      for gameList in gameTags:
        for gameTag in gameList:
          gameIndex = gamesTagsDict[gameTag]
          gamesTagsMatrix[gameIndex] = 1
      return (int(id), gamesTagsMatrix)

    userTagsMatrix = user_games_tags.map(lambda user : createTagsMatrix(user[0],user[1])).compute()
    print()
    print('userTagsMatrix:')
    print(userTagsMatrix)

    # ==============================================================================
    # Result - Displaying the similarity scores between the user's frequently played game tags and the tags of the recommended games from the Apriori and cosine similarity algorithms
    def get_tag_similarity(userTagsMatrix, priorTagsMatirx, csTagsMatrix):
      res = []
      for key in userTagsMatrix:
        #print(userTagsMatrix[key], priorTagsMatirx[key])
        priorSimilarity = cosine_similarity(userTagsMatrix[key], priorTagsMatirx[key])
        cosineSimilarity = cosine_similarity(userTagsMatrix[key], csTagsMatrix[key])
        res.append(("UserID: " + str(key)+"   Prior game tags similarity: ", priorSimilarity))
        res.append(("UserID: " + str(key)+"   Cosine game tags similarity: ", cosineSimilarity))

      return res


    userTagsMatrix, priorTagsMatirx, csMatrix = dict(userTagsMatrix), dict(priorTagsMatirx), dict(csTagsMatrix)

    print('Cosine similarity recommend games: ')
    print(recommendGames)
    print('Prior recommend game name: ')
    print(priorRecommendGames)
    print('Prior recommend game name: ')
    print(priorRecommendGames)
    print('Prior recommend games tags: ')
    print(priorGamesTags)

    print()
    print('Result:')
    print("Displaying the similarity scores between the user's frequently played game tags and the tags of the recommended games from the Apriori and cosine similarity algorithms.")
    print(get_tag_similarity(userTagsMatrix, priorTagsMatirx, csMatrix))

    time_end = time.time()
    print(f"elapsed time is {time_end - time_start}")

Overwriting dask_apriori.py


# ***Create a Dataproc cluster***

In [None]:
# Edit this USERNAME string to set your username (for me this is "jatlas" instead of "?")
USERNAME="fli73"
%env REGION=australia-southeast1
%env ZONE=australia-southeast1-a
%env PROJECT=projects-2024-$USERNAME
%env CLUSTER=projects-2024-$USERNAME-cluster

In [None]:
# Run this to prompt a log in so that this colab session can use your credentials to start the cluster
!gcloud auth login

In [None]:
# Set the project environment variable and enable dataproc on it
!gcloud config set project $PROJECT
!gcloud services enable dataproc.googleapis.com cloudresourcemanager.googleapis.com

In [None]:
# This command actually starts the cluster. It may take up to 5-10 minutes before
#  showing the completion dialog which should be
#  "Created [https://dataproc.googleapis.com/v1/projects/] Cluster placed in zone [australia-southeast1-a]"
!gcloud dataproc clusters create $CLUSTER --region=$REGION --zone=$ZONE \
  --master-machine-type n2-standard-8 \
  --master-boot-disk-size 100 \
  --image-version 2.2-ubuntu22 \
  --max-age=30m \
  --public-ip-address \
  --num-masters=1 --num-workers=0

# NOTE: the above arguments should be self-explanatory; you can read more here: https://cloud.google.com/sdk/gcloud/reference/dataproc/clusters/create
# The 30minute age will ensure the cluster shuts down if you forget or get disconnected from colab
# We only use 1 master node and 0 worker nodes as our 1 master node is an
#  n2-standard-8 which has 8 virtual CPUs

In [None]:
# This command will send your dask_apriori.py file (which was saved in the earlier %%writefile cell)
#  to the cloud and run it within your cluster.
!gcloud dataproc jobs submit pyspark --cluster=$CLUSTER --region=$REGION dask_apriori.py -- 10 1

# NOTE: The argument 1000 is the support threshold and 1 is the number of dask processes
#  it will use. For timing testing you will change this value to see how your
#  program scales with increased resources. At 1, only one of the virtual CPUs will be used.

# If your program is successful, output should include:
# [(('and', 'the'), 14298), (('a', 'the'), 14087), (('i', 'the'), 13563), ...
# elapsed time is 19.15696430206299