In [2]:
import csv
import os
import random
from datetime import datetime, timedelta

# Create input directory if it doesn't exist
os.makedirs("input", exist_ok=True)

# Sample users
user_data = []
usernames = [
    "@techie42", "@critic99", "@daily_vibes", "@designer_dan", "@rage_user",
    "@meme_lord", "@social_queen", "@calm_mind", "@pixel_pusher", "@stream_bot"
]
age_groups = ["Teen", "Adult", "Senior"]
countries = ["US", "UK", "Canada", "India", "Germany", "Brazil"]
verified_status = [True, False]

for user_id in range(1, 9):
    user = {
        "UserID": user_id,
        "Username": usernames[user_id - 1],
        "AgeGroup": random.choice(age_groups),
        "Country": random.choice(countries),
        "Verified": random.choice(verified_status)
    }
    user_data.append(user)

# Write users.csv
with open("input/users.csv", mode="w", newline="") as file:
    writer = csv.DictWriter(file, fieldnames=user_data[0].keys())
    writer.writeheader()
    writer.writerows(user_data)

# Sample posts
hashtags_pool = ["#tech", "#fail", "#design", "#UX", "#cleanUI", "#mood", "#bug", "#love", "#social", "#AI"]
contents = [
    "Loving the new update!",
    "This app keeps crashing. So annoying.",
    "Just another day...",
    "Absolutely love the UX!",
    "Worst experience ever.",
    "Such a smooth interface!",
    "Great performance on mobile.",
    "Can’t stop using it!",
    "Needs dark mode ASAP!",
    "I’m impressed with the speed."
]

posts_data = []
base_time = datetime.now()

for post_id in range(101, 201):
    uid = random.randint(1, 10)
    timestamp = (base_time - timedelta(hours=random.randint(0, 240))).strftime("%Y-%m-%d %H:%M:%S")
    content = random.choice(contents)
    likes = random.randint(0, 150)
    retweets = random.randint(0, 50)
    sentiment = round(random.uniform(-1, 1), 2)
    hashtags = ",".join(random.sample(hashtags_pool, random.randint(1, 3)))

    post = {
        "PostID": post_id,
        "UserID": uid,
        "Content": content,
        "Timestamp": timestamp,
        "Likes": likes,
        "Retweets": retweets,
        "Hashtags": hashtags,
        "SentimentScore": sentiment
    }
    posts_data.append(post)

# Write posts.csv
with open("input/posts.csv", mode="w", newline="", encoding="utf-8") as file:
    writer = csv.DictWriter(file, fieldnames=posts_data[0].keys())
    writer.writeheader()
    writer.writerows(posts_data)

print("✅ Dataset generation complete: 'users.csv' and 'posts.csv' created in /input/")

✅ Dataset generation complete: 'users.csv' and 'posts.csv' created in /input/


In [3]:
import pandas as pd

# Load datasets
posts_df = pd.read_csv("input/posts.csv")

# Split Hashtags and expand into individual tags
hashtags = posts_df['Hashtags'].str.split(',', expand=True).stack()

# Count frequency of each hashtag
hashtag_count = hashtags.value_counts().reset_index()
hashtag_count.columns = ['Hashtag', 'Count']

# Get the top 10 most used hashtags
top_hashtags = hashtag_count.head(10)
print(top_hashtags)


    Hashtag  Count
0     #tech     27
1  #cleanUI     26
2     #love     25
3      #bug     23
4     #mood     21
5       #UX     20
6       #AI     15
7   #design     15
8     #fail     15
9   #social     13


In [4]:
# Load datasets
posts_df = pd.read_csv("input/posts.csv")
users_df = pd.read_csv("input/users.csv")

# Merge posts with users on UserID
merged_df = pd.merge(posts_df, users_df, on="UserID")

# Group by AgeGroup and calculate average likes and retweets
age_group_engagement = merged_df.groupby('AgeGroup').agg(
    avg_likes=('Likes', 'mean'),
    avg_retweets=('Retweets', 'mean')
).reset_index()

# Sort by average engagement (likes + retweets)
age_group_engagement['total_engagement'] = age_group_engagement['avg_likes'] + age_group_engagement['avg_retweets']
age_group_engagement = age_group_engagement.sort_values('total_engagement', ascending=False)

print(age_group_engagement[['AgeGroup', 'avg_likes', 'avg_retweets']])


  AgeGroup  avg_likes  avg_retweets
2     Teen  79.500000     30.388889
1   Senior  64.782609     28.086957
0    Adult  67.351351     23.729730


In [5]:
# Load datasets
posts_df = pd.read_csv("input/posts.csv")

# Categorize Sentiment
def categorize_sentiment(score):
    if score > 0.2:
        return 'Positive'
    elif score < -0.2:
        return 'Negative'
    else:
        return 'Neutral'

posts_df['SentimentCategory'] = posts_df['SentimentScore'].apply(categorize_sentiment)

# Group by sentiment category and calculate average likes and retweets
sentiment_engagement = posts_df.groupby('SentimentCategory').agg(
    avg_likes=('Likes', 'mean'),
    avg_retweets=('Retweets', 'mean')
).reset_index()

print(sentiment_engagement)


  SentimentCategory  avg_likes  avg_retweets
0          Negative  66.978723     23.787234
1           Neutral  75.750000     23.375000
2          Positive  71.378378     28.351351


In [6]:
# Load datasets
posts_df = pd.read_csv("input/posts.csv")
users_df = pd.read_csv("input/users.csv")

# Merge posts with users on UserID
merged_df = pd.merge(posts_df, users_df, on="UserID")

# Filter verified users
verified_users = merged_df[merged_df['Verified'] == True].copy()  # Create a copy to avoid SettingWithCopyWarning

# Calculate total reach as Likes + Retweets using .loc[]
verified_users.loc[:, 'TotalReach'] = verified_users['Likes'] + verified_users['Retweets']

# Group by Username and sum the total reach
user_reach = verified_users.groupby('Username').agg(total_reach=('TotalReach', 'sum')).reset_index()

# Get top 5 users by total reach
top_verified_users = user_reach.sort_values('total_reach', ascending=False).head(5)

print(top_verified_users)


       Username  total_reach
2  @daily_vibes         1314
3    @rage_user         1055
1     @critic99         1031
0    @calm_mind         1001


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, trim

# ✅ Initialize Spark Session
spark = SparkSession.builder \
    .appName("HashtagTrends") \
    .getOrCreate()

# ✅ Set log level to hide warnings
spark.sparkContext.setLogLevel("ERROR")

# ✅ Bypass Windows-specific Hadoop native IO errors
spark._jsc.hadoopConfiguration().set("hadoop.home.dir", "C:/hadoop")
spark._jsc.hadoopConfiguration().set("dfs.client.use.datanode.hostname", "true")
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.file.impl.disable.cache", "true")
spark._jsc.hadoopConfiguration().set("hadoop.native.lib", "false")

# ✅ Load posts data
posts_df = spark.read.option("header", True).csv("input/posts.csv")

# ✅ Handle empty or missing hashtags
posts_df = posts_df.filter(col("Hashtags").isNotNull())

# ✅ Split the 'Hashtags' column and explode it to create individual rows
hashtag_df = posts_df.withColumn("Hashtag", explode(split(col("Hashtags"), ",")))

# ✅ Remove leading/trailing spaces from hashtags
hashtag_df = hashtag_df.withColumn("Hashtag", trim(col("Hashtag")))

# ✅ Count the frequency of each hashtag
hashtag_counts = hashtag_df.groupBy("Hashtag").count()

# ✅ Sort by frequency in descending order
hashtag_counts = hashtag_counts.orderBy(col("count").desc())

# ✅ Write result safely
hashtag_counts.coalesce(1).write.mode("overwrite").option("header", True).csv("outputs/hashtag_trends.csv")

# ✅ Optional: Print top 10 to console
print("Top hashtags:")
hashtag_counts.show(10)

# ✅ Stop Spark session
spark.stop()


Top hashtags:
+--------+-----+
| Hashtag|count|
+--------+-----+
|   #tech|   27|
|#cleanUI|   26|
|   #love|   25|
|    #bug|   23|
|   #mood|   21|
|     #UX|   20|
|     #AI|   15|
| #design|   15|
|   #fail|   15|
| #social|   13|
+--------+-----+



In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col

spark = SparkSession.builder.appName("EngagementByAgeGroup").getOrCreate()

# Load datasets
posts_df = spark.read.option("header", True).csv("input/posts.csv", inferSchema=True)
users_df = spark.read.option("header", True).csv("input/users.csv", inferSchema=True)

# Join the datasets on UserID
joined_df = posts_df.join(users_df, on="UserID", how="inner")

# Group by AgeGroup and calculate average likes and retweets
engagement_df = joined_df.groupBy("AgeGroup").agg(
    avg("Likes").alias("Avg Likes"),
    avg("Retweets").alias("Avg Retweets")
)

# Sort by Avg Likes (you can also sort by Avg Retweets if needed)
engagement_df = engagement_df.orderBy(col("Avg Likes").desc())

# Save result
engagement_df.coalesce(1).write.mode("overwrite").csv("outputs/engagement_by_age.csv", header=True)


In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, avg, col

spark = SparkSession.builder.appName("SentimentVsEngagement").getOrCreate()

# Load posts data
posts_df = spark.read.option("header", True).csv("input/posts.csv", inferSchema=True)

# Categorize posts into Positive, Neutral, and Negative sentiment
posts_df = posts_df.withColumn(
    "SentimentCategory",
    when(col("SentimentScore") > 0.3, "Positive")
    .when(col("SentimentScore") < -0.3, "Negative")
    .otherwise("Neutral")
)

# Calculate average likes and retweets per sentiment category
sentiment_stats = posts_df.groupBy("SentimentCategory").agg(
    avg("Likes").alias("Avg Likes"),
    avg("Retweets").alias("Avg Retweets")
)

# Save result
sentiment_stats.coalesce(1).write.mode("overwrite").csv("outputs/sentiment_engagement.csv", header=True)


In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum

spark = SparkSession.builder.appName("TopVerifiedUsers").getOrCreate()

# Load datasets
posts_df = spark.read.option("header", True).csv("input/posts.csv", inferSchema=True)
users_df = spark.read.option("header", True).csv("input/users.csv", inferSchema=True)

# Join datasets on UserID and filter for verified users
verified_users_df = posts_df.join(users_df.filter(col("Verified") == True), on="UserID", how="inner")

# Calculate total reach (Likes + Retweets) for each verified user
user_reach_df = verified_users_df.groupBy("Username").agg(
    (_sum("Likes") + _sum("Retweets")).alias("Total Reach")
)

# Sort by Total Reach in descending order
top_verified = user_reach_df.orderBy(col("Total Reach").desc()).limit(5)

# Save result
top_verified.coalesce(1).write.mode("overwrite").csv("outputs/top_verified_users.csv", header=True)
