In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 2
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Idle Timeout: 2880
Session ID: 060ca9c4-a6fd-4ee0-98f8-d7fd0b7055d0
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 060ca9c4-a6fd-4ee0-98f8-d7fd0b7055d0 to get into ready status...
Session 060ca9c4-a6fd-4ee0-98f8-d7fd0b7055d0 ha

In [2]:
from awsglue.dynamicframe import DynamicFrame
import boto3 as boto
import json
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime




In [3]:
client = boto.client('s3')




In [4]:
file_contents = client.get_object(Bucket='aws-glue-sandy-demo', Key='config.json')['Body'].read().decode('utf-8')




In [5]:
config = json.loads(file_contents)




In [6]:
input_path = config['input_path']
output_path = config['output_path']
processed_file_name = config['processed_file_name']




In [7]:
# Extracting the S3 File 'netflix_dataset.csv' using GlueContext 
dyf = glueContext.create_dynamic_frame_from_options(connection_type="s3", connection_options={"paths":[input_path]}, format="csv", format_options={"withHeader":True})




In [8]:
# Converting DynamicFrame to PySpark DataFrame for further procesing of transformations
df = dyf.toDF()



In [9]:
# Applying Schema casting for all columns to required datatypes
df = df.withColumn('index', col('index').cast(IntegerType()))\
.withColumn('id', col('id').cast(StringType()))\
.withColumn('title', col('title').cast(StringType()))\
.withColumn('type', col('type').cast(StringType()))\
.withColumn('description', col('description').cast(StringType()))\
.withColumn('release_year', col('release_year').cast(IntegerType()))\
.withColumn('age_certification', col('age_certification').cast(StringType()))\
.withColumn('runtime', col('runtime').cast(IntegerType()))\
.withColumn('imdb_id', col('imdb_id').cast(StringType()))\
.withColumn('imdb_score', col('imdb_score').cast(FloatType()))\
.withColumn('imdb_votes', col('imdb_votes').cast(IntegerType()))




In [10]:
# Filtering only the MOVIE type and movie's release year to be greater than 2000
df = df.filter('type = "MOVIE" and release_year >= 2000')




In [11]:
# Adding Processsed Timestamp and Processed File name columns
date_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
df = df.withColumn('processed_timestamp',lit(date_time)).withColumn('processed_filename',lit(processed_file_name))




In [12]:
# Case when on Whether the Movie is children friendly or not - Binary Valued
df = df.select('*')\
    .withColumn('children_friendly',\
                when((col('age_certification') == 'PG') | (col('age_certification') == 'R')| (col('age_certification') == ''), True).otherwise(False))




In [15]:
# Droppinng unwanted columns from the dataframe
df = df.drop('id','index','age_certification','type','description','imdb_id')




In [16]:
# Adding Monotonically Unique Increasing ID value of numbers
df = df.withColumn('mono_id',monotonically_increasing_id())  




In [17]:
# Arranging the column to be in the below order before writing to S3
select_list = ['mono_id','title','release_year','children_friendly','runtime','imdb_score','imdb_votes','processed_timestamp','processed_filename']
df = df.select(select_list) 




In [19]:
# Converting DataFrame to Dynamic Frame for writing to S3
dyf = DynamicFrame.fromDF(df, glueContext)
# glueContext.write_dynamic_frame_from_options(frame = dyf, connection_type= 's3', connection_options={'path':output_path}, format='parquet')




In [21]:
# Writing Dynamic Frame to S3 and exposing the S3 Files to Athena as Table `netflix_movies` under Database `netflix_stats_db` via AWS Glue Data Catalog
s3output = glueContext.getSink(
  path=output_path,
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=['release_year'],
  compression="snappy",
  enableUpdateCatalog=True
)
s3output.setCatalogInfo(
  catalogDatabase="netflix_stats_db", catalogTableName="netflix_movies"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyf)

<awsglue.dynamicframe.DynamicFrame object at 0x7fd33c709690>
