In [1]:
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType, MapType

from pyspark.ml.feature import Tokenizer

from pyspark.sql.window import Window

from pyspark.sql.functions import lit,col,to_timestamp,count,isnan, when,lower,trim,split, explode, regexp_replace,collect_list,struct,sum,lit, array, explode,concat_ws,countDistinct,avg,lag,unix_timestamp

# create a spark session
spark = SparkSession.builder.appName("Twitter Trends Analysis").getOrCreate()

# create a list of file paths and their corresponding source hashtags
file_sources = [
    ("s3://twitterdatadownload/AdvanceHBDMaheshBabu.json", "AdvanceHBDMaheshBabu"),
    ("s3://twitterdatadownload/gobackmodi.json", "gobackmodi"),
    ("s3://twitterdatadownload/HBDBelovedAlluArjun.json", "HBDBelovedAlluArjun"),
    ("s3://twitterdatadownload/HappyBirthdayNTR.json", "HappyBirthdayNTR"),
    ("s3://twitterdatadownload/HBDDarlingPrabhas.json", "HBDDarlingPrabhas"),
    ("s3://twitterdatadownload/HBDLeaderPawanKalyan.json", "HBDLeaderPawanKalyan"),
    ("s3://twitterdatadownload/HBDMaheshBabu.json", "HBDMaheshBabu"),
    ("s3://twitterdatadownload/NTRBdayFestBegins.json", "NTRBdayFestBegins"),
    ("s3://twitterdatadownload/tnwelcomesmodi.json", "tnwelcomesmodi")
]

schema = StructType([
    StructField("Unnamed: 0", LongType(), True),
    StructField("card", StringType(), True),
    StructField("cashtags", StringType(), True),
    StructField("content", StringType(), True),
    StructField("conversationId", LongType(), True),
    StructField("coordinates", StringType(), True),
    StructField("date", StringType(), True),
    StructField("hashtags", StringType(), True),
    StructField("id", LongType(), True),
    StructField("inReplyToTweetId", StringType(), True),
    StructField("inReplyToUser", StringType(), True),
    StructField("json", StringType(), True),
    StructField("lang", StringType(), True),
    StructField("likeCount", LongType(), True),
    StructField("links", StringType(), True),
    StructField("mentionedUsers", StringType(), True),
    StructField("place", StringType(), True),
    StructField("quoteCount", LongType(), True),
    StructField("quotedTweet", StringType(), True),
    StructField("rawContent", StringType(), True),
    StructField("renderedContent", StringType(), True),
    StructField("replyCount", LongType(), True),
    StructField("retweetCount", LongType(), True),
    StructField("retweetedTweet", StringType(), True),
    StructField("source", StringType(), True),
    StructField("user", StringType(), True),
    StructField("username", StringType(), True),
    StructField("vibe", StringType(), True),
    StructField("viewCount", StringType(), True),
    StructField("sourcehashtag", StringType(), True)
])

# create an empty dataframe to store the combined data
twitterdataanalysis = spark.createDataFrame([], schema=schema)

# iterate over the files and append the data to the combined dataframe
for file_path, source in file_sources:
    df = spark.read.json(file_path)
    df = df.withColumn('sourcehashtag', lit(source))
    twitterdataanalysis = twitterdataanalysis.unionAll(df)

23/05/03 22:26:16 WARN Utils: Your hostname, Rkcs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.177 instead (on interface en0)
23/05/03 22:26:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/03 22:26:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [2]:
# check the number of missing values in each column for the twitterdataanalysis dataframe
twitterdataanalysis.groupBy("sourcehashtag").agg(*[count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in twitterdataanalysis.columns]).show()

23/05/03 22:26:36 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+--------------------+----------+------+--------+-------+--------------+-----------+----+--------+----+----------------+-------------+----+----+---------+------+--------------+------+----------+-----------+----------+---------------+----------+------------+--------------+------+----+--------+------+---------+-------------+
|       sourcehashtag|Unnamed: 0|  card|cashtags|content|conversationId|coordinates|date|hashtags|  id|inReplyToTweetId|inReplyToUser|json|lang|likeCount| links|mentionedUsers| place|quoteCount|quotedTweet|rawContent|renderedContent|replyCount|retweetCount|retweetedTweet|source|user|username|  vibe|viewCount|sourcehashtag|
+--------------------+----------+------+--------+-------+--------------+-----------+----+--------+----+----------------+-------------+----+----+---------+------+--------------+------+----------+-----------+----------+---------------+----------+------------+--------------+------+----+--------+------+---------+-------------+
|AdvanceHBDMaheshBabu|   

                                                                                

In [3]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# drop the 'Unnamed: 0' column
twitterdataanalysis = twitterdataanalysis.drop('Unnamed: 0')

# cast the 'date' column to a timestamp data type
twitterdataanalysis = twitterdataanalysis.withColumn('date', to_timestamp(col('date'), 'yyyy-MM-dd HH:mm:ss'))

In [4]:
# set timeParserPolicy to LEGACY to parse timestamp in yyyy-MM-dd HH:mm:ss format
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# drop unwanted columns
columns_to_drop = ['Unnamed: 0', 'card', 'cashtags', 'coordinates', 'inReplyToTweetId', 'inReplyToUser',
                   'json', 'links', 'mentionedUsers', 'quotedTweet', 'rawContent', 'renderedContent',
                   'retweetedTweet', 'source', 'viewCount','place', 'vibe']
twitterdataanalysis = twitterdataanalysis.drop(*columns_to_drop)

# cast the 'date' column to a timestamp data type
twitterdataanalysis = twitterdataanalysis.withColumn('date', to_timestamp(col('date'), 'yyyy-MM-dd HH:mm:ss'))

In [5]:
# fill missing values in numeric columns with 0
twitterdataanalysis = twitterdataanalysis.fillna({'likeCount': 0, 'quoteCount': 0, 'replyCount': 0, 'retweetCount': 0})

# replace empty strings with None values
twitterdataanalysis = twitterdataanalysis.withColumn('content', when(twitterdataanalysis['content'] == '', None).otherwise(twitterdataanalysis['content']))

In [6]:
# Convert text to lowercase
twitterdataanalysis = twitterdataanalysis.withColumn("content", lower(twitterdataanalysis.content))

# Remove leading/trailing spaces and duplicate whitespaces
twitterdataanalysis = twitterdataanalysis.withColumn("content", trim(regexp_replace(twitterdataanalysis.content, " +", " ")))

In [7]:
twitterdataanalysis = twitterdataanalysis.withColumn("content", lower(trim(twitterdataanalysis.content)))

In [8]:
tokenizer = Tokenizer(inputCol="content", outputCol="tokens")
twitterdataanalysis = tokenizer.transform(twitterdataanalysis)

In [9]:
# Remove square brackets and quotes from hashtags
df = twitterdataanalysis.withColumn('hashtags', regexp_replace('hashtags', "[\[\]']", ''))

# Split the hashtags string into a list of hashtags
df = df.withColumn('hashtags', split(df.hashtags, ', '))

# Explode the hashtags list into individual rows
df = df.select(explode(df.hashtags).alias('hashtag'), 'sourcehashtag')

# Count the occurrences of each hashtag for each sourcehashtag
df = df.groupBy('sourcehashtag', 'hashtag').agg(count('*').alias('count'))

# Sort the dataframe in descending order by count
df = df.orderBy('sourcehashtag', df['count'].desc())

# Extract the top 10 hashtags and their counts for each sourcehashtag
top_hashtags = df.groupBy('sourcehashtag').agg(collect_list(struct(df['hashtag'], df['count'])).alias('top_hashtags'))

In [10]:
# Group by sourcehashtag and sum up the counts
aggregations = twitterdataanalysis.groupBy('sourcehashtag') \
    .agg(sum('likeCount').alias('sum_of_likes'), \
         sum('quoteCount').alias('sum_of_quotes'), \
         sum('replyCount').alias('sum_of_replies'), \
         sum('retweetCount').alias('sum_of_retweets'))

# Add the metric column
aggregations = aggregations.select('sourcehashtag', \
                                   concat_ws('_', lit('sum_of_likes'), lit('likes')).alias('metric'), \
                                   aggregations['sum_of_likes'].alias('value')) \
                           .union( \
                          aggregations.select('sourcehashtag', \
                                              concat_ws('_', lit('sum_of_quotes'), lit('quotes')).alias('metric'), \
                                              aggregations['sum_of_quotes'].alias('value'))) \
                           .union( \
                          aggregations.select('sourcehashtag', \
                                              concat_ws('_', lit('sum_of_replies'), lit('replies')).alias('metric'), \
                                              aggregations['sum_of_replies'].alias('value'))) \
                           .union( \
                          aggregations.select('sourcehashtag', \
                                              concat_ws('_', lit('sum_of_retweets'), lit('retweets')).alias('metric'), \
                                              aggregations['sum_of_retweets'].alias('value')))

                                                                                

+--------------------+--------------------+--------+
|       sourcehashtag|              metric|   value|
+--------------------+--------------------+--------+
|AdvanceHBDMaheshBabu|  sum_of_likes_likes|279056.0|
|          gobackmodi|  sum_of_likes_likes|289639.0|
| HBDBelovedAlluArjun|  sum_of_likes_likes| 50367.0|
|    HappyBirthdayNTR|  sum_of_likes_likes|190636.0|
|   HBDDarlingPrabhas|  sum_of_likes_likes|220664.0|
|HBDLeaderPawanKalyan|  sum_of_likes_likes|251178.0|
|       HBDMaheshBabu|  sum_of_likes_likes|107317.0|
|   NTRBdayFestBegins|  sum_of_likes_likes|337283.0|
|      tnwelcomesmodi|  sum_of_likes_likes|114410.0|
|AdvanceHBDMaheshBabu|sum_of_quotes_quotes| 12923.0|
|          gobackmodi|sum_of_quotes_quotes| 17658.0|
| HBDBelovedAlluArjun|sum_of_quotes_quotes|  1398.0|
|    HappyBirthdayNTR|sum_of_quotes_quotes|  5532.0|
|   HBDDarlingPrabhas|sum_of_quotes_quotes|  6769.0|
|HBDLeaderPawanKalyan|sum_of_quotes_quotes|  8660.0|
|       HBDMaheshBabu|sum_of_quotes_quotes|  3

In [11]:
distinct_users_df = twitterdataanalysis.groupBy("sourcehashtag") \
    .agg(countDistinct(col("user")).alias("value")) \
    .withColumn("metric", lit("distinct_users")) \
    .select("sourcehashtag", "metric", "value")

# Add dummy column to make number of columns same as aggregations DataFrame
distinct_users_df = distinct_users_df.select("sourcehashtag", "metric", "value")

[Stage 28:>                                                         (0 + 4) / 4]

+--------------------+--------------+-----+
|       sourcehashtag|        metric|value|
+--------------------+--------------+-----+
|AdvanceHBDMaheshBabu|distinct_users| 9921|
|   HBDDarlingPrabhas|distinct_users| 6706|
|    HappyBirthdayNTR|distinct_users| 4768|
|       HBDMaheshBabu|distinct_users| 4480|
|          gobackmodi|distinct_users|14162|
|HBDLeaderPawanKalyan|distinct_users| 9565|
|   NTRBdayFestBegins|distinct_users|10200|
| HBDBelovedAlluArjun|distinct_users| 1840|
|      tnwelcomesmodi|distinct_users| 5992|
+--------------------+--------------+-----+



                                                                                

In [12]:
# Average retweets per account
avg_retweets_per_account = twitterdataanalysis \
    .groupBy("sourcehashtag", "user") \
    .agg(avg("retweetCount").alias("avg_retweets_per_user")) \
    .groupBy("sourcehashtag") \
    .agg(avg("avg_retweets_per_user").alias("value")) \
    .withColumn("metric", lit("avg_retweets_per_account")) \
    .select("sourcehashtag", "metric", "value")



+--------------------+--------------------+------------------+
|       sourcehashtag|              metric|             value|
+--------------------+--------------------+------------------+
|AdvanceHBDMaheshBabu|avg_retweets_per_...|12.179774266599992|
|   HBDDarlingPrabhas|avg_retweets_per_...|12.745843092260998|
|    HappyBirthdayNTR|avg_retweets_per_...|14.238305419174075|
|       HBDMaheshBabu|avg_retweets_per_...| 8.943458270995455|
|          gobackmodi|avg_retweets_per_...| 4.074804174057102|
|HBDLeaderPawanKalyan|avg_retweets_per_...|10.698713563882816|
|   NTRBdayFestBegins|avg_retweets_per_...| 9.757457759241372|
| HBDBelovedAlluArjun|avg_retweets_per_...|  15.6386417885033|
|      tnwelcomesmodi|avg_retweets_per_...|3.4122734286787764|
+--------------------+--------------------+------------------+



                                                                                

In [13]:
# total tweets for each hashtag
total_tweets_df = twitterdataanalysis.groupBy("sourcehashtag") \
    .agg(count("*").alias("value")) \
    .withColumn("metric", lit("total_tweets")) \
    .select("sourcehashtag", "metric", "value")



+--------------------+------------+------+
|       sourcehashtag|      metric| value|
+--------------------+------------+------+
|AdvanceHBDMaheshBabu|total_tweets|302595|
|          gobackmodi|total_tweets| 57202|
| HBDBelovedAlluArjun|total_tweets| 28639|
|    HappyBirthdayNTR|total_tweets| 61397|
|   HBDDarlingPrabhas|total_tweets| 73862|
|HBDLeaderPawanKalyan|total_tweets| 95524|
|       HBDMaheshBabu|total_tweets| 61261|
|   NTRBdayFestBegins|total_tweets|316052|
|      tnwelcomesmodi|total_tweets| 32493|
+--------------------+------------+------+



                                                                                

In [14]:
# Calculate total number of tweets for each sourcehashtag category
total_tweets = twitterdataanalysis.groupBy('sourcehashtag').agg(count('*').alias('total_tweets'))

# Calculate number of tweets where like count is less than retweet count for each sourcehashtag category
less_likes_than_retweets = twitterdataanalysis.filter(twitterdataanalysis.likeCount < twitterdataanalysis.retweetCount)
less_likes_than_retweets = less_likes_than_retweets.groupBy('sourcehashtag').agg(count('*').alias('less_likes_than_retweets'))

# Join the two dataframes and calculate the percentage for each sourcehashtag category
percentage_df = total_tweets.join(less_likes_than_retweets, on='sourcehashtag', how='left_outer')
percentage_df = percentage_df.withColumn('percentage', (percentage_df.less_likes_than_retweets / percentage_df.total_tweets) * 100)

# Add a new column "metric" with the constant value "less_likes_than_retweets"
percentage_df = percentage_df.withColumn("metric", lit("less_likes_than_retweets"))

# Select the required columns in the required order
percentage_df = percentage_df.select("sourcehashtag", "metric", "percentage")

                                                                                

+--------------------+--------------------+------------------+
|       sourcehashtag|              metric|        percentage|
+--------------------+--------------------+------------------+
|AdvanceHBDMaheshBabu|less_likes_than_r...|  83.1976734579223|
|          gobackmodi|less_likes_than_r...|17.602531380021677|
| HBDBelovedAlluArjun|less_likes_than_r...| 56.24148887880164|
|    HappyBirthdayNTR|less_likes_than_r...| 42.43529814160301|
|   HBDDarlingPrabhas|less_likes_than_r...| 50.18548103219518|
|HBDLeaderPawanKalyan|less_likes_than_r...| 63.80700138185168|
|       HBDMaheshBabu|less_likes_than_r...|40.955909959027764|
|   NTRBdayFestBegins|less_likes_than_r...| 76.14759596522092|
|      tnwelcomesmodi|less_likes_than_r...|17.554550210814636|
+--------------------+--------------------+------------------+



In [15]:
# Define a window function partitioned by user and ordered by date
window = Window.partitionBy("user").orderBy(col("date"))

# Calculate the time difference between consecutive tweets for each user
time_diff = (unix_timestamp(col("date")) - unix_timestamp(lag("date").over(window)))*1.00 / 60.0

# Calculate the average time difference between tweets for each user and sourcehashtag
avg_time_diff = twitterdataanalysis.select("sourcehashtag", time_diff.alias("time_diff")) \
                                    .groupBy("sourcehashtag") \
                                    .agg({"time_diff": "avg"}) \
                                    .withColumnRenamed("avg(time_diff)", "avg_time_diff") \
                                    .orderBy(["sourcehashtag", "avg_time_diff"], ascending=[True, True])

# Add the metric and value columns
avg_time_diff = avg_time_diff.withColumn("metric", lit("avg_time_diff")) \
                            .withColumn("value", avg_time_diff["avg_time_diff"])
                             
# Show the output
avg_time_diff=avg_time_diff.drop('avg_time_diff')

[Stage 49:>                                                         (0 + 8) / 8]

+--------------------+-------------+------------------+
|       sourcehashtag|       metric|             value|
+--------------------+-------------+------------------+
|AdvanceHBDMaheshBabu|avg_time_diff|3230.2392469750457|
| HBDBelovedAlluArjun|avg_time_diff|2967.3877690850495|
|   HBDDarlingPrabhas|avg_time_diff| 921.9395510889245|
|HBDLeaderPawanKalyan|avg_time_diff|189.34797009826252|
|       HBDMaheshBabu|avg_time_diff|  7.03099833955998|
|    HappyBirthdayNTR|avg_time_diff| 3018.183768379457|
|   NTRBdayFestBegins|avg_time_diff| 4689.098666002743|
|          gobackmodi|avg_time_diff|2248.3842617367086|
|      tnwelcomesmodi|avg_time_diff| 454.8028068547393|
+--------------------+-------------+------------------+



                                                                                

In [16]:
# Group by sourcehashtag and content of the tweet and count the number of unique users
duplicates = twitterdataanalysis.groupBy("sourcehashtag", "content").agg(countDistinct("username").alias("count"))

# Filter only tweets with more than one user tweeting the same content
duplicates = duplicates.filter(duplicates["count"] > 1)

# Group by sourcehashtag and count the number of duplicate tweets
duplicates = duplicates.groupBy("sourcehashtag").agg(sum("count").alias("count"))

# Sort the dataframe in descending order of the count column
duplicates = duplicates.sort("count", ascending=False)

# Create a new dataframe with the required columns
duplicates_df = duplicates.selectExpr("sourcehashtag as sourcehashtag", "'duplicates' as metric", "count as value")

[Stage 58:>                                                         (0 + 9) / 9]

+--------------------+----------+-----+
|sourcehashtag       |metric    |value|
+--------------------+----------+-----+
|NTRBdayFestBegins   |duplicates|58919|
|AdvanceHBDMaheshBabu|duplicates|38719|
|HBDDarlingPrabhas   |duplicates|22028|
|HappyBirthdayNTR    |duplicates|18033|
|HBDLeaderPawanKalyan|duplicates|14410|
|tnwelcomesmodi      |duplicates|12967|
|HBDMaheshBabu       |duplicates|12213|
|gobackmodi          |duplicates|10975|
|HBDBelovedAlluArjun |duplicates|3715 |
+--------------------+----------+-----+



                                                                                

In [17]:
datasets_count_total=twitterdataanalysis.groupBy("sourcehashtag").agg(count("*").alias("count"))



+--------------------+------+
|       sourcehashtag| count|
+--------------------+------+
|AdvanceHBDMaheshBabu|302595|
|          gobackmodi| 57202|
| HBDBelovedAlluArjun| 28639|
|    HappyBirthdayNTR| 61397|
|   HBDDarlingPrabhas| 73862|
|HBDLeaderPawanKalyan| 95524|
|       HBDMaheshBabu| 61261|
|   NTRBdayFestBegins|316052|
|      tnwelcomesmodi| 32493|
+--------------------+------+



                                                                                

In [18]:
# Inner join datasets_count_total and duplicates_df on sourcehashtag column
joined_df = datasets_count_total.join(duplicates_df, "sourcehashtag", "inner")

# Calculate the percentage of duplicate tweets in total tweets
percentage_df_duplicates = joined_df.withColumn("value", (col("value")/col("count"))*100 ) \
                        .withColumn("metric", lit("percentage of duplicate tweets in total tweets")) \
                        .select("sourcehashtag", "metric", "value")

[Stage 74:>                                                         (0 + 8) / 9]

+--------------------+--------------------+------------------+
|       sourcehashtag|              metric|             value|
+--------------------+--------------------+------------------+
|AdvanceHBDMaheshBabu|percentage of dup...|12.795650952593402|
|   HBDDarlingPrabhas|percentage of dup...|29.823183775148248|
|    HappyBirthdayNTR|percentage of dup...|29.371141912471295|
|       HBDMaheshBabu|percentage of dup...|19.936011491813716|
|          gobackmodi|percentage of dup...| 19.18639208419286|
|HBDLeaderPawanKalyan|percentage of dup...|15.085214187010596|
|   NTRBdayFestBegins|percentage of dup...|18.642185463151634|
| HBDBelovedAlluArjun|percentage of dup...|12.971821641817103|
|      tnwelcomesmodi|percentage of dup...| 39.90705690456406|
+--------------------+--------------------+------------------+



                                                                                

In [20]:
# Union the two DataFrames
result = aggregations.union(distinct_users_df).union(avg_retweets_per_account).union(total_tweets_df).union(percentage_df).union(avg_time_diff).union(duplicates_df).union(percentage_df_duplicates)
result = result.withColumn("value", col("value").cast("float"))

                                                                                

+--------------------+--------------------+--------+
|       sourcehashtag|              metric|   value|
+--------------------+--------------------+--------+
|AdvanceHBDMaheshBabu|  sum_of_likes_likes|279056.0|
|          gobackmodi|  sum_of_likes_likes|289639.0|
| HBDBelovedAlluArjun|  sum_of_likes_likes| 50367.0|
|    HappyBirthdayNTR|  sum_of_likes_likes|190636.0|
|   HBDDarlingPrabhas|  sum_of_likes_likes|220664.0|
|HBDLeaderPawanKalyan|  sum_of_likes_likes|251178.0|
|       HBDMaheshBabu|  sum_of_likes_likes|107317.0|
|   NTRBdayFestBegins|  sum_of_likes_likes|337283.0|
|      tnwelcomesmodi|  sum_of_likes_likes|114410.0|
|AdvanceHBDMaheshBabu|sum_of_quotes_quotes| 12923.0|
|          gobackmodi|sum_of_quotes_quotes| 17658.0|
| HBDBelovedAlluArjun|sum_of_quotes_quotes|  1398.0|
|    HappyBirthdayNTR|sum_of_quotes_quotes|  5532.0|
|   HBDDarlingPrabhas|sum_of_quotes_quotes|  6769.0|
|HBDLeaderPawanKalyan|sum_of_quotes_quotes|  8660.0|
|       HBDMaheshBabu|sum_of_quotes_quotes|  3

In [22]:
result.write.format("parquet").mode("overwrite").save("s3://twitterdatanalytics/twitter_data_analytics_metrcs.parquet")

23/05/03 22:37:56 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/05/03 22:37:56 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/05/03 22:37:56 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

In [44]:
# Group by sourcehashtag and content of the tweet and count the number of unique users
duplicates = twitterdataanalysis.groupBy("sourcehashtag", "content").agg(countDistinct("username").alias("count"))

# Filter only tweets with more than one user tweeting the same content
duplicates = duplicates.filter(duplicates["count"] > 1)

# Group by sourcehashtag and count the number of duplicate tweets
duplicates = duplicates.groupBy("sourcehashtag","content").agg(sum("count").alias("count"))

# Sort the dataframe in descending order of the count column
duplicates = duplicates.sort("count", ascending=False)



+--------------------+-------------------------------+-----+
|sourcehashtag       |content                        |count|
+--------------------+-------------------------------+-----+
|gobackmodi          |#gobackmodi                    |4424 |
|NTRBdayFestBegins   |#ntrbdayfestbegins             |2401 |
|tnwelcomesmodi      |#tnwelcomesmodi                |2379 |
|HBDLeaderPawanKalyan|#hbdleaderpawankalyan          |2323 |
|AdvanceHBDMaheshBabu|#advancehbdmaheshbabu          |1412 |
|HBDDarlingPrabhas   |#hbddarlingprabhas             |1396 |
|HBDMaheshBabu       |#hbdmaheshbabu                 |1120 |
|HappyBirthdayNTR    |#happybirthdayntr              |986  |
|gobackmodi          |#gobackmodi #gobacksadistmodi  |338  |
|NTRBdayFestBegins   |jai ntr #ntrbdayfestbegins     |333  |
|HBDBelovedAlluArjun |#hbdbelovedalluarjun           |320  |
|NTRBdayFestBegins   |jai ntr\n#ntrbdayfestbegins    |214  |
|AdvanceHBDMaheshBabu|jai babu #advancehbdmaheshbabu |173  |
|NTRBdayFestBegins   |#n

23/05/03 22:58:28 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 351260 ms exceeds timeout 120000 ms
23/05/03 22:58:28 WARN SparkContext: Killing executors is not supported by current scheduler.
23/05/03 22:58:32 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:641)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1111)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244)
	at sc

In [24]:
duplicates.write.format("parquet").mode("overwrite").save("s3://twitterdatanalytics/twitter_data_duplicates.parquet")

                                                                                

In [25]:
# Join the duplicates dataframe with the original dataframe to get all duplicate tweets
duplicate_tweets = duplicates.join(twitterdataanalysis, on=["content","sourcehashtag"], how="inner")

# Group by username and sourcehashtag and count the number of duplicate tweets tweeted by each user
top_duplicate_tweet_users = duplicate_tweets.groupBy("username", "sourcehashtag").agg(count("content").alias("count"))



+---------------+--------------------+-----+
|       username|       sourcehashtag|count|
+---------------+--------------------+-----+
| URSTRULYMBFAN4|AdvanceHBDMaheshBabu|    3|
|   _Ganesh_here|AdvanceHBDMaheshBabu|   28|
|   urstruly_123|AdvanceHBDMaheshBabu|   42|
|       Mrmhere_|AdvanceHBDMaheshBabu|    9|
|   SHearstealer|AdvanceHBDMaheshBabu|   10|
|      AlwaysAA9| HBDBelovedAlluArjun|   12|
| naresh77590983|   HBDDarlingPrabhas|   57|
|   Manideep1008|   HBDDarlingPrabhas|   23|
|Nagabhu31159066|HBDLeaderPawanKalyan|   15|
|     drdheerajj|HBDLeaderPawanKalyan|    2|
|   anantharao12|HBDLeaderPawanKalyan|    7|
|  Prasadgandam1|       HBDMaheshBabu|   19|
| kalidasu_pavan|    HappyBirthdayNTR|    4|
| itsme_sabarish|   NTRBdayFestBegins|    2|
|Aravind27307201|   NTRBdayFestBegins|    1|
|  sunilreddy_21|   NTRBdayFestBegins|    3|
|    Sai91571164|   NTRBdayFestBegins|    2|
|Srinath69385196|   NTRBdayFestBegins|    3|
|        NtrVnay|   NTRBdayFestBegins|    2|
|MaheshBod

                                                                                

In [26]:
top_duplicate_tweet_users.write.format("parquet").mode("overwrite").save("s3://twitterdatanalytics/twitter_data_duplicates_users.parquet")

                                                                                

In [27]:
distinct_users = twitterdataanalysis.groupBy('sourcehashtag').agg(countDistinct('username').alias('total_distinct_users'))



+--------------------+--------------------+
|       sourcehashtag|total_distinct_users|
+--------------------+--------------------+
|AdvanceHBDMaheshBabu|                9921|
|   HBDDarlingPrabhas|                6706|
|    HappyBirthdayNTR|                4768|
|       HBDMaheshBabu|                4480|
|          gobackmodi|               14162|
|HBDLeaderPawanKalyan|                9565|
|   NTRBdayFestBegins|               10200|
| HBDBelovedAlluArjun|                1840|
|      tnwelcomesmodi|                5992|
+--------------------+--------------------+



                                                                                

In [28]:
# Get the usernames for each hashtag
hb_users = twitterdataanalysis.filter(twitterdataanalysis["sourcehashtag"].isin(["AdvanceHBDMaheshBabu", "HBDMaheshBabu"])) \
    .groupBy("sourcehashtag", "username") \
    .agg(countDistinct("id").alias("count")) \
    .groupBy("username") \
    .agg(countDistinct(when(col("sourcehashtag") == "AdvanceHBDMaheshBabu", col("username"))).alias("AdvanceHBDMaheshBabu_count"),
         countDistinct(when(col("sourcehashtag") == "HBDMaheshBabu", col("username"))).alias("HBDMaheshBabu_count")) \
    .na.fill(0)

# Get the common usernames and the sum of distinct usernames
common_users = hb_users.filter((col("AdvanceHBDMaheshBabu_count") > 0) & (col("HBDMaheshBabu_count") > 0))
common_count = common_users.count()

distinct_users_sum = hb_users.selectExpr("sum(AdvanceHBDMaheshBabu_count + HBDMaheshBabu_count) as distinct_users_sum") \
    .collect()[0]["distinct_users_sum"]

# Show the results
# common_users.show()
print(f"Number of common usernames: {common_count}")
print(f"Sum of distinct usernames who participated in both categories: {distinct_users_sum}")
print(f"Percentage of users in repeated participation of trends:",(common_count/distinct_users_sum)*100)



Number of common usernames: 589
Sum of distinct usernames who participated in both categories: 14401
Percentage of users in repeated participation of trends: 4.089993750433997


                                                                                

In [29]:
# Get the usernames for each hashtag
ntr_users = twitterdataanalysis.filter(twitterdataanalysis["sourcehashtag"].isin(["HappyBirthdayNTR", "NTRBdayFestBegins"])) \
    .groupBy("sourcehashtag", "username") \
    .agg(countDistinct("id").alias("count")) \
    .groupBy("username") \
    .agg(countDistinct(when(col("sourcehashtag") == "HappyBirthdayNTR", col("username"))).alias("HappyBirthdayNTR_count"),
         countDistinct(when(col("sourcehashtag") == "NTRBdayFestBegins", col("username"))).alias("NTRBdayFestBegins_count")) \
    .na.fill(0)

# Get the common usernames and the sum of distinct usernames
common_users = ntr_users.filter((col("HappyBirthdayNTR_count") > 0) & (col("NTRBdayFestBegins_count") > 0))
common_count = common_users.count()

distinct_users_sum = ntr_users.selectExpr("sum(HappyBirthdayNTR_count + NTRBdayFestBegins_count) as distinct_users_sum") \
    .collect()[0]["distinct_users_sum"]

print(f"Number of common usernames: {common_count}")
print(f"Sum of distinct usernames who participated in both categories: {distinct_users_sum}")
print(f"Percentage of users in repeated participation of trends:",(common_count/distinct_users_sum)*100)



Number of common usernames: 1227
Sum of distinct usernames who participated in both categories: 14968
Percentage of users in repeated participation of trends: 8.19748797434527


                                                                                

In [30]:
from pyspark.sql.functions import collect_set

# Filter tweets related to tnwelcomesmodi and gobackmodi hashtags
tnwelcomesmodi_users = twitterdataanalysis.filter(twitterdataanalysis.sourcehashtag == "tnwelcomesmodi").select("username")
gobackmodi_users = twitterdataanalysis.filter(twitterdataanalysis.sourcehashtag == "gobackmodi").select("username")

# Find common and distinct usernames between tnwelcomesmodi and gobackmodi tweets
common_users = tnwelcomesmodi_users.intersect(gobackmodi_users).distinct().count()
distinct_users = tnwelcomesmodi_users.union(gobackmodi_users).agg(collect_set('username')).collect()[0][0]

# Calculate counts
common_user_count = common_users
distinct_user_count = len(distinct_users)

# Print the results
print("Common usernames between tnwelcomesmodi and gobackmodi: {}".format(common_user_count))
print("Distinct usernames in tnwelcomesmodi and gobackmodi: {}".format(distinct_user_count))
print(f"Percentage of users in repeated participation of trends:",(common_user_count/distinct_user_count)*100)



Common usernames between tnwelcomesmodi and gobackmodi: 561
Distinct usernames in tnwelcomesmodi and gobackmodi: 19593
Percentage of users in repeated participation of trends: 2.863267493492574


                                                                                