In [0]:
dbutils.fs.mount(
    source='wasbs://game-sales@assignment2akila.blob.core.windows.net',
    mount_point='/mnt/game-sales',
    extra_configs = {'fs.azure.account.key.assignment2akila.blob.core.windows.net': dbutils.secrets.get('assignment2scope1', 'assignment2secret1')}
)


[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
File [0;32m<command-3088483906151393>, line 1[0m
[0;32m----> 1[0m [43mdbutils[49m[38;5;241;43m.[39;49m[43mfs[49m[38;5;241;43m.[39;49m[43mmount[49m[43m([49m
[1;32m      2[0m [43m    [49m[43msource[49m[38;5;241;43m=[39;49m[38;5;124;43m'[39;49m[38;5;124;43mwasbs://game-sales@assignment2akila.blob.core.windows.net[39;49m[38;5;124;43m'[39;49m[43m,[49m
[1;32m      3[0m [43m    [49m[43mmount_point[49m[38;5;241;43m=[39;49m[38;5;124;43m'[39;49m[38;5;124;43m/mnt/game-sales[39;49m[38;5;124;43m'[39;49m[43m,[49m
[1;32m      4[0m [43m    [49m[43mextra_configs[49m[43m [49m[38;5;241;43m=[39;49m[43m [49m[43m{[49m[38;5;124;43m'[39;49m[38;5;124;43mfs.azure.account.key.assignment2akila.blob.core.windows.net[39;49m[38;5;124;43m'[39;49m[43m:[49m[43m [49m[43mdbutils

In [0]:
action = spark.read.format("csv").load("/mnt/game-sales/raw-data/vgsales.csv")
action.show()

+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
| _c0|                 _c1|     _c2| _c3|         _c4|                 _c5|     _c6|     _c7|     _c8|        _c9|        _c10|
+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
|   1|          Wii Sports|     Wii|2006|      Sports|            Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform|            Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing|            Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports|            Nintendo|   15.75|   11.01|    3.28|  

##Initial transformation##

###Fixing Schema###
**The schema is not inferred correctly in the dataset. So we will be loading in the table with a new schema**

###Setting data types###
**The relevant data types are set so that it helps transformation down the line**     
*Float is used instead of Double for the sales figures because Float consumes less memory than Double and is better for faster data manipulation and we need precision only upto two decimal places*

###Renaming columns###
**Column headings are renamed for easier understanding**     

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
new_schema = StructType([
    StructField("Rank", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Platform", StringType(), True),
    StructField("Year", StringType(), True),
    StructField("Genre", StringType(), True),
    StructField("Publisher", StringType(), True),
    StructField("North_American_Sales", FloatType(), True),
    StructField("European_Sales", FloatType(), True),
    StructField("Japanese_Sales", FloatType(), True),
    StructField("Other_Sales", FloatType(), True),
    StructField("Global_Sales", FloatType(), True),

])
action = spark.read.csv('/mnt/game-sales/raw-data/vgsales.csv', schema=new_schema, header=True)
action.show()


+----+--------------------+--------+----+------------+--------------------+--------------------+--------------+--------------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|           Publisher|North_American_Sales|European_Sales|Japanese_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+--------------------+--------------------+--------------+--------------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports|            Nintendo|               41.49|         29.02|          3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform|            Nintendo|               29.08|          3.58|          6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing|            Nintendo|               15.85|         12.88|          3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports|            Nintendo|             

##Secondary transformation##

###Replacing inconsistent entries###
**The Atari 2600 console is recorded as '2600' in the Platform column. This would be problematic for future data manipulations as its an outlier compared to other data in the column. So we are replacing it with 'Atari 2600'**

###Re-numbering the Rank###
**The 'Rank' column is repopulated after dropping the previous Rank column which is not monotonously ascending**

In [0]:
from pyspark.sql.functions import when
from pyspark.sql.functions import monotonically_increasing_id

action = action.drop('Rank')
action = action.withColumn('Rank', monotonically_increasing_id() + 1)
action = action.withColumn("Platform", when(action["Platform"] == "2600", "Atari 2600").otherwise(action["Platform"]))
action.show()

+--------------------+--------+----+------------+--------------------+--------------------+--------------+--------------+-----------+------------+----+
|                Name|Platform|Year|       Genre|           Publisher|North_American_Sales|European_Sales|Japanese_Sales|Other_Sales|Global_Sales|Rank|
+--------------------+--------+----+------------+--------------------+--------------------+--------------+--------------+-----------+------------+----+
|          Wii Sports|     Wii|2006|      Sports|            Nintendo|               41.49|         29.02|          3.77|       8.46|       82.74|   1|
|   Super Mario Bros.|     NES|1985|    Platform|            Nintendo|               29.08|          3.58|          6.81|       0.77|       40.24|   2|
|      Mario Kart Wii|     Wii|2008|      Racing|            Nintendo|               15.85|         12.88|          3.79|       3.31|       35.82|   3|
|   Wii Sports Resort|     Wii|2009|      Sports|            Nintendo|               15.

In [0]:
action.write.mode("overwrite").option("header",'true').csv("/mnt/game-sales/transformed") 

##Visualisations##

###Popularity by genre###
Now that we have cleaned the data and the games are already ranked by popularity, the count per each genre give the which genre contains the most number of popular games

In [0]:
import plotly.express as px 
action.createOrReplaceGlobalTempView("action")
spark.sql("CREATE TABLE IF NOT EXISTS workingdf AS SELECT * FROM global_temp.action") 

CountByGenre = spark.sql("SELECT Global_Sales, Name, Genre FROM workingdf") 
pandas_df = CountByGenre.toPandas() 

grouped_df = pandas_df.groupby("Genre").size().to_frame(name="count").reset_index() 
fig = px.bar(grouped_df, x="Genre", y="count", text="count")
fig.update_traces(texttemplate='%{text:.2s}', textposition='outside')
fig.show()


###Sales by Platform###
We use Spark to select global sales figure of each game and their plaform and then use Pandas to add the sales per each platform and present it in a pie chart. 

In [0]:
import plotly.express as px 
action.createOrReplaceGlobalTempView("action")
spark.sql("CREATE TABLE IF NOT EXISTS workingdf AS SELECT * FROM global_temp.action") 

SalesbyPlatform = spark.sql("SELECT Global_Sales, Platform FROM workingdf")  
grouped_df = SalesbyPlatform.toPandas().groupby("Platform")["Global_Sales"].sum().reset_index() 
fig = px.pie(grouped_df, values='Global_Sales', names='Platform', title='Distribution of Global Sales by Platform')
fig.show()

###Rank of each game###
Getting the Rank is straightforward as the dataset is properly ranked. We take the first 10 records. This is information is best conveyed in this tabular format. 

In [0]:
import plotly.express as px 
action.createOrReplaceGlobalTempView("action")
spark.sql("CREATE TABLE IF NOT EXISTS workingdf AS SELECT * FROM global_temp.action") 

Rank = spark.sql("SELECT Rank, Name FROM workingdf DESC LIMIT 10 ")
Rank.show()

+----+--------------------+
|Rank|                Name|
+----+--------------------+
|   1|          Wii Sports|
|   2|   Super Mario Bros.|
|   3|      Mario Kart Wii|
|   4|   Wii Sports Resort|
|   5|Pokemon Red/Pokem...|
|   6|              Tetris|
|   7|New Super Mario B...|
|   8|            Wii Play|
|   9|New Super Mario B...|
|  10|           Duck Hunt|
+----+--------------------+



###Further Insight###
##Top 10 game publishers by Global Sales##
We get the sum of global sales as total sales, and group total sales by publisher and take the top 10 

In [0]:
import plotly.express as px 
action.createOrReplaceGlobalTempView("action")
spark.sql("CREATE TABLE IF NOT EXISTS workingdf AS SELECT * FROM global_temp.action") 
sales_by_publisher = spark.sql("""SELECT Publisher, SUM(Global_Sales) as Total_Sales FROM workingdf GROUP BY Publisher ORDER BY Total_Sales DESC LIMIT 10""")
sales_by_publisher.show()
grouped_df = sales_by_publisher.toPandas().reset_index() 

fig = px.bar(grouped_df, x="Publisher", y="Total_Sales", title='Top 10 Game Publishers by Global Sales')
fig.show()

+--------------------+------------------+
|           Publisher|       Total_Sales|
+--------------------+------------------+
|            Nintendo|1750.0500054359436|
|     Electronic Arts|1009.5799981355667|
|          Activision|  633.739998281002|
|Sony Computer Ent...| 548.5999993979931|
|             Ubisoft| 386.2700009942055|
|Take-Two Interactive|365.05999690294266|
|                 THQ|268.18999913334846|
|Microsoft Game St...|231.46000024676323|
|                Sega|209.93000000715256|
|Konami Digital En...|199.80000069737434|
+--------------------+------------------+



##Global sales by Franchise##
Here we are selecting games by four selected franchises using a text matcher and building a dataframe with the column "Franchise" to get a visualisation of sales of each franchise, and also get a count of games in each franchise to put it in the visualisation for comparison.

In [0]:
import plotly.express as px
from pyspark.sql.functions import col, count, sum, when

# Create a temporary view for the action DataFrame
action.createOrReplaceGlobalTempView("action")

# Create the workingdf DataFrame by selecting all columns from the global_temp.action table
workingdf = spark.sql("SELECT * FROM global_temp.action")

# Apply the transformations on the workingdf DataFrame
df_transformed = workingdf.select(
    when(col("Name").like("Call of Duty%"), "Call of Duty")
    .when(col("Name").like("Grand Theft Auto%"), "Grand Theft Auto")
    .when(col("Name").like("Pokemon%"), "Pokemon")
    .when(col("Name").like("FIFA%"), "FIFA")
    .otherwise("Other").alias("Franchise"),
    col("Global_Sales")
)

# Perform aggregation on the transformed DataFrame
df_aggregated = df_transformed.filter(col("Franchise") != "Other") \
    .groupBy("Franchise") \
    .agg(count("*").alias("Game_Count"), sum("Global_Sales").alias("Total_Franchise_Sales")) \
    .orderBy("Total_Franchise_Sales", ascending=False)

# Convert the Spark DataFrame to a Pandas DataFrame for plotting
df_pandas = df_aggregated.toPandas()

# Create the Plotly visualization
fig = px.bar(df_pandas, x='Franchise', y=['Total_Franchise_Sales', 'Game_Count'], title='Total Franchise Sales')
fig.show()

##Average sales of publishers in different regions##
In this visualisation we explore the average sales of each publisher in 2016. This aggregates the game sales of the games of each publisher by first taking the average of the sales in different regions.

In [0]:
import plotly.express as px
from pyspark.sql.functions import col, count, sum, when, avg, round

# Create a temporary view for the action DataFrame
action.createOrReplaceGlobalTempView("action")

# Create the workingdf DataFrame by selecting all columns from the global_temp.action table
workingdf = spark.sql("SELECT * FROM global_temp.action")

# Transform and aggregate the data
regional_sales = workingdf.groupBy("Name", "Year", "Publisher") \
    .agg(
        sum("North_American_Sales").alias("NA_Sales"),
        sum("European_Sales").alias("EU_Sales"),
        sum("Japanese_Sales").alias("JP_Sales"),
        sum("Other_Sales").alias("Other_Sales")
    )

# Aggregate the data again
df_aggregated = regional_sales.groupBy("Publisher", "Year") \
    .agg(
        round(avg("NA_Sales"), 2).alias("Avg_NA_Sales"),
        round(avg("EU_Sales"), 2).alias("Avg_EU_Sales"),
        round(avg("JP_Sales"), 2).alias("Avg_JP_Sales"),
        round(avg("Other_Sales"), 2).alias("Avg_Other_Sales")
    ) \
    .orderBy("Year", ascending=False)

# Filter the DataFrame for the year 2016
df_2016 = df_aggregated.filter(col("Year") == 2016)

# Convert the Spark DataFrame to a Pandas DataFrame
df_pandas = df_2016.toPandas()

# Calculate the total average sales for each publisher
df_pandas['Total_Avg_Sales'] = df_pandas[['Avg_NA_Sales', 'Avg_EU_Sales', 'Avg_JP_Sales', 'Avg_Other_Sales']].sum(axis=1)

# Sort the DataFrame in ascending order of total average sales
df_pandas = df_pandas.sort_values('Total_Avg_Sales')

# Create a bar chart with Plotly
fig = px.bar(df_pandas, x='Publisher', y='Total_Avg_Sales', title='Average Sales of Publishers in 2016',
             color='Publisher', text='Total_Avg_Sales')

# Update layout to display text on top of the bars
fig.update_traces(texttemplate='%{text:.2s}', textposition='outside')

fig.show()


#