In [1]:
import os
import googleapiclient.discovery
# from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

from datetime import datetime, timedelta
from pprint import pprint

# Note: sign in only once not everytime we make a search..., and then pass the api to any needed function. 

In [2]:
def get_api_key_from_file(filename):
    with open(filename, 'r') as f:
        return f.read().strip()  # Read the key and remove any trailing whitespace

# Load the API key from the file
API_KEY = get_api_key_from_file('api_key.txt')

api_service_name = "youtube"
api_version = "v3"

# Build the YouTube API service
youtube = googleapiclient.discovery.build(
    api_service_name, api_version, developerKey=API_KEY)

In [3]:
# Convert an EST datetime string to UTC format
def est_to_utc(est_str):
    est_time = datetime.fromisoformat(est_str)
    utc_time = est_time + timedelta(hours=5)  # EST is UTC-5
    return utc_time.isoformat() + 'Z'


def fetch_youtube_videos(youtube, keyword, from_date, to_date, 
                         max_results=10, location_radius='10mi', 
                         coordinate=(29.6520, 82.3250)):
    
    # Convert the EST dates to UTC
    published_after = est_to_utc(from_date)
    published_before = est_to_utc(to_date)
    
    request = youtube.search().list(
        part="snippet",
        location=f"{coordinate[0]}, {coordinate[1]}",
        locationRadius=location_radius,
        q=keyword,
        publishedAfter=published_after,
        publishedBefore=published_before,
        maxResults = max_results,
        type="video"
    )
    
    response = request.execute()
    #pprint(response)
    
    video_urls = []
    for item in response['items']:
        video_id = item['id']['videoId']
        video_urls.append(f"https://www.youtube.com/watch?v={video_id}")
    
    return video_urls


def fetch_video_details(youtube, video_id):
    
    request = youtube.videos().list(
        part="snippet,statistics",
        id=video_id
    )
    response = request.execute()
    details = response['items'][0] if 'items' in response and len(response['items']) > 0 else None
    return details



def fetch_video_comments(youtube, video_id):
    comments = []
    page_token = None  # Token for the next page of results

    while True:  # Keep fetching until there's no nextPageToken in the response
        request = youtube.commentThreads().list(
            part="snippet",
            videoId=video_id,
            maxResults=100,  # Fetch the maximum allowed comments per request
            textFormat="plainText",
            pageToken=page_token  # This will be None for the first request
        )
        
        try:
            response = request.execute()
        except HttpError as e:
            # Check if the error message contains 'commentsDisabled'
            if 'commentsDisabled' in str(e):
                print(f"Comments are disabled for video ID {video_id}.")
                return comments
            else:
                # For any other errors, you might want to raise them or handle them differently
                raise
        
        # Extract comments from the current page of results
        comments.extend([item['snippet']['topLevelComment']['snippet']['textDisplay'] for item in response['items']])
        
        # If there's no nextPageToken in the response, we've fetched all comments
        if 'nextPageToken' not in response:
            break

        # Otherwise, update the page_token to fetch the next page of results
        page_token = response['nextPageToken']

    return comments

# def is_video_relevant(video_details, location_name):
#     # Check if location name is mentioned in title or description
#     title = video_details['snippet']['title'].lower()
#     description = video_details['snippet']['description'].lower()
#     return location_name.lower() in title or location_name.lower() in description

In [4]:
# Convert an EST datetime string to UTC format
def est_to_utc(est_str):
    est_time = datetime.fromisoformat(est_str)
    utc_time = est_time + timedelta(hours=5)  # EST is UTC-5
    return utc_time.isoformat() + 'Z'


def fetch_youtube_videos(youtube, keyword, from_date, to_date, 
                         max_results=10, location_radius='10mi', 
                         coordinate=(29.6520, 82.3250)):
    
    # Convert the EST dates to UTC
    published_after = est_to_utc(from_date)
    published_before = est_to_utc(to_date)
    
    request = youtube.search().list(
        part="snippet",
        location=f"{coordinate[0]}, {coordinate[1]}",
        locationRadius=location_radius,
        q=keyword,
        publishedAfter=published_after,
        publishedBefore=published_before,
        maxResults = max_results,
        type="video"
    )
    
    response = request.execute()
    #pprint(response)
    
    video_urls = []
    for item in response['items']:
        video_id = item['id']['videoId']
        video_urls.append(f"https://www.youtube.com/watch?v={video_id}")
    
    return video_urls


def fetch_video_details(youtube, video_id):
    
    request = youtube.videos().list(
        part="snippet,statistics, recordingDetails",
        id=video_id
    )
    response = request.execute()
    # details = response['items'][0] if 'items' in response and len(response['items']) > 0  and len(response['items'][0]['recordingDetails']) > 0 else None
    details = response['items'][0] if 'items' in response and len(response['items']) > 0 else None
    # print(details['recordingDetails'])
    return details



def fetch_video_comments(youtube, video_id, keyword):
    comments = []
    page_token = None  # Token for the next page of results

    while True:  # Keep fetching until there's no nextPageToken in the response
        request = youtube.commentThreads().list(
            part="snippet",
            videoId=video_id,
            maxResults=10,  # Fetch the maximum allowed comments per request
            textFormat="plainText",
            pageToken=page_token  # This will be None for the first request
        )
        
        try:
            response = request.execute()
        except HttpError as e:
            # Check if the error message contains 'commentsDisabled'
            if 'commentsDisabled' in str(e):
                print(f"Comments are disabled for video ID {video_id}.")
                return comments
            else:
                # For any other errors, you might want to raise them or handle them differently
                raise
        
        # Extract comments from the current page of results
        comments.extend([item['snippet']['topLevelComment']['snippet']['textDisplay'] for item in response['items'] if item['snippet']['topLevelComment']['snippet']['textDisplay'].find(keyword) != -1])
        
        # If there's no nextPageToken in the response, we've fetched all comments
        if 'nextPageToken' not in response:
            break

        # Otherwise, update the page_token to fetch the next page of results
        page_token = response['nextPageToken']

    return comments

# def is_video_relevant(video_details, location_name):
#     # Check if location name is mentioned in title or description
#     title = video_details['snippet']['title'].lower()
#     description = video_details['snippet']['description'].lower()
#     return location_name.lower() in title or location_name.lower() in description

# Function to write all YouTube relevant data to a PDF. If you don't have the library, run: "pip install fpdf2" in your terminal. 
 

In [5]:
from fpdf import FPDF

def clean_text(text):
    # Here we simply remove any character outside the Latin-1 range
    # You can modify this function to replace characters or do more advanced cleaning
    return ''.join(c for c in text if ord(c) < 256)

# Function to generate daily PDF report
def generate_pdf_report(videos_data, retailer_name="Walmart", 
                        ):
    
    pdf = FPDF()
    pdf.add_page()
    
    # Set title font and size
    pdf.set_font("Helvetica", 'B', 12)
    
    current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    # Print the main report title centered
    pdf.cell(200, 10, f"{retailer_name}-YouTube-Threat-Report", 
             align='C')
    pdf.ln(10)  # Add an extra line break for separation
    
    # Print today's date-time
    pdf.multi_cell(200, 10, f"Today's Date: {current_date}", align='C')
#     pdf.cell(200, 10, f"Date: {current_date}", ln=True)
    pdf.ln(10)
    # pdf.ln(10)  # Add an extra line break for separation
    
    for video_data in videos_data:
        pdf.set_text_color(0,0,0)
        temp_title = video_data['title'].strip()
        # print(temp_title)
        pdf.cell(200, 10, "Title: " + temp_title)
        pdf.ln(10)
        pdf.cell(200, 10, "Video URL: " + video_data['url'])
        pdf.ln(10)
        pdf.cell(200, 10, "Location: " + video_data['location'])
        pdf.ln(10)
        pdf.cell(200, 10, "Published Date: " + video_data['published_at'])
        pdf.ln(10)
        pdf.cell(200, 10, "Views: " + video_data['views'])
        pdf.ln(10)
        pdf.cell(200, 10, "Likes: " + video_data['likes'])
        pdf.ln(10)
        pdf.cell(200, 10, "Comments:")
        pdf.ln(10)
        for comment_index in range(len(video_data['comments'])):
            # For text wrapping multiple lines for longer comments: 
            if comment_index > 5:
                break
            else:
                if comment_index % 2 == 0:
                    pdf.set_text_color(255,0,0)
                    pdf.multi_cell(200, 10, "\t\t\t\t-" + video_data['comments'][comment_index])
                    pdf.ln(10)
                else:
                    continue
        pdf.cell(200, 0, '', 'T')  # Separator line
        pdf.ln(10)
        pdf.add_page()
        
    filename = f"./Reports/{retailer_name}-YouTube-Threat-Report-{current_date}.pdf"
    
    pdf.output(filename)

In [6]:
# Main execution
if __name__ == '__main__':
    
    # Set of unique video URLs: 
    unique_videos = set()
    
    # ESTABLISH LIST OF KEYWORDS TO SEARCH FOR VIDEOS 
    # RELATED TO THEFT, FRAUD, & VIOLENCE AND RETAILER OF CHOICE FOR STUDY
    # LET'S FOCUS ON TARGET or Walmart... (a small number of retailers)
    
    KEYWORDS = ["Walmart", "Walmart Fraud", 
                "Walmart Theft", "Walmart Violence" 
                ]
    
    # Get the current date-time
    current_datetime = datetime.now()
    # Get the date-time 7 days in the past
    seven_days_ago = current_datetime - timedelta(days=7)
    # Convert them to the desired string format
    TO_DATE = current_datetime.strftime('%Y-%m-%dT%H:%M:%S')
    FROM_DATE = seven_days_ago.strftime('%Y-%m-%dT%H:%M:%S')
    
    # print(TO_DATE)
    
    
    COORDINATES = [
                    ((29.6520, 82.3250), "Gainesville, FL"),
                    ((27.2730, 80.3582), "Port St.Lucie, FL"),
                    ((35.0844, 106.6504), "Albuquerque, NM"),
                    ((116.4074, 39.9042), 'Beijing')] 
    
    LOCATION_RADIUS = '10mi'
    MAX_RESULTS = 10
    
    
    videos_data = []
    total_comments = []

    for keyword in KEYWORDS:
        for coords, location in COORDINATES:
            # print(coords)
            videos = fetch_youtube_videos(youtube, keyword, 
                                          FROM_DATE, TO_DATE,
                                          max_results=MAX_RESULTS, 
                                          location_radius=LOCATION_RADIUS,
                                          coordinate=coords)
            
            for url in videos:
                video_id = url.split("v=")[-1]
                if video_id not in unique_videos:
                    unique_videos.add(video_id)
                else:
                    continue 
                    
                video_details = fetch_video_details(youtube, video_id)
                print(video_details)
                comments = fetch_video_comments(youtube, video_id, keyword)
                
                # Clean the video title and comments to remove unsupported chars. 
                cleaned_title = clean_text(video_details['snippet']['title'])
                cleaned_comments = [clean_text(comment) for comment in comments]
                total_comments.append(cleaned_comments)

                video_data = {
                    'url': url,
                    'location': location,
                    'published_at': video_details['snippet']['publishedAt'],
                    'title': cleaned_title,
                    'views': video_details['statistics']['viewCount'],
                    'likes': video_details['statistics']['likeCount'],
                    'comments': cleaned_comments  # Use the cleaned comments
                }
                videos_data.append(video_data)       
                # print(video_data)        
                

{'kind': 'youtube#video', 'etag': '2NMloQFqQx8HiYdM6Xa4hDbEqQ8', 'id': 'q_Hkn9R_oUc', 'snippet': {'publishedAt': '2023-11-27T17:28:23Z', 'channelId': 'UCtdApyWliqAvJRQ5UpnSQnA', 'title': '15 HOT WALMART COUPONING DEALS! ~  4 HOT FREEBIES! ~ NOVEMBER 2023', 'description': 'PRINTABLE BREAKDOWN: https://docs.google.com/document/d/1FF00WH7Aew7wZqRdT2isEPUMeZ3Bvwi0hPZH_gRyhic/edit?usp=sharing\n\nCLOUD WATER REBATE: https://discover.gotoaisle.com/cloud-water-evergreen\n\nAISLE REBATE (to check for body wash deal): https://discover.gotoaisle.com/offers?code=poweredbyaisle\n\nCHECK OUT MY SECOND CHANNEL:\nhttps://www.youtube.com/channel/UCKzNj62T4uz1A-Co1e8zD4g/videos\n\nUse My Fluz referral code to earn THREE 35% vouchers from selected merchants.\nhttps://joinfluz.app.link/OHIOVALLEYCOUPONER\n\nIbotta Referral: AYJJJR \nFetch Referral: C1PNA \nShopkick Referral: WIN430308\nSWAGBUCKS Referral: https://www.swagbucks.com/?rb=48283071\nCOINOUT Referral: https://coinout.com/referrals/new?r=JCJDZ3T

# Make sure you've created a folder called "Reports" to save the daily reports before continuing. 

In [9]:
# Generate PDF report
generate_pdf_report(videos_data)
print(videos_data)

[{'url': 'https://www.youtube.com/watch?v=q_Hkn9R_oUc', 'location': 'Beijing', 'published_at': '2023-11-27T17:28:23Z', 'title': '15 HOT WALMART COUPONING DEALS! ~  4 HOT FREEBIES! ~ NOVEMBER 2023', 'views': '1759', 'likes': '287', 'comments': []}, {'url': 'https://www.youtube.com/watch?v=xpV4PeEJxTM', 'location': 'Beijing', 'published_at': '2023-11-26T17:00:44Z', 'title': 'Osmo X Walmart', 'views': '1282959', 'likes': '110631', 'comments': ['"In Walmart...?"\n"your time starts now"\n"shit nvm "', 'Is it in the Canadian Walmart?', '"Mom why did you take so long?"\n\nSorry honey I was eating a 5 Star meal in Walmart', 'I thought she was confused about finding the ingredients in Walmart at first.. lol ', '"Uhm sir this is Walmart"', "We're going to Walmart with this one\n", 'Nick:"Osmo is in Walmart for a limited time"\n\nBro really just took the last pack in that Walmart', 'Bro cooked a 5 star meal in Walmart', 'Walmart? Yes Walmart has all the ingredients.', 'I declare shanagins!  One m

# Code Notes: Not every YouTube video is Geotagged, so we have some duplicates when we search for YouTube videos using keyword searches. The date-time range works exactly as intended. However, we will not always be able to get videos from a location unless that data is available in the video. To remove duplicates when running the code, I'll add a set that will store the video ID. Add the video ID to the set, and when going to next video check to make sure it hasn't been added yet so our pdf doesn't grow to large. 

# Note: the Geotagging does not seem reliable right now. Nic, start looking into this and test, test, test to prove to yourself the solution works for saving video data to report if geolocation is provided, otherwise, don't save the data to the report. I removed the duplication issue. See all other updates i've made and start experimenting with the code. Also start experimenting with more keywords, we can focus on Walmart as a retailer of interest. 