# AWS Glue Studio Notebook
This notebook is used for reading JSON data from an S3 bucket, flattening the nested schema, and saving it as a Parquet file.


#### Optional: Run this cell to see available notebook commands ("magics").


####  Enviroment Setup



In [6]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5
%connections "news-connect"



Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.2 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Connections to be included:
news-connect


### Libaries 

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StructType, StructField, StringType, DateType,IntegerType, LongType
from pyspark.sql.functions import col, monotonically_increasing_id, row_number
from pyspark.sql.window import Window




Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: 5537d7f8-facc-41ea-8aff-b6c1adfb816a
Applying the following default arguments:
--glue_kernel_version 1.0.2
--enable-glue-datacatalog true
Waiting for session 5537d7f8-facc-41ea-8aff-b6c1adfb816a to get into ready status...
Session 5537d7f8-facc-41ea-8aff-b6c1adfb816a has been created.



#### Initiazling context


In [2]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)





### Fetching current Date 

In [3]:
current_date = datetime.now().strftime("%Y-%m-%d")




#### Data Loading


In [4]:
input_path = f"s3://news-etl-09-08-23/raw-data/{current_date}/all_news_{current_date}.json"
df = spark.read.option("multiline", "true").option("inferschema", "true").json(input_path)
df.printSchema()                                                                              


root
 |-- author: string (nullable = true)
 |-- content: string (nullable = true)
 |-- description: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- source: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- urlToImage: string (nullable = true)


### Filtering

In [5]:
df_filtered = df.filter(df.author.isNotNull())




### Flattening Data

In [6]:
df_filtered = df_filtered.withColumn("source_name", col("source.name")) \
                         .withColumn("source_id", col("source.id")) \
                         .drop("source")




### Validating The Filter

In [7]:
# Count any remaining records with a null author
null_author_count = df_filtered.filter(df_filtered.author.isNull()).count()

# Print the count or raise an error if any null authors are found
if null_author_count > 0:
    raise ValueError(f"Null authors found in the data: {null_author_count}")
else:
    print("No null authors found in the data.")

No null authors found in the data.


### Creating distinct tables for authors and sources with unique IDs

In [8]:
author_table = df_filtered.select("author").distinct().withColumn("author_id", monotonically_increasing_id().cast(LongType()))
source_table = df_filtered.select("source_name").distinct().withColumn("source_id", monotonically_increasing_id().cast(LongType()))





### Renaming the key columns to avoid ambiguity

In [9]:
author_table = author_table.withColumnRenamed("author", "unique_author")
source_table = source_table.withColumnRenamed("source_name", "unique_source_name")




### Joining Back

In [10]:
# Removing the original 'source_id' and 'author_id' if they exist in df_filtered
if 'source_id' in df_filtered.columns:
    df_filtered = df_filtered.drop('source_id')
if 'author_id' in df_filtered.columns:
    df_filtered = df_filtered.drop('author_id')

# Performing the joins with author_table and source_table
df_joined = df_filtered.join(author_table, df_filtered["author"] == author_table["unique_author"], "left_outer") \
                       .join(source_table, df_filtered["source_name"] == source_table["unique_source_name"], "left_outer") \
                       .drop("unique_author") \
                       .drop("unique_source_name")

# Creating the Articles Table with associated author_id and source_id from joined table
articles_table = df_joined.select(
    monotonically_increasing_id().alias("article_id"), 
    "title", 
    "description", 
    "url", 
    "urlToImage", 
    "publishedAt", 
    "content", 
    "author_id", 
    "source_id"
)

# Show the table for verification
articles_table.show(5)


+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+------------+
|article_id|               title|         description|                 url|          urlToImage|         publishedAt|             content|   author_id|   source_id|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+------------+
|         0|Jake Paul Says Hi...|American YouTuber...|https://bleacherr...|https://media.ble...|2023-11-15T22:51:55Z|Sam Hodde/Getty I...|  8589934592|103079215104|
|         1|How to watch Tyso...|The top heavyweig...|https://www.digit...|https://www.digit...|2023-10-28T16:00:55Z|ESPN
The lineal ...|  8589934593| 68719476736|
|         2|Canelo Alvarez vs...|Saul Alvarez&apos...|https://www.marca...|https://phantom-m...|2023-10-21T04:40:04Z|Saul Alvarez's la...| 51539607552| 85899345920|
|         3

### Writing To Redshift

In [None]:
# Define Redshift connection options
redshift_connection_options = {
    "url": "jdbc:redshift://default-workgroup.505802839350.us-west-1.redshift-serverless.amazonaws.com:5439/dev",
    "database": "dev", 
    "user": "admin",  
    "password": "admin",  
    "aws_iam_role": "arn:aws:iam::505802839350:role/service-role/AWSGlueServiceRole-news"
}

# Define Redshift connection
def write_to_redshift(dataframe, table_name):
    connection_options = redshift_connection_options.copy()
    connection_options["dbtable"] = table_name
    dynamic_frame = DynamicFrame.fromDF(dataframe, glueContext, table_name)
    glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=dynamic_frame,
        catalog_connection="news-connect",
        connection_options=connection_options,
        redshift_tmp_dir="s3://redshift-temp-buck/glue-temp/"  # Hardcoded TempDir
    )

# Write the author table to Redshift
write_to_redshift(author_table, "public.author")

# Write the source table to Redshift
write_to_redshift(source_table, "public.source")

# Write the article table to Redshift
write_to_redshift(article_table, "public.article")
