# Social Media ETL & Analytics Pipeline
**Objective:** Fetch and analyze posts from Twitter and YouTube, normalize data, compute engagement metrics, and identify top posts.

## Import Required Libraries
- `requests` for API requests
- `pandas` for data handling
- `logging` to track pipeline execution and errors
- `os` and `datetime` for file paths and timestamps

In [None]:
# import libraries
import requests
import pandas as pd
import logging
from datetime import datetime
import os

## API Keys
Set Twitter Bearer Token and YouTube API key.

In [None]:
import os
from dotenv import load_dotenv

# Load variables from .env file
load_dotenv()

# Load API keys from environment variables
TWITTER_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN", "your-twitter-token-here")
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY", "your-youtube-key-here")

# Twitter headers
TWITTER_HEADERS = {"Authorization": f"Bearer {TWITTER_BEARER_TOKEN}"}

## Logging
Setup a log file to record pipeline execution and errors.

In [None]:
# using logging to create a log file and store it in the work directory 
LOG_FOLDER = r"D:\Data_Engineer_SE\logs"
os.makedirs(LOG_FOLDER, exist_ok=True)

logging.basicConfig(
    filename=os.path.join(LOG_FOLDER, 'social_data_pipeline.log'),
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

## Data Storage
Create folder and master CSV file to store all posts.

In [None]:
# create csv file to store the data fetched 
DATA_FOLDER = r"D:\Data_Engineer_SE"
os.makedirs(DATA_FOLDER, exist_ok=True)

MASTER_FILE = os.path.join(DATA_FOLDER, "social_data_master.csv")

## Fetch Functions
- `fetch_twitter_posts()` retrieves tweets
- `fetch_youtube_posts()` retrieves videos

In [None]:
def fetch_twitter_posts(query="Data warehouse", max_results=10):
    """Fetch recent tweets from Twitter API."""
    url = f"https://api.twitter.com/2/tweets/search/recent?query={query}&max_results={max_results}&tweet.fields=public_metrics,created_at,author_id"
    try:
        response = requests.get(url, headers=TWITTER_HEADERS) #get data from twitter using the api
        response.raise_for_status()
        tweets = response.json().get("data", []) # store the data returned in tweets for using later 
    except Exception as e:
        logging.error(f"Twitter API request failed: {e}") # raising error if the extraction failed
        return []

    records = []
    for t in tweets:   # iterate the java script file returnd to identify the columns required and assign keys for them such as: post_id ,platform :Twitter , and author_id .....
        records.append({
            "post_id": str(t.get("id", "")),
            "platform": "Twitter",
            "author_id": str(t.get("author_id", "")),
            "content": str(t.get("text", "")),
            "likes": t.get("public_metrics", {}).get("like_count", 0),
            "comments": t.get("public_metrics", {}).get("reply_count", 0),
            "shares": t.get("public_metrics", {}).get("retweet_count", 0),
            "post_date": pd.to_datetime(t.get("created_at", ""), errors='coerce')
        })
    logging.info(f"Fetched {len(records)} tweets") #record the process in the log file and show the total rows fetched 
    return records


def fetch_youtube_posts(query="Data warehouse ", max_results=10):
    """Fetch recent YouTube videos."""
    search_url = f"https://www.googleapis.com/youtube/v3/search?part=snippet&q={query}&type=video&maxResults={max_results}&key={YOUTUBE_API_KEY}"
    try:
        response = requests.get(search_url).json()
    except Exception as e:
        logging.error(f"YouTube search request failed: {e}")
        return []

    records = []
    for item in response.get("items", []):
        video_id = item["id"].get("videoId")
        if not video_id:
            continue
        snippet = item["snippet"]
        stats_url = f"https://www.googleapis.com/youtube/v3/videos?part=statistics&id={video_id}&key={YOUTUBE_API_KEY}"
        try:
            stats_response = requests.get(stats_url).json()
            stats_items = stats_response.get("items", [])
            if not stats_items:
                continue
            stats = stats_items[0].get("statistics", {})
        except Exception as e:
            logging.error(f"YouTube stats request failed for video {video_id}: {e}")
            continue

        records.append({
            "post_id": video_id,
            "platform": "YouTube",
            "author_id": str(snippet.get("channelId", "")),
            "content": str(snippet.get("title", "")),
            "likes": int(stats.get("likeCount", 0)),
            "comments": int(stats.get("commentCount", 0)),
            "shares": 0,
            "post_date": pd.to_datetime(snippet.get("publishedAt", ""), errors='coerce')
        })
    logging.info(f"Fetched {len(records)} YouTube videos")
    return records


## Save Data
- Save all posts to a single master CSV file.

In [None]:
def save_data(df, master_file=MASTER_FILE):
    """Append new records into master CSV file without overwriting existing ones."""
    if os.path.exists(master_file):
        try:
            # Load existing data
            df_master = pd.read_csv(master_file, encoding="utf-8-sig")

            # Concatenate old + new
            df_combined = pd.concat([df_master, df], ignore_index=True)

            # Drop duplicate posts by post_id + platform (to avoid duplicates)
            df_combined.drop_duplicates(subset=["post_id", "platform"], keep="last", inplace=True)

        except Exception as e:
            logging.error(f"Failed to read master file: {e}") # error if can not find the file beacue of not existence or premission denied 
            df_combined = df.copy()
    else:
        df_combined = df.copy() # if there is not data copy the fetched only

    # Ensure string encoding
    df_combined['content'] = df_combined['content'].fillna("").astype(str) #treate the content as string because it may contain spaces or emojies

    try:
        # Save back to master file
        df_combined.to_csv(master_file, index=False, encoding="utf-8-sig") # use standerd encoding because of arabic or any language result
        logging.info(f"Master CSV updated: {master_file}, total rows: {len(df_combined)}") # update the log and return the length of data 
    except Exception as e:
        logging.error(f"Failed to save master CSV: {e}") #error if can not save the data because of permissions 


## ETL Pipeline
- Fetch, normalize, compute engagement score, and save to master CSV.

In [None]:
def fetch_and_process_posts(query="Data warehouse", max_results=10):
    """Run ETL pipeline."""
    try:
        twitter_data = fetch_twitter_posts(query=query, max_results=max_results) #fetch data from twitter using the (data warehouse )query and max results per run query
        youtube_data = fetch_youtube_posts(query=query, max_results=max_results) #fetch data from youtube using the (data warehouse) query and max results also 
        all_data = twitter_data + youtube_data #collect the data returned into on place

        df = pd.DataFrame(all_data) # make unified dataframe of data

        # Transform & normalize
        df['post_id'] = df['post_id'].astype(str) # make all the posts ids as a string so that we can handel it twitter is numeric but you tube is string for ids
        df['author_id'] = df['author_id'].astype(str) # make all the posts authors id as a string so that we can handel it twitter is numeric but you tube is string for ids
        df['post_date'] = pd.to_datetime(df['post_date'], errors='coerce') # convert type of post date to date and coerce used to make it as nat if could not convert it
        df['likes'] = df['likes'].fillna(0).astype(int) # likes as integer and fill null values with 0 
        df['comments'] = df['comments'].fillna(0).astype(int) # comments as integer and fill null values with 0
        df['shares'] = df['shares'].fillna(0).astype(int) # shares as integer and fill null values with 0
        df['content'] = df['content'].fillna("").astype(str) # content as string and fill null values with empty string
        df['platform'] = df['platform'].str.title() # platform into title to capitalize first charachter
        df['engagement_score'] = df['likes'] + df['comments'] + df['shares'] # sum of likes ,comments ,and shares as engagement score

        # Save to master CSV
        save_data(df) # call save data function to save the data 

        logging.info("Pipeline executed successfully.") # store the process in the log as sucessfully
        return df

    except Exception as e:
        logging.error(f"Pipeline execution failed: {e}") # return error if can not pulled the data
        return pd.DataFrame()

## Analytics
Compute:
- Daily engagement per platform
- Top posts overall and per platform

In [None]:
def compute_daily_engagement(df):
    daily_engagement = (
        df.groupby([df['post_date'].dt.date, 'platform']) # we group by date , platform ,and aggregate function for every row
          .agg(
              daily_posts=('post_id', 'count'), # count posts
              daily_likes=('likes', 'sum'), # sum likes
              daily_comments=('comments', 'sum'), #sum comments
              daily_shares=('shares', 'sum'), # sum shares
              daily_engagement=('engagement_score', 'sum') #sum engagement
          )
          .reset_index() # use the normal index 0.1..2..3 and etc...
    )
    daily_engagement.rename(columns={'post_date': 'date'}, inplace=True) #rename post_date to date and store the value to the orginal df
    return daily_engagement #retuen daily_engagement to use later when we call the function


def get_top_posts(df, top_n_overall=5, top_n_per_platform=3):
    top_overall = df.sort_values('engagement_score', ascending=False).head(top_n_overall) #sort the data by engagement score and return the most 5 posts for both platforms
    top_per_platform = df.groupby('platform', group_keys=False).apply(
        lambda x: x.sort_values('engagement_score', ascending=False).head(top_n_per_platform) # choose first 3 using head after sorting per platform
    )
    return top_overall, top_per_platform #return first 5 of both platforms and first 3 per platform


## Run Pipeline and Analytics
- Execute ETL
- Compute daily engagement
- Show top posts

In [None]:
# Run the ETL pipeline and fetch all posts from both platforms
all_posts_df = fetch_and_process_posts(query="Data warehouse", max_results=10)
print(f"Pipeline run completed. Rows fetched: {len(all_posts_df)}")

Pipeline run completed. Rows fetched: 20


In [None]:
# Compute daily engagement per platform using group by platform,date and sum of total engagement per day
daily_engagement = compute_daily_engagement(all_posts_df)
print("Daily Engagement per Platform:")
daily_engagement.head()

Daily Engagement per Platform:


Unnamed: 0,date,platform,daily_posts,daily_likes,daily_comments,daily_shares,daily_engagement
0,2017-06-22,Youtube,1,10782,265,0,11047
1,2020-02-15,Youtube,1,12242,424,0,12666
2,2020-06-04,Youtube,1,7654,81,0,7735
3,2021-06-04,Youtube,1,2660,31,0,2691
4,2021-10-26,Youtube,1,16374,524,0,16898


In [None]:
# Get top 5 posts overall by engagement using group by platform and sum of total engagement_score
top_overall, top_per_platform = get_top_posts(all_posts_df)
print("Top 5 Posts Overall:")
top_overall[['platform', 'content', 'engagement_score']]


Top 5 Posts Overall:


  top_per_platform = df.groupby('platform', group_keys=False).apply(


Unnamed: 0,platform,content,engagement_score
12,Youtube,Database vs Data Warehouse vs Data Lake | What...,24975
16,Youtube,KNOW the difference between Data Base // Data ...,16898
13,Youtube,What is ETL | What is Data Warehouse | OLTP vs...,12666
17,Youtube,Data Warehouse Tutorial For Beginners | Data W...,11047
19,Youtube,SQL Data Warehouse from Scratch | Full Hands-O...,10736


In [None]:
# Show top 3 posts per platform using aggregation and group by from the top_per_platform function
print("Top 3 Posts per Platform:")
top_per_platform[['platform', 'content', 'engagement_score']]


Top 3 Posts per Platform:


Unnamed: 0,platform,content,engagement_score
3,Twitter,RT @propchainmaster: Real estate doesn’t lack ...,109
4,Twitter,RT @propchainmaster: Real estate doesn’t lack ...,109
5,Twitter,RT @propchainmaster: Real estate doesn’t lack ...,109
12,Youtube,Database vs Data Warehouse vs Data Lake | What...,24975
16,Youtube,KNOW the difference between Data Base // Data ...,16898
13,Youtube,What is ETL | What is Data Warehouse | OLTP vs...,12666


In [None]:
# know how many records pulled
print(all_posts_df['platform'].value_counts())

platform
Twitter    10
Youtube    10
Name: count, dtype: int64
