In [1]:
pwd()

'/home/hx152/Project/tweet_project/XinranZhao/Tweet-database-project'

In [1]:
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, TimestampType, LongType, ArrayType
import json
from datetime import datetime

## Create Spark Session

In [2]:
# Set the warehouse location
warehouse_location = "file:///home/hx152/Project/tweet_project/Dataset"
metastore_db_location = "/home/hx152/Project/tweet_project/Dataset/metastore_db"
# warehouse_location = "hdfs://data/"

spark = SparkSession.builder \
    .enableHiveSupport() \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("spark.sql.legacy.createHiveTableByDefault", False) \
    .config("javax.jdo.option.ConnectionURL", f"jdbc:derby:;databaseName={metastore_db_location};create=true") \
    .appName("UserInformation").getOrCreate()
sc = spark.sparkContext
sparksql = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/26 03:16:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable




## Create DataBase

In [3]:
a = sparksql.sql('SHOW DATABASES')
a.collect()

[Row(namespace='default'),
 Row(namespace='user_data'),
 Row(namespace='userinformation')]

In [5]:
sql_query = 'CREATE DATABASE IF NOT EXISTS user_data'
a = sparksql.sql(sql_query)
a.collect()

[]

delete_db_query = 'DROP DATABASE IF EXISTS user_data_version2 CASCADE'

In [4]:
a = sparksql.sql('Use user_data')
a.collect()

[]

## Create tables in selected DataBase
In Spark SQL, __Parquet__ is a columnar storage format that is very well suited for __big data processing__. Parquet is a binary file format designed for efficiently storing large amounts of structured data. Its __columnar storage__ nature enables compression and encoding techniques, thus improving data read performance.

In [26]:
sparksql.sql('''
            CREATE TABLE IF NOT EXISTS user_beta(
             timestamp LONG,
             tweet_id LONG,
             user_id LONG,
             name STRING,
             screen_name STRING,
             location STRING,
             url STRING,
             description STRING,
             verified BOOLEAN,
             created_at TIMESTAMP,
             followers_count INT,
             friends_count INT,
             listed_count INT,
             favourites_count INT,
             statuses_count INT 
            ) USING parquet
            ''')


DataFrame[]

In [6]:
sparksql.sql('''
            CREATE TABLE IF NOT EXISTS new_user_beta(
             user_id LONG,
             name STRING,
             screen_name STRING,
             location STRING,
             url STRING,
             description STRING,
             verified BOOLEAN,
             created_at TIMESTAMP,
             followers_count INT,
             friends_count INT,
             listed_count INT,
             favourites_count INT,
             statuses_count INT,
             public_tweet_id_list ARRAY<LONG>
            ) USING parquet
            ''')

DataFrame[]

sparksql.sql('''
    DROP TABLE user_beta
''')


In [27]:
a = sparksql.sql('SHOW TABLES')
a.collect()

[Row(namespace='user_data', tableName='new_user_basic', isTemporary=False),
 Row(namespace='user_data', tableName='new_user_beta', isTemporary=False),
 Row(namespace='user_data', tableName='user_basic', isTemporary=False),
 Row(namespace='user_data', tableName='user_beta', isTemporary=False)]

## Insert User Data from the tweet data

### ----------------------- ‚≠êÔ∏èSchames‚≠êÔ∏è -----------------------

In [30]:
user_basic_schema = StructType([
    StructField("timestamp", LongType()),
    StructField("tweet_id", LongType()),
    StructField("user_id", LongType()),
    StructField("name", StringType()),
    StructField("screen_name", StringType()),
    StructField("location", StringType()),
    StructField("url", StringType()),
    StructField("description", StringType()),
    StructField("verified", BooleanType()),
    StructField("created_at", TimestampType()),
    StructField("followers_count", IntegerType()),
    StructField("friends_count", IntegerType()),
    StructField("listed_count", IntegerType()),
    StructField("favourites_count", IntegerType()),
    StructField("statuses_count", IntegerType())
])

In [9]:
new_user_basic_schema = StructType([
    StructField("user_id", LongType(), True),
    StructField("name", StringType()),
    StructField("screen_name", StringType()),
    StructField("location", StringType()),
    StructField("url", StringType()),
    StructField("description", StringType()),
    StructField("verified", BooleanType()),
    StructField("created_at", TimestampType()),
    StructField("followers_count", IntegerType()),
    StructField("friends_count", IntegerType()),
    StructField("listed_count", IntegerType()),
    StructField("favourites_count", IntegerType()),
    StructField("statuses_count", IntegerType()),
    StructField("public_tweet_id_list", ArrayType(LongType()))
])

### ------------------------ ‚≠êÔ∏èFunctions‚≠êÔ∏è ------------------------

In [10]:
# CONST DEFINITION
DATETIME_FORMAT = "%a %b %d %H:%M:%S %z %Y"

In [28]:
def unify_data_type(data, timestamp):
    timestamp = int(timestamp)
    created_at_datetime = datetime.strptime(data['user']['created_at'], DATETIME_FORMAT)
    user_data_row = (timestamp, data['id'], data['user']['id'], data['user']['name'], data['user']['screen_name'], 
                     data['user']['location'], data['user']['url'], data['user']['description'], 
                     data['user']["verified"], created_at_datetime, data['user']["followers_count"], 
                     data['user']["friends_count"], data['user']["listed_count"], data['user']["favourites_count"], 
                     data['user']["statuses_count"])
    return user_data_row

def insert_with_df(df, schema, table_name):    
    user_df = spark.createDataFrame(df, schema)
    user_df.write.mode("overwrite").insertInto(table_name)
    

In [22]:
int('1587817301803')

1587817301803

### An example for insert directly with SQL query
query = """ \
    INSERT INTO basic (id, name, screen_name) \
    VALUES (1, 'Alice', 'alice01'), \
           (2, 'Bob', 'bob02'), \
           (3, 'Charlie', 'charlie03'), \
           (4, 'David', 'david04') \
""" \
\
spark.sql(query)

Using __DataFrames__ is more efficient then inserting data directly into a table using a SQL INSERT INTO statement, especially when dealing with __large amounts of data__.

In [6]:
# count the file rows
import json

with open("/home/hx152/Project/tweet_project/Dataset/corona-out-3", "r") as f1:
    
    count = 0
    for line in f1:
        try:
            data = json.loads(line)
            count += 1
        except json.JSONDecodeError:
            pass

In [7]:
# Number of rows in corona-out-3 file
count

101916

# recursion

In [16]:
def recursive_fectch_data(data, timestamp):
    users = [unify_data_type(data, timestamp)]
    
    if "quoted_status" in data:
        users_in_quoted_status_field = recursive_fectch_data(data['quoted_status'], timestamp)
        users += users_in_quoted_status_field
        
    if "retweeted_status" in data:
        users_in_retweeted_status_field = recursive_fectch_data(data['retweeted_status'], timestamp)
        users += users_in_retweeted_status_field

    return users

In [31]:
with open("/home/hx152/Project/tweet_project/Dataset/corona-out-3", "r") as f1:
    
    all_users = []
    for line in f1:
        try:
            data = json.loads(line)
            users_for_one_record = recursive_fectch_data(data, data['timestamp_ms'])
            all_users += users_for_one_record
            
        except json.JSONDecodeError:
            pass
    # finished set up df at this stage
    
    
    # parameters: 
    df, schema, table_name = all_users, user_basic_schema, 'user_beta'
    insert_with_df(df, schema, table_name)

23/04/26 03:27:57 WARN TaskSetManager: Stage 0 contains a task of very large size (9315 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [23]:
with open("/home/hx152/Project/tweet_project/Dataset/corona-out-3", "r") as f1:
    
    result = []
    for line in f1:
        try:
            data = json.loads(line)
            user_data_row = unify_data_type(data, data['timestamp_ms'])
            result.append(user_data_row)
            
            if 'retweeted_status' in data:
                data = data['retweeted_status']
                user_data_row = unify_data_type(data)
                result.append(user_data_row)
                
            if 'quoted_status' in data:
                data = data['quoted_status']
                user_data_row = unify_data_type(data)
                result.append(user_data_row)

        except json.JSONDecodeError:
            pass
    # parameters: 
    df, schema, table_name = result, user_basic_schema, 'user_basic'
    insert_with_df(df, schema, table_name)

23/04/24 03:00:48 WARN TaskSetManager: Stage 3 contains a task of very large size (8684 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [32]:
spark.sql("SELECT count(*) number_rows FROM user_basic").show()

+-----------+
|number_rows|
+-----------+
|     179140|
+-----------+



In [33]:
spark.sql("SELECT count(*) number_rows FROM user_beta").show()

+-----------+
|number_rows|
+-----------+
|     187443|
+-----------+



In [10]:
spark.sql("SELECT user_id, name, count(tweet_id) as tweets_count FROM user_basic GROUP BY user_id, name ORDER BY tweets_count DESC LIMIT 5").show()


[Stage 3:>                                                          (0 + 4) / 4]

+----------+--------------------+------------+
|   user_id|                name|tweets_count|
+----------+--------------------+------------+
| 732819391|           Quirinale|        1745|
| 112047805|           Brit Hume|        1589|
|    851211|          Ben Wikler|        1152|
|  31079332|dr. Shela Putri S...|         897|
|1540461966|               Funda|         805|
+----------+--------------------+------------+



                                                                                

# Add a column 'public_tweet_id_list'
Summrize all tweet_id for one user, keep the latest data

In [26]:
# Check the schema of the new_user_basic table
spark.sql("DESCRIBE user_data.new_user_basic").show()

# Check the schema of the result_df DataFrame
result_df.printSchema()


+--------------------+-------------+-------+
|            col_name|    data_type|comment|
+--------------------+-------------+-------+
|             user_id|       bigint|   null|
|                name|       string|   null|
|         screen_name|       string|   null|
|            location|       string|   null|
|                 url|       string|   null|
|         description|       string|   null|
|            verified|      boolean|   null|
|          created_at|    timestamp|   null|
|     followers_count|          int|   null|
|       friends_count|          int|   null|
|        listed_count|          int|   null|
|    favourites_count|          int|   null|
|      statuses_count|          int|   null|
|public_tweet_id_list|array<bigint>|   null|
+--------------------+-------------+-------+

root
 |-- user_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- url: string (nullable =

In [34]:
from pyspark.sql.functions import collect_list, max, struct

# Read data from the user_basic table
user_basic_df = sparksql.table("user_beta")

# Group by user_id and perform the necessary transformations
grouped_df = user_basic_df.groupBy("user_id")\
    .agg(
        max(struct("timestamp", *user_basic_df.columns[1:])).alias("latest_data"),
        collect_list("tweet_id").alias("public_tweet_id_list")
    )

# Extract the latest_data and add the public_tweet_id_list column
result_df = grouped_df.select("latest_data.*", "public_tweet_id_list")

# Drop the unnecessary columns
result_df = result_df.drop("timestamp", "tweet_id")

# Write the transformed data to the new_user_basic table
result_df.write.mode("overwrite").insertInto("new_user_beta")


23/04/26 03:30:58 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [11]:
spark.sql("SELECT count(*) number_rows FROM new_user_basic").show()

+-----------+
|number_rows|
+-----------+
|      90336|
+-----------+



In [35]:
spark.sql("SELECT count(*) number_rows FROM new_user_beta").show()


+-----------+
|number_rows|
+-----------+
|      90336|
+-----------+



In [12]:
spark.sql("""
    SELECT user_id, name, size(public_tweet_id_list) AS number_tweets 
    FROM new_user_basic 
    ORDER BY number_tweets DESC 
    LIMIT 5
""").show()


+----------+--------------------+-------------+
|   user_id|                name|number_tweets|
+----------+--------------------+-------------+
| 732819391|           Quirinale|         1745|
| 112047805|           Brit Hume|         1589|
|    851211|          Ben Wikler|         1152|
|  31079332|dr. Shela Putri S...|          897|
|1540461966|               Funda|          805|
+----------+--------------------+-------------+



In [36]:
spark.sql("""
    SELECT user_id, name, size(public_tweet_id_list) AS number_tweets 
    FROM new_user_beta
    ORDER BY number_tweets DESC 
    LIMIT 5
""").show()


+---------+--------------------+-------------+
|  user_id|                name|number_tweets|
+---------+--------------------+-------------+
|732819391|           Quirinale|         1895|
|112047805|           Brit Hume|         1622|
|   851211|          Ben Wikler|         1223|
| 75077164|       VOA Indonesia|          980|
| 31079332|dr. Shela Putri S...|          898|
+---------+--------------------+-------------+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 41630)
Traceback (most recent call last):
  File "/home/hx152/anaconda3/envs/pyspark/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/hx152/anaconda3/envs/pyspark/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/home/hx152/anaconda3/envs/pyspark/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/hx152/anaconda3/envs/pyspark/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/hx152/anaconda3/envs/pyspark/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/hx152/anaconda3/envs/pyspark/lib/python3.10/site-packages/pyspark/accumulators.py", line 253, in 

We got the same result as summary for user_basic table. But number of rows decreases significantly.

# Write interface for searching

In [3]:
def search_by_name(search_string, table_name="user_data.user_basic"):
    # Escape single quotes in the search string to avoid SQL injection
    search_string = search_string.replace("'", "''")

    # Execute the SQL query
    query = f"""
        SELECT *
        FROM {table_name}
        WHERE name LIKE '%{search_string}%' OR screen_name LIKE '%{search_string}%'
    """
    result = spark.sql(query)

    # Display the result
    return result

# Check whether keep latest user data in new_user_basic table

In [19]:
spark.sql("""
    SELECT user_id, name, screen_name, size(public_tweet_id_list) AS number_tweets
    FROM new_user_basic
    GROUP BY user_id, name, screen_name, public_tweet_id_list
    HAVING size(public_tweet_id_list) = 2
    LIMIT 5
""").show()


+----------+--------------------+--------------+-------------+
|   user_id|                name|   screen_name|number_tweets|
+----------+--------------------+--------------+-------------+
|   8533902|    barbara ondrisek|   electrobabe|            2|
| 166511977|         Gbolahan üòé|Gbolahanguitar|            2|
| 501151352|pastoredvaldosoli...|PrEdvaldoSouza|            2|
| 948723169|          Laat Mario|     1975Raman|            2|
|1174762118|‚≠êÔ∏è‚≠êÔ∏è‚≠êÔ∏èDeplorable Eva|     Ettan1945|            2|
+----------+--------------------+--------------+-------------+



In [30]:
search_by_name("electrobabe", "user_data.user_basic")

+-------------------+-------------------+-------+----------------+-----------+---------------+--------------------+--------------------+--------+-------------------+---------------+-------------+------------+----------------+--------------+
|  tweet_create_time|           tweet_id|user_id|            name|screen_name|       location|                 url|         description|verified|         created_at|followers_count|friends_count|listed_count|favourites_count|statuses_count|
+-------------------+-------------------+-------+----------------+-----------+---------------+--------------------+--------------------+--------+-------------------+---------------+-------------+------------+----------------+--------------+
|2020-04-25 12:35:56|1254026357098545156|8533902|barbara ondrisek|electrobabe|Vienna, Austria|https://electroba...|software engineer...|   false|2007-08-30 13:17:15|           1765|          555|         158|            4920|         16669|
|2020-04-25 12:35:56|125402635709854

In [31]:
search_by_name("electrobabe", "user_data.new_user_basic")

+-------+----------------+-----------+---------------+--------------------+--------------------+--------+-------------------+---------------+-------------+------------+----------------+--------------+--------------------+
|user_id|            name|screen_name|       location|                 url|         description|verified|         created_at|followers_count|friends_count|listed_count|favourites_count|statuses_count|public_tweet_id_list|
+-------+----------------+-----------+---------------+--------------------+--------------------+--------+-------------------+---------------+-------------+------------+----------------+--------------+--------------------+
|8533902|barbara ondrisek|electrobabe|Vienna, Austria|https://electroba...|software engineer...|   false|2007-08-30 13:17:15|           1765|          556|         158|            4925|         16672|[1254026357098545...|
+-------+----------------+-----------+---------------+--------------------+--------------------+--------+-------