# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [13]:
# Do all imports and installs here
!pip install boto3
!pip install numpy
!pip install pandas
!pip install plotly
!pip install psycopg2

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Collecting plotly
  Downloading plotly-5.6.0-py2.py3-none-any.whl (27.7 MB)
[K     |████████████████████████████████| 27.7 MB 261 kB/s eta 0:00:01
Installing collected packages: plotly
Successfully installed plotly-5.6.0


In [1]:
import boto3
import pandas as pd
import configparser
import sys
import os
import plotly.express as px

from sql_queries import *
import psycopg2

### Step 1: Scope the Project and Gather Data

#### Scope 
The project aims at designing the ETL pipeline for processing the data that can later be used for creating an Anime Recommendataion System.

#### Anime Recommendation Dataset 
For this project, we are using a modified version of the dataset from kaggle :[Anime Recommendation Database 2020](https://www.kaggle.com/hernan4444/anime-recommendation-database-2020)

The dataset is stored in AWS S3. And it consists of 4 csv files :
* anime.csv : Information of anime.
* animelist.csv : List of all animes register by the user with the respective score, watching status and numbers of episodes watched. 
* watching_status.csv : Description of every possible status of the column: "watching_status" in animelist.csv.
* anime_with_synopsis.csv : CSV with synopsis of anime 

In [2]:
#Reading in the configuration file
config = configparser.ConfigParser()
config.read('credentials.cfg')

AWS_ACCESS_KEY_ID=config['S3']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY=config['S3']['AWS_SECRET_ACCESS_KEY']

In [3]:
#Starting the aws session
session = boto3.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
  )

s3_resource = session.resource('s3')

In [4]:
#S3 helper function : to download the dataset
def downloadDirectoryFroms3(bucketName, remoteDirectoryName):
    files=[]
    
    bucket = s3_resource.Bucket(bucketName) 
    for obj in bucket.objects.filter(Prefix = remoteDirectoryName):
        if not os.path.exists(os.path.dirname(obj.key)):
            os.makedirs(os.path.dirname(obj.key))
        bucket.download_file(obj.key, obj.key) 
        files.append(obj.key)
    
    return files

In [5]:
datasetfiles = downloadDirectoryFroms3('anime-recommendation-dataset', 'Anime_Recommendation_Dataset')
print(datasetfiles)

['Anime_Recommendation_Dataset/anime.csv', 'Anime_Recommendation_Dataset/anime_with_synopsis.csv', 'Anime_Recommendation_Dataset/animelist.csv', 'Anime_Recommendation_Dataset/watching_status.csv']


In [32]:
# Reading in the data
anime_data_org = pd.read_csv('./Anime_Recommendation_Dataset/anime.csv')
anime_watching_status_org = pd.read_csv('./Anime_Recommendation_Dataset/watching_status.csv')
anime_with_synposis_org = pd.read_csv('./Anime_Recommendation_Dataset/anime_with_synopsis.csv')
animelist_org = pd.read_csv('./Anime_Recommendation_Dataset/animelist.csv')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [29]:
os.makedirs('./processed_output',exist_ok=True)

#### anime.csv

In [19]:
anime_data_org.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 17562 entries, 0 to 17561
Data columns (total 36 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   Unnamed: 0     17562 non-null  int64 
 1   MAL_ID         17562 non-null  int64 
 2   Name           17562 non-null  object
 3   Score          17562 non-null  object
 4   Genres         17562 non-null  object
 5   English name   17562 non-null  object
 6   Japanese name  17562 non-null  object
 7   Type           17562 non-null  object
 8   Episodes       17562 non-null  object
 9   Aired          17562 non-null  object
 10  Premiered      17562 non-null  object
 11  Producers      17562 non-null  object
 12  Licensors      17562 non-null  object
 13  Studios        17562 non-null  object
 14  Source         17562 non-null  object
 15  Duration       17562 non-null  object
 16  Rating         17562 non-null  object
 17  Ranked         17562 non-null  object
 18  Popularity     17562 non-n

In [33]:
#checking the size of the dataset
print("Size(Number of rows) of anime dataset : ",len(anime_data_org))

#checking for duplicates
print("Number of duplicate rows found : ",len(anime_data_org[anime_data_org.duplicated()==True]))

Size(Number of rows) of anime dataset :  17562
Number of duplicate rows found :  0


In [64]:
#listing out the columns we are interested in
interested_columns_anime = ['MAL_ID','Name','Score','Genres','Type','Episodes','Aired','Licensors',
                            'Premiered','Studios','Duration','Rating','Ranked','Popularity','Score-10',
                            'Score-9','Score-8','Score-7','Score-6','Score-5','Score-4','Score-3','Score-2','Score-1']

#dropping the other columns
anime_data_org = anime_data_org[interested_columns_anime]

#Renaming a column : for proper understanding
anime_data_org.rename(columns = {'Rating':'Audience_Age_Rating'}, inplace = True)

In [65]:
anime_data_org.head(3)

Unnamed: 0,MAL_ID,Name,Score,Genres,Type,Episodes,Aired,Licensors,Premiered,Studios,...,Score-10,Score-9,Score-8,Score-7,Score-6,Score-5,Score-4,Score-3,Score-2,Score-1
0,1,Cowboy Bebop,8.78,"Action, Adventure, Comedy, Drama, Sci-Fi, Space",TV,26,"Apr 3, 1998 to Apr 24, 1999","Funimation, Bandai Entertainment",Spring 1998,Sunrise,...,229170.0,182126.0,131625.0,62330.0,20688.0,8904.0,3184.0,1357.0,741.0,1580.0
1,5,Cowboy Bebop: Tengoku no Tobira,8.39,"Action, Drama, Mystery, Sci-Fi, Space",Movie,1,"Sep 1, 2001",Sony Pictures Entertainment,Unknown,Bones,...,30043.0,49201.0,49505.0,22632.0,5805.0,1877.0,577.0,221.0,109.0,379.0
2,6,Trigun,8.24,"Action, Sci-Fi, Adventure, Comedy, Drama, Shounen",TV,26,"Apr 1, 1998 to Sep 30, 1998","Funimation, Geneon Entertainment USA",Spring 1998,Madhouse,...,50229.0,75651.0,86142.0,49432.0,15376.0,5838.0,1965.0,664.0,316.0,533.0


In [66]:
#listing out properties of the columns (Null values and unique values)
for col in anime_data_org.columns:
    print("Col : ",col," ----- Unique Values Count : ",len(anime_data_org[col].unique())," ----- Null Values : ",anime_data_org[col].isna().sum())

Col :  MAL_ID  ----- Unique Values Count :  17562  ----- Null Values :  0
Col :  Name  ----- Unique Values Count :  17558  ----- Null Values :  0
Col :  Score  ----- Unique Values Count :  533  ----- Null Values :  0
Col :  Genres  ----- Unique Values Count :  5034  ----- Null Values :  0
Col :  Type  ----- Unique Values Count :  7  ----- Null Values :  0
Col :  Episodes  ----- Unique Values Count :  201  ----- Null Values :  0
Col :  Aired  ----- Unique Values Count :  11947  ----- Null Values :  0
Col :  Licensors  ----- Unique Values Count :  231  ----- Null Values :  0
Col :  Premiered  ----- Unique Values Count :  231  ----- Null Values :  0
Col :  Studios  ----- Unique Values Count :  1090  ----- Null Values :  0
Col :  Duration  ----- Unique Values Count :  313  ----- Null Values :  0
Col :  Audience_Age_Rating  ----- Unique Values Count :  7  ----- Null Values :  0
Col :  Ranked  ----- Unique Values Count :  10490  ----- Null Values :  0
Col :  Popularity  ----- Unique Values C

In [67]:
#Visualizing the frequency of different types of anime present in the dataset
type_freq_count = {}
for animetype in anime_data_org['Type'].unique():
    type_freq_count[animetype] =  anime_data_org['Type'].value_counts()[animetype]

type_df = pd.DataFrame({'Type':type_freq_count.keys(),'Freq':type_freq_count.values()})
fig = px.bar(type_df, x='Type', y="Freq")
fig.show()

In [68]:
#saving the processed file to csv
anime_data_org.to_csv('./processed_output/anime.csv',index=False)
#deleting the dataframe to free up memory
del anime_data_org

#### anime_with_synopsis.csv

In [21]:
anime_with_synposis_org.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16214 entries, 0 to 16213
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   Unnamed: 0  16214 non-null  int64 
 1   MAL_ID      16214 non-null  int64 
 2   Name        16214 non-null  object
 3   Score       16214 non-null  object
 4   Genres      16214 non-null  object
 5   sypnopsis   16206 non-null  object
dtypes: int64(2), object(4)
memory usage: 760.2+ KB


In [37]:
#checking the size of the dataset
print("Size(Number of rows) of anime_with_synposis : ",len(anime_with_synposis_org))

#checking for duplicates
print("Number of duplicate rows found : ",len(anime_with_synposis_org[anime_with_synposis_org.duplicated()==True]))

Size(Number of rows) of anime_with_synposis :  16214
Number of duplicate rows found :  0


In [38]:
anime_with_synposis_org.head(3)

Unnamed: 0.1,Unnamed: 0,MAL_ID,Name,Score,Genres,sypnopsis
0,0,1,Cowboy Bebop,8.78,"Action, Adventure, Comedy, Drama, Sci-Fi, Space","In the year 2071, humanity has colonized sever..."
1,1,5,Cowboy Bebop: Tengoku no Tobira,8.39,"Action, Drama, Mystery, Sci-Fi, Space","other day, another bounty—such is the life of ..."
2,2,6,Trigun,8.24,"Action, Sci-Fi, Adventure, Comedy, Drama, Shounen","Vash the Stampede is the man with a $$60,000,0..."


In [57]:
#dropping a column
anime_with_synposis_org.drop(['sypnopsis'],axis=1, inplace=True)

In [58]:
#listing out properties of the columns (Null values and unique values)
for col in anime_with_synposis_org.columns:
    print("Col : ",col," ----- Unique Values Count : ",len(anime_with_synposis_org[col].unique())," ----- Null Values : ",anime_with_synposis_org[col].isna().sum())

Col :  Unnamed: 0  ----- Unique Values Count :  16214  ----- Null Values :  0
Col :  MAL_ID  ----- Unique Values Count :  16214  ----- Null Values :  0
Col :  Name  ----- Unique Values Count :  16210  ----- Null Values :  0
Col :  Score  ----- Unique Values Count :  532  ----- Null Values :  0
Col :  Genres  ----- Unique Values Count :  4857  ----- Null Values :  0


In [59]:
#Visualizing genres data
genre_freq_dict = {}

def update_dict(dictionary, key):
    if key in dictionary.keys():
        dictionary[key] = dictionary[key]+1
    else:
        dictionary[key] = 1
    return dictionary

def update_freq(dictionary, keylist):
    for i in keylist:
        dictionary = update_dict(dictionary, i)
    return dictionary

for index, row in  anime_with_synposis_org.iterrows():
    genre = row[4].split(',')
    genre_freq_dict = update_freq(genre_freq_dict, genre)

print("Total number of different genres present in the dataset : ",len(genre_freq_dict))


Total number of different genres present in the dataset :  81


In [60]:
df_temp1 = pd.DataFrame({'genres':genre_freq_dict.keys(),'freq': genre_freq_dict.values()})
fig = px.pie(df_temp1.head(20), values='freq', names='genres')
fig.show()

In [47]:
df_temp1.head()

Unnamed: 0,genres,freq
0,Action,3846
1,Adventure,1473
2,Comedy,2884
3,Drama,1801
4,Sci-Fi,2019


In [63]:
#saving the processed file to csv
anime_with_synposis_org.to_csv('./processed_output/anime_summary.csv',index=False)
#deleting the dataframe to free up memory
del anime_with_synposis_org

#### watching_status.csv

In [20]:
anime_watching_status_org.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   Unnamed: 0    5 non-null      int64 
 1   status        5 non-null      int64 
 2    description  5 non-null      object
dtypes: int64(2), object(1)
memory usage: 248.0+ bytes


In [48]:
#checking the size of the dataset
print("Size(Number of rows) of anime_watching_status : ",len(anime_watching_status_org))

#checking for duplicates
print("Number of duplicate rows found : ",len(anime_watching_status_org[anime_watching_status_org.duplicated()==True]))

Size(Number of rows) of anime_watching_status :  5
Number of duplicate rows found :  0


In [49]:
anime_watching_status_org.head()

Unnamed: 0.1,Unnamed: 0,status,description
0,0,1,Currently Watching
1,1,2,Completed
2,2,3,On Hold
3,3,4,Dropped
4,4,6,Plan to Watch


In [50]:
#listing out properties of the columns (Null values and unique values)
for col in anime_watching_status_org.columns:
    print("Col : ",col," ----- Unique Values Count : ",len(anime_watching_status_org[col].unique())," ----- Null Values : ",anime_watching_status_org[col].isna().sum())

Col :  Unnamed: 0  ----- Unique Values Count :  5  ----- Null Values :  0
Col :  status  ----- Unique Values Count :  5  ----- Null Values :  0
Col :   description  ----- Unique Values Count :  5  ----- Null Values :  0


In [62]:
#saving the processed file to csv
anime_watching_status_org.to_csv('./processed_output/anime_watching_status.csv',index=False)
#deleting the dataframe to free up memory
del anime_watching_status_org

#### animelist.csv

In [22]:
animelist_org.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8000000 entries, 0 to 7999999
Data columns (total 6 columns):
 #   Column            Dtype
---  ------            -----
 0   Unnamed: 0        int64
 1   user_id           int64
 2   anime_id          int64
 3   rating            int64
 4   watching_status   int64
 5   watched_episodes  int64
dtypes: int64(6)
memory usage: 366.2 MB


In [51]:
#checking the size of the dataset
print("Size(Number of rows) of animelist : ",len(animelist_org))

#checking for duplicates
print("Number of duplicate rows found : ",len(animelist_org[animelist_org.duplicated()==True]))

Size(Number of rows) of animelist :  8000000
Number of duplicate rows found :  0


In [52]:
animelist_org.head(3)

Unnamed: 0.1,Unnamed: 0,user_id,anime_id,rating,watching_status,watched_episodes
0,37557744,122007,28841,8,2,1
1,108104054,349795,15039,0,6,0
2,41733663,135290,3210,7,2,2


In [53]:
#listing out properties of the columns (Null values and unique values)
for col in animelist_org.columns:
    print("Col : ",col," ----- Unique Values Count : ",len(animelist_org[col].unique())," ----- Null Values : ",animelist_org[col].isna().sum())

Col :  Unnamed: 0  ----- Unique Values Count :  8000000  ----- Null Values :  0
Col :  user_id  ----- Unique Values Count :  309558  ----- Null Values :  0
Col :  anime_id  ----- Unique Values Count :  17418  ----- Null Values :  0
Col :  rating  ----- Unique Values Count :  11  ----- Null Values :  0
Col :  watching_status  ----- Unique Values Count :  7  ----- Null Values :  0
Col :  watched_episodes  ----- Unique Values Count :  1068  ----- Null Values :  0


In [61]:
#saving the processed file to csv
animelist_org.to_csv('./processed_output/animeratinglist.csv',index=False)
#deleting the dataframe to free up memory
del animelist_org

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

![Schema](./static/schema.jpg "Database Schema")

![Table Overview](./static/data_model.png "Data model")

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

Steps :

* Create tables by executing create_tables.py.

* Read in the processed data files.

* Replace the "Unknown" values with 0

* Insert data into the tables.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [3]:
# Reading in the data
anime_data_processed = pd.read_csv('./processed_output/anime.csv')
anime_watching_status_processed = pd.read_csv('./processed_output/anime_watching_status.csv')
anime_with_synposis_processed = pd.read_csv('./processed_output/anime_summary.csv')
animelist_processed = pd.read_csv('./processed_output/animeratinglist.csv')

In [4]:
conn = psycopg2.connect(
            host=config["DB"]["HOST"],
            port=config["DB"]["PORT"],
            database=config["DB"]["DATABASE"],
            user=config["DB"]["USER"],
            password=config["DB"]["PASSWORD"])
    
conn.set_session(autocommit=True)

In [5]:
curr = conn.cursor()

In [6]:
#Replace the Unknown values
anime_data_processed = anime_data_processed.replace("Unknown",0)
anime_watching_status_processed = anime_watching_status_processed.replace("Unknown",0)
animelist_processed = animelist_processed.replace("Unknown",0)

In [35]:
#Inserting data into anime_master
for index, row in anime_data_processed.iterrows():
    curr.execute(INSERT_ANIME_MASTER, [row[0], row[1], row[2], row[3]])
    
    curr.execute(INSERT_ANIME_SCORE, [row[0], float(row[-10]), float(row[-9]), float(row[-8]), float(row[-7]), float(row[-6]), float(row[-5]), float(row[-4]), float(row[-3]), float(row[-2]), float(row[-1])])
    
    curr.execute(INSERT_ANIME_META, [row[0], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], row[13]])

In [8]:
#Inserting data into anime_watching_status
for index, row in anime_watching_status_processed.iterrows():
    curr.execute(INSERT_ANIME_WATCHING_STATUS, [row[1],row[2]])

In [10]:
#Inserting data into anime_list
for index, row in animelist_processed.iterrows():
    curr.execute(INSERT_ANIME_LIST, [int(row[1]), int(row[2]), int(row[4]), int(row[5])])

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [13]:
# Perform quality checks here
curr.execute(DC_ANIME_MASTER)
if curr.rowcount < 1:
    print("No data found in table anime_master")
    
curr.execute(DC_ANIME_SCORE)
if curr.rowcount < 1:
    print("No data found in table anime_score")

curr.execute(DC_ANIME_META)
if curr.rowcount < 1:
    print("No data found in table anime_meta")

curr.execute(DC_ANIME_LIST)
if curr.rowcount < 1:
    print("No data found in table anime_list")

curr.execute(DC_ANIME_WATCHING_STATUS)
if curr.rowcount < 1:
    print("No data found in table anime_watching_status")


In [14]:
curr.close()
conn.close()

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

![data_dict](./static/data_dict.png "Data Dict")

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

> In this project, we have used the following tools and technologies:
> - PostgreSQL
> - Pandas
> - Python3

> The following set of tools works fine with the dataset of this size (max dataset of 8M rows) and for the requirement of few users.

---

* Propose how often the data should be updated and why.

> Ideally, the data should be updated daily. Assuming we have a system in place which records the daily user activities (which user is watching which anime), we can schedule using airflow to collect the data and process it using the ETL pipeline. This data can be further used to train & update the recommendation algorithm.

---

* Write a description of how you would approach the problem differently under the following scenarios:
 
 * The data was increased by 100x.
 >  Use of Spark to process the data efficiently in a distributed way e.g. with EMR.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 > Using Airflow and creating a DAG that performs the logic of the described pipeline.
 
 * The database needed to be accessed by 100+ people.
 > Using RedShift to have the data stored in a way that it can efficiently & concurrently be accessed by many people.