#**Skills Gained from this Project**

* **Python scripting:** You will gain proficiency in Python programming by developing scripts to interact with the YouTube API, fetch data, and process it for storage and analysis.

* **Data collection:** You will learn how to retrieve data from the YouTube API, including video details, comments, likes, views, and other relevant information.

* **MongoDB:** You will gain hands-on experience with MongoDB, a NoSQL database, and learn how to store and manage YouTube data efficiently using MongoDB's document-oriented approach.

* **Streamlit:** You will learn how to create interactive web applications using Streamlit, a Python library for building data-driven apps. You will design and develop a user-friendly interface to visualize and explore the YouTube data.

* **API integration:** You will understand how to integrate with the YouTube API, authenticate requests, and retrieve data using API endpoints.

* **Data management using MongoDB Atlas:** You will learn how to set up and manage a MongoDB Atlas cluster for cloud-based storage and access to your YouTube data.

* **SQL:** You will gain knowledge of SQL (Structured Query Language) and learn how to use it alongside MongoDB for advanced data analysis and querying.

# **Steamlit Docs for Beginners:**



```
https://docs.streamlit.io/library/get-started
```



# **REQUIREMENTS**

**Set up your development environment:**

* Ensure you have Python installed on your system.
* Install the necessary libraries using pip, such as Streamlit, pymongo, SQLAlchemy, and google-api-python-client.
*Set up a MongoDB database (you can use MongoDB Atlas for convenience) and obtain the connection string.
*Set up a SQL database (e.g., MySQL, PostgreSQL) and create the necessary tables.

In [None]:
import streamlit as st
import pymongo
from pymongo import MongoClient
from googleapiclient.discovery import build
from sqlalchemy import create_engine


In [None]:
# Replace <MONGODB_CONNECTION_STRING> with your actual connection string
client = MongoClient("<MONGODB_CONNECTION_STRING>")
db = client["youtube_data"]  # Replace "youtube_data" with your database name


In [None]:
# Replace <SQL_DATABASE_URL> with your actual database URL (e.g., mysql://user:password@host:port/database)
engine = create_engine("<SQL_DATABASE_URL>")


In [None]:
#Define functions to interact with the YouTube API and retrieve channel and video data:

# Replace <API_KEY> with your actual YouTube Data API key
def youtube_data_api():
    api_key = "<API_KEY>"
    youtube = build("youtube", "v3", developerKey=api_key)
    return youtube

def get_channel_data(youtube, channel_id):
    # Implement the logic to retrieve channel data using the YouTube API
    # Return the relevant channel data as a dictionary

def get_video_data(youtube, video_id):
    # Implement the logic to retrieve video data using the YouTube API
    # Return the relevant video data as a dictionary


In [None]:
# Create the Streamlit app and define the UI elements:

def main():
    st.title("YouTube Data Harvesting and Warehousing App")

    # Define Streamlit UI elements, such as input fields, buttons, and data display sections
    # Implement the logic to interact with the app and perform the required actions

if __name__ == "__main__":
    main()


Implement the logic for data collection, storage, migration, and retrieval based on the user's actions:

* Use the functions defined earlier to retrieve data from the YouTube API.
* Store the retrieved data in MongoDB using the insert_one or insert_many methods.
* Migrate data from MongoDB to the SQL database using SQL queries and the SQLAlchemy engine.
* Query the SQL database based on user input and display the results in the Streamlit app.

# **STEP BY STEP IMPLEMENTATION**

* streamlit
* pymongo
* google-api-python-client
* sqlalchemy

* Google Developers: UC_x5XG1OV2P6uZZ5FSM9Ttw
* TED: UCsT0YIqwnpJCM-mx7-gSA4Q
* National Geographic: UCpVm7bg6pXKo1Pr6k5kxG9A
* NASA: UCDNewsroom


In [None]:
import streamlit as st
from googleapiclient.discovery import build
import pymongo
from pymongo import MongoClient
from sqlalchemy import create_engine
from googleapiclient.errors import HttpError


In [None]:
# Connect to MongoDB
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['youtube_data']
collection = db['channel_data']


In [None]:
# Set Streamlit app title
st.title("YouTube Data Harvesting and Warehousing")

# Display input field for YouTube channel ID
channel_id = st.text_input("Enter YouTube Channel ID")

# Retrieve channel data using YouTube API
if st.button("Retrieve Channel Data"):
    # Initialize YouTube Data API client
    youtube = build('youtube', 'v3', developerKey='YOUR_API_KEY')

    # Make API request to get channel data
    request = youtube.channels().list(
        part='snippet,statistics',
        id=channel_id
    )
    response = request.execute()

    # Extract relevant data
    channel_data = {
        'Channel Name': response['items'][0]['snippet']['title'],
        'Subscribers': response['items'][0]['statistics']['subscriberCount'],
        'Total Videos': response['items'][0]['statistics']['videoCount']
    }

    # Store data in MongoDB
    collection.insert_one(channel_data)

    # Display channel data
    st.write("Channel Name:", channel_data['Channel Name'])
    st.write("Subscribers:", channel_data['Subscribers'])
    st.write("Total Videos:", channel_data['Total Videos'])


In [None]:
import streamlit as st
from googleapiclient.discovery import build
import pymongo

# Connect to MongoDB
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['youtube_data']
collection = db['channel_data']

# Set Streamlit app title
st.title("YouTube Data Harvesting and Warehousing")

# Display input field for YouTube channel ID
channel_id = st.text_input("Enter YouTube Channel ID")

# Retrieve videos for a given YouTube channel ID
def get_channel_videos(youtube, channel_id, api_key):
    videos = []
    request = youtube.search().list(
        part='id',
        channelId=channel_id,
        maxResults=10
    )
    response = request.execute()

    video_ids = [item['id']['videoId'] for item in response['items']]
    video_request = youtube.videos().list(
        part='snippet,statistics,contentDetails',
        id=','.join(video_ids)
    )
    video_response = video_request.execute()

    videos.extend(video_response['items'])
    return videos

# Retrieve channel data using YouTube API
if st.button("Retrieve Channel Data"):
    try:
        # Initialize YouTube Data API client
        youtube = build('youtube', 'v3', developerKey='Your_API')

        # Make API request to get channel data
        request = youtube.channels().list(
            part='snippet,statistics,contentDetails',
            id=channel_id
        )
        response = request.execute()

        if 'items' in response:
            channel_data = response['items'][0]
            snippet = channel_data['snippet']
            statistics = channel_data['statistics']
            content_details = channel_data.get('contentDetails', {})
            related_playlists = content_details.get('relatedPlaylists', {})

            # Extract relevant data
            data = {
                'Channel_Name': {
                    'Channel_Name': snippet.get('title', ''),
                    'Channel_Id': channel_id,
                    'Subscription_Count': int(statistics.get('subscriberCount', 0)),
                    'Channel_Views': int(statistics.get('viewCount', 0)),
                    'Channel_Description': snippet.get('description', ''),
                    'Playlist_Id': related_playlists.get('uploads', '')
                }
            }

            # Retrieve video data
            videos = get_channel_videos(youtube, channel_id, 'YOUR API')
            for video in videos:
                video_id = video['id']
                video_data = {
                    'Video_Id': video_id,
                    'Video_Name': video['snippet'].get('title', ''),
                    'Video_Description': video['snippet'].get('description', ''),
                    'Tags': video['snippet'].get('tags', []),
                    'PublishedAt': video['snippet'].get('publishedAt', ''),
                    'View_Count': int(video['statistics'].get('viewCount', 0)),
                    'Like_Count': int(video['statistics'].get('likeCount', 0)),
                    'Dislike_Count': int(video['statistics'].get('dislikeCount', 0)),
                    'Favorite_Count': int(video['statistics'].get('favoriteCount', 0)),
                    'Comment_Count': int(video['statistics'].get('commentCount', 0)),
                    'Duration': video['contentDetails'].get('duration', ''),
                    'Thumbnail': video['snippet'].get('thumbnails', {}).get('default', {}).get('url', ''),
                    'Caption_Status': video['snippet'].get('localized', {}).get('localized', 'Not Available'),
                    'Comments': {}
                }
                data[video_id] = video_data


            st.write("Channel Name:", data['Channel_Name']['Channel_Name'])
            st.write("Subscribers:", data['Channel_Name']['Subscription_Count'])
            st.write("Total Videos:", len(videos))

            # Display video data
            st.subheader("Video Data:")
            for video_id, video_data in data.items():
                if video_id != 'Channel_Name':
                    st.write("Video Name:", video_data['Video_Name'])
                    st.write("Video Description:", video_data['Video_Description'])
                    st.write("Published At:", video_data['PublishedAt'])
                    st.write("View Count:", video_data['View_Count'])
                    st.write("Like Count:", video_data['Like_Count'])
                    st.write("Dislike Count:", video_data['Dislike_Count'])
                    st.write("Comment Count:", video_data['Comment_Count'])
                    st.write("Duration:", video_data['Duration'])
                    st.write("Thumbnail:", video_data['Thumbnail'])

            # Store data in MongoDB
            if st.button("Store Data in MongoDB"):
                collection.insert_one(data)
                st.success("Data stored in MongoDB!")

    except Exception as e:
        st.error(f"Error retrieving channel data: {str(e)}")



# **What is MongoDB Atlas**

> MongoDB Atlas is a fully managed cloud database service provided by MongoDB, the company behind the MongoDB database. It allows you to deploy, scale, and manage MongoDB databases effortlessly in the cloud. MongoDB Atlas takes care of the operational aspects of running a database, such as infrastructure provisioning, setup, configuration, monitoring, backups, and security, so you can focus on building your applications.

Here are some key features and benefits of MongoDB Atlas:

* **Fully Managed:** MongoDB Atlas is a fully managed database service, meaning MongoDB handles all the operational tasks like database setup, scaling, patching, backups, and monitoring. You don't need to worry about infrastructure management, allowing you to focus on developing your application.

* **Scalability:** MongoDB Atlas provides seamless scalability to handle growing data workloads. You can easily scale your cluster vertically by adding more powerful instances or horizontally by adding more nodes to the cluster. This allows you to handle increased traffic and data storage requirements without downtime.

* **Global Deployment:** MongoDB Atlas allows you to deploy your databases across multiple cloud providers and regions around the world. This enables you to place your data closer to your users, reducing latency and providing a better user experience.

* **Security:** MongoDB Atlas offers various security features to protect your data. It includes encryption at rest, encryption in transit, network isolation, fine-grained access controls.

* **Monitoring and Alerts:** MongoDB Atlas provides built-in monitoring and alerting capabilities to help you track the performance and health of your databases. You can view metrics, set up custom alerts, and integrate with popular monitoring tools to gain insights into your database performance.



MongoDB Atlas offers both free-tier and paid-tier options, making it accessible for developers and businesses of all sizes. It provides a reliable, scalable, and secure platform for hosting your MongoDB databases in the cloud.


# **How to Setup MongoDB Atlas**


To get MongoDB Atlas username, password, and cluster details, you need to follow these steps:

* Sign up for MongoDB Atlas: Go to the MongoDB Atlas website (https://www.mongodb.com/cloud/atlas) and click on the "Get started free" button. Follow the prompts to create an account and sign up for MongoDB Atlas.

* Create a Cluster: Once you have signed up and logged in to MongoDB Atlas, click on the "Build a Cluster" button or the "Create Cluster" button to create a new MongoDB cluster. Select your preferred cloud provider, region, and cluster configuration options. You can choose the free-tier option or select a paid plan based on your requirements.

* Set Up Cluster: After creating the cluster, you will be directed to the Cluster Overview page. Here, you can configure additional settings like cluster name, disk size, backup options, and more. Review and adjust the settings as per your needs.

* Create a Database User: To access your MongoDB cluster, you need to create a database user with appropriate privileges. Go to the "Database Access" section under the "Security" tab in the left sidebar. Click on the "Add New Database User" button to create a new user. Provide a username and password for the user and assign the necessary roles and privileges.

* Whitelist IP Address: In the "Network Access" section under the "Security" tab, click on the "Add IP Address" button. You can choose to allow access from anywhere (0.0.0.0/0) or specify a specific IP address or IP range that will be allowed to connect to your MongoDB cluster.

* Retrieve Connection Details: Once you have set up the database user and whitelisted IP address, you can obtain the connection details for your MongoDB cluster. Go to the "Clusters" section in the left sidebar and click on the "Connect" button for your cluster. You will be presented with connection options.

* To connect using MongoDB drivers, select the driver of your choice (e.g., Python, Node.js, Java) and follow the instructions to get the connection string. The connection string contains the username, password, cluster hostname, and other necessary details.


In the context of MongoDB Atlas, "database" and "data lake" refer to different concepts related to data storage and management.

* **Database:** A database in MongoDB is a container for storing structured or semi-structured data. It is a logical entity that organizes and stores collections of documents. In MongoDB, a database is not limited to a specific schema, meaning each document within a collection can have a different structure. Databases provide a way to organize and manage related data and enable efficient querying and indexing.
Within MongoDB Atlas, you can create multiple databases to segregate different sets of data. Each database can contain multiple collections, and you can define security settings and access controls at the database level.

* **Data Lake:** A data lake is a centralized repository that stores large amounts of raw, unprocessed data in its native format. Unlike traditional databases, data lakes are designed to store vast volumes of data in various formats, such as text files, images, videos, JSON, or CSV files. Data lakes provide a cost-effective and scalable solution for storing and analyzing big data.
In the context of MongoDB Atlas, the term "data lake" refers to the ability to store data in a flexible manner using MongoDB Atlas Data Lake. With Data Lake, you can directly query and analyze data stored in external cloud object storage, such as Amazon S3 or Google Cloud Storage, using the MongoDB Query Language (MQL) and the MongoDB Atlas Data Lake API. It allows you to leverage the power of MongoDB's rich query capabilities and indexes on data stored in external storage without needing to import it into a MongoDB database.

Data lakes are particularly useful when dealing with large volumes of unstructured or semi-structured data that don't fit well into traditional database schemas. By leveraging the data lake capabilities, you can efficiently process and analyze diverse data sources without the need to transform or load them into a structured database beforehand.



# **Using MongoDB Atlas**

In [None]:
import streamlit as st
from googleapiclient.discovery import build
import pymongo
from pymongo import MongoClient
from bson import ObjectId

# Connect to MongoDB Atlas
atlas_username = 'your_username'
atlas_password = 'your_password'
atlas_cluster = 'your_cluster_name'
client = MongoClient(f"mongodb+srv://{atlas_username}:{atlas_password}@{atlas_cluster}.rjsvnlc.mongodb.net/")
# mongodb+srv://ashispalailearning2022:<password>@cluster0.rjsvnlc.mongodb.net/
db = client['youtube_data']
collection = db['channel_data']

# Set Streamlit app title
st.title("YouTube Data Harvesting and Warehousing")

# Display input field for YouTube channel ID
channel_id = st.text_input("Enter YouTube Channel ID")

# Retrieve videos for a given YouTube channel ID
def get_channel_videos(youtube, channel_id, api_key):
    videos = []
    request = youtube.search().list(
        part='id',
        channelId=channel_id,
        maxResults=10
    )
    response = request.execute()

    video_ids = [item['id']['videoId'] for item in response['items']]

    video_request = youtube.videos().list(
        part='snippet,statistics,contentDetails',
        id=','.join(video_ids)
    )
    video_response = video_request.execute()

    videos.extend(video_response['items'])
    return videos

def parse_duration(duration):
    duration = duration[2:]  # Remove 'PT' prefix
    hours = duration.count('H')
    minutes = duration.count('M')
    seconds = duration.count('S')

    duration_str = ''
    if hours > 0:
        duration_str += f'{hours}h '
    if minutes > 0:
        duration_str += f'{minutes}m '
    if seconds > 0:
        duration_str += f'{seconds}s'

    return duration_str.strip()


    # Initialize YouTube Data API client
youtube = build('youtube', 'v3', developerKey='Your_API')

# Make API request to get channel data
request = youtube.channels().list(
            part='snippet,statistics,contentDetails',
            id=channel_id
        )
response = request.execute()

if 'items' in response:
            channel_data = response['items'][0]
            snippet = channel_data['snippet']
            statistics = channel_data['statistics']
            content_details = channel_data.get('contentDetails', {})
            related_playlists = content_details.get('relatedPlaylists', {})

            # Extract relevant data
            data = {
                'Channel_Name': {
                    'Channel_Name': snippet.get('title', ''),
                    'Channel_Id': channel_id,
                    'Subscription_Count': int(statistics.get('subscriberCount', 0)),
                    'Channel_Views': int(statistics.get('viewCount', 0)),
                    'Channel_Description': snippet.get('description', ''),
                    'Playlist_Id': related_playlists.get('uploads', '')
                }
            }

            # Retrieve video data
            videos = get_channel_videos(youtube, channel_id, 'YOUR_API')
            for video in videos:
                video_id = video['id']
                video_data = {
                    'Video_Id': video_id,
                    'Video_Name': video['snippet'].get('title', ''),
                    'Video_Description': video['snippet'].get('description', ''),
                    'Tags': video['snippet'].get('tags', []),
                    'PublishedAt': video['snippet'].get('publishedAt', ''),
                    'View_Count': int(video['statistics'].get('viewCount', 0)),
                    'Like_Count': int(video['statistics'].get('likeCount', 0)),
                    'Dislike_Count': int(video['statistics'].get('dislikeCount', 0)),
                    'Favorite_Count': int(video['statistics'].get('favoriteCount', 0)),
                    'Comment_Count': int(video['statistics'].get('commentCount', 0)),
                    'Duration': parse_duration(video['contentDetails'].get('duration', '')),
                    'Thumbnail': video['snippet'].get('thumbnails', {}).get('default', {}).get('url', ''),
                    'Caption_Status': video['snippet'].get('localized', {}).get('localized', 'Not Available'),
                    'Comments': {}
                }
                data[video_id] = video_data

# Retrieve channel data using YouTube API
if st.button("Retrieve Channel Data"):
    try:


            # Display channel data
            st.write("Channel Name:", data['Channel_Name']['Channel_Name'])
            st.write("Subscribers:", data['Channel_Name']['Subscription_Count'])
            st.write("Total Videos:", len(videos))

            # Display video data
            st.subheader("Video Data:")
            for video_id, video_data in data.items():
              if video_id != 'Channel_Name':
                st.write("Video Name:", video_data['Video_Name'])
                st.write("Video Description:", video_data['Video_Description'])
                st.write("Published At:", video_data['PublishedAt'])
                st.write("View Count:", video_data['View_Count'])
                st.write("Like Count:", video_data['Like_Count'])
                st.write("Dislike Count:", video_data['Dislike_Count'])
                st.write("Comment Count:", video_data['Comment_Count'])
                st.write("Duration:", video_data['Duration'])
                st.write("Thumbnail:", video_data['Thumbnail'])
    except Exception as e:
        st.error(f"Error retrieving channel data: {str(e)}")

# Store data in MongoDB Atlas
if st.button("Store Data in MongoDB Atlas"):
    collection.insert_one(data)
    st.success("Data stored successfully in MongoDB Atlas!")

# # Retrieve data from MongoDB Atlas
# if st.button("Retrieve Data from MongoDB Atlas"):
#     retrieved_data = collection.find_one({'Channel_Name.Channel_Id': channel_id})
#     if retrieved_data:
#         st.subheader("Retrieved Data:")
#         st.write("Channel Name:", retrieved_data['Channel_Name']['Channel_Name'])
#         st.write("Subscribers:", retrieved_data['Channel_Name']['Subscription_Count'])
#         st.write("Total Videos:", len(videos))
#         for video_id, video_data in retrieved_data.items():
#             if video_id != 'Channel_Name' and not isinstance(video_data, ObjectId):
#                 st.write("Video Name:", video_data['Video_Name'])
#                 st.write("Video Description:", video_data['Video_Description'])
#                 st.write("Published At:", video_data['PublishedAt'])
#                 st.write("View Count:", video_data['View_Count'])
#                 st.write("Like Count:", video_data['Like_Count'])
#                 st.write("Dislike Count:", video_data['Dislike_Count'])
#                 st.write("Comment Count:", video_data['Comment_Count'])
#                 st.write("Duration:", video_data['Duration'])
#                 st.write("Thumbnail:", video_data['Thumbnail'])
#     else:
#         st.warning("Data not found in MongoDB Atlas!")



In [None]:
# Connect to SQL data warehouse
sql_host = 'your_sql_host'
sql_user = 'your_sql_username'
sql_password = 'your_sql_password'
sql_database = 'your_sql_database'

cnx = mysql.connector.connect(
    host=sql_host,
    user=sql_user,
    password=sql_password,
    database=sql_database
)

cursor = cnx.cursor()


In [None]:
# Migrate data to SQL data warehouse
    for video_id, video_data in data.items():
        if video_id != 'Channel_Name' and not isinstance(video_data, ObjectId):
            query = """
                INSERT INTO videos (
                    video_id,
                    video_name,
                    video_description,
                    tags,
                    published_at,
                    view_count,
                    like_count,
                    dislike_count,
                    favorite_count,
                    comment_count,
                    duration,
                    thumbnail,
                    caption_status
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """
            values = (
                video_id,
                video_data['Video_Name'],
                video_data['Video_Description'],
                ','.join(video_data['Tags']),
                video_data['PublishedAt'],
                video_data['View_Count'],
                video_data['Like_Count'],
                video_data['Dislike_Count'],
                video_data['Favorite_Count'],
                video_data['Comment_Count'],
                video_data['Duration'],
                video_data['Thumbnail'],
                video_data['Caption_Status']
            )
            cursor.execute(query, values)

    cnx.commit()
    st.success("Data stored successfully in MongoDB Atlas and migrated to SQL data warehouse!")

# ...

In [None]:
import streamlit as st
from sqlalchemy import create_engine
import pandas as pd

# Set up the connection to the SQL data warehouse
sql_host = 'your_sql_host'
sql_username = 'your_sql_username'
sql_password = 'your_sql_password'
sql_database = 'your_sql_database'
engine = create_engine(f'mysql+pymysql://{sql_username}:{sql_password}@{sql_host}/{sql_database}')

# Get user input for the channel name
channel_name = st.text_input("Enter Channel Name")

# Query the SQL data warehouse
if st.button("Query SQL Data Warehouse"):
    try:
        # Execute SQL query to retrieve data for the specified channel
        query = """
            SELECT *
            FROM videos
            WHERE channel_name = %s
        """
        df = pd.read_sql_query(query, engine, params=[channel_name])

        # Display the retrieved data
        if not df.empty:
            st.subheader("Data for Channel: " + channel_name)
            st.dataframe(df)
        else:
            st.warning("No data found for the specified channel.")
    except Exception as e:
        st.error(f"Error querying the SQL data warehouse: {str(e)}")


## **Modified Parse Duration Method:**

In [None]:
def parse_duration(duration):
    duration_str = ""
    hours = 0
    minutes = 0
    seconds = 0

    # Remove 'PT' prefix from duration
    duration = duration[2:]

    # Check if hours, minutes, and/or seconds are present in the duration string
    if "H" in duration:
        hours_index = duration.index("H")
        hours = int(duration[:hours_index])
        duration = duration[hours_index+1:]
    if "M" in duration:
        minutes_index = duration.index("M")
        minutes = int(duration[:minutes_index])
        duration = duration[minutes_index+1:]
    if "S" in duration:
        seconds_index = duration.index("S")
        seconds = int(duration[:seconds_index])

    # Format the duration string
    if hours > 0:
        duration_str += f"{hours}h "
    if minutes > 0:
        duration_str += f"{minutes}m "
    if seconds > 0:
        duration_str += f"{seconds}s"

    return duration_str.strip()
