### Library

In [None]:
import requests
import re
import time
import pandas as pd
from pymongo import MongoClient
import logging
from pymongo.errors import BulkWriteError
import ollama
from langdetect import detect
from googletrans import Translator
import os
import openai

### Constant Variable

In [None]:
# Constant
INSTAGRAM_API_URL = "https://instagram-scraper-api2.p.rapidapi.com/v1/hashtag"
HEADERS = {
    "x-rapidapi-key": "YOUR_RAPIDAPI_KEY",
    "x-rapidapi-host": "instagram-scraper-api2.p.rapidapi.com"
}
MONGO_URI = "mongodb://localhost:27017/"
DB_NAME = "ig_post"
COLLECTION_NAME = "instagram_posts"

### Extract data from API

In [None]:
# Function to Fetch Data for One Hashtag
def fetch_all_data_for_hashtag(hashtag):
    all_items = []
    querystring = {"hashtag": hashtag}
    
    while True:
        response = requests.get(url, headers=headers, params=querystring)
        if response.status_code != 200:
            print(f"Error fetching {hashtag}: {response.status_code}")
            break
        
        data = response.json()
        items = data.get("data", {}).get("items", [])
        all_items.extend(items)
        
        # Check if there is a next page
        next_page = data.get("data", {}).get("next_page")
        if not next_page:
            break  # Stop if no more pages
        
        querystring["next_page"] = next_page  # Use next page token
        time.sleep(1)  # Avoid hitting API rate limits

    return all_items

In [None]:
# Function to Extract Insights
def extract_insights(items, hashtag):
    insights = []
    for item in items:
        caption_text = item.get("caption", {}).get("text", "").lower()
        
        # Extract hashtags from text using regex
        hashtags = re.findall(r"#(\w+)", caption_text)

        # Extract additional details
        post_id = item.get("id", None)
        comment_count = item.get("comment_count", 0)
        feed_type = item.get("feed_type", "")
        is_video = item.get("is_video", False)
        like_count = item.get("like_count", 0)
        media_name = item.get("media_name", "")
        product_type = item.get("product_type", "")
        video_duration = item.get("video_duration", 0.0)

        # Check for mentions of the hashtag in text or hashtags
        if hashtag in caption_text or any(tag.lower() == hashtag for tag in hashtags):
            insights.append({
                "text": caption_text,
                "hashtags": hashtags,
                "id": post_id,
                "comment_count": comment_count,
                "feed_type": feed_type,
                "is_video": is_video,
                "like_count": like_count,
                "media_name": media_name,
                "product_type": product_type,
                "video_duration": video_duration,
            })
    
    return insights

In [None]:
# Fetch Data for Both Hashtags
hashtags = ["thailand", "bangkok","ไทย", "กรุงเทพ", "bkk", "bangkokcity", "thai", "bangkokthailand", "amazingthailand", "y2kthailand","thailandtravel"]
all_insights = {}

In [None]:
# Function to Fetch Data from all hashtag
for hashtag in hashtags:
    print(f"Fetching data for #{hashtag}...")
    data_items = fetch_all_data_for_hashtag(hashtag)
    insights = extract_insights(data_items, hashtag)
    all_insights[hashtag] = insights
    print(f"Total posts retrieved for #{hashtag}: {len(insights)}\n")

# Sum total post getting from API
total_posts = sum(len(posts) for posts in all_insights.values())
print(f"Total number of relevant posts: {total_posts}")

### Connect to MongoDB

In [None]:
#Install pymongo for notebook
!pip install pymongo

In [None]:
# Connect to MongoDB
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]

### Save raw data to MongoDB

In [None]:
import logging
from pymongo.errors import BulkWriteError
# Assuming all_insights is a dictionary with hashtag keys and list of insights as values.
# Flatten the insights into a single list.
all_data = [post for insights in all_insights.values() for post in insights]
for doc in all_data:
    doc.pop("_id", None)

logging.info(f"Attempting to insert {len(all_data)} documents into MongoDB.")

try:
    for doc in all_data:
        filter_query = {"id": doc["id"]}  # Adjust based on your unique field
        update_query = {"$set": doc}
        collection.update_one(filter_query, update_query, upsert=True)

    logging.info("All new data upserted into MongoDB!")
except BulkWriteError as bwe:
    logging.error("Bulk write error occurred during update operation.")
    logging.error(bwe.details)

### Load data from MongoDB

In [None]:
# Load Data from MongoDB
data = list(collection.find({}, {"text": 1}))
df = pd.DataFrame(data)

### Data cleaning

In [None]:
# Clean Text
def clean_text(text):
    if text:
        text = re.sub(r"http\S+|www.\S+", "", text)  # Remove URLs
        text = re.sub(r"#\w+", "", text)  # Remove hashtags
        text = re.sub(r"@\w+", "", text)  # Remove mentions
        return text.strip()
    return ""

df["clean_text"] = df["text"].apply(clean_text)

### Sentiment Analysis with OpenAI API

In [None]:
# Load API Key from .env file
openai.api_key = os.getenv("OPENAI_API_KEY")

In [None]:
# Function to Use ChatGPT for Sentiment Analysis
def get_sentiment_with_chatgpt(text):
    if text:
        try:
            response = openai.chat.completions.create(
                model="gpt-3.5-turbo-1106",
                messages=[
                    {"role": "system", "content": "You are a sentiment analysis assistant. Classify the sentiment of the given text."},
                    {"role": "user", "content": f"Classify this Instagram caption into one of these categories: Positive, Negative, Neutral:\n\n{text}"}
                ],
                temperature=0.1  # Low temperature for deterministic response
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error calling OpenAI API: {e}")
            return "Neutral"  # Default fallback
    return "Neutral"

In [None]:
# Apply Sentiment Analysis using ChatGPT API
df["sentiment"] = df["clean_text"].apply(get_sentiment_with_chatgpt)

In [None]:
#Ensure Data quality from response
def extract_sentiment(text):
    # Define the list of sentiment words to look for.
    sentiments = ["Positive", "Negative", "Neutral"]
    
    # Convert the input text to lowercase for case-insensitive comparison.
    lower_text = text.lower()
    
    # Check each sentiment word.
    for sentiment in sentiments:
        if sentiment.lower() in lower_text:
            return sentiment  # Return the sentiment in its original form.
    
    # Return None if no sentiment word is found.
    return None

In [None]:
# Apply text cleaning
df["sentiment"] = df["sentiment"].apply(extract_sentiment)

In [None]:
# Store Processed Data Back in MongoDB
for index, row in df.iterrows():
    collection.update_one({"text": row["text"]}, {"$set": {"sentiment": row["sentiment"]}}, upsert=True)

print("Sentiment Analysis Completed using ChatGPT API!")

### Topic Analysis with Local run DeepSeek

In [None]:
# Function to Use Deepseek for Topic Analysis
def get_topic_with_deepseek(text):
    if text:
        try:
            response = ollama.chat(
                model="deepseek-v2:16b",
                messages=[
                    {"role": "system", "content": "You are a topic analysis assistant. Classify the topic of the given text."},
                    {"role": "user", "content": f"Analyze the given multi-language Instagram caption and classify it into only one topic. The topic should be specific enough to provide meaningful insights but not overly niche. If multiple topics are highly related, consolidate them into a common broader category instead of listing them separately. Return only one word representing the topic in English. If the caption cannot be analyzed, return only the word Unknown (without quotes or additional explanation).:\n\n{text}"}
                ],
                options={"temperature": 0}
            )
            time.sleep(0.5)  # Avoid overloading Ollama with fast requests
            return response["message"]["content"] if "message" in response else "Unknown"
        except Exception as e:
            print(f"Error calling DeepSeek: {e}")
            return "Unknown"  # Default fallback in case of errors

In [None]:
# Batch Topic Analysis
batch_size = 10  # Adjust batch size based on available memory
num_batches = len(df) // batch_size + 1

for i in range(num_batches):
    start_idx = i * batch_size
    end_idx = min((i + 1) * batch_size, len(df))
    
    df.loc[start_idx:end_idx, "topic"] = df.loc[start_idx:end_idx, "clean_text"].apply(get_topic_with_deepseek)

    print(f"Processed batch {i+1}/{num_batches}")

In [None]:
# Store Processed Data Back in MongoDB
for index, row in df.iterrows():
    collection.update_one({"text": row["text"]}, {"$set": {"topic": row["topic"]}}, upsert=True)

print("Topic Analysis Completed using Local DeepSeek!")

In [None]:
# Load data from MongoDB
df = pd.DataFrame(list(collection.find({}, {"_id": 1, "topic": 1})))

In [None]:
# Define a regex pattern for detecting emojis
emoji_regex = re.compile(
    r"[\U0001F300-\U0001F5FF\U0001F600-\U0001F64F\U0001F680-\U0001F6FF"
    r"\U0001F700-\U0001F77F\U0001F780-\U0001F7FF\U0001F800-\U0001F8FF"
    r"\U0001F900-\U0001F9FF\U0001FA00-\U0001FA6F\U0001FA70-\U0001FAFF"
    r"\U00002702-\U000027B0]+", flags=re.UNICODE
)

# Filter DataFrame where "topic" contains an emoji
df_with_emoji = df[df["topic"].str.contains(emoji_regex, na=False)]
print(len(df_with_emoji))

# Replace data with emoji with Unknown
df.loc[df["topic"].str.contains(emoji_regex, na=False), "topic"] = "Unknown"

In [None]:
# Check text length and replace topic with "Unknown" if over 50 characters
print(len(df["topic"].str.len() > 50))
df.loc[df["topic"].str.len() > 50, "topic"] = "Unknown"

In [None]:
# Update MongoDB
for _, row in df.iterrows():
    collection.update_one({"_id": row["_id"]}, {"$set": {"topic": row["topic"]}})

print("Updated topics in MongoDB for long texts.")

### Translate non-eng with GoogleTrans API

In [None]:
# Initialize translator
translator = Translator()

# Function to translate only non-English text
def translate_if_needed(text):
    try:
        detected_lang = translator.detect(text).lang  # No async
        if detected_lang == "en":
            return text
        return translator.translate(text, dest="en").text  # No await needed
    except Exception as e:
        print(f"Error: {e}")
        return "Translation Error"

In [None]:
# Apply translation function
df["topic"] = df["topic"].apply(translate_if_needed)

In [None]:
# Update MongoDB
for _, row in df.iterrows():
    collection.update_one({"_id": row["_id"]}, {"$set": {"topic": row["topic"]}})

print("Updated topics in MongoDB for non eng to eng")