# Trending YouTube Videos Data ETL Pipeline 


#### Project Summary
YouTube (the world-famous video sharing website) maintains a List of the top trending videos on the platform. According to Variety magazine, “To determine the year’s top-trending videos, YouTube uses a combination of factors including measuring users interactions (number of views, shares, comments And likes). Note that they’re Not the most-viewed videos overall For the calendar year”.



#### Dataset Summary

The project follows the follow steps:
* Step 1: Scope the Project And Gather Data
* Step 2: Explore And Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
import pandas as pd
import psycopg2
import configparser
from sql_queries import create_table_queries, drop_table_queries,check_table_queries 
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import weekofyear,hour,dayofmonth,month,year,dayofweek
from pyspark.sql.types import IntegerType ,TimestampType
from pyspark.sql.functions import udf, col,from_unixtime

### Step 1: Scope the Project and Gather Data

#### Scope the Project
The motivation of the project is to built ETL automation pipeline to extract data from raw csv files and json files, design and built a database schema in AWS Redshift Data Warehouse ,transform and load data in the database tables to facilitate data analysis and know about how videos trends going in different countries.

This dataset is a daily record of the top trending YouTube videos. The dataset is public available on https://www.kaggle.com/datasnaek/youtube-new

Tools used:
1. Data Warehouse: AWS Redshift
2. SPARK SQL
3. Python 3.X

#### Data Description
This dataset includes several months (and counting) of data on daily trending YouTube videos. Data is included for the US, GB, DE, CA, and FR regions (USA, Great Britain, Germany, Canada, and France, respectively), with up to 200 listed trending videos per day.

EDIT: Now includes data from RU, MX, KR, JP and IN regions (Russia, Mexico, South Korea, Japan and India respectively) over the same time period.

Each region’s data is in a separate file. Data includes the video title, channel title, publish time, tags, views, likes and dislikes, description, and comment count.

The data also includes a category_id field, which varies between regions. To retrieve the categories for a specific video, find it in the associated JSON. One such file is included for each of the regions in the dataset.


In [None]:
filepath1="category_id/US_category_id.json"
category_id_df = pd.read_json(filepath1)
category_id_df.head()

In [None]:
filepath2="country_videos/USvideos.csv"
videos_df = pd.read_csv(filepath2,header='infer')
videos_df.head()

### Step 2: Explore and Assess the Data
#### Explore the Data 
When we explore the raw videos csv files, we find some chars like '"' in tags column,',' in title column and '\r' in description column, etc. These chars will cause error when copying csv files into Redshift staging table. We will check and remove these chars before copy data into Redshift.

When we explore the category_id json file, we find it has nested structure. One json file for one country. We will extract category id and category title of each country from these json files.
#### Cleaning Steps

In [None]:
def clean_video_csv(video_df,country_code):
    """
    This function is to remove unnecessary chars like '"',',','\r'which will cause errors when copy csv files into Redshift staging table.
    
    Parameters:
    video_df: Dataframe from read_csv file
    filepath: videos csv filepath
    
    Return:
    video_df: Dataframe which remove unnecessary chars
    """
    video_df["tags"] = video_df["tags"].apply(lambda x:x.replace('"',""))
    video_df["title"] = video_df["title"].apply(lambda x:x.replace(',',' '))
    video_df["channel_title"] = video_df["channel_title"].apply(lambda x:x.replace(',',' '))
    video_df["description"] = video_df["description"].apply(lambda x:str(x).replace('\r',''))
    video_df["description"] = video_df["description"].apply(lambda x:str(x).replace(',',' '))
    video_df["description"] = video_df["description"].apply(lambda x:str(x).replace('"',''))
    video_df["country"] = country_code
    return video_df


Select the country code for ETL. The available country code are US, GB, DE, CA, FR ,RU, MX, KR, JP and IN regions (USA, Great Britain, Germany, Canada, France, Russia, Mexico, South Korea, Japan and India respectively) over the same time period. Be careful that each csv file may has different problematic chars to remove. Each file has to be inspected respectively. Here I only worked throught USvideos.csv file as demo.

In [None]:
#Clean videos csv files for selected country code
country_code=['US']
for c in country_code:
    filepath="country_videos/"+c+"videos.csv"
    video_df = pd.read_csv(filepath,header='infer')
    savepath = filepath[:filepath.find(".csv")]+'1.csv'
    video_df = clean_video_csv(video_df,c)
    video_df.to_csv(savepath,index=False)

In [None]:
def category_extract (df,country_code):
    """
    The function is to extract category id and category title from category_id json files
    
    Parameters:
    df: Dataframe of read_json file
    filepath: category_id json filepath
    
    Return:
    category_df: Dataframe with columns: category_id,category_title,category_filename,country_code
    
    """
    category_id = []
    category_title = []
    for i in range(df.shape[0]):
        category_id.append(df.iloc[i]["items"]['id'])
        category_title.append(df.iloc[i]["items"]["snippet"]["title"])
    category_df = pd.DataFrame()
    category_df["category_id"] = category_id
    category_df["category_title"] = category_title
    category_df.insert(category_df.shape[1],"country_code",country_code)
    return category_df

In [None]:
#Extract category title and id from json file of each country
category_all = pd.DataFrame()
for c in country_code:
    filepath="category_id/"+c+"_category_id.json"
    category_id_df = pd.read_json(filepath)
    category_all = pd.concat([category_all,category_extract(category_id_df,c)])


In [None]:
#category_all.tail()
savepath = "category_id/"+"category_all.csv"
category_all.to_csv(savepath,index=False)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
After we prepare the staging table of videos, we need to think about the data model in our Redshift database. It means the schema of fact table and dimension tables. In the cleaned videos csv files, we find that one video appears several time on different trending_date. There are redundency data in the staging table(cleaned csv file). We can split the staging table into fact and dimension tables. We also need involve category title from category_id json file which is distinct from each country.

**Fact Tables**
#### videos_trending
*videos_trending_id,video_id,trending_date,views,likes,dislikes,comment_count,comments_disabled,ratings_disabled,video_error_or_removed,country*

**Dimension Tables**
#### videos 
*video_id,title,channel_title,category_title,country,publish_time,tags,thumbnail_link,description*

#### videos _notitle
*video_id,channel_title,category_title,country,publish_time*

#### time
*trending_date,year,month,day*

#### category
*category_id,category_title,country_code*

#### 3.2 Mapping Out Data Pipelines
Here is the steps to pipeline the data into the data model
* 1. Create the data model
* 2. COPY cleaned videos csv into staging table
* 3. COPY extracted category csv into category table
* 4. Transform and load data into videos_trending, videos,time tables
* 5. Data quality check

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

In [None]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [None]:
def drop_tables(cur,conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

In [None]:
def create_tables(cur,conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

In [None]:
drop_tables(cur,conn)
create_tables(cur,conn)

#### 4.2 COPY cleaned videos csv into staging table

In [None]:
os.environ['AWS_ACCESS_KEY_ID']=config['CREDENTIAL']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['CREDENTIAL']['AWS_SECRET_ACCESS_KEY']
output_datapath_s3a = "s3a://XXX/"
output_datapath_s3 = "s3://XXX/"
os.environ["PYSPARK_PYTHON"] ='/usr/bin/python3'

In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate()

In [None]:
videos_pd = pd.DataFrame()
country_code = ['US']
for c in country_code:
    filepath="country_videos/"+c+"videos1.csv"
    df= spark.read.csv(filepath,inferSchema=True,header=True,sep=',')
    videos_pd =pd.concat([videos_pd,df.toPandas()],axis=0)
df_videos = spark.createDataFrame(videos_pd)
df_videos.write.csv(output_datapath_s3a+"videos_all",mode='append',header=True)

In [None]:
copy_staging_sql = """
copy {} 
from '{}'
ACCESS_KEY_ID '{}'
SECRET_ACCESS_KEY '{}'
delimiter ','
ignoreheader 1
timeformat as 'YYYY-MM-DDTHH24:MI:SS.000Z'
"""


In [None]:
copy_sql = copy_staging_sql.format("youtube.staging_videos",output_datapath_s3+"videos_all/",config['CREDENTIAL']['AWS_ACCESS_KEY_ID'],config['CREDENTIAL']['AWS_SECRET_ACCESS_KEY'])
cur.execute(copy_sql)
conn.commit()
print ("Copying videos csv to staging_videos table is complete.")

#### 4.3 COPY extracted category csv into category table

In [None]:
filepath="category_id/category_all.csv"
df= spark.read.csv(filepath,inferSchema=True,header=True,sep=',')
df.write.csv(output_datapath_s3a+"category_all",header=True)
df.createOrReplaceTempView("category")

In [None]:
copy_sql = copy_staging_sql.format("youtube.category",output_datapath_s3+"category_all/",config['CREDENTIAL']['AWS_ACCESS_KEY_ID'],config['CREDENTIAL']['AWS_SECRET_ACCESS_KEY'])
cur.execute(copy_sql)
conn.commit()
print ("Copying category to category table is completed")

#### 4.4 Transform and load data into videos_trending, videos,time tables

##### 4.4.1 Transform and load data into videos_trending table

In [None]:
df_videos.printSchema()

In [None]:
df_videos.createOrReplaceTempView("staging_videos")

In [None]:
videos_trending = spark.sql('''
SELECT DISTINCT video_id,trending_date,views,likes,dislikes,comment_count,comments_disabled,ratings_disabled,video_error_or_removed,country
FROM staging_videos s
''')

In [None]:
videos_trending.write.csv(output_datapath_s3a+"videos_trending",header=True)

In [None]:
copy_sql = copy_staging_sql.format("youtube.videos_trending",output_datapath_s3+"videos_trending/",config['CREDENTIAL']['AWS_ACCESS_KEY_ID'],config['CREDENTIAL']['AWS_SECRET_ACCESS_KEY'])
cur.execute(copy_sql)
conn.commit()
print ("Copying videos_trending to videos_trending table is completed.")

##### 4.4.2 Transform and load data into video table

In [None]:
videos = spark.sql('''
SELECT DISTINCT video_id,title,channel_title,category_title,s.country,publish_time,tags,thumbnail_link,description
FROM staging_videos s
JOIN category c
ON s.category_id = c.category_id AND s.country =c.country_code
''')

In [None]:
videos.write.csv(output_datapath_s3a+"videos",header=True)

In [None]:
copy_sql = copy_staging_sql.format("youtube.videos",output_datapath_s3+"videos/",config['CREDENTIAL']['AWS_ACCESS_KEY_ID'],config['CREDENTIAL']['AWS_SECRET_ACCESS_KEY'])
cur.execute(copy_sql)
conn.commit()
print ("Copying videos to videos table is completed.")

We notice that several video titles ,tags and descriptions may related to one video_id. We load videos without titles ,tags and discriptions in table youtube.videos_notitle

In [None]:
videos_notitle = spark.sql('''
SELECT DISTINCT video_id,channel_title,category_title,s.country,publish_time
FROM staging_videos s
JOIN category c
ON s.category_id = c.category_id AND s.country =c.country_code
''')

In [None]:
videos_notitle.write.csv(output_datapath_s3a+"videos_notitle",header=True)

In [None]:
copy_sql = copy_staging_sql.format("youtube.videos_notitle",output_datapath_s3+"videos_notitle/",config['CREDENTIAL']['AWS_ACCESS_KEY_ID'],config['CREDENTIAL']['AWS_SECRET_ACCESS_KEY'])
cur.execute(copy_sql)
conn.commit()
print ("Copying videos to videos table is completed.")

##### 4.4.3 Transform and load data into time table

In [None]:
time = spark.sql('''
SELECT DISTINCT trending_date
FROM staging_videos
''')
time_pandas = time.toPandas()

In [None]:
time_pandas["year"] = time_pandas["trending_date"].apply(lambda x: "20"+x[0:2])
time_pandas["month"] = time_pandas["trending_date"].apply(lambda x: x[-2:])
time_pandas["day"] = time_pandas["trending_date"].apply(lambda x: x[3:5])

In [None]:
time_pandas.head()

In [None]:
time_df = spark.createDataFrame(time_pandas)

In [None]:
time_df.show(5)

In [None]:
time_df.write.csv(output_datapath_s3a+"time",header=True)

In [None]:
copy_sql = copy_staging_sql.format("youtube.time",output_datapath_s3+"time",config['CREDENTIAL']['AWS_ACCESS_KEY_ID'],config['CREDENTIAL']['AWS_SECRET_ACCESS_KEY'])
cur.execute(copy_sql)
conn.commit()
print ("Copying time data to time table is completed.")

#### 4.5 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Each table has its primary key. Redshift does not enforced primary key to be unique, we ensure it by data processing. The foreign key is "video_id" which is set to DISTKEY in fact table and dimension tables to eliminate suffleing and fast table joins. We also set time and category table to DISTSTYLE ALL, for the two tables are small and be frequently used.
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def table_check(table):
    table_check_sql1 = """
    SELECT * 
    FROM {}
    LIMIT 1
    """
    table_check_sql2 = """
    SELECT COUNT(*)
    FROM {}
    """
    check_sql = table_check_sql1.format(table)
    cur.execute(check_sql)
    conn.commit()
    rows = cur.fetchall()
    print ("{} table check:\n".format(table))
    for r in rows:
        print (r)
    check_sql = table_check_sql2.format(table)
    cur.execute(check_sql)
    conn.commit()
    rowcount = cur.fetchall()
    print ("\n{} table has {} rows\n".format(table,rowcount[0][0]))
    

In [None]:
for query in check_table_queries:
    
    table_check(query)

#### Step 5: Complete Project Write Up

##### 1. The rationale for the choice of tools and technologies for the project.

* In this project, I choose Redshift as Data Warehouse and spark to do ETL. Redshift is a Data Warehouse of AWS which is easy obtained with low costs. 

* I choose Spark to do ETL other than Redshift SQL because Spark is a powerful tool to directly infer file schema from datalake, which is an edge for loading big data in relational database and it is much faster than INSERT command of SQL. By Spark, I built data files in datalake meanwhile built Database in Redshift.

* Before moving to copy csv files to Redshift staging tables , I found there are many chars in original csv files which cause errors when copying. These chars include '"', ',', '\r',etc. I found pandas is a powerful tool to handle the problem. Pandas can correctly build DataFrame without the influence of these chars and make it easy to remove them. Original csv files of each country have different problem may need to be inspected respectively. I only worked through the USvideos.csv as demo. The cleaning method is general.

##### 2. The data should be update daily according to "trending_date"

##### 3. Discussion:
* 1. If the data was increased by 100x, it is may outrange the ability of pandas. I can use Spark to leverage its distributed computing ability to clean original csv files and do ETL tasks as well.
* 2. If the data populates a dashboard that must be updated on a daily basis by 7am everyday, it is a proper way to do daily ETL by Airflow and schedule email reports of any tasks errors.
* 3. Redshift Database can be accessed by 100+ people at the same time.