# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: 974bbedc-5bf8-4157-bd24-6da59471b02c
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
Waiting for session 974bbedc-5bf8-4157-bd24-6da59471b02c to get into ready status...
Session 974bbedc-5bf8-4157-bd24-6da59471b02c has been created.



In [2]:
spark.sql("create database if not exists movie_db location 's3://z23747723-fau-cloud-computing-project/moviedb/'")

DataFrame[]


In [5]:
# %%sql
# drop database movie_db

In [3]:
input_path = 's3://z23747723-fau-cloud-computing-project/datasets/movies_metadata.csv'
df = spark.read.format("csv").option("header", "true").load(input_path)
# df.show(truncate=False)




In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import ast


def extract_genre_names(genre_string):
    try:
        genres_list = ast.literal_eval(genre_string)
        genres_names = ', '.join([genre['name'] for genre in genres_list])
        return f"{genres_names}"
    except Exception as e:
        return None

extract_genre_names_udf = udf(extract_genre_names, StringType())

df_with_formatted_genres = df.withColumn("formatted_genres", extract_genre_names_udf(col("genres")))

df_with_formatted_genres.select("formatted_genres").show(truncate=False)





+--------------------------------+
|formatted_genres                |
+--------------------------------+
|Animation, Comedy, Family       |
|Adventure, Fantasy, Family      |
|Romance, Comedy                 |
|Comedy, Drama, Romance          |
|Comedy                          |
|Action, Crime, Drama, Thriller  |
|Comedy, Romance                 |
|Action, Adventure, Drama, Family|
|Action, Adventure, Thriller     |
|Adventure, Action, Thriller     |
|Comedy, Drama, Romance          |
|Comedy, Horror                  |
|Family, Animation, Adventure    |
|History, Drama                  |
|Action, Adventure               |
|Drama, Crime                    |
|Drama, Romance                  |
|Crime, Comedy                   |
|Crime, Comedy, Adventure        |
|Action, Comedy, Crime           |
+--------------------------------+
only showing top 20 rows


In [26]:
df_with_formatted_genres.show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------------------------------------------------------------------------------------------------------+--------------------------------------------+-----+---------+-----------------+------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------------------+-----------------------------------------------------------------------------------------------

In [5]:
df_with_formatted_genres.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [13]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import DateType, IntegerType, FloatType, DecimalType
df_with_formatted_genres = df_with_formatted_genres \
    .withColumn("id", col("id").cast(IntegerType())) \
    .withColumn("release_date", to_date(col("release_date"), "yyyy-MM-dd")) \
    .withColumn("vote_count", col("vote_count").cast(IntegerType())) \
    .withColumn("Budget", col("Budget").cast(DecimalType(38, 2))) \
    .withColumn("Popularity", col("Popularity").cast(FloatType())) \
    .withColumn("Revenue", col("Revenue").cast(DecimalType(38, 2))) \
    .withColumn("Runtime", col("Runtime").cast(FloatType())) \
    .withColumn("Vote Average", col("Vote Average").cast(FloatType()))

# Show the updated DataFrame schema to confirm the changes
df_with_formatted_genres.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- Budget: decimal(38,2) (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- Popularity: float (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- Revenue: decimal(38,2) (nullable = true)
 |-- Runtime: float (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count:

In [14]:
df_with_formatted_genres.write \
  .mode("overwrite") \
  .format("parquet") \
  .option("path", "s3://z23747723-fau-cloud-computing-project/moviedb/") \
  .saveAsTable("movie_db.movies_metadata")


