# Trending Youtube Video Statistics
### Data Engineering Capstone Project

#### Project Summary
The project uses a [dataset from Kaggle](https://www.kaggle.com/datasnaek/youtube-new) containing statistics for trending video on YouTube, collected daily. 

Please, download the dataset and extract it in this folder. The data should have the path 
```
project_dir/
    |- data
    |   |- countries.json
    |   |- CA_category_id.json
    |   |- Cavideos.csv
    ....
    
```

The goal of this project is to structure and persist these data on a Data warehouse. For example, a Ads company can query data to discover which are the most trending artists, or to undestand which youtube channel/artist choose for an ads campaign.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
import pandas as pd
import json
from sqlalchemy import create_engine
import datetime as dt
from sqlalchemy.orm import sessionmaker
import psycopg2
import uuid
import numpy


from model.db import *

### Step 1: Scope the Project and Gather Data

#### Scope 
I'm using data from this [dataset from Kaggle](https://www.kaggle.com/datasnaek/youtube-new). With these data I've the possibility to analyze statistics like views, likes, dislikes, comments by country, category, date and so on. I want to create a data warehouse were a business user can query data, create reports and take decisions based on data.

#### About the dataset

#### Context

YouTube (the world-famous video sharing website) maintains a list of the [top trending videos](https://www.youtube.com/feed/trending) on the platform. [According to Variety magazine](http://variety.com/2017/digital/news/youtube-2017-top-trending-videos-music-videos-1202631416/), “To determine the year’s top-trending videos, YouTube uses a combination of factors including measuring users interactions (number of views, shares, comments and likes). Note that they’re not the most-viewed videos overall for the calendar year”. Top performers on the YouTube trending list are music videos (such as the famously virile “Gangam Style”), celebrity and/or reality TV performances, and the random dude-with-a-camera viral videos that YouTube is well-known for.

This dataset is a daily record of the top trending YouTube videos.

Note that this dataset is a structurally improved version of [this dataset](https://www.kaggle.com/datasnaek/youtube).

#### Content

This dataset includes several months (and counting) of data on daily trending YouTube videos. Data is included for the US, GB, DE, CA, and FR regions (USA, Great Britain, Germany, Canada, and France, respectively), with up to 200 listed trending videos per day.

EDIT: Now includes data from RU, MX, KR, JP and IN regions (Russia, Mexico, South Korea, Japan and India respectively) over the same time period.

Each region’s data is in a separate file. Data includes the video title, channel title, publish time, tags, views, likes and dislikes, description, and comment count.

The data also includes a `category_id` field, which varies between regions. To retrieve the categories for a specific video, find it in the associated `JSON`. One such file is included for each of the five regions in the dataset.

For more information on specific columns in the dataset refer to the [column metadata](https://www.kaggle.com/datasnaek/youtube-new/data).

An example of video data (csv files).

In [None]:
# Read in the data here
df = pd.read_csv("data/CAvideos.csv")

In [None]:
df.head()

An example of json files, that describe the categories.

In [None]:
# Read in the data here
df_cat = pd.read_json("data/CA_category_id.json")

In [None]:
df_cat.head()

In [None]:
df_cat["items"].head().to_dict()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# Total rows
len(df.index)


In [None]:
# Checking null values

df.isna().sum()

# The dataset is very usable, no null values.
# The only missing values are on the description. It can be possible, not all videos have a description.

In [None]:
df[df["description"].isna()].head()

In [None]:
# I decide to replace these NaN values with an empty string, because it's more appropriate. It's a string field, not a number.
df.description = df.description.fillna('')

In [None]:
# Tag = [none]
# Substitute with empty string
df.tags.replace({"[none]": ""}, inplace=True)

In [None]:
# Checking duplicates values
df[df.duplicated()]
old = len(df.index)
df = df.drop_duplicates(["video_id", "trending_date"])
print("Duplicated items found: " + str(old - len(df.index)))

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I'm creating a data warehouse, so I structured the data model as a start schema.
![ER diagram data model](er_capstone_project.png)

**Dimensions:**
- Time: it's usually a standard dimension in a data warehouse. I'm using the time for two dimensions in the fact table: trending_date (the date when the video was trending on YouTube) and publication_date (the date when the video was published)
- Category: it's a dimension that describe the video's category.
- Country: the country where the video was trending in a specific date.  
- Channel: it describes the channel that published the video. It has only one attribute (title) because on the dataset it's the only information available, but I decided to model it as a dimension because can be useful to analyze the data on this dimension and can be developed a pipeline to add other useful data about the channels, increasing the informations available for this dimension.

The "Video" table is the fact table, where the facts are:
- view: number of views
- like: number of likes
- dislike: number of dislike
- comments_count: number of comments
- comments_disabled: True if the comments are disabled on the video
- ratings_disabled: True if the ratings are disabled for the video
- video_error_or_removed: True if the video was removed or there was some errors during video processing

Every video has also two attributes: title and descriptions.


#### 3.2 Mapping Out Data Pipelines
Let's imagine that we have data like this every month. We need a data pipeline that process the new data and inject them in the data warehouse.

The step to create and populate the data warehouse are:
1. Create and populate the "Country" dimention. This step is necessary only the first time we populate the data warehouse, because it's indipendent of the new data. For populate this table, i'm going to use the "countries.json" file, that's a mapping between every country and it's two-digits code.
2. Create and populate the "Category" dimension. In this step I'll read the json file describing the categories for each country and I'll insert the data into the category table.
3. For every csv file, read the file, pre process it (preprocessing described at step 2 of this notebook) and get data to populate video, channel and time category.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In the notebook I test the data pipeline only for a subset of files. It should be industrialized if we want to use it in production and on every file.

**4.1.1 Connection to Postgres**

In [None]:
# Complete this dictionary with yuur cluster informations
conn_data = {
        "host": "",
        "dbname": "",
        "user": "",
        "password": "",
        "port": "5439"
    }
DATABASE_URI = 'redshift+psycopg2://{user}:{password}@{host}:{port}/{dbname}'.format(**conn_data)
engine = create_engine(DATABASE_URI)
Session = sessionmaker(bind=engine)
s = Session()

In [None]:
Base.metadata.create_all(engine)

**4.1.2 Country dimension**

In [None]:
countries=[]
with open('data/countries.json') as json_file:
    data = json.load(json_file)
    for item in data:
        c = Country()
        c.country_id = item["Code"]
        c.complete_name = item["Name"]
        countries.append(c)

In [None]:
for i,c in enumerate(countries):
    if i%10==0:
        print(f"Insert n. {i}")
    s.add(c)
    s.commit()

**4.1.3 Category dimension**

In [None]:
# Test only on one json file
file = "data/CA_category_id.json"
categories=[]
with open(file) as json_file:
    data = json.load(json_file)
    for item in data["items"]:
        cat = Category()
        cat.category_id = item["id"]
        cat.etag = item["etag"].replace('"', '')
        cat.title = item["snippet"]["title"]
        categories.append(cat)

In [None]:
for i, cat in enumerate(categories):
    print(f"Inserting item n. {i}")
    s.add(cat)
    s.commit()

**Read csv and populate tables**

Consider only one csv in the notebook implementaion.

In [None]:
filename = "data/CAvideos.csv"
df = pd.read_csv(filename)

Pre processing

In [None]:
count = len(df.index)
print(f"Items: {count}")

# I decide to replace these NaN values with an empty string, because it's more appropriate. It's a string field, not a number.
df.description = df.description.fillna('')

# Checking duplicates values
df = df.drop_duplicates(["video_id", "trending_date"])
print("Duplicated items found: " + str(count - len(df.index)))

**Channels**

In [None]:
channels = df.channel_title.unique().tolist()
len(channels)

In [None]:
# Only for tests
# channels = channels[:10]

In [None]:
channels_to_insert = [Channel(channel_id=str(uuid.uuid4()),title=c) for c in channels]
for i,c in enumerate(channels_to_insert):
    if i%10==0:
        print(f"Insert n. {i}")
    s.add(c)
    s.commit()

**Time**

In [None]:
times = set()
trending_date_format = "%y.%d.%m"
df.trending_date = df.trending_date.apply(
    lambda ts: datetime.strptime(ts, trending_date_format)
)
times.update(set(df.trending_date.unique().tolist()))
pub_date_format = "%Y-%m-%dT%H:%M:%S.000Z"
df.publish_time = df.publish_time.apply(
    lambda ts: datetime.strptime(ts, pub_date_format)
)
times.update(set(df.publish_time.unique().tolist()))
len(times)

In [None]:
# Only test
# times = list(times)[:30]

In [None]:
epoch = dt.datetime(1970, 1, 1)
def unix_time_millis_str(t):
    return str((t - epoch).total_seconds() * 1000.0)

In [None]:
for i, time in enumerate(times):
    if i % 10 == 0:
        print(f"Insert n. {i}")
    time = dt.datetime.fromtimestamp(time/1000000000)
    t = Time()
    t.time_id = unix_time_millis_str(time)
    t.day = time.day
    t.month = time.month
    t.year = time.year
    t.hour = time.hour
    t.minute = time.minute
    s.add(t)
    s.commit()

**Video**

In [None]:
country = filename.split("/")[-1].split("videos")[0]
country

In [None]:
def build_fact_row(row):
    video = Video()
    video.video_id = row["video_id"]
    video.title = row["title"]
    video.description = row["short_description"]
    video.view = int(row["views"])
    video.like = int(row["likes"])
    video.dislike = int(row["dislikes"])
    video.comments_count = int(row["comment_count"])
    video.comments_disabled = bool(row["comments_disabled"])
    video.ratings_disabled = bool(row["ratings_disabled"])
    video.video_error_or_removed = bool(row["video_error_or_removed"])
    video.category_id = int(row["category_id"])
    video.trending_date = unix_time_millis_str(row["trending_date"])
    video.publication_date = unix_time_millis_str(row["publish_time"])
    video.country_id = country
    res = s.query(Channel).filter_by(title=row["channel_title"])
    try:
        video.channel_id = res[0].channel_id
    except Exception:
        video.channel_id = None
        
    
    return video

In [None]:
df['short_description'] = df['description'].str[:200]

In [None]:
# Test for one item
# v = build_fact_row(df.iloc[0])
# v.channel_id
# s.add(v)
# s.commit()

In [None]:
# df = df.head(10) # Only for tests
df["db_items"] = df.apply(lambda row: build_fact_row(row), axis=1)

In [None]:
videos = df.db_items.tolist()
len(videos)

In [None]:
for i,v in enumerate(videos):
    if i%10==0:
        print(f"Insert n. {i}")
    
    s.add(v)
    s.commit()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
assert s.query(Country).count() == len(countries)
assert s.query(Category).count() == len(categories)
assert s.query(Channel).count() > 0
assert s.query(Video).count() > 0

res = s.query(Video, Country).join(Country).filter(Video.country_id is not None).limit(1)
for r in res[0]:
    assert r is not None

#### 4.3 Data dictionary 
The data dictionary is the Excel file called "capstone_project_data_catalog.xlsx".


#### Step 5: Complete Project Write Up
##### Tools and technologies
For this project, I chose to use Redshift to store data. The main goal of the project is to create a data warehouse, where business users can query data to have statistics about YouTube trending videos. Redshift is the perfect solution in this case, because it allows a big flexibility and efficiency, can scale on a huge data size and allows business users to query data with SQL.

For data processing, I chose to use Pandas. It's a data analysis library that allows to explore and manipulate data. In the notebook I processed only one file at a time and Pandas can handle its dimension.   

I also used SQLAlchemy as ORM for Python. In fact, once decided the star schema, I wrote the model classes in a python file. In this way, manipulating database object, queries and transactions is easier.
 
##### Pipeline scheduling
This dataset is a collection of data from November 2017 to June 2018, but the data have a daily granularity. So, the data can be updated every day or, if the analyses are not so urgent, the data can be updated every week. A cronjob can generate the dataset (using the process described here [https://github.com/mitchelljy/Trending-YouTube-Scraper](https://github.com/mitchelljy/Trending-YouTube-Scraper))  and the ETL pipeline can populate the data warehouse.

##### Different scenario: data increased by 100x
In this case, tools like Pandas are not an option anymore. To pre-process data, I would use a Spark cluster and I would convert the pre-processing step into a Spark job. To orchestrate the jobs, I would use an Airflow data pipeline. SQLAlchemy is always a good tool to use. For data storage, I would continue using Redshift, because it can scale horizontally and handle the data increase. 

##### Different scenario: The data populates a dashboard that must be updated on a daily basis by 7am every day
In this case, the data pipeline should be scheduled for updating the data warehouse every day. The pipeline could be scheduled, for example, at 1 a.m. to retrieve data of the past day. After the data warehouse is updated, an Airflow pipeline scheduled, for example, at 3 a.m. can update the dashboard.

##### Different scenario: The database needed to be accessed by 100+ people
For supporting more people accessing the data warehouse, I thought to some possible improvements:
- Generate OLAP Cubes to improve common queries
- Ask to user if there are common queries or  standard queries that they will execute every day at 100%, so we can execute them only once and generate report for all interested users.
- Scale redhsift horizontally to support more queries
