# Project 1

Data Source: https://rapidapi.com/mrngstar/api/instagram-scraper-api3

Instagram: @marcitocastro (482001976) - https://www.instagram.com/marcitocastro/

- Profile Data (twice)
- General Information About the Last 24 Posts (twice)
- Comments from the Last Post (twice)
- Classification of Comments from the Last Post (twice)
  - Data and Time of Data Extraction:
    - 2024-09-18 / 05:03:20.26
    - 2024-09-21 / 19:04:30.89

## Install Dependencies

In [None]:
!pip install requests
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Spark.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install pyspark==3.1.1
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.
!pip install -U google-generativeai # Install or update the Google Generative AI package.
!ls


## Configure Environment

In [None]:
# Import libraries
import requests
import json
import findspark
import os
import pandas as pd
import pyspark
from pyspark.sql import DataFrame, SparkSession
from typing import List, Tuple, Dict
from array import ArrayType
import pyspark.sql.types as T
import pyspark.sql.functions as F
import google.generativeai as genai # Import the Google Generative AI module
from google.colab import userdata
from google.colab import drive

# Configure environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

drive.mount('/content/drive')

# Initialize Spark session
findspark.init()
spark = SparkSession.builder.appName("Instagram Data Analysis").getOrCreate()
spark


Mounted at /content/drive


## Get Infos from Instagram

### Get PROFILE

In [None]:
def get_influencer_profile(influencer: str, url: str, headers: dict):
    """
    Fetches the profile information of the given Instagram influencer.

    Args:
        influencer (str): The username of the Instagram influencer.
        url (str): The base URL for the Instagram API.
        headers (dict): Headers required for the API request.
    """
    try:
        querystring = {"username_or_id": influencer}
        method = 'user_info'
        response = requests.get(f'{url}{method}', headers=headers, params=querystring)
        response.raise_for_status() # Checks if there was an HTTP error

        response_instagram = response.json()

        profile = response_instagram.get("data", {})

        full_name = profile.get('full_name')
        profile_id  = profile.get('id')
        biography = profile.get('biography')
        category = profile.get('category')
        bio_links = profile.get('bio_links')
        follower_count = profile.get('follower_count')
        following_count = profile.get('following_count')

        hd_profile_pic_versions = profile.get('hd_profile_pic_versions', [])
        hd_profile_pic_versions_url = hd_profile_pic_versions[0].get('url') if hd_profile_pic_versions else None

        media_count = profile.get('media_count')
        is_verified = profile.get('is_verified')
        threads_profile_glyph_url = profile.get('threads_profile_glyph_url')

        data_profile = [(
            full_name, profile_id, biography, category, bio_links, follower_count, following_count, hd_profile_pic_versions_url,
            media_count, is_verified, threads_profile_glyph_url
        )]

        return data_profile

    except requests.RequestException as e:
        print(f"Error fetching influencer profile: {e}")
        return []


### Get POSTS

In [None]:
def get_influencer_posts(influencer: str, url: str, headers: dict):
    """
    Fetches up to 24 posts of the given Instagram influencer

    Args:
        influencer (str): The username of the Instagram influencer.
        url (str): The base URL for the Instagram API.
        headers (dict): Headers required for the API request.
    """
    try:
        method = 'user_posts'
        latest_post_code = ''
        next_max_id = None
        data_posts = []

        for i in range(2):  # Limit to two requests
            querystring = {"username_or_id": influencer, "count": "12"}

            if next_max_id:
                querystring["max_id"] = next_max_id

            response = requests.get(f'{url}{method}', headers=headers, params=querystring)
            response.raise_for_status() # Checks if there was an HTTP error

            response_instagram = response.json()

            items = response_instagram.get('data', {}).get('items', [])
            next_max_id = response_instagram.get('data', {}).get('next_max_id')

            # Process each post
            for item in items:
                post_id = item.get('id')
                code = item.get('code')

                # Update the latest post code if not pinned
                if item.get('timeline_pinned_user_ids') == None:
                    if latest_post_code == '':
                        latest_post_code = code

                device_timestamp = item.get('device_timestamp')
                like_and_view_counts_disabled = item.get('like_and_view_counts_disabled')
                caption = item.get('caption')
                image_versions2 = item.get('image_versions2')
                product_type = item.get('product_type')
                coauthor_producers = item.get('coauthor_producers')
                like_count = item.get('like_count')
                comment_count = item.get('comment_count')
                reshare_count = item.get('reshare_count')
                timeline_pinned_user_ids = item.get('timeline_pinned_user_ids')

                data_posts.append((
                    post_id, code, device_timestamp, like_and_view_counts_disabled, caption, image_versions2, product_type,
                    coauthor_producers, like_count, comment_count, reshare_count, timeline_pinned_user_ids
                ))

            if not next_max_id:
                break

        return data_posts, latest_post_code

    except requests.RequestException as e:
        print(f"Error fetching influencer posts: {e}")
        return [], None


### Get COMMENTS

In [None]:
def get_comments_for_latest_post(latest_post_code: str, url: str, headers: dict):
    """
    Fetches comments for the latest post of the given Instagram influencer.

    Args:
        latest_post_code (str): The code of the latest post.
        url (str): The base URL for the Instagram API.
        headers (dict): Headers required for the API request.
    """
    method = 'media_comments'
    min_id = None
    data_comments = []

    while True:
        try:
            querystring = {"code_or_id_or_url": latest_post_code, "sort_order": "recent", "min_id": min_id}
            response = requests.get(f'{url}{method}', headers=headers, params=querystring)
            response.raise_for_status() # Checks if there was an HTTP error

            response_instagram = response.json()
            array_comments = response_instagram.get('data', {}).get('comments', [])

            # Loop through each comment in the response and extract relevant details
            for comment in array_comments:
                pk = comment.get('pk')
                user_id = comment.get('user_id')
                text = comment.get('text')
                comment_like_count = comment.get('comment_like_count', 0)
                child_comment_count = comment.get('child_comment_count', 0)
                user_username = comment.get('user', {}).get('username')
                user_full_name = comment.get('user', {}).get('full_name')
                user_is_verified = comment.get('user', {}).get('is_verified')

                # Store each comment's data in a dictionary
                data_comments.append((
                    pk, user_id, text, comment_like_count, child_comment_count, user_username, user_full_name, user_is_verified
                ))

            # Check if there is a next page of comments
            min_id = response_instagram.get('data', {}).get('next_min_id')
            if not min_id:
                break

        except requests.RequestException as e:
            print(f"Error fetching comments: {e}")
            break

    return data_comments


## Main Function

In [None]:
def main(influencer: str, url: str, headers: dict):
    """
    Main function to run the workflow: fetching profile, posts, comments for the latest post and saving to Data Lake.

    Args:
        influencer (str): The username of the Instagram influencer.
        url (str): The base URL for the Instagram API.
        headers (dict): Headers required for the API request.
    """
    try:
        # Get influencer profile
        data_profile = get_influencer_profile(influencer, url, headers)
        if not data_profile:
            raise ValueError("Profile data could not be retrieved")
        else:
            # Create and save DataFrame for profile
            schema_profile = T.StructType([
                T.StructField("full_name", T.StringType(), nullable=False),
                T.StructField("id", T.StringType(), nullable=True),
                T.StructField("biography", T.StringType(), nullable=True),
                T.StructField("category", T.StringType(), nullable=True),
                T.StructField("bio_links", T.StringType(), nullable=True),
                T.StructField("follower_count", T.StringType(), nullable=True),
                T.StructField("following_count", T.StringType(), nullable=True),
                T.StructField("hd_profile_pic_versions_url", T.StringType(), nullable=True),
                T.StructField("media_count", T.StringType(), nullable=True),
                T.StructField("is_verified", T.StringType(), nullable=True),
                T.StructField("threads_profile_glyph_url", T.StringType(), nullable=True)
            ])

            df_profile = spark.createDataFrame(data_profile, schema=schema_profile)
            df_profile = df_profile.withColumn('ts_exec', F.current_timestamp())
            df_profile.write.partitionBy("full_name", "ts_exec").mode("append").format('parquet').save('/content/drive/MyDrive/Datalake/Instagram/Profile/')

        # Get posts
        data_posts, latest_post_code = get_influencer_posts(influencer, url, headers)
        if not data_posts or not latest_post_code:
            raise ValueError("Post data or latest post code could not be retrieved")
        else:
            # Create and save DataFrame for posts
            schema_posts = T.StructType([
                T.StructField("id", T.StringType(), nullable=True),
                T.StructField("code", T.StringType(), nullable=True),
                T.StructField("device_timestamp", T.StringType(), nullable=True),
                T.StructField("like_and_view_counts_disabled", T.StringType(), nullable=True),
                T.StructField("caption", T.StringType(), nullable=True),
                T.StructField("image_versions2", T.StringType(), nullable=True),
                T.StructField("product_type", T.StringType(), nullable=True),
                T.StructField("coauthor_producers", T.StringType(), nullable=True),
                T.StructField("like_count", T.StringType(), nullable=True),
                T.StructField("comment_count", T.StringType(), nullable=True),
                T.StructField("reshare_count", T.StringType(), nullable=True),
                T.StructField("timeline_pinned_user_ids", T.StringType(), nullable=True)
            ])

            df_posts = spark.createDataFrame(data_posts, schema=schema_posts)
            df_posts = df_posts.withColumn('ts_exec', F.current_timestamp())
            df_posts.write.partitionBy("ts_exec").mode("append").format('parquet').save('/content/drive/MyDrive/Datalake/Instagram/Posts/')

        # Get comments for the latest post
        comments_data = get_comments_for_latest_post(latest_post_code, url, headers)
        if comments_data:
            # Create and save DataFrame for comments
            schema_comments = T.StructType([
                T.StructField("pk", T.StringType(), nullable=True),
                T.StructField("user_id", T.StringType(), nullable=True),
                T.StructField("text", T.StringType(), nullable=True),
                T.StructField("comment_like_count", T.StringType(), nullable=True),
                T.StructField("child_comment_count", T.StringType(), nullable=True),
                T.StructField("user_username", T.StringType(), nullable=True),
                T.StructField("user_full_name", T.StringType(), nullable=True),
                T.StructField("user_is_verified", T.StringType(), nullable=True)
            ])

            df_comments = spark.createDataFrame(comments_data, schema=schema_comments)
            df_comments = df_comments.withColumn('ts_exec', F.current_timestamp())
            df_comments.write.partitionBy("ts_exec").mode("append").format('parquet').save('/content/drive/MyDrive/Datalake/Instagram/Comments/')
        else:
            print("No comments found for the latest post")

    except ValueError as e:
        print(f"Error in the main workflow: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")


## Classify Comments Using Google Gemini

In [None]:
def classify_comment(text: str, gemini_api_key: str):
    """
    Classifies the sentiment of a comment using the Google Gemini API.

    Args:
        text (str): The comment text to classify.
        gemini_api_key (str): The API key for Google Gemini.
    """
    try:
        genai.configure(api_key=gemini_api_key)
        model = genai.GenerativeModel('gemini-1.5-flash')
        prompt = f'Classify the following comment as (Positive, Negative, Neutral, Humorous). Return only the classification: {text}'
        response = model.generate_content(prompt)

        return response.text.strip()

    except Exception as e:
        print(f"Error classifying comment: {e}")

        return 'Unknown'


In [None]:
def classify_comments_for_the_latest_post(influencer: str, url: str, headers: dict, gemini_api_key: str):
    """
    Classifies the sentiment of comments on the latest Instagram post and stores the results in a Data Lake.

    Args:
        influencer (str): The username of the Instagram influencer.
        url (str): The base URL for the Instagram API.
        headers (dict): Headers required for the API request.
        gemini_api_key (str): The API key for Google Gemini.
    """
    try:
        # Get comments for the latest post
        data_posts, latest_post_code = get_influencer_posts(influencer, url, headers)
        comments_data = get_comments_for_latest_post(latest_post_code, url, headers)
        if comments_data:
            # Classify comments using Google Gemini
            comments_to_classify = comments_data[:20]  # Limit to 20 comments

            classified_comments = [(pk, text, classify_comment(text, gemini_api_key))
                                  for (pk, user_id, text, comment_like_count, child_comment_count, user_username, user_full_name, user_is_verified)
                                  in comments_to_classify]

            # Create and save DataFrame for classified comments
            schema_classified_comments = T.StructType([
                T.StructField("pk", T.StringType(), nullable=True),
                T.StructField("text_original", T.StringType(), nullable=True),
                T.StructField("classification", T.StringType(), nullable=True)
            ])

            df_classified_comments = spark.createDataFrame(classified_comments, schema=schema_classified_comments)
            df_classified_comments = df_classified_comments.withColumn('ts_exec', F.current_timestamp())
            df_classified_comments.write.partitionBy("ts_exec").mode("append").format('parquet').save('/content/drive/MyDrive/Datalake/Instagram/Classified_comments/')
        else:
            print("No comments found for the latest post")

    except ValueError as e:
        print(f"Error in the main workflow: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")


## Define the Parameters and Execute the Workflow

In [None]:
# Define your parameters
influencer = 'marcitocastro'

url = 'https://instagram-scraper-api3.p.rapidapi.com/'

headers = {
    'x-rapidapi-key': userdata.get('x-rapidapi-key'),
    'x-rapidapi-host': 'instagram-scraper-api3.p.rapidapi.com'
}

gemini_api_key = userdata.get('gemini_api_key')

# Execute the workflow
main(influencer, url, headers)
classify_comments_for_the_latest_post(influencer, url, headers, gemini_api_key)


## Analytics

In [None]:
df_profile = spark.read.parquet('/content/drive/MyDrive/Datalake/Instagram/Profile/')

df_profile.createOrReplaceTempView("df_profile")

spark.sql("""
SELECT
    ts_exec,
    id AS profile_id,
    full_name,
    category,
    media_count,
    follower_count,
    following_count,
    is_verified,
    hd_profile_pic_versions_url,
    threads_profile_glyph_url
FROM df_profile
"""
).show(truncate=False)


+-----------------------+----------+--------------+--------+-----------+--------------+---------------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------+
|ts_exec                |profile_id|full_name     |category|media_count|follower_count|following_count|is_verified|hd_profile_pic_versions_url                                                                                                                                                                                                                                                                                           

In [None]:
df_posts = spark.read.parquet('/content/drive/MyDrive/Datalake/Instagram/Posts/')

df_posts.createOrReplaceTempView("df_posts")

spark.sql("""
SELECT
    id AS post_id,
    code,
    device_timestamp,
    like_count,
    comment_count,
    reshare_count,
    product_type,
    timeline_pinned_user_ids,
    coauthor_producers,
    like_and_view_counts_disabled,
    image_versions2
FROM df_posts
"""
).show(10, truncate=False)


+-------------------------------+-----------+----------------+----------+-------------+-------------+------------------+------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
df_comments = spark.read.parquet('/content/drive/MyDrive/Datalake/Instagram/Comments/')

df_comments.createOrReplaceTempView("df_comments")

spark.sql("""
SELECT *
FROM df_comments
"""
).show(10, truncate=False)


+-----------------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+-------------------+------------------+-----------------------+----------------+-----------------------+
|pk               |user_id    |text                                                                                                                                                  |comment_like_count|child_comment_count|user_username     |user_full_name         |user_is_verified|ts_exec                |
+-----------------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+-------------------+------------------+-----------------------+----------------+-----------------------+
|18064855975625310|1985716545 |Acreditem , mas eu terminei hj de assistir essa sér

In [None]:
df_classified_comments = spark.read.parquet('/content/drive/MyDrive/Datalake/Instagram/Classified_comments/')

df_classified_comments.createOrReplaceTempView("df_classified_comments")

spark.sql("""
SELECT *
FROM df_classified_comments
"""
).show(10, truncate=False)


+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------+---------------------+
|pk               |text_original                                                                                                                                         |classification                                                   |ts_exec              |
+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------+---------------------+
|18064855975625310|Acreditem , mas eu terminei hj de assistir essa série, fiquei viciado , todos dias saia do trabalho já pensando em vir assistir. Pena que já acabei :/|Positive                                             

In [None]:
# 1. Influencer's Number of Followers
followers_count_query = """
SELECT
    follower_count,
    ts_exec
FROM df_profile
ORDER BY ts_exec DESC
"""
followers_count = spark.sql(followers_count_query)
followers_count.show(truncate=False)


+--------------+-----------------------+
|follower_count|ts_exec                |
+--------------+-----------------------+
|822214        |2024-09-21 19:04:30.894|
|820374        |2024-09-18 05:03:20.26 |
+--------------+-----------------------+



In [None]:
# 2. Engagement Rate (AVG per Post) - (Sum of Interactions across Multiple Posts / (Number of Posts * Number of Followers)) * 100
engagement_rate_query = """
SELECT
    COUNT(post.id) AS qnt_posts_analyzed,
    prof.follower_count AS total_followers,
    SUM(post.like_count + post.comment_count + post.reshare_count) AS sum_likes_comments_reshares,
    (SUM(post.like_count + post.comment_count + post.reshare_count) / (COUNT(post.id) * prof.follower_count)) * 100 AS engagement_rate
FROM df_posts post
CROSS JOIN df_profile prof
WHERE prof.ts_exec = (SELECT MAX(ts_exec) FROM df_profile) -- Último número de seguidores registrado
GROUP BY prof.follower_count
"""
engagement_rate = spark.sql(engagement_rate_query)
engagement_rate.show()


+------------------+---------------+---------------------------+-----------------+
|qnt_posts_analyzed|total_followers|sum_likes_comments_reshares|  engagement_rate|
+------------------+---------------+---------------------------+-----------------+
|                48|         822214|                   888473.0|2.251220991939649|
+------------------+---------------+---------------------------+-----------------+



In [None]:
# 3. AVG Likes per Post
avg_likes_query = """
SELECT
    COUNT(post.id) AS qnt_posts_analyzed,
    SUM(post.like_count) AS total_likes,
    SUM(post.like_count) / COUNT(post.id) AS avg_likes
FROM df_posts post
"""
avg_likes = spark.sql(avg_likes_query)
avg_likes.show()


+------------------+-----------+------------------+
|qnt_posts_analyzed|total_likes|         avg_likes|
+------------------+-----------+------------------+
|                48|   584287.0|12172.645833333334|
+------------------+-----------+------------------+



In [None]:
# 4. AVG Comments per Post
avg_comments_query = """
SELECT
    COUNT(post.id) AS qnt_posts_analyzed,
    SUM(post.comment_count) AS total_comments,
    SUM(post.comment_count) / COUNT(post.id) AS avg_comments
FROM df_posts post
"""
avg_comments = spark.sql(avg_comments_query)
avg_comments.show()


+------------------+--------------+-----------------+
|qnt_posts_analyzed|total_comments|     avg_comments|
+------------------+--------------+-----------------+
|                48|       13124.0|273.4166666666667|
+------------------+--------------+-----------------+



In [None]:
# 5. Follower Growth Rate
follower_growth_query = """
SELECT
    prof.id AS influencer_id,
    MAX(prof.follower_count) AS current_followers_count,
    MIN(prof.follower_count) AS previous_followers_count,
    ((MAX(prof.follower_count) - MIN(prof.follower_count)) / MIN(prof.follower_count)) * 100 AS growth_rate
FROM df_profile prof
GROUP BY prof.id
"""
follower_growth = spark.sql(follower_growth_query)
follower_growth.show()


+-------------+-----------------------+------------------------+------------------+
|influencer_id|current_followers_count|previous_followers_count|       growth_rate|
+-------------+-----------------------+------------------------+------------------+
|    482001976|                 822214|                  820374|0.2242879467169852|
+-------------+-----------------------+------------------------+------------------+



In [None]:
# 6. Sentiment of Comments
sentiment_analysis_query = """
SELECT
    COUNT(c.classification) AS qnt_comments_analyzed,
    SUM(CASE WHEN c.classification = 'Positive' THEN 1 ELSE 0 END) AS positive,
    SUM(CASE WHEN c.classification = 'Negative' THEN 1 ELSE 0 END) AS negative,
    SUM(CASE WHEN c.classification = 'Humorous' THEN 1 ELSE 0 END) AS humorous,
    SUM(CASE WHEN c.classification NOT IN ('Positive', 'Humorous', 'Negative') THEN 1 ELSE 0 END) AS neutral
FROM df_classified_comments c
"""
sentiment_analysis = spark.sql(sentiment_analysis_query)
sentiment_analysis.show()


+---------------------+--------+--------+--------+-------+
|qnt_comments_analyzed|positive|negative|humorous|neutral|
+---------------------+--------+--------+--------+-------+
|                   40|      19|       1|       2|     18|
+---------------------+--------+--------+--------+-------+



In [None]:
# 7. Posting Frequency (Number of Posts per Week)
post_frequency_query = """
SELECT
    COUNT(id) AS qnt_posts_analyzed,
    COUNT(id) / COUNT(DISTINCT WEEKOFYEAR(to_timestamp(device_timestamp / 1000))) AS posts_per_week
FROM df_posts
"""
post_frequency = spark.sql(post_frequency_query)
post_frequency.show()


+------------------+------------------+
|qnt_posts_analyzed|    posts_per_week|
+------------------+------------------+
|                48|2.0869565217391304|
+------------------+------------------+



In [None]:
# 8. Content Variety (Distribution of Media Types)
media_distribution_query = """
SELECT
    product_type,
    COUNT(*) AS count_per_type
FROM df_posts
GROUP BY product_type
"""
media_distribution = spark.sql(media_distribution_query)
media_distribution.show()


+------------------+--------------+
|      product_type|count_per_type|
+------------------+--------------+
|             clips|            27|
|              feed|            20|
|carousel_container|             1|
+------------------+--------------+

