In [None]:
# importing required libraries

'''
We are going to load data from MongoDB to AWS S3

'''

import boto3
import pandas as pd
from pymongo import MongoClient
import configparser
from datetime import datetime
from settings import CHANNEL_ID, MAX_RESULTS, DB_NAME, VIDEO_COLLECTION, COMMENT_COLLECTION

In [2]:
# using the configparser module to read the credentials.ini file

'''
If the file is not found, config.read will not throw an error but will return an empty list and the sections will be empty

'''

config = configparser.ConfigParser()
config.read('credentials.ini')

['credentials.ini']

In [None]:
# mongodb configuration

client = MongoClient(config['MONGODB']['uri'])
db = client[DB_NAME]

In [None]:
 # Get videos and convert to DataFrame

videos = list(db[VIDEO_COLLECTION].find()) # find() operation on the mongodb should be returning documents that we will convert into dataframe
video_df = pd.DataFrame(videos)

In [None]:
# printing a list of all columns in the dataframe

print(video_df.columns.tolist())

['_id', 'kind', 'etag', 'id', 'snippet']


In [None]:
 # Add partition columns for S3 bucket

video_df['publishedAt'] = pd.to_datetime(video_df['snippet'].apply(lambda x: x['publishedAt']))
video_df['year'] = video_df['publishedAt'].dt.year
video_df['month'] = video_df['publishedAt'].dt.month

In [None]:
# updated list of columns in the dataframe

print(video_df.columns.tolist())

['_id', 'kind', 'etag', 'id', 'snippet', 'publishedAt', 'year', 'month']


In [None]:
# total number of records in the dataframe

len(video_df)

742

In [None]:
# Upload to S3 partitioned

# setting up S3 client

s3 = boto3.client(
    's3',
    aws_access_key_id=config['AWS']['access_key'],
    aws_secret_access_key=config['AWS']['secret_key']
    )

In [None]:
# writing the transformed data to AWS S3, partitioned by year and month

for (year, month), group in video_df.groupby(['year', 'month']):
    csv_data = group.to_csv(index=False)
    s3_key = f"videos/{year}/{month}/data.csv" # writing as .csv files
    s3.put_object(
        Bucket=config['AWS']['bucket_name'],
        Key=s3_key,
        Body=csv_data
        )