# ENTREGABLE 3- EXTRAER DATOS DE API DE YOUTUBE E INSERTAR LA DATA EN UNA TABLA DE AWS REDSHIFT.

#### Explicación del Proyecto:

El script siguiente obtiene los 10 videos más populares (con más vistas) de YouTube en el momento de la ejecución del script. La API realiza una búsqueda de videos, ordenándolos por la cantidad de vistas (ordenados en orden descendente). Luego, obtiene los detalles de esos videos y crea un DataFrame con la información relevante, incluida la cantidad de vistas, y lo imprime.

Además, en la tabla resultante de la API le incluyo la columna "Insert_Date", que contiene el día de ejecución del script, para identificar en qué día se obtuvieron los 10 registros con los 10 videos más vistos.

Por último, esos 10 registros son insertados en la tabla "videos" dentro de mi Base de Datos de Redshift, haciendo previamente la conexión a dicha Base de Datos (y creando la tabla en primera instancia).


In [1]:
pip install oauth2client




In [2]:
pip install google-api-python-client

Note: you may need to restart the kernel to use updated packages.


In [10]:
#import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from airflow.models import DAG, Variable
import datetime

from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib

In [11]:
from datetime import datetime  # Importa datetime antes de su uso
import pandas as pd
from googleapiclient.discovery import build

In [12]:
# Definición de Variables:

# Variables de Conexión a Redshift:
url="data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com"
data_base="data-engineer-database"
#user=Variable.get("user_redshift")                                 #esta variable fue creada en la interfaz de Airflow por cuestiones de Seguridad
#pwd= Variable.get("secret_pass_redshift")                          #esta variable fue creada en la interfaz de Airflow por cuestiones de Seguridad
user='christian_r_coderhouse'
pwd='3b4LjN1alG'

# Variable de Conexión a la API de Youtube:
#client_API_KEY = Variable.get("client_API_KEY")                    #esta variable fue creada en la interfaz de Airflow por cuestiones de Seguridad
client_API_KEY= 'AIzaSyBXPyx2L67WhXATIaaR8yl3FJZLsXvpDIE' 

In [14]:
# Task 1:
def get_top_videos():
    
    # Definir tu clave de API de YouTube
    API_KEY = client_API_KEY

    # Crear una instancia del servicio de la API de YouTube
    youtube = build('youtube', 'v3', developerKey=API_KEY)

    # Función para obtener el nombre de la categoría a partir del ID
    def get_category_name(youtube, category_id):
        categories_response = youtube.videoCategories().list(
            part='snippet',
            id=category_id
        ).execute()
        if 'items' in categories_response:
            return categories_response['items'][0]['snippet']['title']
        else:
            return 'Desconocida'

    # Función para convertir la duración en formato "PT11M13S" a segundos
    def convert_duration_to_seconds(duration):
        parts = duration[2:].split('T')[-1].split('H')
        hours = int(parts[0]) if len(parts) > 1 else 0
        minutes_parts = parts[-1].split('M')
        minutes = int(minutes_parts[0]) if len(minutes_parts) > 1 else 0
        seconds_parts = minutes_parts[-1].split('S')
        seconds = int(seconds_parts[0]) if len(seconds_parts) > 1 else 0
        total_seconds = hours * 3600 + minutes * 60 + seconds
        return total_seconds

    # Obtener la fecha actual en el formato requerido por la API de YouTube
    #current_date = datetime.datetime.now().strftime('%Y-%m-%dT00:00:00Z')

    # Realizar la búsqueda de videos ordenados por vistas y limitar a 10 resultados
    search_response = youtube.search().list(
        part='id',
        maxResults=10,
        order='viewCount',
        type='video'
    ).execute()

    # Extraer los IDs de los videos obtenidos en la búsqueda
    video_ids = [item['id']['videoId'] for item in search_response['items']]

    # Obtener detalles de los videos
    videos_response = youtube.videos().list(
        part='snippet,statistics,contentDetails',
        id=','.join(video_ids)
    ).execute()

    # Crear una lista de diccionarios con la información de los videos
    video_data = []
    for video in videos_response['items']:
        video_id = video['id']
        video_info = {
            "ID_del_Video": video_id,
            "Título": video['snippet']['title'],
            "Descripción": video['snippet']['description'],
            "Canal_Propietario": video['snippet']['channelTitle'],
            "Fecha_de_Publicación": video['snippet']['publishedAt'],
            "Categoría_ID": video['snippet']['categoryId'],
            "Categoría": get_category_name(youtube, video['snippet']['categoryId']),
            "Duración_segundos": convert_duration_to_seconds(video['contentDetails']['duration']),
            "URL_del_Video": f"https://www.youtube.com/watch?v={video_id}",      
            "Vistas": video['statistics']['viewCount'],
            "Likes": video['statistics'].get('likeCount', 0),
            "Dislikes": video['statistics'].get('dislikeCount', 0),
            "Favorite_Count": video['statistics'].get('favoriteCount', 0),
            "Comment_Count": video['statistics'].get('commentCount', 0),
            #"Insert_Date": current_date
        }
        video_data.append(video_info)

    # Crear un DataFrame a partir de la lista de diccionarios
    df = pd.DataFrame(video_data)

    #Hago las siguientes transformaciones a las columnas del Dataframe df:

    # Recortar la columna "Descripción" y "Título" a 301 caracteres:
    df['Descripción'] = df['Descripción'].str[:301]
    df['Título'] = df['Título'].str[:301]
    
    #from datetime import datetime
    #import pandas as pd
    
    # Convertir la columna "Fecha de Publicación" en objeto datetime y creo la columna "Insert Date" en objeto datetime:
    df['Fecha_de_Publicación'] = pd.to_datetime(df['Fecha_de_Publicación'])
    #df['Insert_Date'] = pd.to_datetime(df['Insert_Date'])
    df['Insert_Date'] = pd.to_datetime(datetime.now().strftime('%Y-%m-%dT00:00:00Z'))
    
    # Formatear la columna "Fecha de Publicación" y "Insert Date" en el formato deseado:
    df['Fecha_de_Publicación'] = df['Fecha_de_Publicación'].dt.strftime('%Y-%m-%d')
    df['Insert_Date'] = df['Insert_Date'].dt.strftime('%Y-%m-%d')
    df=df.to_dict()
    return(df)

In [15]:
get_top_videos()

[[34m2023-10-02T03:11:54.691-0300[0m] {[34m__init__.py:[0m49} INFO[0m - file_cache is only supported with oauth2client<4.0.0[0m


{'ID_del_Video': {0: '0aZ7lPQ5EXs',
  1: 'eNLjdPI9zdE',
  2: 'ebVVuJN1WFM',
  3: 'bX3S-_jUauc',
  4: 'wPNQw8naE2Q',
  5: 'aSjflT_J0Xo',
  6: '4nKcnfw9ggc',
  7: 'NPpELzyP4rw',
  8: 'KATq-Ws3xtM',
  9: 'uTK_7MOFV4s'},
 'Título': {0: 'El Gallo y la Pata - Canciones de la Granja de Zenón 2',
  1: 'La Vaca Lola - Canciones de La Granja de Zenón 2',
  2: 'Bartolito - La Granja de Zenón 3',
  3: 'Paulo Londra ft Lenny Tavarez - Nena Maldicion (Official Video)',
  4: 'El Pollito Pío 3D - Canciones de la Granja de Zenón 2',
  5: 'Paulo Londra - Adan y Eva (Official Video)',
  6: 'Percherón - La Granja de Zenón 3',
  7: 'Paulo Londra - Tal Vez (Official Video)',
  8: 'Patitos Cua Cua Cua - Canciones y clásicos infantiles',
  9: 'El Marinero Baila - Paco El Marinero | El Reino Infantil'},
 'Descripción': {0: '🎁 En estas Navidades, encuentra los productos de La Granja de Zenón en Amazon Store 🎁 \nPeluches ▶️ https://rebrand.ly/AmazonMXPeluchesMusicalesLGDZ\nSábanas y Mantas ▶️ https://rebrand.ly/

In [16]:
# Task 2:
def conectar_Redshift():
    import psycopg2
    url="data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com"
    data_base="data-engineer-database"
    user="christian_r_coderhouse"

    try:
        conn = psycopg2.connect(
            host='data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com',
            dbname=data_base,
            user=user,
            password= pwd,
            port='5439'
        )
        print("Conectado a Redshift con éxito!")

    except Exception as e:
        print("No es posible conectar a Redshift")
        print(e)
    
    #Crear la tabla si no existe:
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE IF NOT EXISTS videos
            (
            Id_del_Video VARCHAR(50) primary key
            ,Título VARCHAR(350)
            ,Descripción VARCHAR(350)   
            ,Canal_Propietario VARCHAR(255)
            ,Fecha_de_Publicación date
            ,Categoría_ID VARCHAR(50)
            ,Categoría VARCHAR(100)
            ,Duración_segundos INTEGER
            ,URL_del_Video NVARCHAR(500)
            ,Vistas INTEGER
            ,Likes INTEGER
            ,Dislikes INTEGER
            ,Favorite_Count INTEGER
            ,Comment_Count INTEGER
            ,Insert_Date date

            )
        """)
        conn.commit()

In [17]:
conectar_Redshift()

Conectado a Redshift con éxito!


In [18]:
# Task 3:
def insert_data():
    
    import psycopg2
    
    
    conn = psycopg2.connect(
            host='data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com',
            dbname=data_base,
            user=user,
            password= pwd,
            port='5439'
        )
    
    
    
    data_dict = get_top_videos()
    df = pd.DataFrame(data_dict)
    #data = [(row['ID_del_Video'], row['Título'], row['Descripción'], row['Canal_Propietario'], row['Fecha_de_Publicación'], row['Categoría_ID'], row['Categoría'], row['Duración_segundos'], row['URL_del_Video'], row['Vistas'],  row['Likes'], row['Dislikes'], row['Favorite_Count'], row['Comment_Count'], row['Insert_Date'] ) for _, row in df.iterrows()]
    print(df)
    
    from psycopg2.extras import execute_values  # Añado esta línea para importar execute_values
    with conn.cursor() as cur:
        try:
            execute_values(
                cur,
                '''
                    INSERT INTO videos (ID_del_Video, Título, Descripción, Canal_Propietario, Fecha_de_Publicación, Categoría_ID, Categoría, Duración_segundos, URL_del_Video, Vistas, Likes, Dislikes, Favorite_Count, Comment_Count, Insert_Date)
                    VALUES %s
                    ''',
                    [tuple(row) for row in df.values],
                    #data,
                    page_size=len(df)
                )
            conn.commit()
            conn.close()
        except Exception as e:
            print("No es posible insertar datos")
            print(e)

In [19]:
insert_data()

[[34m2023-10-02T03:12:21.540-0300[0m] {[34m__init__.py:[0m49} INFO[0m - file_cache is only supported with oauth2client<4.0.0[0m
  ID_del_Video                                             Título  \
0  0aZ7lPQ5EXs  El Gallo y la Pata - Canciones de la Granja de...   
1  eNLjdPI9zdE   La Vaca Lola - Canciones de La Granja de Zenón 2   
2  ebVVuJN1WFM                   Bartolito - La Granja de Zenón 3   
3  bX3S-_jUauc  Paulo Londra ft Lenny Tavarez - Nena Maldicion...   
4  wPNQw8naE2Q  El Pollito Pío 3D - Canciones de la Granja de ...   
5  aSjflT_J0Xo         Paulo Londra - Adan y Eva (Official Video)   
6  4nKcnfw9ggc                   Percherón - La Granja de Zenón 3   
7  NPpELzyP4rw            Paulo Londra - Tal Vez (Official Video)   
8  KATq-Ws3xtM  Patitos Cua Cua Cua - Canciones y clásicos inf...   
9  uTK_7MOFV4s  El Marinero Baila - Paco El Marinero | El Rein...   

                                         Descripción   Canal_Propietario  \
0  🎁 En estas Navidades, encue

In [22]:
# Task 4: 
Pass_Email= Variable.get("secret_pass_gmail")
smtp_server = 'smtp.gmail.com'
smtp_port = 587
sender_email = 'christian.jrivasn@gmail.com'
password = Pass_Email


def send_email():
        try:
            subject = 'Carga de datos de 10 videos mas vistos en Youtube a Redshift'
            body_text = 'Los datos de los 10 videos mas vistos en Youtube fueron cargados a la base de datos de Redshift exitosamente.'

            msg = MIMEMultipart()
            msg['From'] = sender_email
            msg['To'] = sender_email
            msg['Subject'] = subject
            msg.attach(MIMEText(body_text, 'plain'))
            with smtplib.SMTP(smtp_server, smtp_port) as server:
                server.starttls()
                server.login(sender_email, password)
                server.send_message(msg)
            print('El email fue enviado correctamente.')

        except Exception as exception:
            print(exception)
            print('El email no se pudo enviar.')

In [23]:
send_email()

El email fue enviado correctamente.
