## Creating a AWS Session using boto3 with an IAM user

In [43]:
import boto3

In [44]:
console = boto3.session.Session(profile_name='iam_user_for_projects')

# I configured the iam user through aws cli in local machine. so I need not to give credentials here.

## Creating a s3 buckets to store the raw dataset

In [3]:
# creating the s3 client

s3 = console.client(service_name='s3',region_name='ap-south-1')

In [9]:
# creating the s3 bucket for the raw csv files

location_name = "ap-south-1"
bucket_name = "raw-csv-from-jupyter"
location = {'LocationConstraint': location_name}

try:
    bucket = s3.create_bucket(Bucket=bucket_name,CreateBucketConfiguration=location)
    print("Bucket has been created")

except Exception as e:
    print(e)

Bucket has been created


In [40]:
# creating the s3 bucket for the raw json files

location_name = "ap-south-1"
bucket_name = "raw-json-from-jupyter"
location = {'LocationConstraint': location_name}

try:
    bucket = s3.create_bucket(Bucket=bucket_name,CreateBucketConfiguration=location)
    print("Bucket has been created")

except Exception as e:
    print(e)

Bucket has been created


In [196]:
for i in s3.list_buckets().get('Buckets'):
    print(i['Name'])

raw-csv-from-jupyter
raw-json-from-jupyter


In [30]:
# uploading the raw csv files to the s3 bucket

s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\CAvideos.csv)', Bucket='raw-csv-from-jupyter', Key='CAvideos.csv')
s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\DEvideos.csv)', Bucket='raw-csv-from-jupyter', Key='DEvideos.csv')
s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\FRvideos.csv)', Bucket='raw-csv-from-jupyter', Key='FRvideos.csv')
s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\GBvideos.csv)', Bucket='raw-csv-from-jupyter', Key='GBvideos.csv')
s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\INvideos.csv)', Bucket='raw-csv-from-jupyter', Key='INvideos.csv')
s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\JPvideos.csv)', Bucket='raw-csv-from-jupyter', Key='JPvideos.csv')
s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\KRvideos.csv)', Bucket='raw-csv-from-jupyter', Key='KRvideos.csv')
s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\MXvideos.csv)', Bucket='raw-csv-from-jupyter', Key='MXvideos.csv')
s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\RUvideos.csv)', Bucket='raw-csv-from-jupyter', Key='RUvideos.csv')
s3.upload_file(Filename=r'(C:\Users\BALA\Youtube ETL Project\csv_files\USvideos.csv)', Bucket='raw-csv-from-jupyter', Key='USvideos.csv')
print('Successfully Uploaded')

Successfully Uploaded


In [33]:
# uploading the raw json files to the s3 bucket

s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\CA_category_id.json'), Bucket='raw-json-from-jupyter', Key='CA_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\DE_category_id.json'), Bucket='raw-json-from-jupyter', Key='DE_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\FR_category_id.json'), Bucket='raw-json-from-jupyter', Key='FR_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\GB_category_id.json'), Bucket='raw-json-from-jupyter', Key='GB_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\IN_category_id.json'), Bucket='raw-json-from-jupyter', Key='IN_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\JP_category_id.json'), Bucket='raw-json-from-jupyter', Key='JP_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\KR_category_id.json'), Bucket='raw-json-from-jupyter', Key='KR_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\MX_category_id.json'), Bucket='raw-json-from-jupyter', Key='MX_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\RU_category_id.json'), Bucket='raw-json-from-jupyter', Key='RU_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\US_category_id.json'), Bucket='raw-json-from-jupyter', Key='US_category_id.json')
print('Successfully uploaded')

Successfully Uploaded


In [200]:
for i in s3.list_objects(Bucket='raw-json-from-jupyter')['Contents']:
    print(i['Key'])

CA_category_id.json
DE_category_id.json
FR_category_id.json
GB_category_id.json
IN_category_id.json
JP_category_id.json
KR_category_id.json
MX_category_id.json
RU_category_id.json
US_category_id.json


In [201]:
for i in s3.list_objects(Bucket='raw-csv-from-jupyter')['Contents']:
    print(i['Key'])

CAvideos.csv
DEvideos.csv
FRvideos.csv
GBvideos.csv
INvideos.csv
JPvideos.csv
KRvideos.csv
MXvideos.csv
RUvideos.csv
USvideos.csv


# Creating glue crawler to investigate the raw data

## creating a role for glue and attaching required permission policies

In [13]:
# Creating the IAM client

iam = console.client(service_name='iam',region_name='ap-south-1')

In [67]:
# JSON file for the IAM role for glue

import json
document=json.dumps({
 "Version": "2012-10-17",
 "Statement": [
    {
      "Effect": "Allow",
      "Principal":{'Service':'glue.amazonaws.com'},
      "Action": "sts:AssumeRole"
    }
  ]
}
)

In [68]:
# Creating the IAM role with the above json file

iam.create_role(RoleName='role-for-glue',
               AssumeRolePolicyDocument=document)

{'Role': {'Path': '/',
  'RoleName': 'role-for-glue',
  'RoleId': 'AROA5AKEZQQNRMA7ZBCQL',
  'Arn': 'arn:aws:iam::894034347035:role/role-for-glue',
  'CreateDate': datetime.datetime(2022, 12, 20, 8, 51, 14, tzinfo=tzutc()),
  'AssumeRolePolicyDocument': {'Version': '2012-10-17',
   'Statement': [{'Effect': 'Allow',
     'Principal': {'Service': 'glue.amazonaws.com'},
     'Action': 'sts:AssumeRole'}]}},
 'ResponseMetadata': {'RequestId': 'cd16675f-5ea0-4686-b635-4515e236c732',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cd16675f-5ea0-4686-b635-4515e236c732',
   'content-type': 'text/xml',
   'content-length': '778',
   'date': 'Tue, 20 Dec 2022 08:51:14 GMT'},
  'RetryAttempts': 0}}

In [71]:
# Attaching the required permissio policies with the role. AmazonS3FullAccess and AWSGlueServiceRole

iam.attach_role_policy(RoleName='role-for-glue',PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess')
iam.attach_role_policy(RoleName='role-for-glue',PolicyArn='arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole')

{'ResponseMetadata': {'RequestId': '29805cd8-82dc-4c17-8b45-fef668a17f98',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '29805cd8-82dc-4c17-8b45-fef668a17f98',
   'content-type': 'text/xml',
   'content-length': '212',
   'date': 'Tue, 20 Dec 2022 09:07:05 GMT'},
  'RetryAttempts': 0}}

## Creating Glue crawler

In [4]:
# Creating Glue client

glue = console.client(service_name='glue',region_name='ap-south-1')

In [80]:
# Creating a glue crawler to populate a table on the json files.

try:  
    glue.create_crawler(Name='crawler1',
                        Role='role-for-glue',
                        DatabaseName='crawler_db',
                        Targets={'S3Targets':[{'Path':'s3://raw-json-from-jupyter/json_files/'}]})
    print('Succesfully Created')
    
except Exception as e:
    print(e)

Succesfully Created


In [82]:
# Starting the crawler

try:
    glue.start_crawler(Name='crawler1')
except Exception as e:
    print(e)

In [117]:
# Getting the Schema of the table populated on the raw json files.

glue_tables['TableList'][0]['StorageDescriptor']['Columns']

[{'Name': 'kind', 'Type': 'string'},
 {'Name': 'etag', 'Type': 'string'},
 {'Name': 'items',
  'Type': 'array<struct<kind:string,etag:string,id:string,snippet:struct<channelId:string,title:string,assignable:boolean>>>'}]

In [98]:

glue_tables = glue.get_tables(DatabaseName='crawler_db')

for table in glue_tables['TableList']:
    for column in table['StorageDescriptor']['Columns']:
        column_name = column['Name']
        comment = column.get('Comment', '')
        column_type = column['Type']
        print(column_name,comment,column_type)

kind  string
etag  string
items  array<struct<kind:string,etag:string,id:string,snippet:struct<channelId:string,title:string,assignable:boolean>>>


In [31]:
# Creating the crawler for to populate a table on raw csv files.

try:  
    glue.create_crawler(Name='crawler2',
                        Role='role-for-glue',
                        DatabaseName='crawler_db_for_csv',
                        Targets={'S3Targets':[{'Path':'s3://raw-csv-from-jupyter'}]})
    print('Succesfully Created')
    
except Exception as e:
    print(e)

Succesfully Created


In [32]:
# Starting the crawler

try:
    glue.start_crawler(Name='crawler2')
except Exception as e:
    print(e)

In [35]:
# Getting the Schema of the table on the raw csv file.

glue.get_tables(DatabaseName='crawler_db_for_csv')['TableList'][0]['StorageDescriptor']['Columns']

[{'Name': 'video_id', 'Type': 'string'},
 {'Name': 'trending_date', 'Type': 'string'},
 {'Name': 'title', 'Type': 'string'},
 {'Name': 'channel_title', 'Type': 'string'},
 {'Name': 'category_id', 'Type': 'bigint'},
 {'Name': 'publish_time', 'Type': 'string'},
 {'Name': 'tags', 'Type': 'string'},
 {'Name': 'views', 'Type': 'bigint'},
 {'Name': 'likes', 'Type': 'bigint'},
 {'Name': 'dislikes', 'Type': 'bigint'},
 {'Name': 'comment_count', 'Type': 'bigint'},
 {'Name': 'thumbnail_link', 'Type': 'string'},
 {'Name': 'comments_disabled', 'Type': 'boolean'},
 {'Name': 'ratings_disabled', 'Type': 'boolean'},
 {'Name': 'video_error_or_removed', 'Type': 'boolean'},
 {'Name': 'description', 'Type': 'string'}]

# What's about the raw data?

## What I came to know about the raw json and csv files is,The JSON files are nested ones. We can't read a nested json in columnar format. So, we need to flatten the nested json.And there is nothing much to worry about the csv files.

# What need to be done with the raw data to get the final analytical data?

## So, the plan is to merge the csv and json files based on the id columns of both the files. We have issue with the json file. So, we need to flatten the json and change the data type of the id column on json from string to integer inorder to merge with the category_id column of csv file which is in integer datatype. I'm going to use Lambda function to procees the json files and Glue to merge both the files to get the final data for analytics. 

# Creating Lambda function for processing JSON files.

In [14]:
# Creating the Lambda client

lamda = console.client(service_name='lambda',region_name='ap-south-1')

In [145]:
# This is the json file for creating the IAM role for Lambda

document=json.dumps({
 "Version": "2012-10-17",
 "Statement": [
    {
      "Effect": "Allow",
      "Principal":{'Service':'lambda.amazonaws.com'},
      "Action": "sts:AssumeRole"
    }
  ]
}
)

In [146]:
# Creating role for lambda with the above file.

iam.create_role(RoleName='role-for-lambda',
               AssumeRolePolicyDocument=document)

{'Role': {'Path': '/',
  'RoleName': 'role-for-lambda',
  'RoleId': 'AROA5AKEZQQN356KPWKNF',
  'Arn': 'arn:aws:iam::894034347035:role/role-for-lambda',
  'CreateDate': datetime.datetime(2022, 12, 21, 6, 59, 25, tzinfo=tzutc()),
  'AssumeRolePolicyDocument': {'Version': '2012-10-17',
   'Statement': [{'Effect': 'Allow',
     'Principal': {'Service': 'lambda.amazonaws.com'},
     'Action': 'sts:AssumeRole'}]}},
 'ResponseMetadata': {'RequestId': '211f8c81-8df2-412c-be1c-0def8a3bbfb5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '211f8c81-8df2-412c-be1c-0def8a3bbfb5',
   'content-type': 'text/xml',
   'content-length': '784',
   'date': 'Wed, 21 Dec 2022 06:59:25 GMT'},
  'RetryAttempts': 0}}

In [148]:
# Attaching the required policies to the role. LambdaBasicExecution policy and s3FullAccess policy

iam.attach_role_policy(RoleName='role-for-lambda',PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess')
iam.attach_role_policy(RoleName='role-for-lambda',PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole')

{'ResponseMetadata': {'RequestId': 'a7f68a2c-b61e-442e-8e27-5f59dbd1038d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a7f68a2c-b61e-442e-8e27-5f59dbd1038d',
   'content-type': 'text/xml',
   'content-length': '212',
   'date': 'Wed, 21 Dec 2022 07:10:45 GMT'},
  'RetryAttempts': 0}}

In [11]:
# Creating bucket for the lambda function script.

location_name = "ap-south-1"
bucket_name = "bucket-for-lambda-function"
location = {'LocationConstraint': location_name}

try:
    bucket = s3.create_bucket(Bucket=bucket_name,CreateBucketConfiguration=location)
    print("Bucket has been created")

except Exception as e:
    print(e)

Bucket has been created


In [6]:
# Creating s3 bucket for the cleaned json files which are processed by Lambda

location_name = "ap-south-1"
bucket_name = "cleaned-json-as-csv"
location = {'LocationConstraint': location_name}

try:
    bucket = s3.create_bucket(Bucket=bucket_name,CreateBucketConfiguration=location)
    print("Bucket has been created")

except Exception as e:
    print(e)

Bucket has been created


## Lambda Function Script 
 What i am doing here in the lambda script is, first getting the name and key of the files from the event.
 
 Using the name and key getting the object from the bucket.
 
 Through pandas read_json method I rad the json file in the object.
 
 Then I normalised the nested json and changed the data type of id column in order to merge with the csv file.bcz in csv  file category id is in integer data type.
 
 Then I written the dataframe as csv file in the cleaned json bucket using the buffer and put object method.
 
 And I put the csv file inside a folder inorder to create partitioned table.

import pandas as pd

import boto3

from io import StringIO

s3 = boto3.resource(service_name='s3',region_name='ap-south-1')

def lambda_handler(event, context):

    bucketname = event['Records'][0]['s3']['bucket']['name']
    filename = event['Records'][0]['s3']['object']['key']   

    obj = s3.Object(bucket_name=bucketname,key=filename).get()

    df = pd.read_json(obj['Body'])             # reading the json file from the object

    df = pd.json_normalize(df['items'])        # Flattening the nested json

    df['id']= df['id'].astype('int64')         # Changing the datatype of the id column

    csv_buffer = StringIO()

    df.to_csv(csv_buffer,index=0)              # Converting as csv ang putting in the buffer
    
    f_name = filename[:-17]+'/'+filename[:-5]  # Its to put the file under a folder. Its to create partitioned table.

    s3.Object(bucket_name='cleaned-json-as-csv',key=f_name).put(Body=csv_buffer.getvalue()) # putting in s3 bucket.


In [9]:
# The lambda script file need to be uploaded to the bucket in the bytes format.So converting the script in bytes format.

from io import BytesIO

with open((r'C:\Users\BALA\aws_deployment_package\lambda_function.zip'), "rb") as f:
    buf = BytesIO(f.read())

In [13]:
# Putting the script in the s3 bucket.

s3_resource = boto3.resource(service_name='s3',region_name='ap-south-1')
s3_resource.Object(bucket_name='bucket-for-lambda-function',key='lambda_zipfile').put(Body=buf.getvalue())

{'ResponseMetadata': {'RequestId': 'KFNCE0FHBG50NDD2',
  'HostId': 'NvRFxZNstsZ6Y5drbO546cHbQrvzKYCFCYs8MGHkqxGn1W4Fh0UQvv135iP24arAD7+Cv1lhkj0=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'NvRFxZNstsZ6Y5drbO546cHbQrvzKYCFCYs8MGHkqxGn1W4Fh0UQvv135iP24arAD7+Cv1lhkj0=',
   'x-amz-request-id': 'KFNCE0FHBG50NDD2',
   'date': 'Sun, 25 Dec 2022 12:51:25 GMT',
   'etag': '"656228c8c123ed9565b8ab1e539b8631"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"656228c8c123ed9565b8ab1e539b8631"'}

In [17]:
# Creating the Lambda function.

lamda.create_function(
        FunctionName='lambda-function-for-json-processing',
        Code={'S3Bucket': 'bucket-for-lambda-function',
              'S3Key': 'lambda_zipfile'},
        Role='arn:aws:iam::894034347035:role/role-for-lambda',
        Handler='lambda_function.lambda_handler',
        Runtime='python3.9'
    )

{'ResponseMetadata': {'RequestId': 'e9168ae3-7d19-4d03-b962-f883439558aa',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Sun, 25 Dec 2022 13:01:58 GMT',
   'content-type': 'application/json',
   'content-length': '1027',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'e9168ae3-7d19-4d03-b962-f883439558aa'},
  'RetryAttempts': 0},
 'FunctionName': 'lambda-function-for-json-processing',
 'FunctionArn': 'arn:aws:lambda:ap-south-1:894034347035:function:lambda-function-for-json-processing',
 'Runtime': 'python3.9',
 'Role': 'arn:aws:iam::894034347035:role/role-for-lambda',
 'Handler': 'lambda_function.lambda_handler',
 'CodeSize': 519,
 'Description': '',
 'Timeout': 3,
 'MemorySize': 128,
 'LastModified': '2022-12-25T13:01:58.861+0000',
 'CodeSha256': 'LYIH/kIs2MW67kREYPvrSIyaewBRzC2XwzASc1XVE3M=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': '748632df-7117-4424-9ab5-4ecc9921b089',
 'State': 'Pending',
 'StateReason': 'The function is being 

In [20]:
# Invoking the function.

try:
    lamda.invoke(FunctionName='lambda-function-for-json-processing')
    print('Lambda Function Successfully Invoked')
except Exception as e:
    print(e)

Lambda Function Successfully Invoked


In [99]:
# Uploading the json files again in the raw s3 bucket. Uploading the file will trigger the Lambda function.

s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\CA_category_id.json'), Bucket='raw-json-from-jupyter', Key='CA_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\DE_category_id.json'), Bucket='raw-json-from-jupyter', Key='DE_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\FR_category_id.json'), Bucket='raw-json-from-jupyter', Key='FR_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\GB_category_id.json'), Bucket='raw-json-from-jupyter', Key='GB_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\IN_category_id.json'), Bucket='raw-json-from-jupyter', Key='IN_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\JP_category_id.json'), Bucket='raw-json-from-jupyter', Key='JP_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\KR_category_id.json'), Bucket='raw-json-from-jupyter', Key='KR_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\MX_category_id.json'), Bucket='raw-json-from-jupyter', Key='MX_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\RU_category_id.json'), Bucket='raw-json-from-jupyter', Key='RU_category_id.json')
s3.upload_file(Filename=(r'C:\Users\BALA\Youtube ETL Project\json_files\US_category_id.json'), Bucket='raw-json-from-jupyter', Key='US_category_id.json')
print('Successfully uploaded')

Successfully uploaded


In [100]:
# Those json files are uploaded into the raw bucket.

for i in s3.list_objects(Bucket='raw-json-from-jupyter')['Contents']:
    print(i['Key'])

CA_category_id.json
DE_category_id.json
FR_category_id.json
GB_category_id.json
IN_category_id.json
JP_category_id.json
KR_category_id.json
MX_category_id.json
RU_category_id.json
US_category_id.json


In [25]:
# See, the cleaned json files in csv format uploaded in the mentioned path in the cleaned json s3 bucket. 

for i in s3.list_objects(Bucket='cleaned-json-as-csv')['Contents']:
    print(i['Key'])

CA/CA_category_id
DE/DE_category_id
FR/FR_category_id
GB/GB_category_id
IN/IN_category_id
JP/JP_category_id
KR/KR_category_id
MX/MX_category_id
RU/RU_category_id
US/US_category_id


## Investigating the cleaned json files through the Glue crawler. 

In [102]:
try:  
    glue.create_crawler(Name='crawler3',
                        Role='role-for-glue',
                        DatabaseName='crawler_db_for_cleanedjson',
                        Targets={'S3Targets':[{'Path':'s3://cleaned-json-as-csv'}]})
    print('Succesfully Created')
    
except Exception as e:
    print(e)

Succesfully Created


In [103]:
try:
    glue.start_crawler(Name='crawler3')
except Exception as e:
    print(e)

## You can see the flattened json in the csv format in the below cell. Now we can access all the columns. And also you can see the data type of id column is bigint and also category id column of csv file in bigint datatype.

In [105]:
glue.get_tables(DatabaseName='crawler_db_for_cleanedjson')['TableList'][0]['StorageDescriptor']['Columns']

[{'Name': 'kind', 'Type': 'string'},
 {'Name': 'etag', 'Type': 'string'},
 {'Name': 'id', 'Type': 'bigint'},
 {'Name': 'snippet.channelid', 'Type': 'string'},
 {'Name': 'snippet.title', 'Type': 'string'},
 {'Name': 'snippet.assignable', 'Type': 'boolean'}]

In [106]:
glue.get_tables(DatabaseName='crawler_db_for_csv')['TableList'][0]['StorageDescriptor']['Columns']

[{'Name': 'video_id', 'Type': 'string'},
 {'Name': 'trending_date', 'Type': 'string'},
 {'Name': 'title', 'Type': 'string'},
 {'Name': 'channel_title', 'Type': 'string'},
 {'Name': 'category_id', 'Type': 'bigint'},
 {'Name': 'publish_time', 'Type': 'string'},
 {'Name': 'tags', 'Type': 'string'},
 {'Name': 'views', 'Type': 'bigint'},
 {'Name': 'likes', 'Type': 'bigint'},
 {'Name': 'dislikes', 'Type': 'bigint'},
 {'Name': 'comment_count', 'Type': 'bigint'},
 {'Name': 'thumbnail_link', 'Type': 'string'},
 {'Name': 'comments_disabled', 'Type': 'boolean'},
 {'Name': 'ratings_disabled', 'Type': 'boolean'},
 {'Name': 'video_error_or_removed', 'Type': 'boolean'},
 {'Name': 'description', 'Type': 'string'}]

# Glue job for creating the final data for analytics

In [30]:
# Creating the bucket of the glue script.

location_name = "ap-south-1"
bucket_name = "bucket-for-glue-script"
location = {'LocationConstraint': location_name}

try:
    bucket = s3.create_bucket(Bucket=bucket_name,CreateBucketConfiguration=location)
    print("Bucket has been created")

except Exception as e:
    print(e)

Bucket has been created


In [33]:
# Creating the s3 bucket for the final processed data by glue

location_name = "ap-south-1"
bucket_name = "analytical-data-bucket"
location = {'LocationConstraint': location_name}

try:
    bucket = s3.create_bucket(Bucket=bucket_name,CreateBucketConfiguration=location)
    print("Bucket has been created")

except Exception as e:
    print(e)

Bucket has been created


## Glue script
## The idea is, we have the glue catelogs for both csv and cleaned json files. Here i'm going to create dynamic frames from the catelogs. And merging them with the Join method based on the id and category_id columns of both the frames.Then I write the joined frame in csv format in the final s3 bucket with the partition of regions. so that we can do analysis easily within the region partitions.

 import sys

 from awsglue.transforms import *

 from awsglue.utils import getResolvedOptions

 from pyspark.context import SparkContext

 from awsglue.context import GlueContext

 from awsglue.job import Job
 

 args = getResolvedOptions(sys.argv, ['JOB_NAME'])

 sc = SparkContext()

 glueContext = GlueContext(sc)

 spark = glueContext.spark_session

 job = Job(glueContext)

 job.init(args['JOB_NAME'], args)
 
#creating dynamic frame from the json file catelog. 

 cleaned_json_frame = glueContext.create_dynamic_frame.from_catalog ( database = "crawler_db_for_cleanedjson",

                                                                table_name="cleaned_json_as_csv",
                                                                
                                                                transformation_ctx="cleaned_json_frame")

#creating dynamic frame from the csv file catelog. 

 raw_csv_frame = glueContext.create_dynamic_frame.from_catalog ( database = "crawler_db_for_csv",

                                                                table_name="raw_csv_from_jupyter",
                                                                
                                                                transformation_ctx="raw_csv_frame")
#Joining both the dynamic frames based on the id columns.

 joined_frame = join.apply(frame1 = cleaned_json_frame,

                        frame2 = raw_csv_frame,
                        
                        keys1 = ['category_id'],
                        
                        keys2 = ['id']
                        
                        transformation_ctx="joined_frame")
                        
#writing the joined frame into the s3 bucket with the partitions.

 S3_target = glueContext.getSink(path="s3://analytical-data-bucket",

                                connection_type="s3",
                                
                                updateBehavior="LOG",
                                
                                partitionKeys=["partition_0"],
                                
                                enableUpdateCatalog=True,
                                
                                transformation_ctx="S3_target")

 S3_target.setFormat("csv")

 S3_target.writeFrame(joined_frame)


job.commit()

In [91]:
# uploading the glue script to the s3 bucket

try:
    s3.upload_file(Filename=(r'C:\Users\BALA\aws_deployment_package\spark_script_for_glue.py'),
                   Bucket='bucket-for-glue-script', Key='Scripts/spark_script_for_glue.py')
    print('Uploaded Successfully')
except Exception as e:
    print(e)

Uploaded Successfully


In [92]:
for i in s3.list_objects(Bucket='bucket-for-glue-script')['Contents']:
    print(i['Key'])

Scripts/spark_script_for_glue.py


In [86]:
# Creating the glue job

try:
    glue.create_job(Name='glue-job-for-join',
                    Role='arn:aws:iam::894034347035:role/role-for-glue',
                    Command={'Name':'glueetl',
                        'ScriptLocation':'s3://bucket-for-glue-script/Scripts/spark_script_for_glue'
                            'PythonVersion': '3'},
                    MaxRetries=0,
                    Timeout=2880,
                    GlueVersion='3.0',
                    NumberOfWorkers=2,
                    WorkerType='G.1X')

    print('Successfully Job created')
except Exception as e:
    print(e)

Successfully Job created


In [87]:
# Starting the glue job.

glue.start_job_run(JobName='glue-job-for-join')

{'JobRunId': 'jr_45f79044d9317a51efb44f5c7371c3d38d64d4bfcf95200e1f99e73b37b62288',
 'ResponseMetadata': {'RequestId': '758d2ba7-413e-43a4-9657-1c4cfe036e18',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Mon, 02 Jan 2023 06:54:29 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'connection': 'keep-alive',
   'x-amzn-requestid': '758d2ba7-413e-43a4-9657-1c4cfe036e18'},
  'RetryAttempts': 0}}

In [68]:
# you can see the final processed data in the form of blocks in the analytical data bucket

for i in s3.list_objects(Bucket='analytical-data-bucket')['Contents']:
    print(i['Key'])

partition_0=CA/run-AmazonS3_node1672643785456-1-part-r-00000
partition_0=CA/run-AmazonS3_node1672643785456-1-part-r-00001
partition_0=CA/run-AmazonS3_node1672643785456-1-part-r-00002
partition_0=CA/run-AmazonS3_node1672643785456-1-part-r-00003
partition_0=DE/run-AmazonS3_node1672643785456-1-part-r-00000
partition_0=DE/run-AmazonS3_node1672643785456-1-part-r-00001
partition_0=DE/run-AmazonS3_node1672643785456-1-part-r-00002
partition_0=DE/run-AmazonS3_node1672643785456-1-part-r-00003
partition_0=FR/run-AmazonS3_node1672643785456-1-part-r-00000
partition_0=FR/run-AmazonS3_node1672643785456-1-part-r-00001
partition_0=FR/run-AmazonS3_node1672643785456-1-part-r-00002
partition_0=FR/run-AmazonS3_node1672643785456-1-part-r-00003
partition_0=GB/run-AmazonS3_node1672643785456-1-part-r-00000
partition_0=GB/run-AmazonS3_node1672643785456-1-part-r-00001
partition_0=GB/run-AmazonS3_node1672643785456-1-part-r-00002
partition_0=GB/run-AmazonS3_node1672643785456-1-part-r-00003
partition_0=IN/run-Amazo

In [5]:
# Creating a crawler for the analytical data bucket

try:  
    glue.create_crawler(Name='crawler_db_for_analytical_data',
                        Role='role-for-glue',
                        DatabaseName='analytical_db',
                        Targets={'S3Targets':[{'Path':'s3://analytical-data-bucket'}]})
    print('Succesfully Created')
    
except Exception as e:
    print(e)

Succesfully Created


In [7]:
# Starting the crawler

try:
    glue.start_crawler(Name='crawler_db_for_analytical_data')
except Exception as e:
    print(e)

## See the final data with all the required columns.

In [8]:
glue.get_tables(DatabaseName='analytical_db')['TableList'][0]['StorageDescriptor']['Columns']

[{'Name': 'ratings_disabled', 'Type': 'boolean'},
 {'Name': 'snippet.title', 'Type': 'string'},
 {'Name': 'comments_disabled', 'Type': 'boolean'},
 {'Name': 'trending_date', 'Type': 'string'},
 {'Name': 'etag', 'Type': 'string'},
 {'Name': 'video_id', 'Type': 'string'},
 {'Name': 'category_id', 'Type': 'bigint'},
 {'Name': 'thumbnail_link', 'Type': 'string'},
 {'Name': 'kind', 'Type': 'string'},
 {'Name': 'likes', 'Type': 'bigint'},
 {'Name': 'comment_count', 'Type': 'bigint'},
 {'Name': 'snippet.channelid', 'Type': 'string'},
 {'Name': 'description', 'Type': 'string'},
 {'Name': 'views', 'Type': 'bigint'},
 {'Name': 'dislikes', 'Type': 'bigint'},
 {'Name': 'snippet.assignable', 'Type': 'boolean'},
 {'Name': 'channel_title', 'Type': 'string'},
 {'Name': 'title', 'Type': 'string'},
 {'Name': '.partition_0', 'Type': 'string'},
 {'Name': 'publish_time', 'Type': 'string'},
 {'Name': 'tags', 'Type': 'string'},
 {'Name': 'id', 'Type': 'bigint'},
 {'Name': 'video_error_or_removed', 'Type': 'b

# Athena for querying the analytical data

In [9]:
# Creating athena client

try:
    athena = console.client(service_name='athena',region_name='ap-south-1')
    print('Client successfully created')
except Exception as e:
    print(e)

Client successfully created


In [12]:
# JSON file for the IAM role for Arhena

import json
document1=json.dumps({
 "Version": "2012-10-17",
 "Statement": [
    {
      "Effect": "Allow",
      "Principal":{'Service':'athena.amazonaws.com'},
      "Action": "sts:AssumeRole"
    }
  ]
}
)

In [14]:
# Creating the role

iam.create_role(RoleName='role-for-athena',
               AssumeRolePolicyDocument=document1)

{'Role': {'Path': '/',
  'RoleName': 'role-for-athena',
  'RoleId': 'AROA5AKEZQQNWZGG3ORIB',
  'Arn': 'arn:aws:iam::894034347035:role/role-for-athena',
  'CreateDate': datetime.datetime(2023, 1, 3, 6, 44, 50, tzinfo=tzutc()),
  'AssumeRolePolicyDocument': {'Version': '2012-10-17',
   'Statement': [{'Effect': 'Allow',
     'Principal': {'Service': 'athena.amazonaws.com'},
     'Action': 'sts:AssumeRole'}]}},
 'ResponseMetadata': {'RequestId': '9138d11c-ce14-424d-8620-04c4843a3841',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '9138d11c-ce14-424d-8620-04c4843a3841',
   'content-type': 'text/xml',
   'content-length': '784',
   'date': 'Tue, 03 Jan 2023 06:44:50 GMT'},
  'RetryAttempts': 0}}

In [15]:
# Attaching the required permission policy for Athena. AmazonS3FullAccess policy.

iam.attach_role_policy(RoleName='role-for-athena',PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess')

{'ResponseMetadata': {'RequestId': 'bc7f08b0-e7d5-47e0-9f60-95df1fb4193b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'bc7f08b0-e7d5-47e0-9f60-95df1fb4193b',
   'content-type': 'text/xml',
   'content-length': '212',
   'date': 'Tue, 03 Jan 2023 06:45:48 GMT'},
  'RetryAttempts': 0}}

In [20]:
# Creating a s3 bucket for the Athena query result.

location_name = "ap-south-1"
bucket_name = "bucket-athena-result"
location = {'LocationConstraint': location_name}

try:
    bucket = s3.create_bucket(Bucket=bucket_name,CreateBucketConfiguration=location)
    print("Bucket has been created")

except Exception as e:
    print(e)

Bucket has been created


# Querying the analytical db data

## Here I'm creating a query for getting the video details of top 10 most liked videos in a the region.

## The Query:

## SELECT "title", "snippet.title","likes","partition_0" FROM "analytical_db"."analytical_data_bucket" WHERE "partition_0"='CA' ORDER BY "likes" DESC limit 10;

In [57]:
# querying the table in Athena

athena.start_query_execution(QueryString='''SELECT "title", "snippet.title","likes","partition_0" FROM "analytical_db"."analytical_data_bucket" WHERE "partition_0"='CA' ORDER BY "likes" DESC limit 10;''',
                            QueryExecutionContext={'Database':'analytical_db'},
                             ResultConfiguration={'OutputLocation':'s3://bucket-athena-result'})

{'QueryExecutionId': '4bcf5341-9f53-4367-940b-1dfa3620a0d7',
 'ResponseMetadata': {'RequestId': 'bafb3bb3-57cc-4882-a5fe-548b56d0a4ae',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 04 Jan 2023 03:54:42 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'bafb3bb3-57cc-4882-a5fe-548b56d0a4ae'},
  'RetryAttempts': 0}}

In [58]:
# getting the query result with the QueryExecutionId.

a = athena.get_query_results(QueryExecutionId='4bcf5341-9f53-4367-940b-1dfa3620a0d7')['ResultSet']['Rows']
for i in a:
    print(i['Data'])

[{'VarCharValue': 'title'}, {'VarCharValue': 'snippet.title'}, {'VarCharValue': 'likes'}, {'VarCharValue': 'partition_0'}]
[{'VarCharValue': ' KoreaConnect with BTS:http://www.ibighit.com http://twitter.com/BTS_bighit http://twitter.com/BTS_twt https://www.facebook.com/ibighit/http://www.facebook.com/bangtan.officialhttp://instagram.com/BTS.bighitofficialhttp://weibo.com/BTSbighitBU content certified by Big Hit Entertainment"'}, {'VarCharValue': 'Music'}, {'VarCharValue': '5150839'}, {'VarCharValue': 'CA'}]
[{'VarCharValue': ' KoreaConnect with BTS:http://www.ibighit.com http://twitter.com/BTS_bighit http://twitter.com/BTS_twt https://www.facebook.com/ibighit/http://www.facebook.com/bangtan.officialhttp://instagram.com/BTS.bighitofficialhttp://weibo.com/BTSbighitBU content certified by Big Hit Entertainment"'}, {'VarCharValue': 'Music'}, {'VarCharValue': '5053338'}, {'VarCharValue': 'CA'}]
[{'VarCharValue': ' KoreaConnect with BTS:http://www.ibighit.com http://twitter.com/BTS_bighit ht