#### Create Glue Context and Job

In [None]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

#### Create Spark Dataframe

In [3]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='glue-etl-from-csv-to-parquet', table_name='ufo_reports_source_csv')
df_spark = dyf.toDF()

#### Rename columns

In [4]:
df_renamed = df_spark.withColumnRenamed("Shape Reported", "shape_reported") \
    .withColumnRenamed("Colors Reported", "color_reported") \
    .withColumnRenamed("State", "state") \
    .withColumnRenamed("Time", "time")

#### Extract year and drop unnecessary columns

In [5]:
df_year_added = df_renamed.withColumn("year", F.year(F.to_timestamp(F.col("time"), "M/d/yyyy H:mm"))) \
    .drop("time") \
    .drop("City")

#### Filter out records with NULL shape_reported and color_reported values

In [6]:
df_not_null = df_year_added.filter((F.col("shape_reported").isNotNull()) & (F.col("color_reported").isNotNull()))

#### Group and aggregate occurrences of shape and color

In [7]:
shape_grouped = df_not_null.groupBy("year", "state", "shape_reported") \
    .agg(F.count("*").alias("shape_occurrence"))

color_grouped = df_not_null.groupBy("year", "state", "color_reported") \
    .agg(F.count("*").alias("color_occurrence"))

### Join shape and color dataframes

In [8]:
df_joined = shape_grouped.join(color_grouped,
                               on=["year", "state"],
                               how="inner")

#### Create window specifications

In [9]:
shape_window_spec = Window.partitionBy("year", "state").orderBy(F.col("shape_occurrence").desc())
color_window_spec = Window.partitionBy("year", "state").orderBy(F.col("color_occurrence").desc())

#### Select top occurrences of shape and color per year and state breakdown

In [10]:
final_df = df_joined.withColumn("shape_rank", F.row_number().over(shape_window_spec)) \
    .withColumn("color_rank", F.row_number().over(color_window_spec)) \
    .filter((F.col("shape_rank") == 1) & (F.col("color_rank") == 1)) \
    .select("year", "state", "shape_reported", "shape_occurrence", "color_reported", "color_occurrence") \
    .orderBy(F.col("shape_occurrence").desc(), F.col("color_occurrence").desc())

#### Create Glue Dynamic Dataframe

In [None]:
MyDynamicFrame = DynamicFrame.fromDF(final_df, glueContext, "glue_etl")

#### Write the Data

In [None]:
s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(MyDynamicFrame)

job.commit()