In [1]:
import pandas as pd
import numpy as np
import ast
import decimal
import json
from datetime import datetime
from psycopg2 import connect, sql

In [2]:
pd.set_option('display.max_rows', 3000)

In [3]:
# Connect to the PostgreSQL database
conn = connect(
    host="localhost",
    database="postgres",
    user="postgres",
    password="kueen"
)

In [4]:
# Open a cursor to perform database operations
cur = conn.cursor()

In [5]:
def create_table(file_path, table_name):

    # check if the table exists
    cur.execute("SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name=%s)", (table_name,))
    table_exists = cur.fetchone()[0]

    # if the table exists, drop it
    if table_exists:
        cur.execute("DROP TABLE {}".format(table_name))
        conn.commit()
        print("Table dropped successfully")
    
    # open the SQL script file and read the contents
    with open(file_path, 'r') as f:
        query = f.read()
    
    # execute the SQL commands in the script file
    cur.execute(query)
    conn.commit()
    print("Table created successfully")

In [6]:
create_table('../utils/Partitioning.sql', 'twitter_users_partitioned')

In [7]:
# Create an index on the "name" column
cur.execute("CREATE INDEX name_idx_p ON twitter_users_partitioned (name);")
conn.commit()

# Create a compound index on the "followers_count" and "verified" columns
cur.execute("CREATE INDEX followers_verified_idx_p ON twitter_users_partitioned (followers_count DESC, verified DESC);")
conn.commit()

In [8]:
users = []

In [9]:
def insert_user_info(tweet):
    try:
        cur.execute("""
        INSERT INTO twitter_users_partitioned 
        (user_id, name, screen_name, date, twitter_join_date, location, description, 
        verified, followers_count, friends_count, listed_count, favourites_count, language)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (tweet['user']['id'], tweet['user']['name'], tweet['user']['screen_name'], tweet['created_at'], 
          tweet['user']['created_at'], tweet['user']['location'], tweet['user']['description'], 
          tweet['user']['verified'], tweet['user']['followers_count'], tweet['user']['friends_count'], 
          tweet['user']['listed_count'], tweet['user']['favourites_count'], tweet['user']['lang']))
        
        users.append({'user_id': tweet['user']['id'], 
                      'name': tweet['user']['name'], 
                      'screen_name': tweet['user']['screen_name'], 
                      'date': tweet['created_at'],
                      'twitter_join_date': tweet['user']['created_at'], 
                      'location': tweet['user']['location'], 
                      'description': tweet['user']['description'], 
                      'verified': tweet['user']['verified'], 
                      'followers_count': tweet['user']['followers_count'], 
                      'friends_count': tweet['user']['friends_count'],
                      'listed_count': tweet['user']['listed_count'], 
                      'favourites_count': tweet['user']['favourites_count'],
                      'language': tweet['user']['lang']})

    except Exception as e:
        print(e)
        conn.rollback()

In [10]:
# define the input and output formats
input_format = '%a %b %d %H:%M:%S %z %Y'
output_format = '%Y-%m-%d %H:%M:%S %Z%z'

In [11]:
def load_data(file_path):
    # Load the JSON data from file
    with open(file_path, "r") as f:
        for line in f:
            try:
                tweet = json.loads(line)
                tweet['created_at'] = datetime.strptime(tweet['created_at'], input_format).strftime(output_format)
                tweet['user']['created_at'] = datetime.strptime(tweet['user']['created_at'], input_format).strftime(output_format)
                # insert user entry into database
                insert_user_info(tweet)
                # if there is a retweet, get original user from retweet
                if (tweet['text'].startswith('RT')):
                    original_tweet = tweet["retweeted_status"]
                    insert_user_info(original_tweet)
            except:
                # if there is an error loading the json of the tweet, skip
                continue

In [12]:
load_data("../../corona-out-2")

In [13]:
load_data("../../corona-out-3")

In [14]:
# create a list to store the row counts
row_counts = []

# loop through the partitions
for i in range(1,5):
    # generate the partition name
    partition_name = 'twitter_users_partitioned_' + str(i)

    # count the rows in the partition
    count_query = sql.SQL("SELECT COUNT(*) FROM {}").format(sql.Identifier(partition_name))
    cur.execute(count_query)
    row_count = cur.fetchone()[0]
    row_counts.append(row_count)

# print the row counts
for i, row_count in enumerate(row_counts):
    print("Partition {}: {} rows".format(i, row_count))

In [15]:
conn.commit()
cur.close()
conn.close()

In [16]:
# Create a pandas DataFrame from the parsed data
df_users = pd.DataFrame(users)

In [17]:
df_users.head()

In [18]:
df_users['date'] = pd.to_datetime(df_users['date'])

In [19]:
df_users['twitter_join_date'] = pd.to_datetime(df_users['twitter_join_date'])
df_users.groupby(df_users['twitter_join_date'].dt.year)['twitter_join_date'].count()

In [20]:
df_users.groupby(df_users['date'].dt.hour)['date'].count()

In [21]:
df_users.groupby(df_users['date'].dt.year)['date'].count()

In [22]:
min_date = df_users['date'].min()
max_date = df_users['date'].max()
print(min_date)
print(max_date)

In [23]:
dates_2020 = df_users.groupby(pd.Grouper(key='date', freq='M'))['date'].count().loc['2020-01-01':'2020-05-25']
dates_2020

In [24]:
df_users.to_csv('../data/users.csv', index=False)