#Instruction: Upload steam_200k.csv to DBFS File system and then run the codes

Installing matplotlib, seaborn, pandas, and plotly

In [0]:
%sh
pip install matplotlib seaborn pandas

In [0]:
%sh
pip install plotly

In [0]:
!pip install mlflow

In [0]:
#import mlflow and autolog machine learning runs
import mlflow
mlflow.pyspark.ml.autolog()

In [0]:
dbutils.fs.ls("/FileStore/tables/")

In [0]:
# Load the CSV file into a Spark DataFrame, inferring data types and without a header row
steam = spark.read.csv("/FileStore/tables/steam_200k.csv", header=False, inferSchema=True)

# Rename the columns to the desired headers
steam = steam.toDF("ID", "game_Name", "mem_Type", "value")

In [0]:
# Remove rows with any missing values (NaN) from the DataFrame and then display the first 20 rows
steam = steam.dropna()
steam.show(truncate= False)

Creating a dataframe containing the distinct game names from steam dataframe

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Step 1: Select the game_Name column and get distinct game names
distinct_games_df = steam.select("game_Name").distinct()

# Step 2: Add a game_ID column with self-generated numbering starting from 1
distinct_games_df = distinct_games_df.withColumn(
    "game_ID", (monotonically_increasing_id() + 1).cast("int")
)

# Display the resulting DataFrame
distinct_games_df.show(truncate=False)

In [0]:
# Get the count of distinct games in the distinct_games_df DataFrame
distinct_game_count = distinct_games_df.count()

# Display the count of distinct games
print(f"Total number of distinct games in distinct_games_df: {distinct_game_count}")

In [0]:
# Get the last 20 rows of the DataFrame
last_20_rows = distinct_games_df.tail(20)

# Display the last 20 rows
for row in last_20_rows:
    print(row)


#Initial exploratory analysis of the data

In [0]:
# View the schema of the DataFrame
print("\nSchema of the steam DataFrame:")
steam.printSchema()

In [0]:
# Count the total number of rows in the DataFrame
total_rows = steam.count()
print(f"\nTotal number of rows in the steam DataFrame: {total_rows}")

In [0]:
# Get unique values in mem_Type and game_Name
print("\nUnique values in the mem_Type column:")
steam.select("mem_Type").distinct().show()

In [0]:
print("\nUnique values in the game_Name column:")
steam.select("game_Name").distinct().show(truncate=False)

In [0]:
# Group by mem_Type and count the number of rows in each group
print("\nGroup by mem_Type and count the number of rows in each group:")
steam.groupBy("mem_Type").count().show()

In [0]:
# Filter the DataFrame to include only rows where mem_Type is "purchase"
purchase_df = steam.filter(steam.mem_Type == "purchase")
purchase_df.show()

In [0]:
# Group the filtered DataFrame by game_Name and count the occurrences
purchase_count_df = purchase_df.groupBy("game_Name").count()

purchase_count_df.show(truncate = False)

In [0]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Convert the purchase_count_df DataFrame to a Pandas DataFrame
purchase_count_pandas_df = purchase_count_df.toPandas()

# Sort the DataFrame in descending order based on the count
purchase_count_pandas_df = purchase_count_pandas_df.sort_values(by='count', ascending=False)

# Select the top 5 rows (top 5 games with the highest number of purchases)
top_5_games = purchase_count_pandas_df.head(5)

# Plot a vertical bar chart
plt.figure(figsize=(10, 6))
sns.barplot(x='game_Name', y='count', data=top_5_games, palette='viridis')

# Set plot labels and title
plt.xlabel('Game Name')
plt.ylabel('Count of Purchases')
plt.title('Top 5 Games with the Highest Number of Purchases')

# Rotate x-axis labels for better readability
plt.xticks(rotation=45, ha='right')

# Show the plot
plt.tight_layout()
plt.show()

In [0]:
# Order the results in descending order based on the count
ordered_purchase_count_df = purchase_count_df.orderBy("count", ascending=False)
ordered_purchase_count_df.show(truncate = False)

In [0]:
# Get the game_Name with the highest number of "purchase" occurrences
top_game = ordered_purchase_count_df.first()


# Display the game_Name with the highest number of "purchase" occurrences and its count
print(f"The game with the highest number of 'purchase' occurrences is: {top_game['game_Name']}, with {top_game['count']} occurrences.")

In [0]:
from pyspark.sql.functions import sum

# Filter the DataFrame to include only rows where mem_Type is "play"
play_df = steam.filter(steam.mem_Type == "play")
play_df.show(truncate = False)

In [0]:
from pyspark.sql.functions import sum, round

# Group the filtered DataFrame by ID and calculate the cumulative play time (sum of 'value' column)
cumulative_play_time_df = play_df.groupBy("ID").agg(sum("value").alias("cumulative_play_time"))

# Round the cumulative_play_time column to two decimal places
cumulative_play_time_df = cumulative_play_time_df.withColumn("cumulative_play_time", round(cumulative_play_time_df["cumulative_play_time"], 2))

# Display the DataFrame with cumulative play time rounded to two decimal places
cumulative_play_time_df.show(truncate=False)

In [0]:
# Arrange the cumulative_play_time_df DataFrame in descending order of cumulative play time
cumulative_play_time_df = cumulative_play_time_df.orderBy("cumulative_play_time", ascending=False)

# Display the DataFrame with cumulative play time rounded to two decimal places, ordered in descending order
cumulative_play_time_df.show(truncate=False)

In [0]:
# Take the top 5 IDs from the ordered DataFrame
top_5_ids_df = cumulative_play_time_df.limit(5)

# Display the top 5 IDs with their corresponding cumulative play time
top_5_ids_df.show(truncate=False)

In [0]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Convert the top 5 IDs DataFrame to a Pandas DataFrame
top_5_ids_pandas_df = top_5_ids_df.toPandas()

# Plot a horizontal bar chart
plt.figure(figsize=(10, 6))
sns.barplot(y='cumulative_play_time', x='ID', data=top_5_ids_pandas_df, palette='viridis')

# Set plot labels and title
plt.xlabel('Member ID')
plt.ylabel('Cumulative Play Time (Hours)')
plt.title('Top 5 Member IDs with Maximum Cumulative Play Time')

# Show the plot
plt.tight_layout()
plt.show()


Giving distinct game_ID to the games

In [0]:
from pyspark.sql.functions import desc, sum, count

# Join the steam and distinct_games_df DataFrames on the game name (game_Name)
joined_df = steam.join(distinct_games_df, steam["game_Name"] == distinct_games_df["game_Name"], "inner")
joined_df.show(truncate=False)

In [0]:
# Join the steam and distinct_games_df DataFrames on the game name (game_Name)
joined_df = steam.join(distinct_games_df, steam["game_Name"] == distinct_games_df["game_Name"], "inner")

# Drop the game_Name column from the joined DataFrame
joined_df = joined_df.drop(steam["game_Name"])

# Display the joined DataFrame without the game_Name column repetition
joined_df.show(truncate=False)

In [0]:
# Calculate the distribution of mem_Type
mem_type_distribution = joined_df.groupBy("mem_Type").count().toPandas()
print("\nDistribution of mem_Type:")
print(mem_type_distribution)

In [0]:
# Distribution of games
game_distribution = joined_df.groupBy("game_ID").count().orderBy(desc("count")).toPandas()
print("\nTop 10 most frequent games:")
print(game_distribution.head(10))

In [0]:
# Distribution of games
game_distribution = joined_df.groupBy("game_Name").count().orderBy(desc("count")).toPandas()
print("\nTop 10 most frequent games:")
print(game_distribution.head(10))

In [0]:
# Top games in terms of play time
top_games_play_time = joined_df.filter(joined_df["mem_Type"] == "play")\
                               .groupBy("game_Name")\
                               .agg(sum("value").alias("total_play_time"))\
                               .orderBy(desc("total_play_time"))\
                               .toPandas()

print("\nTop 10 games by play time:")
print(top_games_play_time.head(10))

In [0]:
# Visualize the relationship between total play time and total purchases for each game
play_vs_purchases_df = joined_df.groupBy("game_ID")\
                                .agg(sum("value").alias("total_play_time"), count("ID").alias("total_purchases"))

# Convert to Pandas for plotting
play_vs_purchases_df_pandas = play_vs_purchases_df.toPandas()

# Plot the relationship between total play time and total purchases
plt.figure(figsize=(10, 6))
sns.scatterplot(x='total_play_time', y='total_purchases', data=play_vs_purchases_df_pandas)
plt.xlabel("Total Play Time")
plt.ylabel("Total Purchases")
plt.title("Total Play Time vs Total Purchases for Each Game ID")
plt.show()

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.sql.functions import sum, count

# Aggregate the data by game name and calculate total play time and total purchases
game_aggregated_df = joined_df.groupBy("game_Name")\
                              .agg(sum("value").alias("total_play_time"), count("ID").alias("total_purchases"))

# Convert the aggregated DataFrame to a Pandas DataFrame
game_aggregated_df_pandas = game_aggregated_df.toPandas()

# Calculate the correlation matrix between total play time and total purchases
correlation_matrix = game_aggregated_df_pandas[['total_play_time', 'total_purchases']].corr()

# Plot the correlation heat map
plt.figure(figsize=(8, 6))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', square=True, fmt='.2f')
plt.title("Correlation Heatmap: Total Play Time and Total Purchases")
plt.show()


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Convert to Pandas for plotting
play_vs_purchases_df_pandas = play_vs_purchases_df.toPandas()

# Create a hexbin plot to visualize the relationship between total play time and total purchases
plt.figure(figsize=(10, 6))
hexbin_plot = plt.hexbin(play_vs_purchases_df_pandas['total_play_time'], play_vs_purchases_df_pandas['total_purchases'], gridsize=20, cmap='Oranges', mincnt=1)

# Add a color bar to the plot for reference
plt.colorbar(hexbin_plot)

# Add labels and title
plt.xlabel("Total Play Time")
plt.ylabel("Total Purchases")
plt.title("Hexbin Plot of Total Play Time vs Total Purchases for Each Game ID")

# Set the x-axis range to display only up to 50,000
plt.xlim(0, 50000)

# Show the plot
plt.show()


In [0]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.getOrCreate()

# Register the DataFrame as a temporary table
joined_df.createOrReplaceTempView("joined_df")

# Example 2: Calculate the average play time per game and round the time to 2 decimal places
avg_play_time_per_game = spark.sql("""
    SELECT game_ID, ROUND(AVG(value), 2) AS avg_play_time
    FROM joined_df
    WHERE mem_Type = 'play'
    GROUP BY game_ID
""")

# Display the results
avg_play_time_per_game.show()


#Machine Learning Algorithm for "Purchase" Behavior

###Steps in ML Algorithm

####Step 1: Filter the Data for 'purchase' type
####Step 2: Split the data into training (80%) and test (20%) sets
####Step 3: Train the ALS model
####Step 4: Evaluate the model's performance using the test data
####Step 5: Explore some of the resulting recommendations
####Step 6: Generate top 5 recommendations for each user
####Step 7: Display the first 5 recommendations for each user



In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# Step 1: Filter the Data for 'purchase' type
purchase_joined_df = joined_df.filter(col("mem_Type") == "purchase")
purchase_joined_df.show(truncate = False)

In [0]:
(training,test) = purchase_joined_df.randomSplit([0.8,0.2],seed = 100)

In [0]:
from pyspark.ml.recommendation import ALS
als = ALS(maxIter = 5, regParam = 0.01, userCol = "ID" , itemCol ="game_ID" , ratingCol = "value", seed = 100)
model=  als.fit(training)

In [0]:
predictions = model.transform(test).dropna()
predictions.show()

In [0]:
from pyspark.sql.functions import desc

# Order the predictions DataFrame in descending order of ID
predictions_sorted = predictions.orderBy(desc("ID"))

# Display the sorted DataFrame
predictions_sorted.show()

In [0]:
# Order the predictions DataFrame in ascending order of game_ID
ordered_predictions = predictions.orderBy("game_ID", ascending=True)

# Display the ordered DataFrame
ordered_predictions.show(truncate=False)


In [0]:
from pyspark.sql.functions import desc, asc

# Order the predictions DataFrame based on ID in descending order and game_ID in ascending order
ordered_predictions = predictions.orderBy(desc("ID"), asc("game_ID"))

# Show the ordered predictions
ordered_predictions.show()


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName = "rmse",labelCol = "value",predictionCol = "prediction")
rmse = evaluator.evaluate(predictions)
print( " Root mean square error is = %g" %rmse)
# Create a RegressionEvaluator for MAE
evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="value", predictionCol="prediction")
mae = evaluator_mae.evaluate(predictions)
print(f"Mean absolute error (MAE) is: {mae}")

In [0]:
# Order the predictions DataFrame based on prediction in descending order
ordered_predictions = predictions.orderBy("prediction", ascending=False)

# Show the ordered predictions
ordered_predictions.show(truncate=False)


In [0]:
# Drop rows with missing values
predictions = predictions.dropna()

# Order the predictions DataFrame based on prediction in descending order
ordered_predictions = predictions.orderBy("prediction", ascending=False)

# Show the ordered predictions
ordered_predictions.show(truncate=False)


In [0]:
ordered_predictions.createOrReplaceTempView("myPredictions")

In [0]:
%sql
 
select ID,game_ID, game_Name, prediction from myPredictions order by prediction desc limit 10

In [0]:
%sql 
select ID,game_ID, game_Name, prediction from myPredictions where game_Name like '%Wars%' order by prediction desc limit 10

In [0]:
%sql
select ID,game_ID, game_Name, prediction from myPredictions where game_ID In ( 18, 24) order by prediction desc limit 10

In [0]:
ordered_predictions.orderBy("prediction",ascending=False).limit(10).show(truncate=False)


In [0]:
ordered_predictions.orderBy("prediction",ascending=False).filter(ordered_predictions.game_Name.like('%Wars%')).limit(10).show(truncate=False)

In [0]:
#Function give recommendatons filtered by game name
def recommendationsByGameName(name):
    filter = '%' + name + '%'
    recommendationsList = ordered_predictions.orderBy("prediction",ascending=False).filter(ordered_predictions.game_Name.like(filter)).limit(10).show(truncate=False)
    return recommendationsList

In [0]:
recommendationsByGameName('Duty')

In [0]:
#Function give recommendatons filtered by game id
def recommendationsByGameID(game_IDs):
    # Filter ordered predictions based on provided game_IDs
    filtered_predictions = ordered_predictions.filter(ordered_predictions.game_ID.isin(game_IDs))
    
    # Order by prediction in descending order and limit to top 10 results
    top_recommendations = filtered_predictions.orderBy("prediction", ascending=False).limit(10)
    
    # Show the top recommendations without truncation
    top_recommendations.show(truncate=False)
    
    # Return the DataFrame containing the top recommendations
    return top_recommendations

In [0]:
recommendationsByGameID(1)

In [0]:
#Function give recommendatons filtered by user id
def recommendationsByUserID(ID):
    # Filter ordered predictions based on provided user ID
    user_recommendations = ordered_predictions.filter(ordered_predictions.ID == ID)
    
    # Order the filtered results by prediction in descending order
    ordered_user_recommendations = user_recommendations.orderBy("prediction", ascending=False)
    
    # Show all the recommendations for the user ID without truncation
    ordered_user_recommendations.show(truncate=False)
    
    # Return the DataFrame containing the recommendations for the user ID
    return ordered_user_recommendations


In [0]:
recommendationsByUserID(59945701)

In [0]:
userRecs = model.recommendForAllUsers(10)

In [0]:
userRecs.show(truncate=False)

In [0]:
from pyspark.sql.functions import explode, col

# Function to get game recommendations for a specific user ID and join with game details
def gameRecommendationsForUser(user_ID):
    # Filter the recommendations for the specific user ID from userRecs
    user_recommendations = userRecs.filter(col("ID") == user_ID)
    
    # Explode the recommendations column to work with each recommendation separately
    user_recommendations_exploded = user_recommendations.select("ID", explode("recommendations").alias("recommendation"))
    
    # Join the exploded recommendations with the distinct games DataFrame
    # Use the game_ID from the recommendations and the distinct_games_df DataFrame to join
    recommended_games = user_recommendations_exploded.join(distinct_games_df,
                                                          user_recommendations_exploded["recommendation.game_ID"] == distinct_games_df["game_ID"],
                                                          how="inner")\
                                                     .select("game_ID", "game_Name", "recommendation.rating")\
                                                     .orderBy(col("recommendation.rating").desc())
    
    # Display the recommended games with their ratings (predictions)
    recommended_games.show(truncate=False)
    
    return recommended_games

# Call the function for a specific user ID (e.g., user ID = 1)
gameRecommendationsForUser(76767)


In [0]:
gameRecommendationsForUser(109323647)


In [0]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Take a sample of the training data
# Choose a fraction of the data you want to sample (e.g., 0.1 for 10% of the data)
sample_fraction = 0.1
sample_training = training.sample(False, sample_fraction, seed=42)

# Step 2: Perform hyperparameter tuning using the sample of the training data

# Create an ALS model instance
als = ALS(userCol="ID", itemCol="game_ID", ratingCol="value", coldStartStrategy="drop", seed=42)

# Define a regression evaluator for the ALS model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="value", predictionCol="prediction")

# Create a parameter grid for ALS model tuning with a smaller range of values
param_grid = ParamGridBuilder()\
    .addGrid(als.rank, [5, 10 ])\
    .addGrid(als.regParam, [0.01, 0.1])\
    .addGrid(als.alpha, [0.1, 0.5])\
    .addGrid(als.maxIter, [5, 10])\
    .build()

# Define TrainValidationSplit for model tuning
tvs = TrainValidationSplit()\
    .setSeed(42)\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(param_grid)\
    .setEstimator(als)\
    .setEvaluator(evaluator)

# Fit the TrainValidationSplit model with the sample training data
gridsearch_model = tvs.fit(sample_training)

# Step 3: Use the best model for making predictions and further evaluation

# Get the best model from grid search
best_model = gridsearch_model.bestModel


In [0]:
# Use the best model directly for making predictions on the full training data
# You do not need to refit the model on the full training data as the model is already trained

# Make predictions on the full training data
full_training_predictions = best_model.transform(training)

# Evaluate the model on the full training data
full_training_rmse = evaluator.evaluate(full_training_predictions)
print(f"Root Mean Squared Error (RMSE) on full training data: {full_training_rmse}")

# Change the metricName of the evaluator to 'mae' to evaluate Mean Absolute Error (MAE)
evaluator.setMetricName("mae")

# Evaluate the model on the full training data using MAE
full_training_mae = evaluator.evaluate(full_training_predictions)
print(f"Mean Absolute Error (MAE) on full training data: {full_training_mae}")

# If you want to use the model on other data for predictions, you can use `best_model` directly


In [0]:
# Select best model and identify the parameters
bestModel = gridsearch_model.bestModel

print("Parameters for the best model:")
print("Rank Parameter: %g" % bestModel.rank)
print("RegParam Parameter: %g" % bestModel._java_obj.parent().getRegParam())
print("MaxIter Parameter: %g" % bestModel._java_obj.parent().getMaxIter())
print("ImplicitPrefs Parameter: %s" % bestModel._java_obj.parent().getImplicitPrefs())


In [0]:
# Make predictions on the test data
full_test_predictions = best_model.transform(test)

evaluator.setMetricName("rmse")
# Evaluate the model on the full test data
full_test_rmse = evaluator.evaluate(full_test_predictions)
print(f"Root Mean Squared Error (RMSE) on full test data: {full_test_rmse}")

# Change the metricName of the evaluator to 'mae' to evaluate Mean Absolute Error (MAE)
evaluator.setMetricName("mae")

# Evaluate the model on the full test data using MAE
full_test_mae = evaluator.evaluate(full_test_predictions)
print(f"Mean Absolute Error (MAE) on full test data: {full_test_mae}")

In [0]:
# Transform data with the best model
bestGeneratedPredictions = bestModel.transform(test)

# Drop rows with null values
bestGeneratedPredictions = bestGeneratedPredictions.dropna()

# Order predictions in descending order based on the 'prediction' column
sorted_predictions = bestGeneratedPredictions.orderBy("prediction", ascending=False)

# Display the top 10 sorted predictions
sorted_predictions.show(10, truncate=False)

In [0]:
display(sorted_predictions)

In [0]:
sorted_predictions_pandas = sorted_predictions.toPandas()
print(sorted_predictions_pandas.head(10))

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Convert to Pandas DataFrame for plotting
sorted_predictions_pandas = sorted_predictions.toPandas()

# Plotting top 10 predictions as a bar chart
sns.barplot(data=sorted_predictions_pandas.head(10), x='game_ID', y='prediction')
plt.xlabel('Game ID')
plt.ylabel('Prediction')
plt.title('Top 10 Predictions')
plt.show()


In [0]:
# Convert sorted_predictions to Pandas DataFrame
sorted_predictions_df = sorted_predictions.toPandas()

# Apply color coding to the table using style.background_gradient
styled_table = sorted_predictions_df.style.background_gradient(cmap='coolwarm')

# Display the styled table
styled_table


#Machine Learning Algorithm for "Play" Behavior

In [0]:
from pyspark.sql.functions import col

# Step 1: Filter the data for 'play' type
play_joined_df = joined_df.filter(col("mem_Type") == "play")

# Display the play_joined_df DataFrame
play_joined_df.show(truncate=False)


In [0]:
# Split the data into training (80%) and test (20%) sets
(train_play, test_play) = play_joined_df.randomSplit([0.8, 0.2], seed=100)


In [0]:
from pyspark.ml.recommendation import ALS

# Create an ALS model with specified parameters
als_play = ALS(maxIter=5, regParam=0.01, userCol="ID", itemCol="game_ID", ratingCol="value", seed=100)

# Fit the model to the training data for the 'play' type
play_model = als_play.fit(train_play)


In [0]:
# Use the trained model (play_model) to generate predictions on the test data (test_play)
play_predictions = play_model.transform(test_play).dropna()

# Display the predictions without truncation
play_predictions.show()

In [0]:
from pyspark.sql.functions import desc

# Order the play_predictions DataFrame in descending order of ID
play_predictions_sorted = play_predictions.orderBy(desc("ID"))

# Display the sorted DataFrame without truncation
play_predictions_sorted.show()

In [0]:
# Order the play_predictions DataFrame in ascending order of game_ID
ordered_play_predictions = play_predictions.orderBy("game_ID", ascending=True)

# Display the ordered DataFrame without truncation
ordered_play_predictions.show(truncate=False)

In [0]:
from pyspark.sql.functions import desc, asc

# Order the play_predictions DataFrame based on ID in descending order and game_ID in ascending order
ordered_play_predictions = play_predictions.orderBy(desc("ID"), asc("game_ID"))

# Show the ordered play predictions
ordered_play_predictions.show()

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# Create a RegressionEvaluator for RMSE
play_evaluator = RegressionEvaluator(metricName="rmse", labelCol="value", predictionCol="prediction")

# Evaluate the RMSE of play_predictions
play_rmse = play_evaluator.evaluate(play_predictions)
print(f"Root Mean Square Error (RMSE) for 'play' predictions is: {play_rmse:.4f}")

# Create a RegressionEvaluator for MAE
evaluator_play_mae = RegressionEvaluator(metricName="mae", labelCol="value", predictionCol="prediction")
play_mae = evaluator_play_mae.evaluate(play_predictions)
print(f"Mean absolute error (MAE) is: {play_mae}")


In [0]:
# Order the predictions DataFrame based on prediction in descending order
ordered_play_predictions = play_predictions.orderBy("prediction", ascending=False)

# Show the ordered predictions
ordered_play_predictions.show(truncate=False)


In [0]:
# Drop rows with missing values from play_predictions
play_predictions = play_predictions.dropna()

# Order the play predictions DataFrame based on prediction in descending order
ordered_play_predictions = play_predictions.orderBy("prediction", ascending=False)

# Show the ordered play predictions
ordered_play_predictions.show(truncate=False)

In [0]:
ordered_play_predictions.createOrReplaceTempView("my_play_Predictions")

In [0]:
%sql 
select ID,game_ID, game_Name, prediction from my_play_Predictions order by prediction desc limit 10

In [0]:
%sql
select ID,game_ID, game_Name, prediction from my_play_Predictions where game_Name like '%Wars%' order by prediction desc limit 10

In [0]:
%sql
 select ID,game_ID, game_Name, prediction from my_play_Predictions where game_ID In ( 18, 24) order by prediction desc limit 10

In [0]:
ordered_play_predictions.orderBy("prediction",ascending=False).limit(10).show(truncate=False)

In [0]:
ordered_play_predictions.orderBy("prediction",ascending=False).filter(ordered_play_predictions.game_Name.like('%Wars%')).limit(10).show(truncate=False)

In [0]:
#Function to give recommendation filtered by name
def recommendationsByGamePlayName(name):
    filter = '%' + name + '%'
    recommendationsList = ordered_play_predictions.orderBy("prediction",ascending=False).filter(ordered_play_predictions.game_Name.like(filter)).limit(10).show(truncate=False)
    return recommendationsList

In [0]:
recommendationsByGamePlayName('Duty')

In [0]:
#Function to give recommendation filtered by game id
def recommendationsByGamePlayID(game_IDs):
    # Filter ordered predictions based on provided game_IDs
    filtered_play_predictions = ordered_play_predictions.filter(ordered_play_predictions.game_ID.isin(game_IDs))
    
    # Order by prediction in descending order and limit to top 10 results
    top_recommendations_play = filtered_play_predictions.orderBy("prediction", ascending=False).limit(10)
    
    # Show the top recommendations without truncation
    top_recommendations_play.show(truncate=False)
    
    # Return the DataFrame containing the top recommendations
    return top_recommendations_play

In [0]:
recommendationsByGamePlayID(1)

In [0]:
#Function to give recommendation filtered by user id
def recommendationsByUserPlayID(ID):
    # Filter ordered predictions based on provided user ID
    user_recommendations_play = ordered_play_predictions.filter(ordered_play_predictions.ID == ID)
    
    # Order the filtered results by prediction in descending order
    ordered_user_recommendations_play = user_recommendations_play.orderBy("prediction", ascending=False)
    
    # Show all the recommendations for the user ID without truncation
    ordered_user_recommendations_play.show(truncate=False)
    
    # Return the DataFrame containing the recommendations for the user ID
    return ordered_user_recommendations_play


In [0]:
recommendationsByUserPlayID(43908860)

In [0]:
userRecs_play = play_model.recommendForAllUsers(10)

In [0]:
userRecs_play.show(truncate=False)

In [0]:
from pyspark.sql.functions import explode, col

# Function to get game recommendations for a specific user ID and join with game details
def gameRecommendationsForUserPlay(user_ID):
    # Filter the recommendations for the specific user ID from userRecs
    user_recommendationsPlay = userRecs_play.filter(col("ID") == user_ID)
    
    # Explode the recommendations column to work with each recommendation separately
    user_recommendationsPlay_exploded = user_recommendationsPlay.select("ID", explode("recommendations").alias("recommendation"))
    
    # Join the exploded recommendations with the distinct games DataFrame
    # Use the game_ID from the recommendations and the distinct_games_df DataFrame to join
    recommended_gamesPlay = user_recommendationsPlay_exploded.join(distinct_games_df,
                                                          user_recommendationsPlay_exploded["recommendation.game_ID"] == distinct_games_df["game_ID"],
                                                          how="inner")\
                                                     .select("game_ID", "game_Name", "recommendation.rating")\
                                                     .orderBy(col("recommendation.rating").desc())
    
    # Display the recommended games with their ratings (predictions)
    recommended_gamesPlay.show(truncate=False)
    
    return recommended_gamesPlay

# Call the function for a specific user ID (e.g., user ID = 1)
gameRecommendationsForUserPlay(76767)

In [0]:
gameRecommendationsForUserPlay(109323647)

In [0]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Take a sample of the training data
# Choose a fraction of the data you want to sample (e.g., 0.1 for 10% of the data)
sample_fraction = 0.1
sample_train_play = train_play.sample(False, sample_fraction, seed=42)

# Step 2: Perform hyperparameter tuning using the sample of the training data

# Create an ALS model instance
als = ALS(userCol="ID", itemCol="game_ID", ratingCol="value", coldStartStrategy="drop", seed=42)

# Define a regression evaluator for the ALS model
evaluator_play = RegressionEvaluator(metricName="rmse", labelCol="value", predictionCol="prediction")

# Create a parameter grid for ALS model tuning with a smaller range of values
param_grid = ParamGridBuilder()\
    .addGrid(als.rank, [5, 10 ])\
    .addGrid(als.regParam, [0.01, 0.1])\
    .addGrid(als.alpha, [0.1, 0.5])\
    .addGrid(als.maxIter, [5, 10])\
    .build()

# Define TrainValidationSplit for model tuning
tvs_play = TrainValidationSplit()\
    .setSeed(42)\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(param_grid)\
    .setEstimator(als)\
    .setEvaluator(evaluator_play)

# Fit the TrainValidationSplit model with the sample training data
gridsearch_model_play = tvs_play.fit(sample_train_play)

# Step 3: Use the best model for making predictions and further evaluation

# Get the best model from grid search
best_model_play = gridsearch_model_play.bestModel

In [0]:
# Use the best model directly for making predictions on the full training data
# You do not need to refit the model on the full training data as the model is already trained

# Make predictions on the full training data
full_training_predictions_play = best_model_play.transform(train_play)

# Evaluate the model on the full training data
full_training_rmse_play = evaluator_play.evaluate(full_training_predictions_play)
print(f"Root Mean Squared Error (RMSE) on full training data: {full_training_rmse_play}")

# Change the metricName of the evaluator to 'mae' to evaluate Mean Absolute Error (MAE)
evaluator_play.setMetricName("mae")

# Evaluate the model on the full training data using MAE
full_training_mae_play = evaluator_play.evaluate(full_training_predictions_play)
print(f"Mean Absolute Error (MAE) on full training data: {full_training_mae_play}")

# If you want to use the model on other data for predictions, you can use `best_model` directly


In [0]:
# Select best model and identify the parameters

print("Parameters for the best model:")
print("Rank Parameter: %g" %best_model_play.rank)
print("RegParam Parameter: %g" %best_model_play._java_obj.parent().getRegParam())
print("MaxIter Parameter: %g" %best_model_play._java_obj.parent().getMaxIter())
print("ImplicitPrefs Parameter: %s" %best_model_play._java_obj.parent().getImplicitPrefs())

In [0]:
# Transform the test data using the best model
transformed_test_play = best_model_play.transform(test_play)

# Ensure that the `prediction` column is of type `double`
transformed_test_play = transformed_test_play.withColumn(
    "prediction",
    transformed_test_play["prediction"].cast("double")
)

# Evaluate the model on the full test data
evaluator_play.setMetricName("rmse")
full_test_rmse_play = evaluator_play.evaluate(transformed_test_play)
print(f"Root Mean Squared Error (RMSE) on full test data: {full_test_rmse_play}")

# Change the metricName of the evaluator to 'mae' to evaluate Mean Absolute Error (MAE)
evaluator_play.setMetricName("mae")

# Evaluate the model on the full test data using MAE
full_test_mae_play = evaluator_play.evaluate(transformed_test_play)
print(f"Mean Absolute Error (MAE) on full test data: {full_test_mae_play}")


In [0]:
# Transform data with the best model
bestGeneratedPlayPredictions = best_model_play.transform(test_play)

# Drop rows with null values
bestGeneratedPlayPredictions = bestGeneratedPlayPredictions.dropna()

# Order predictions in descending order based on the 'prediction' column
sorted_predictions_play = bestGeneratedPlayPredictions.orderBy("prediction", ascending=False)

# Display the top 10 sorted predictions
sorted_predictions_play.show(10, truncate=False)

In [0]:
# Display the sorted predictions
sorted_predictions_play.show(10, truncate=False)


In [0]:
# Convert to Pandas DataFrame
sorted_predictions_pandas = sorted_predictions_play.toPandas()

# Print the top 10 rows
print(sorted_predictions_pandas.head(10))


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Convert to Pandas DataFrame for plotting
sorted_predictions_pandas = sorted_predictions_play.toPandas()

# Plotting top 10 predictions as a bar chart
sns.barplot(data=sorted_predictions_pandas.head(10), x='game_ID', y='prediction')
plt.xlabel('Game ID')
plt.ylabel('Prediction')
plt.title('Top 10 Predictions')
plt.show()


In [0]:
# Convert sorted_predictions to Pandas DataFrame
sorted_predictions_df = sorted_predictions_play.toPandas()

# Apply color coding to the table using style.background_gradient
styled_table = sorted_predictions_df.style.background_gradient(cmap='coolwarm')

# Display the styled table
styled_table


#Machine Learning Algorithm for "Purchase" and "Play" Behavior

In [0]:
from pyspark.sql.functions import col

# Step 1: Filter the data for 'play' type
purchase_play_df = joined_df.filter(col("mem_Type").isin(["purchase", "play"]))

# Display the play_joined_df DataFrame
purchase_play_df.show(truncate=False)

In [0]:
# Split the data into training (80%) and test (20%) sets
(train_purchase_play, test_purchase_play) = purchase_play_df.randomSplit([0.8, 0.2], seed=100)

In [0]:
from pyspark.ml.recommendation import ALS

# Create an ALS model with specified parameters
als_purchase_play = ALS(maxIter=5, regParam=0.01, userCol="ID", itemCol="game_ID", ratingCol="value", seed=100)

# Fit the model to the training data for the 'play' type
purchase_play_model = als_purchase_play.fit(train_purchase_play)

In [0]:
# Use the trained model (play_model) to generate predictions on the test data (test_play)
purchase_play_predictions = purchase_play_model.transform(test_purchase_play).dropna()

# Display the predictions without truncation
purchase_play_predictions.show()

In [0]:
from pyspark.sql.functions import desc

# Order the play_predictions DataFrame in descending order of ID
purchase_play_predictions_sorted = purchase_play_predictions.orderBy(desc("ID"))

# Display the sorted DataFrame without truncation
purchase_play_predictions_sorted.show()

In [0]:
# Order the play_predictions DataFrame in ascending order of game_ID
ordered_purchase_play_predictions = purchase_play_predictions.orderBy("game_ID", ascending=True)

# Display the ordered DataFrame without truncation
ordered_purchase_play_predictions.show(truncate=False)

In [0]:
from pyspark.sql.functions import desc, asc

# Order the play_predictions DataFrame based on ID in descending order and game_ID in ascending order
ordered_purchase_play_predictions = purchase_play_predictions.orderBy(desc("ID"), asc("game_ID"))

# Show the ordered play predictions
ordered_purchase_play_predictions.show()

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# Create a RegressionEvaluator for RMSE
purchase_play_evaluator = RegressionEvaluator(metricName="rmse", labelCol="value", predictionCol="prediction")

# Evaluate the RMSE of play_predictions
purchase_play_rmse = purchase_play_evaluator.evaluate(purchase_play_predictions)
print(f"Root Mean Square Error (RMSE) for 'purchase' and play' predictions is: {purchase_play_rmse:.4f}")

purchase_play_evaluator.setMetricName("mae")

# Change the metricName of the evaluator to 'mae' to evaluate Mean Absolute Error (MAE)
evaluator.setMetricName("mae")

# Evaluate the model on the full training data using MAE
purchase_play_mae = purchase_play_evaluator.evaluate(purchase_play_predictions)
print(f"Root Mean Square Error (RMSE) for 'purchase' and play' predictions is: {purchase_play_mae:.4f}")

In [0]:
# Order the predictions DataFrame based on prediction in descending order
ordered_purchase_play_predictions = purchase_play_predictions.orderBy("prediction", ascending=False)

# Show the ordered predictions
ordered_purchase_play_predictions.show(truncate=False)

In [0]:
# Drop rows with missing values from play_predictions
purchase_play_predictions = purchase_play_predictions.dropna()

# Order the play predictions DataFrame based on prediction in descending order
ordered_purchase_play_predictions = purchase_play_predictions.orderBy("prediction", ascending=False)

# Show the ordered play predictions
ordered_purchase_play_predictions.show(truncate=False)

In [0]:
ordered_purchase_play_predictions.createOrReplaceTempView("my_purchase_play_Predictions")

In [0]:
%sql
select ID,game_ID, game_Name, prediction from my_purchase_play_Predictions order by prediction desc limit 10

In [0]:
%sql
select ID,game_ID, game_Name, prediction from my_purchase_play_Predictions where game_Name like '%Wars%' order by prediction desc limit 10

In [0]:
%sql
select ID,game_ID, game_Name, prediction from my_purchase_play_Predictions where game_ID In ( 18, 24) order by prediction desc limit 10

In [0]:
ordered_purchase_play_predictions.orderBy("prediction",ascending=False).limit(10).show(truncate=False)

In [0]:
ordered_purchase_play_predictions.orderBy("prediction",ascending=False).filter(ordered_purchase_play_predictions.game_Name.like('%Wars%')).limit(10).show(truncate=False)

In [0]:
#Function to get recommendation filtered by game name
def recommendationsByGamePurchasePlayName(name):
    filter = '%' + name + '%'
    recommendationsList = ordered_purchase_play_predictions.orderBy("prediction",ascending=False).filter(ordered_purchase_play_predictions.game_Name.like(filter)).limit(10).show(truncate=False)
    return recommendationsList

In [0]:
recommendationsByGamePurchasePlayName('Duty')

In [0]:
#Function to get recommendation filtered by game ID
def recommendationsByGamePurchasePlayID(game_IDs):
    # Filter ordered predictions based on provided game_IDs
    filtered_purchase_play_predictions = ordered_purchase_play_predictions.filter(ordered_purchase_play_predictions.game_ID.isin(game_IDs))
    
    # Order by prediction in descending order and limit to top 10 results
    top_recommendations_purchase_play = filtered_purchase_play_predictions.orderBy("prediction", ascending=False).limit(10)
    
    # Show the top recommendations without truncation
    top_recommendations_purchase_play.show(truncate=False)
    
    # Return the DataFrame containing the top recommendations
    return top_recommendations_purchase_play

In [0]:
recommendationsByGamePlayID(1)

In [0]:
#Function to get recommendation filtered by user ID
def recommendationsByUserPurchasePlayID(ID):
    # Filter ordered predictions based on provided user ID
    user_recommendations_purchase_play = ordered_purchase_play_predictions.filter(ordered_purchase_play_predictions.ID == ID)
    
    # Order the filtered results by prediction in descending order
    ordered_user_recommendations_purchase_play = user_recommendations_purchase_play.orderBy("prediction", ascending=False)
    
    # Show all the recommendations for the user ID without truncation
    ordered_user_recommendations_purchase_play.show(truncate=False)
    
    # Return the DataFrame containing the recommendations for the user ID
    return ordered_user_recommendations_purchase_play

In [0]:
recommendationsByUserPurchasePlayID(43908860)

In [0]:
userRecs_purchase_play = purchase_play_model.recommendForAllUsers(10)

In [0]:
userRecs_purchase_play.show(truncate=False)

In [0]:
from pyspark.sql.functions import explode, col

# Function to get game recommendations for a specific user ID and join with game details
def gameRecommendationsForUserPurchasePlay(user_ID):
    # Filter the recommendations for the specific user ID from userRecs
    user_recommendationspurchase_Play = userRecs_purchase_play.filter(col("ID") == user_ID)
    
    # Explode the recommendations column to work with each recommendation separately
    user_recommendationspurchase_Play_exploded = user_recommendationspurchase_Play.select("ID", explode("recommendations").alias("recommendation"))
    
    # Join the exploded recommendations with the distinct games DataFrame
    # Use the game_ID from the recommendations and the distinct_games_df DataFrame to join
    recommended_gamespurchase_Play = user_recommendationspurchase_Play_exploded.join(distinct_games_df,
                                                          user_recommendationspurchase_Play_exploded["recommendation.game_ID"] == distinct_games_df["game_ID"],
                                                          how="inner")\
                                                     .select("game_ID", "game_Name", "recommendation.rating")\
                                                     .orderBy(col("recommendation.rating").desc())
    
    # Display the recommended games with their ratings (predictions)
    recommended_gamespurchase_Play.show(truncate=False)
    
    return recommended_gamespurchase_Play

# Call the function for a specific user ID (e.g., user ID = 1)
gameRecommendationsForUserPurchasePlay(76767)

In [0]:
gameRecommendationsForUserPurchasePlay(109323647)

In [0]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Take a sample of the training data
# Choose a fraction of the data you want to sample (e.g., 0.1 for 10% of the data)
sample_fraction = 0.1
sample_train_purchase_play = train_purchase_play.sample(False, sample_fraction, seed=42)

# Step 2: Perform hyperparameter tuning using the sample of the training data

# Create an ALS model instance
als = ALS(userCol="ID", itemCol="game_ID", ratingCol="value", coldStartStrategy="drop", seed=42)

# Define a regression evaluator for the ALS model
evaluator_purchase_play = RegressionEvaluator(metricName="rmse", labelCol="value", predictionCol="prediction")

# Create a parameter grid for ALS model tuning with a smaller range of values
param_grid = ParamGridBuilder()\
    .addGrid(als.rank, [5, 10])\
    .addGrid(als.regParam, [0.01, 0.1])\
    .addGrid(als.alpha, [0.1, 0.5])\
    .addGrid(als.maxIter, [5, 10])\
    .build()

# Define TrainValidationSplit for model tuning
tvs_purchase_play = TrainValidationSplit()\
    .setSeed(42)\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(param_grid)\
    .setEstimator(als)\
    .setEvaluator(evaluator_purchase_play)

# Fit the TrainValidationSplit model with the sample training data
gridsearch_model_purchase_play = tvs_purchase_play.fit(sample_train_purchase_play)

# Step 3: Use the best model for making predictions and further evaluation

# Get the best model from grid search
best_model_purchase_play = gridsearch_model_purchase_play.bestModel


In [0]:
# Use the best model directly for making predictions on the full training data
# Transform the training data using the best model
full_training_predictions_purchase_play = best_model_purchase_play.transform(train_purchase_play)

# Convert the prediction column to double type, if necessary
full_training_predictions_purchase_play = full_training_predictions_purchase_play.withColumn(
    "prediction",
    full_training_predictions_purchase_play["prediction"].cast("double")
)
# Evaluate the model on the full training data using RMSE
evaluator_purchase_play.setMetricName("rmse")
full_training_rmse_purchase_play = evaluator_purchase_play.evaluate(full_training_predictions_purchase_play)
print(f"Root Mean Squared Error (RMSE) on full training data: {full_training_rmse_purchase_play}")

# Change the metricName of the evaluator to 'mae' to evaluate Mean Absolute Error (MAE)
evaluator_purchase_play.setMetricName("mae")

# Evaluate the model on the full training data using MAE
full_training_mae_purchase_play = evaluator_purchase_play.evaluate(full_training_predictions_purchase_play)
print(f"Mean Absolute Error (MAE) on full training data: {full_training_mae_purchase_play}")

In [0]:
# Select best model and identify the parameters

print("Parameters for the best model:")
print("Rank Parameter: %g" %best_model_purchase_play.rank)
print("RegParam Parameter: %g" %best_model_purchase_play._java_obj.parent().getRegParam())
print("MaxIter Parameter: %g" %best_model_purchase_play._java_obj.parent().getMaxIter())
print("ImplicitPrefs Parameter: %s" %best_model_purchase_play._java_obj.parent().getImplicitPrefs())

In [0]:
# Perform predictions on the test data using the best model
test_predictions_purchase_play = best_model_purchase_play.transform(test_purchase_play)

# Convert the prediction column to double type, if necessary
test_predictions_purchase_play = test_predictions_purchase_play.withColumn(
    "prediction",
    test_predictions_purchase_play["prediction"].cast("double")
)

# Evaluate the model on the test data using RMSE
evaluator_purchase_play.setMetricName("rmse")
test_rmse_pp = evaluator_purchase_play.evaluate(test_predictions_purchase_play)
print(f"Root Mean Squared Error (RMSE) on test data: {test_rmse_pp}")

# Evaluate the model on the test data using MAE
evaluator_purchase_play.setMetricName("mae")
test_mae_pp = evaluator_purchase_play.evaluate(test_predictions_purchase_play)
print(f"Mean Absolute Error (MAE) on test data: {test_mae_pp}")


In [0]:
# Transform data with the best model
bestGeneratedPurchasePlayPredictions = best_model_purchase_play.transform(test_purchase_play)

# Drop rows with null values
bestGeneratedPurchasePlayPredictions = bestGeneratedPurchasePlayPredictions.dropna()

# Order predictions in descending order based on the 'prediction' column
sorted_predictions_purchase_play = bestGeneratedPurchasePlayPredictions.orderBy("prediction", ascending=False)

# Display the top 10 sorted predictions
sorted_predictions_purchase_play.show(10, truncate=False)

In [0]:
# Display the sorted predictions
sorted_predictions_purchase_play.show(10, truncate=False)

In [0]:
# Convert to Pandas DataFrame
sorted_predictions_pandas_purchase_play = sorted_predictions_purchase_play.toPandas()

# Print the top 10 rows
print(sorted_predictions_pandas_purchase_play.head(10))

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Convert to Pandas DataFrame for plotting
sorted_predictions_pandas_purchase_play = sorted_predictions_purchase_play.toPandas()

# Plotting top 10 predictions as a bar chart
sns.barplot(data=sorted_predictions_pandas_purchase_play.head(10), x='game_ID', y='prediction')
plt.xlabel('Game ID')
plt.ylabel('Prediction')
plt.title('Top 10 Predictions (Purchase Play)')
plt.show()


In [0]:
# Convert sorted_predictions to Pandas DataFrame
sorted_predictions_df_purchase_play = sorted_predictions_purchase_play.toPandas()

# Apply color coding to the table using style.background_gradient
# Limit the DataFrame to the first 20 rows using the `head()` method
styled_table_purchase_play = sorted_predictions_df_purchase_play.head(50).style.background_gradient(cmap='coolwarm')

# Display the styled table
styled_table_purchase_play

POWER BI - DASHBOARD

In [0]:
joined_df.show(truncate=False)

In [0]:
joined_df.write.mode("overwrite").csv("/FileStore/tables/steam_with_game_id.csv")

In [0]:
%sql
drop table if exists steam_with_game_id;
create table steam_with_game_id
using csv
options (path "dbfs:/FileStore/tables/steam_with_game_id.csv", header "False" , inferSchema "True")

In [0]:
%sql
SHOW TABLES

In [0]:
%sql
SELECT * FROM steam_with_game_id

In [0]:
# Remove all files in the specified directory
dbutils.fs.rm("/FileStore/tables/", recurse=True)