In [1]:
import datetime
import time as tm
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("dataframe-spark").getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

In [2]:
# FACEBOOK
facebook_data= spark.read.json("hdfs://localhost:9000/json/facebook/*.json").rdd

facebook_post = facebook_data.map(lambda x: (("facebook", x['created_time'][:10]),1))

facebook_comment= facebook_data.flatMap(lambda x: x['comments']['data'])
facebook_comment = facebook_comment.map(lambda x: (("facebook", x['created_time'][:10]),1))

mapped_fb = facebook_post.union(facebook_comment)
reduced_fb = mapped_fb.reduceByKey(lambda x,y: x+y)
reduced_fb.collect()

                                                                                

[(('facebook', '2021-12-20'), 27),
 (('facebook', '2021-12-14'), 148),
 (('facebook', '2021-12-08'), 209),
 (('facebook', '2021-12-05'), 32),
 (('facebook', '2021-12-04'), 34),
 (('facebook', '2021-11-01'), 286),
 (('facebook', '2021-10-31'), 105),
 (('facebook', '2021-10-15'), 29),
 (('facebook', '2021-10-11'), 49),
 (('facebook', '2021-03-26'), 111),
 (('facebook', '2021-09-25'), 26),
 (('facebook', '2021-09-14'), 38),
 (('facebook', '2021-09-13'), 59),
 (('facebook', '2021-07-10'), 36),
 (('facebook', '2021-07-05'), 25),
 (('facebook', '2021-07-03'), 15),
 (('facebook', '2021-04-26'), 53),
 (('facebook', '2021-04-25'), 28),
 (('facebook', '2021-04-21'), 69),
 (('facebook', '2021-04-10'), 62),
 (('facebook', '2021-02-25'), 57),
 (('facebook', '2021-02-21'), 27),
 (('facebook', '2021-02-05'), 117),
 (('facebook', '2021-01-28'), 26),
 (('facebook', '2021-01-25'), 22),
 (('facebook', '2021-01-18'), 95),
 (('facebook', '2021-01-15'), 65),
 (('facebook', '2021-05-27'), 24),
 (('facebook',

In [3]:
# YOUTUBE

youtube_data = spark.read.json("hdfs://localhost:9000/json/youtube/").rdd

youtube_comment = youtube_data.filter(lambda x: x['snippet']['topLevelComment'] is not None)
youtube_comment = youtube_comment.map(lambda x: (("youtube", x['snippet']['topLevelComment']['snippet']['publishedAt'][:10]),1))

youtube_replies = youtube_data.filter(lambda x: x['snippet']['publishedAt'] is not None)
youtube_replies = youtube_replies.map(lambda x: (("youtube", x['snippet']['publishedAt'][:10]),1))

mapped_yt= youtube_replies.union(youtube_comment)
reduced_yt = mapped_yt.reduceByKey(lambda x,y: x+y)
reduced_yt.collect()

                                                                                

[(('youtube', '2021-10-08'), 350),
 (('youtube', '2021-05-14'), 30),
 (('youtube', '2021-04-14'), 68),
 (('youtube', '2021-05-23'), 16),
 (('youtube', '2021-10-18'), 15),
 (('youtube', '2021-10-17'), 13),
 (('youtube', '2021-04-22'), 98),
 (('youtube', '2021-04-24'), 60),
 (('youtube', '2021-04-23'), 62),
 (('youtube', '2021-06-22'), 4),
 (('youtube', '2021-06-05'), 7),
 (('youtube', '2021-06-02'), 20),
 (('youtube', '2021-05-22'), 7),
 (('youtube', '2021-10-25'), 1),
 (('youtube', '2021-02-15'), 1),
 (('youtube', '2021-11-29'), 4),
 (('youtube', '2021-05-17'), 1),
 (('youtube', '2021-09-20'), 1),
 (('youtube', '2021-08-23'), 1),
 (('youtube', '2021-12-03'), 2),
 (('youtube', '2021-04-18'), 4),
 (('youtube', '2022-01-15'), 1),
 (('youtube', '2021-07-11'), 2),
 (('youtube', '2021-05-29'), 6),
 (('youtube', '2022-01-27'), 1),
 (('youtube', '2021-12-30'), 1),
 (('youtube', '2021-10-09'), 168),
 (('youtube', '2021-05-24'), 15),
 (('youtube', '2021-07-15'), 10),
 (('youtube', '2021-06-14'),

In [4]:
# INSTAGRAM

instagram_data = spark.read.json("hdfs://localhost:9000/json/instagram/").rdd

ig_data = instagram_data.map(lambda x: (("instagram",str(datetime.datetime.fromtimestamp(int(x['created_time'])).date())),1))

reduced_ig = ig_data.reduceByKey(lambda x,y: x+y)
reduced_ig.collect()


                                                                                

[(('instagram', '2022-01-12'), 484),
 (('instagram', '2022-01-26'), 1224),
 (('instagram', '2022-01-19'), 1362),
 (('instagram', '2022-01-04'), 452),
 (('instagram', '2021-11-20'), 120),
 (('instagram', '2021-11-12'), 169),
 (('instagram', '2021-11-09'), 59),
 (('instagram', '2021-11-03'), 94),
 (('instagram', '2021-10-26'), 66),
 (('instagram', '2021-10-19'), 63),
 (('instagram', '2021-10-07'), 80),
 (('instagram', '2021-08-05'), 6),
 (('instagram', '2021-07-16'), 108),
 (('instagram', '2021-06-24'), 13),
 (('instagram', '2021-07-13'), 10),
 (('instagram', '2021-04-11'), 9),
 (('instagram', '2021-02-24'), 21),
 (('instagram', '2021-02-18'), 17),
 (('instagram', '2021-07-30'), 14),
 (('instagram', '2021-08-20'), 4),
 (('instagram', '2022-02-02'), 2052),
 (('instagram', '2022-02-13'), 963),
 (('instagram', '2022-01-28'), 1305),
 (('instagram', '2022-01-03'), 391),
 (('instagram', '2021-11-29'), 136),
 (('instagram', '2021-10-04'), 48),
 (('instagram', '2021-09-25'), 40),
 (('instagram',

In [None]:
# TWITTER
import time as tm
twitter_data = spark.read.json("hdfs://localhost:9000/json/twitter/twitter/").rdd
mapped_tweet= twitter_data.map(lambda x: (("twitter",tm.strftime('%Y-%m-%d %H:%M:%S', tm.strptime(x['created_at'],'%a %b %d %H:%M:%S +0000 %Y')),1)))
reduced_tweet = mapped_tweet.reduceByKey(lambda x,y: x+y)
reduced_tweet.collect()

In [5]:
combinedData = reduced_ig.union(reduced_fb).union(reduced_yt)
combinedData = combinedData.map(lambda x: (x[0][0],x[0][1],x[1]))
combinedDf = combinedData.toDF(['social_media','date','count'])

combinedDf.show()

+------------+----------+-----+
|social_media|      date|count|
+------------+----------+-----+
|   instagram|2022-01-12|  484|
|   instagram|2022-01-26| 1224|
|   instagram|2022-01-19| 1362|
|   instagram|2022-01-04|  452|
|   instagram|2021-11-20|  120|
|   instagram|2021-11-12|  169|
|   instagram|2021-11-09|   59|
|   instagram|2021-11-03|   94|
|   instagram|2021-10-26|   66|
|   instagram|2021-10-19|   63|
|   instagram|2021-10-07|   80|
|   instagram|2021-08-05|    6|
|   instagram|2021-07-16|  108|
|   instagram|2021-06-24|   13|
|   instagram|2021-07-13|   10|
|   instagram|2021-04-11|    9|
|   instagram|2021-02-24|   21|
|   instagram|2021-02-18|   17|
|   instagram|2021-07-30|   14|
|   instagram|2021-08-20|    4|
+------------+----------+-----+
only showing top 20 rows



In [6]:
combinedDf.coalesce(1).write.mode('overwrite').option('header','true').csv('hdfs://localhost:9000/json/result.csv')

                                                                                