# Real-Time Twitter Data Pipeline with Kafka, PostgreSQL, and PySpark

## Project Overview
This project demonstrates a real-time ETL (Extract, Transform, Load) pipeline that:
1.  **Extracts** tweets from the Twitter API using `tweepy`.
2.  **Streams** the data to a **Kafka** topic (`tweet-crawl`) using a Python producer.
3.  **Consumes** the data from Kafka and **Loads** it into a **PostgreSQL** database (`tweet_merge` table).
4.  **Transforms** the stored data using **PySpark** (e.g., text normalization) and writes the results back to PostgreSQL (`tweet_transformed` table).

## Tech Stack
- **Source**: Twitter API v2
- **Streaming**: Apache Kafka
- **Storage**: PostgreSQL
- **Processing**: PySpark
- **Language**: Python 3.12

## Prerequisites
- Running Kafka Broker (e.g., via Docker)
- Running PostgreSQL Instance
- Twitter Developer Account (Bearer Token)

---


In [None]:
# Install required packages (run once)
!pip install -q tweepy kafka-python psycopg2-binary pyspark


## Configuration
Load sensitive credentials and configuration from environment variables for security.


In [None]:
import os
import json
import time

# Twitter API
BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN", "<YOUR_BEARER_TOKEN>")

# Kafka
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
KAFKA_TOPIC = "tweet-crawl"

# PostgreSQL
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "twitter_db")
DB_USER = os.getenv("DB_USER", "admin")
DB_PASS = os.getenv("DB_PASS", "admin123")

print("Configuration loaded.")

## Part 1: Kafka Producer (Twitter -> Kafka)
Fetches recent tweets about "AI" and publishes them to the Kafka topic.


In [None]:
import tweepy
from kafka import KafkaProducer

def run_producer():
    if BEARER_TOKEN == "<YOUR_BEARER_TOKEN>":
        print("Please set your Twitter Bearer Token.")
        return

    try:
        # Initialize Twitter Client
        client = tweepy.Client(bearer_token=BEARER_TOKEN)
        
        # Initialize Kafka Producer
        producer = KafkaProducer(
            bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
            value_serializer=lambda v: json.dumps(v).encode("utf-8")
        )
        
        query = "AI -is:retweet lang:en"
        response = client.search_recent_tweets(query=query, tweet_fields=["id", "text", "created_at"], max_results=10)
        
        if response.data:
            for tweet in response.data:
                tweet_data = {
                    "id": tweet.id,
                    "text": tweet.text,
                    "created_at": str(tweet.created_at)
                }
                producer.send(KAFKA_TOPIC, tweet_data)
                print(f"Produced -> ID: {tweet.id}")
        
        producer.flush()
        print("Producer finished.")
        
    except Exception as e:
        print(f"Producer Error: {e}")

# Uncomment to run the producer
# run_producer()

## Part 2: Kafka Consumer (Kafka -> PostgreSQL)
Consumes messages from Kafka and inserts them into the `tweet_merge` table.


In [None]:
import psycopg2
from kafka import KafkaConsumer

def run_consumer():
    try:
        # Connect to PostgreSQL
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASS,
            host=DB_HOST,
            port=DB_PORT
        )
        cursor = conn.cursor()
        
        # Create table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS tweet_merge (
                id BIGINT PRIMARY KEY,
                text TEXT,
                created_at TIMESTAMP
            );
        """)
        conn.commit()
        
        # Initialize Consumer
        consumer = KafkaConsumer(
            KAFKA_TOPIC,
            bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
            auto_offset_reset="earliest",
            enable_auto_commit=True,
            group_id="tweet-consumer-group",
            value_deserializer=lambda v: json.loads(v.decode("utf-8"))
        )
        
        print("Consumer started. Listening for messages...")
        # Note: In a real app, this would run indefinitely. Here we consume a few for demo.
        for i, message in enumerate(consumer):
            tweet = message.value
            try:
                cursor.execute(
                    "INSERT INTO tweet_merge (id, text, created_at) VALUES (%s, %s, %s) ON CONFLICT (id) DO NOTHING;",
                    (tweet["id"], tweet["text"], tweet.get("created_at"))
                )
                conn.commit()
                print(f"Inserted -> {tweet['id']}")
            except Exception as e:
                print(f"DB Error: {e}")
                conn.rollback()
            
            if i >= 9: # Stop after 10 messages for demo
                break
                
        cursor.close()
        conn.close()
        
    except Exception as e:
        print(f"Consumer Error: {e}")

# Uncomment to run the consumer
# run_consumer()

## Part 3: PySpark Transformation (PostgreSQL -> PySpark -> PostgreSQL)
Reads from `tweet_merge`, converts text to uppercase, and writes to `tweet_transformed`.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper

def run_spark_job():
    try:
        # Initialize Spark Session with PostgreSQL Driver
        # Note: Ensure the PostgreSQL JDBC driver jar is available to Spark
        spark = SparkSession.builder \
            .appName("TweetTransformation") \
            .config("spark.jars", "postgresql-42.2.18.jar") \
            .getOrCreate()
        
        jdbc_url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"
        db_props = {
            "user": DB_USER,
            "password": DB_PASS,
            "driver": "org.postgresql.Driver"
        }
        
        # Read Data
        tweets_df = spark.read.jdbc(url=jdbc_url, table="tweet_merge", properties=db_props)
        
        # Transform Data
        transformed_df = tweets_df.withColumn("text_uppercase", upper(col("text")))
        
        print("--- Transformed Data Preview ---")
        transformed_df.select("id", "text_uppercase").show(5, truncate=False)
        
        # Write Data Back
        transformed_df.write.jdbc(
            url=jdbc_url,
            table="tweet_transformed",
            mode="overwrite",
            properties=db_props
        )
        print("Transformation job completed successfully.")
        
        spark.stop()
        
    except Exception as e:
        print(f"Spark Error: {e}. Ensure PostgreSQL JDBC driver is in your path.")

# Uncomment to run the Spark job
# run_spark_job()