### Create glue context and job 

In [None]:
# import packages 
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


### Create spark dataframe 


In [None]:
DataCatalogtable_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="video-games-data",
    table_name="initial_dataset",
    transformation_ctx="DataCatalogtable_node1",
)

df = DataCatalogtable_node1.toDF()

### Drop duplicated and unnecessary columns 

In [None]:
df = df.dropDuplicates(['Name','Year','Genre','Platform', 'Publisher'])

df = df.withColumn("Year", df["Year"].cast(IntegerType())) \
        .drop("Name") \
        .drop("Publisher") \
        .drop("Rank")

### Filter out records with Null year, genre or platform, and year above 2015

In [None]:
df_final = df.filter((F.col("Year").isNotNull()) & (F.col("Genre").isNotNull()) & (F.col("Platform").isNotNull()) 
& (df["Year"]<=2015))    

### Group the data by year, platform and genre 

In [None]:
df_group = df_final \
.groupBy("Year","Platform","Genre") \
.agg(F.count("*").alias("Total_games"), F.round(F.sum("NA_Sales"),2).alias('North_america_sales(millions)'), \
F.round(F.sum("EU_Sales"),2).alias("Europe_sales(millions)"), F.round(F.sum("JP_Sales"),2).alias("Japan_sales(millions)"), \
F.round(F.sum("Other_Sales"),2).alias("Rest_of_world_sales(millions)"), F.round(F.sum("Global_Sales"),2).alias("Worldwide_sales(millions)")) \
.orderBy("Year")

### Create window specifications 

In [None]:
sales_window = Window.partitionBy("Year","Platform").orderBy(F.col("Worldwide_sales(millions)").desc())

### Select top genre per year and platform

In [None]:
final_df = df_group.withColumn("genre_rank",F.rank().over(sales_window)) \
                .filter(F.col("genre_rank")==1) \
                .select("Year","Platform","Genre","Total_games","North_america_sales(millions)","Europe_sales(millions)",
                        "Japan_sales(millions)","Rest_of_world_sales(millions)","Worldwide_sales(millions)") \
                .orderBy("Year")

### Create glue dynamic frame

In [None]:
glue_dynamic_frame_final = DynamicFrame.fromDF(df_final, glueContext, "glue_etl_vg_sales")

### Write data 

In [None]:
S3bucket_node2 = glueContext.getSink(
    path="s3://video-game-etl/parquet-format-data/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="S3bucket_node2",
)
S3bucket_node2.setCatalogInfo(
    catalogDatabase="video-games-data",
    catalogTableName="vg-sales-report-parquet-format",
)
S3bucket_node2.setFormat("glueparquet")
S3bucket_node2.writeFrame(glue_dynamic_frame_final)
job.commit()