In [6]:
### Creating tables for original_tweet_meta_data, retweet_meta_data and user_meta_data

import psycopg2

# Connection parameters
conn_params = {
    'host': "localhost",
    'dbname': "postgres",
    'user': "postgres",
    'password': "Yashu@419",
    'port': "5432"
}

conn = psycopg2.connect(**conn_params)
cur = conn.cursor()

# Drop tables if they exist to avoid constraints issues during development/testing
cur.execute("DROP TABLE IF EXISTS original_tweet_meta_data CASCADE;")
cur.execute("DROP TABLE IF EXISTS retweet_meta_data CASCADE;")
cur.execute("DROP TABLE IF EXISTS user_meta_data CASCADE;")

# Create user_meta_data table
cur.execute("""
    CREATE TABLE IF NOT EXISTS user_meta_data (
        user_id BIGINT PRIMARY KEY,
        screen_name TEXT,
        created_at TIMESTAMP WITH TIME ZONE,
        followers INT
    )
""")

# Create original_tweet_meta_data table
cur.execute("""
    CREATE TABLE IF NOT EXISTS original_tweet_meta_data (
        tweet_id BIGINT PRIMARY KEY,
        user_id BIGINT,
        created_at TIMESTAMP WITH TIME ZONE,
        language VARCHAR(10),
        hashtags TEXT,
        retweet_count INT,
        FOREIGN KEY (user_id) REFERENCES user_meta_data(user_id)
    )
""")

# Create retweet_meta_data table
cur.execute("""
    CREATE TABLE IF NOT EXISTS retweet_meta_data (
        tweet_id BIGINT PRIMARY KEY,
        user_id BIGINT,
        created_at TIMESTAMP WITH TIME ZONE,
        language VARCHAR(10),
        hashtags TEXT,
        retweet_count INT,
        FOREIGN KEY (user_id) REFERENCES user_meta_data(user_id)
    )
""")

# Commit the changes
conn.commit()
print("Tables created and foreign keys set successfully.")

# Clean up: Close cursor and connection
cur.close()
conn.close()


Tables created and foreign keys set successfully.


In [17]:
# Population of Databases (SQL + ELastic Search)

import json
import psycopg2
from elasticsearch import Elasticsearch
import re

from datetime import datetime

def parse_twitter_date(date_str):
    """Convert Twitter's date string into a format compatible with Elasticsearch."""
    return datetime.strptime(date_str, '%a %b %d %H:%M:%S +0000 %Y').isoformat()


# Connect to Elastic Search
es = Elasticsearch("http://localhost:9200")  

# Define your database connection parameters
db_params = {
    'dbname': 'postgres',
    'user': 'postgres',
    'password': 'Yashu@419',
    'host': 'localhost',
    'port': '5432'
}

# Connect to the PostgreSQL database
conn = psycopg2.connect(**db_params)
cur = conn.cursor()

# File path to your tweet data file
file_path = r"/Users/sidsy04/Desktop/DataBase_Project/corona-out-3"

# Function to tokenize tweet text for elastic search
def tokenize_text(text):
    """ Tokenize text using simple regex split. """
    return re.split(r'\W+', text)

# Function to find retweet count attribute in JSON object
def find_deepest_retweet_count(data, key='retweet_count'):
    queue = [data]
    last_value = None
    while queue:
        current = queue.pop(0)
        for k, v in current.items():
            if k == key:
                last_value = v
            if isinstance(v, dict):
                queue.append(v)
    return last_value

# Variable to keep track of unique user IDs
seen_user_ids = set()

# Process the tweet file, one JSON object at a time
with open(file_path, 'r', encoding='utf-8') as file:
    for line in file:
        try:
            tweet = json.loads(line)
            user_info = tweet.get('user', {})
            user_id = user_info.get('id')
            if user_id is None:
                continue  # Skip tweets where user_id is not present

            # Populate user data table if not already processed
            if user_id not in seen_user_ids:
                seen_user_ids.add(user_id)
                user_data = {
                    'user_id': user_id,
                    'screen_name': user_info.get('screen_name'),
                    'created_at': user_info.get('created_at'),
                    'followers': user_info.get('followers_count', 0)
                }
                cur.execute("""
                    INSERT INTO user_meta_data (user_id, screen_name, created_at, followers)
                    VALUES (%(user_id)s, %(screen_name)s, TO_TIMESTAMP(%(created_at)s, 'Dy Mon DD HH24:MI:SS +0000 YYYY'), 
                            %(followers)s)
                    ON CONFLICT (user_id) DO NOTHING;
                """, user_data)

            # Prepare common tweet data including tokenized text
            tokens = tokenize_text(tweet.get('text', ''))
            tweet_data = {
                'tweet_id': tweet['id'],
                'user_id': user_id,
                'created_at': parse_twitter_date(tweet['created_at']),
                'language': tweet.get('lang', None),
                'hashtags': json.dumps([hashtag['text'] for hashtag in tweet.get('entities', {}).get('hashtags', [])]),
                'retweet_count': find_deepest_retweet_count(tweet)
            }

            # Elasticsearch indexing with tokenized text
            es.index(index="dbms-tweets", body={
                "tweet_id": tweet_data['tweet_id'],
                "created_at": tweet_data['created_at'],
                "language": tweet_data['language'],
                "hashtags": tweet_data['hashtags'],
                "text_tokens": tokens
            })

        except json.JSONDecodeError as e:
            print(f"Processing...")

# Commit the changes to the database
conn.commit()

# Close the cursor and connection
cur.close()
conn.close()

print("Tweets have been processed and inserted into their respective databases, and indexed in Elasticsearch.")


  es.index(index="dbms-tweets", body={


Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Proces