In [62]:
import zipfile
import os

zip_file_path = "Data/tweet_8.json.zip"
extraction_path = "Data/Tweets"

# Extract the ZIP file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extraction_path)

In [63]:
import lmod
await lmod.purge(force=True)
await lmod.load('jdk/17.0.5')

In [64]:
import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession.builder.appName("Our First Spark Example").config("spark.executor.memory", "96g").config("spark.driver.memory", "64g").getOrCreate()

spark

In [65]:
from pyspark.sql import functions as F
from pyspark.sql import Window

In [66]:
#Read Data
tweet_data = spark.read.option("multiline", "true").json("Data/Tweets/tweet_1.json")

                                                                                

In [67]:
#Extracts features of Average of URL'S,HASHTAGS,USER_MENTIONS AND SYMBOLS PRESENT IN TWEET
subset = tweet_data.select("author_id","entities")
subset = subset.withColumn("urls", F.col("entities.urls"))\
            .withColumn("hashtags",F.col("entities.hashtags"))\
            .withColumn("symbols",F.col("entities.symbols")) \
            .withColumn("user_mentions",F.col("entities.user_mentions"))
subset = subset.drop("entities")
subset = subset.withColumn('urls', F.when(F.col('urls').isNull(), F.array()).otherwise(F.col('urls')))\
        .withColumn('hashtags', F.when(F.col('hashtags').isNull(), F.array()).otherwise(F.col('hashtags')))\
        .withColumn('symbols', F.when(F.col('symbols').isNull(), F.array()).otherwise(F.col('symbols')))\
        .withColumn('user_mentions', F.when(F.col('user_mentions').isNull(), F.array()).otherwise(F.col('user_mentions')))
subset = subset.withColumn("num_urls", F.size(F.col("urls"))) \
       .withColumn("num_hashtags", F.size(F.col("hashtags"))) \
       .withColumn("num_symbols", F.size(F.col("symbols"))) \
       .withColumn("num_user_mentions", F.size(F.col("user_mentions")))

# 2. Group by 'author_id' and calculate average counts for each entity type
result_df = subset.groupBy("author_id").agg(
    F.avg("num_urls").alias("avg_num_urls"),
    F.avg("num_hashtags").alias("avg_num_hashtags"),
    F.avg("num_symbols").alias("avg_num_symbols"),
    F.avg("num_user_mentions").alias("avg_num_user_mentions")
)

In [68]:
# Get most frequent language of the user 
lang_count_df = tweet_data.groupBy("author_id", "lang").count()

# 4. Use a window function to get the most frequent language for each user
window_spec = Window.partitionBy("author_id").orderBy(F.desc("count"))

# 5. Add a rank column to identify the most frequent language
lang_ranked_df = lang_count_df.withColumn("rank", F.rank().over(window_spec))

# 6. Filter to keep only the most used language (rank = 1)
most_used_lang_df = lang_ranked_df.filter(F.col("rank") == 1).drop("rank")

#7. Join the language information with the averages dataframe
final_df = result_df.join(most_used_lang_df.select("author_id", "lang"), on="author_id", how="left")

In [69]:
retweet_pattern = r"^RT\s+@\w+:"  # Matches 'RT @user:'
mention_pattern = r"@\w+"         # Matches user mentions '@username'
url_pattern = r"http\S+|www.\S+"  # Matches URLs starting with http or www
hashtag_pattern = r"#\w+"         # Matches hashtags

# 2. Clean the 'text' column by removing retweets, mentions, URLs, and hashtags
df_cleaned = tweet_data.withColumn("clean_text", F.col("text")) \
    .withColumn("clean_text", F.regexp_replace(F.col("clean_text"), retweet_pattern, "")) \
    .withColumn("clean_text", F.regexp_replace(F.col("clean_text"), mention_pattern, "")) \
    .withColumn("clean_text", F.regexp_replace(F.col("clean_text"), url_pattern, "")) \
    .withColumn("clean_text", F.regexp_replace(F.col("clean_text"), hashtag_pattern, "")) \
    .withColumn("clean_text", F.trim(F.col("clean_text")))  # Trim whitespace from the result



In [70]:
df_cleaned = df_cleaned.withColumn("clean_text_length", F.length(F.col("clean_text")))

# 2. Group by 'author_id' and calculate the average length of the cleaned tweets
avg_tweet_length_df = df_cleaned.groupBy("author_id").agg(
    F.avg("clean_text_length").alias("avg_clean_text_length")
)

final_combined_df = final_df.join(avg_tweet_length_df, on="author_id", how="left")

In [71]:
df_metric = tweet_data.withColumn("retweet_count", F.coalesce(F.col("public_metrics.retweet_count"), F.lit(0))) \
                      .withColumn("reply_count", F.coalesce(F.col("public_metrics.reply_count"), F.lit(0))) \
                      .withColumn("like_count", F.coalesce(F.col("public_metrics.like_count"), F.lit(0))) \
                      .withColumn("quote_count", F.coalesce(F.col("public_metrics.quote_count"), F.lit(0)))

In [72]:
avg_metrics_df = df_metric.groupBy("author_id").agg(
    F.avg("retweet_count").alias("avg_retweet_count"),
    F.avg("reply_count").alias("avg_reply_count"),
    F.avg("like_count").alias("avg_like_count"),
    F.avg("quote_count").alias("avg_quote_count")
)

# 3. Combine the new averages with the previous final_combined_df
final_combined_df = final_combined_df.join(avg_metrics_df, on="author_id", how="left")

In [73]:
final_combined_df.coalesce(1).write.parquet("Data/SparkOutput",mode = "overwrite")

                                                                                