# Importing Libraries

In [1]:
from __future__ import print_function

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
import plotly.express as px
from pyspark.sql import SparkSession
# from plotly.graph_objs import Scene
import numpy as np
from kneed import KneeLocator

import requests
import json
import time
from datetime import datetime
# import traceback
from scipy import stats

from pymongo import MongoClient

import os
import webbrowser

# Initialising Spark Session

In [2]:
spark = SparkSession.builder \
      .appName("Word count") \
      .getOrCreate()


# Collecting data from MongoDB Atlas Cluster

In [12]:
em_list=[]
start_time = time.time()
client = MongoClient("MongoDB Collection URL Here") # Collection URL Not specified for Data Security
db = client.get_database('steam_data')
game_id="271590" # get game id here sample game id : 271590 has been specified
coll=db.game_data
cur=coll.find_one({"url_info.id":game_id},{"popu_tags":1})
tags=[]
tag_length=len(cur["popu_tags"])
for i in cur["popu_tags"]:
    tags.append(i)
    cur=coll.find({"popu_tags":{"$in":[i]}},{"url_info.id":1,"name":1,'price':1})#,"popu_tags":1, "_id":1})
    cur=[i for i in cur]
    if(tag_length>15):
        cur=cur[:tag_length]
    else:
        cur=cur[:20]
    em_list=em_list+cur
em_list=list({v['name']:v for v in em_list}.values())
em_list=em_list[:200]
print("--- %s seconds ---" % (time.time() - start_time))

--- 49.3672251701355 seconds ---


# Cache Deletion Implementation
    - Any data which is 3 days old will be automatically deleted

In [14]:
############ Cache Deletion Implementation

def old_data_clearance(db):
    now = datetime.now()
    game_review_data=db.game_review_data.find({

    },{
       "game_id": 1,
       "epoch": 1
    }
    );

    game_review_data=[i for i in game_review_data]

    for i in game_review_data:
        if((int(i['epoch'])-int(time.mktime(now.timetuple())))>259200):
            db.game_review_data.find({"game_id " :  i['game_id']},{"E": 1});
            print(str(i['game_id'])+' Deleted')

old_data_clearance(client.get_database('steam_data'))    

############ Cache Deletion Implementation

# Data Cleaning

In [None]:
start_time = time.time()
import pandas as pd
name=[]
price=[]
pos_rev=[]
neg_rev=[]
l1=em_list
for i in l1:
    try:
            name.append(i['name'])
            if(i['price']!='free'):
                try:
                    price.append(int(i['price']))
                except:
                    print(i['price'])
                    price.append(0)
            else:
                price.append(0)
            ress=db.game_review_data.find_one({'game_id':str(i['url_info']['id'])})
            if(type(ress)!=type(dict())):
                qwe=requests.get('https://store.steampowered.com/appreviews/'+str(i['url_info']['id'])+'?json=1').content
                ress = json.loads(qwe.decode("utf-8"))
                ress['game_id']=str(i['url_info']['id'])
                now = datetime.now()
                ress['epoch']=int(time.mktime(now.timetuple()))
                db.game_review_data.insert_one(ress)
                print(str(ress['game_id'])+' Inserted')
            else:
                print(str(ress['game_id'])+' fetched')
            pos_rev.append(ress['query_summary']['total_positive'])
            neg_rev.append(ress['query_summary']['total_negative'])
    except:
#         print(str(traceback.format_exc()))
#         print(i)
        pass
        
df = pd.DataFrame(list(zip(name, price, pos_rev, neg_rev)), 
               columns =['Name', 'Price','Positive_Reviews','Negative_Reviews']) 
for i in df.columns:
    try:
        df=df[list(np.abs(stats.zscore(list(df[i]))) < 3)]
    except:
        pass
print("--- %s seconds ---" % (time.time() - start_time))

# K-Means Function
    - Dynamic Determination of Appropriate number of Clusters using Elbow Method has been implemented

In [23]:
def kmeans_3dplot(spark, df, op_html):
    sqlContext = SQLContext(spark)
    FEATURES_COL = ['Price', 'Positive_Reviews', 'Negative_Reviews']
    df = spark.createDataFrame(df)
    for col in df.columns:
        if col in FEATURES_COL:
            df = df.withColumn(col,df[col].cast('float'))
    df = df.na.drop()
    vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
    df_kmeans = vecAssembler.transform(df).select('Name', 'features')
    cost = np.zeros(20)
    for k in range(2,20):
        kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
        model = kmeans.fit(df_kmeans.sample(False,0.1, seed=42))
        cost[k] = model.summary.trainingCost
    y=cost[2:20]
    x = range(1, len(y)+1)
    kn = KneeLocator(x, y, curve='convex', direction='decreasing')
    try:
        k = int(kn.knee)
    except:
        k=2
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(df_kmeans)
    transformed = model.transform(df_kmeans).select('Name', 'prediction')
    rows = transformed.collect()
    df_pred = sqlContext.createDataFrame(rows)
    df_pred = df_pred.join(df, 'Name')
    pddf_pred = df_pred.toPandas().set_index('Name')
    fig = px.scatter_3d(x=pddf_pred.Price, 
                        y=pddf_pred.Positive_Reviews, 
                        z=pddf_pred.Negative_Reviews, 
                        color=pddf_pred.prediction,  
                        hover_name=list(df.select('Name').toPandas()['Name']))
    fig.update_layout(scene = dict(
                    xaxis_title='Price',
                    yaxis_title='Positive Reviews',
                    zaxis_title='Negative Reviews'), template="plotly_dark")
    fig.write_html(op_html)


# Final Step
    - Plotting 3d Clustering Plot and opening it in Browser

In [24]:
kmeans_3dplot(spark, df,'kmeans_cache testt.html')
webbrowser.open('file://' + os.path.realpath('kmeans_plot.html'))

True