In [0]:
all_games_highest_global_sales = sales.orderBy("NA_Sales", ascending=False).limit(20).show()

## Imports

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType
from pyspark.sql.functions import col, date_format


## Mounting

In [0]:
dbutils.fs.mount(
    source='wasbs://assignment-data@harryassignment2.blob.core.windows.net',
    mount_point='/mnt/assignment-data',
    extra_configs = {'fs.azure.account.key.harryassignment2.blob.core.windows.net': dbutils.secrets.get('harryassignmentscope', 'storageAccountKey')}
)

## Read/loading CSV

In [0]:
sales = spark.read.format("csv").option("header","true").load("/mnt/assignment-data/raw-data/sales.csv")

## Printing scheme of sales data, all strings 

In [0]:
sales.printSchema()

## Changing data types

In [0]:
sales = sales \
    .withColumn("Ranking", col("Rank").cast(IntegerType())) \
    .withColumn("Release_Year", col("Year").cast(IntegerType())) \
    .withColumn("NA_Sales", col("NA_Sales").cast(DoubleType())) \
    .withColumn("EU_Sales", col("EU_Sales").cast(DoubleType())) \
    .withColumn("JP_Sales", col("JP_Sales").cast(DoubleType())) \
    .withColumn("Other_Sales", col("Other_Sales").cast(DoubleType())) \
    .withColumn("Global_Sales", col("Global_Sales").cast(DoubleType()))

## Printing scheme with new data types

In [0]:
sales.printSchema()

## Exploring the data 

In [0]:
all_games_highest_global_sales = sales.orderBy("Global_Sales", ascending=False).limit(20).show()

In [0]:
all_games_highest_global_sales = sales.orderBy("NA_Sales", ascending=False).filter(col("Genre").contains("Sports")).limit(20).show()

In [0]:
all_games_highest_global_sales = sales.orderBy("EU_Sales", ascending=False).filter(col("Year").contains("2006")).limit(20).show()

In [0]:
all_games_highest_global_sales = sales.orderBy("JP_Sales", ascending=False).filter(col("Platform").contains("X360")).limit(20).show()

# Cleaning data

## Searching for null values

In [0]:
# Count null values in each column
null_counts = {col_name: sales.where(col(col_name).isNull()).count() for col_name in sales.columns}

# Print the count of null values for each column
for col_name, count in null_counts.items():
    print(f"Column '{col_name}' has {count} null values.")


## Found "N/A" stored in year of release for some records
### Printing the the count of records with "N/A" for each column

In [0]:
# Count "N/A" values in each column
na_counts = {col_name: sales.where(col(col_name) == "N/A").count() for col_name in sales.columns}

# Print the count of "N/A" values for each column
for col_name, count in na_counts.items():
    print(f"Column '{col_name}' has {count} 'N/A' values.")

## Removing records with "N/A" under year of release

In [0]:
sales = sales.filter(col("Year") != "N/A")

## Removing columns with "N/A" under publisher

In [0]:
sales = sales.filter(col("Publisher") != "N/A")

## Checking for N/A columns again after cleaning the data

In [0]:
# Count "N/A" values in each column
na_counts = {col_name: sales.where(col(col_name) == "N/A").count() for col_name in sales.columns}

# Print the count of "N/A" values for each column
for col_name, count in na_counts.items():
    print(f"Column '{col_name}' has {count} 'N/A' values.")

## Writing the transformed data to Storage Account 

In [0]:
sales.write.option("header",'true').csv("/mnt/assignment-data/transformed-data/sales")

In [0]:
sales.write.mode("overwrite").option("header",'true').csv("/mnt/assignment-data/transformed-data/sales")

## Visualizations

In [0]:
import plotly.express as px

In [0]:
# Read the CSV files as Spark DataFrames
sales = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/mnt/assignment-data/raw-data/sales.csv")

# Create temporary view for querying
sales.createOrReplaceTempView("sales_view")

# Execute SQL queries to aggregate data
query_result = spark.sql("""
SELECT genre,
       COUNT(*) AS genre_count
FROM sales_view
GROUP BY genre
ORDER BY genre_count DESC
""")

# Import Plotly
import plotly.express as px

# Create a Pandas DataFrame for plotting
pandas_df = query_result.toPandas()

# Create the bar chart using Plotly
fig = px.bar(pandas_df, x="genre", y="genre_count", color="genre", title="Genre Popularity")

# Display the plot using display method
display(fig)


In [0]:
# Read the CSV file as a Spark DataFrame
sales = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/mnt/assignment-data/raw-data/sales.csv")

# Create temporary view for querying
sales.createOrReplaceTempView("sales_view")

# Execute SQL queries to aggregate data
query_result = spark.sql("""
SELECT Platform,
       SUM(NA_Sales) AS NA_Sales,
       SUM(EU_Sales) AS EU_Sales,
       SUM(JP_Sales) AS JP_Sales,
       SUM(Other_Sales) AS Other_Sales,
       SUM(Global_Sales) AS Global_Sales
FROM sales_view
GROUP BY Platform
ORDER BY Global_Sales DESC
""")

# Import Plotly Express for visualisation
import plotly.express as px

# Create a Pandas DataFrame for plotting
pandas_df = query_result.toPandas()

# Create the bar chart using Plotly
fig = px.bar(pandas_df, x="Platform", y=["NA_Sales", "EU_Sales", "JP_Sales", "Other_Sales", "Global_Sales"],
             barmode="group", title="Platform Sales")

# Display the plot
display(fig)


In [0]:
# Read the CSV file as a Spark DataFrame
sales = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/mnt/assignment-data/raw-data/sales.csv")

# Create temporary view for querying
sales.createOrReplaceTempView("sales_view")

# Execute SQL query to select rank and global sales
query_result = spark.sql("""
SELECT Rank, Global_Sales
FROM sales_view
ORDER BY Rank ASC
""")

# Import Plotly Express for visualisation
import plotly.express as px

# Create a Pandas DataFrame for plotting
pandas_df = query_result.toPandas()

# Create the scatter plot using Plotly
fig = px.scatter(pandas_df, x="Rank", y="Global_Sales", title="Global Sales vs. Rank",
                 labels={"Rank": "Rank", "Global_Sales": "Global Sales"},
                 color="Global_Sales", color_continuous_scale=px.colors.sequential.Viridis)

# Customize marker size and opacity
fig.update_traces(marker=dict(size=10, opacity=0.7))

# Display the plot
display(fig)


In [0]:
# Read the CSV file as a Spark DataFrame
sales = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/mnt/assignment-data/raw-data/sales.csv")

# Create temporary view for querying
sales.createOrReplaceTempView("sales_view")

# Query to get the total global sales by genre
query_result_genre = spark.sql("""
    SELECT Genre, SUM(Global_Sales) AS Total_Global_Sales
    FROM sales_view
    GROUP BY Genre
    ORDER BY Total_Global_Sales DESC
""")

# Convert the result to a Pandas DataFrame
pandas_df_genre = query_result_genre.toPandas()

# Create the pie chart using Plotly
fig_genre = px.pie(pandas_df_genre, values='Total_Global_Sales', names='Genre', title='Distribution of Game Sales by Genre')
fig_genre.show()  # Display the plot


In [0]:
# Read the CSV file as a Spark DataFrame
sales = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/mnt/assignment-data/raw-data/sales.csv")

# Create temporary view for querying
sales.createOrReplaceTempView("sales_view")

# Query to get the year of release, global sales, and platform of games
query_result_year_sales_platform = spark.sql("""
    SELECT CAST(Year AS INT) AS Year, Global_Sales, Platform
    FROM sales_view
    WHERE Year IS NOT NULL AND Global_Sales IS NOT NULL AND Platform IS NOT NULL
    ORDER BY Year ASC
""")

# Convert the result to a Pandas DataFrame
pandas_df_year_sales_platform = query_result_year_sales_platform.toPandas()

# Create the scatter plot using Plotly with color differentiation based on platform
fig_year_sales_platform = px.scatter(pandas_df_year_sales_platform, x='Year', y='Global_Sales', 
                                     color='Platform', hover_name='Platform',
                                     title='Relationship between Year of Release and Global Sales of Games',
                                     trendline='ols')  # Trendline 
fig_year_sales_platform.show()  # Display the plot

