# AWS Movie Analytics Pipeline

End-to-end workflow demonstration using AWS Glue, Athena, and S3.

## Steps Covered
1. **Data Ingestion**: Uploaded raw movie dataset files (`credits.csv`, `keywords.csv`, etc.) to S3 bucket `movieanalysis25/raw/`.
2. **Flattening**: Used AWS Glue PySpark scripts to parse and flatten JSON columns (e.g., `cast`, `crew`, `keywords`).
3. **Storage**: Saved flattened data as CSVs in S3 bucket `movieanalysis25/cleaned/` for downstream analytics.
4. **Athena Integration**: Created external tables in Athena to query the cleaned datasets.
5. **Analysis**: Ran SQL queries to extract insights from the transformed datasets.


In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F

# Glue job parameters
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Load raw credits CSV from S3
df = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://movieanalysis25/raw/credits.csv"], "recurse": True}
).toDF()

# Flatten 'cast' JSON array
df = df.withColumn("cast_exploded", F.explode(F.from_json(F.col("cast"), "array<struct<cast_id:int,character:string,credit_id:string,gender:int,id:int,name:string,order:int>>")))
df_flat = df.select(
    "id",
    F.col("cast_exploded.cast_id").alias("cast_id"),
    F.col("cast_exploded.character").alias("character"),
    F.col("cast_exploded.credit_id").alias("credit_id"),
    F.col("cast_exploded.gender").alias("gender"),
    F.col("cast_exploded.name").alias("name"),
    F.col("cast_exploded.order").alias("order_num")
)

# Write flattened CSV back to S3
df_flat.write.mode("overwrite").option("header", True).csv("s3://movieanalysis25/cleaned/credits/cast_csv/")

job.commit()

## Athena Table Creation

```sql
CREATE EXTERNAL TABLE movie_analysis.cast_data (
    id STRING,
    cast_id STRING,
    character STRING,
    credit_id STRING,
    gender INT,
    name STRING,
    order_num INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",",
  "quoteChar"     = "\""
)
LOCATION 's3://movieanalysis25/cleaned/credits/cast_csv/'
TBLPROPERTIES ('skip.header.line.count'='1');
```


## Example Athena Query
```sql
SELECT name, COUNT(*) AS appearances
FROM movie_analysis.cast_data
GROUP BY name
ORDER BY appearances DESC
LIMIT 10;
```
