In [1]:
import json
import ast
from datetime import datetime

In [2]:
URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("hdfs://localhost:9000"), Configuration())

status = fs.listStatus(Path('/social_media/raw_json/'))

for fileStatus in status:
    file_path = fileStatus.getPath()
    if "facebook" in file_path.getName():
        print(file_path)

hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972260556_y83jramqg4.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972326665_xiv4mxxkf1.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972388728_eras2ibu8c.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972461175_dnk01ddzwp.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972529514_qm6evhqmdi.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972597234_dsfnp7rwip.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972659448_plxc9a4dyq.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972728442_cslxg1sf2d.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972796033_xkyaiietm7.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972866646_rkin1huies.json
hdfs://localhost:9000/social_media/raw_json/facebook_post_1641972938845_gx4gmkohlz.json
hdfs://localhost:9000/social_med

In [3]:
# get others filename
others_json = []

['anaktester_go','byu.id','gridoto_news','myxl','telkomsel']

for fileStatus in status:
    file_path = fileStatus.getPath()
    if 'anaktester_go' in file_path.getName() or\
        'byu.id' in file_path.getName() or\
        'gridoto_news' in file_path.getName() or\
        'myxl' in file_path.getName() or\
        'telkomsel' in file_path.getName():
        others_json.append("hdfs://localhost:9000/social_media/raw_json/"+file_path.getName())

print(others_json)

['hdfs://localhost:9000/social_media/raw_json/anaktester_go.json.json', 'hdfs://localhost:9000/social_media/raw_json/byu.id.json.json', 'hdfs://localhost:9000/social_media/raw_json/gridoto_news.json.json', 'hdfs://localhost:9000/social_media/raw_json/myxl.json.json', 'hdfs://localhost:9000/social_media/raw_json/telkomsel.json.json']


In [5]:
# from list of filenames, get the content and load the json
RDD1_filename = sc.wholeTextFiles(','.join(others_json))
RDD1_values = RDD1_filename.values()
RDD1 = RDD1_values.map(lambda x: json.loads(x))

# get the 'GraphImages' key for the value of array of dicts / json + the name of the social media
# all can be retreived from the first 'username' key
RDD2 = RDD1.map(lambda x: (x['GraphImages'], x['GraphImages'][0]['username']))

# add the 'username' value, being either 'anaktester_go','byu.id','gridoto_news','myxl','telkomsel'
# added as a separate key 'social_media' for each dict
# this is done because some of the dicts doesn't contain the 'username' key
# this is so that the final process will be easier to acccess the name
RDD3 = RDD2.map(lambda x: [{**d, "social_media":x[1]} for d in x[0]])

# flatmap the array of array of dicts to just one big array of dicts
RDD4 = RDD3.flatMap(lambda x: x)

# filter the dicts containing taken_at_timestamp and the dicts that doesn't
RDD4_timestamp = RDD4.filter(lambda x: 'taken_at_timestamp' in list(x))
# filter the dicts containing comments
RDD4_comment = RDD4.filter(lambda x: 'comments' in list(x))

# map the dict containing timestamp to format ((soc_med, time), 1)
# this is also treated as only the post element
RDD5_timestamp = RDD4_timestamp.map(lambda x: ((x['social_media'],\
                                               datetime.utcfromtimestamp(x['taken_at_timestamp'])\
                                               .strftime('%Y-%m-%d')),\
                                               1))

# grab the x['comments']['data'] to create yet another array of array of dicts + the social media name
RDD5_comment_data = RDD4_comment.map(lambda x: (x['comments']['data'], x['social_media']))
# filter only the comment data that contains more than 0 elements
RDD5_comment_data_filtered = RDD5_comment_data.filter(lambda x: len(x[0])>0)
# as done in the RDD2-3 step, add social media as a key for easier retreival
RDD5_comment_data_transformed = RDD5_comment_data_filtered.map(lambda x: [{**d, "social_media":x[1]} for d in x[0]])
# flatmap array of array of dicts to one single array of dicts
RDD5_comment_flat = RDD5_comment_data_transformed.flatMap(lambda x: x)
# map the dict containing timestamp to format ((soc_med, time), 1)
RDD5_comment_timestamp = RDD5_comment_flat.map(lambda x: ((x['social_media'],\
                                                          datetime.utcfromtimestamp(x['created_at'])\
                                                          .strftime('%Y-%m-%d')),\
                                                          1))

# union the timestamp from post and timestamp from comments
RDD6 = RDD5_timestamp.union(RDD5_comment_timestamp)
# RDD6.collect()
# reduce by key and reformat to (socmed, time, totalnum)
RDD7 = RDD6.reduceByKey(lambda x,y: x+y)
RDD8 = RDD7.map(lambda x: (x[0][0], x[0][1], x[1]))
# the final result seems to be sorted descending
# save to another variable for further use, and reuse variable name
RDD_others = RDD8
RDD_others.collect()

[('anaktester_go', '2022-02-11', 84),
 ('anaktester_go', '2022-02-10', 164),
 ('anaktester_go', '2022-02-07', 51),
 ('anaktester_go', '2022-02-01', 7),
 ('anaktester_go', '2022-01-28', 2),
 ('anaktester_go', '2022-01-27', 7),
 ('anaktester_go', '2022-01-17', 8),
 ('anaktester_go', '2022-01-16', 9),
 ('anaktester_go', '2022-01-15', 13),
 ('anaktester_go', '2022-01-14', 12),
 ('anaktester_go', '2022-01-13', 15),
 ('anaktester_go', '2022-01-09', 11),
 ('anaktester_go', '2022-01-07', 14),
 ('anaktester_go', '2022-01-06', 40),
 ('anaktester_go', '2022-01-03', 4),
 ('anaktester_go', '2022-01-01', 11),
 ('anaktester_go', '2021-12-30', 21),
 ('byu.id', '2022-02-15', 69),
 ('byu.id', '2022-02-14', 67),
 ('byu.id', '2022-02-11', 294),
 ('byu.id', '2022-02-10', 86),
 ('byu.id', '2022-02-07', 116),
 ('byu.id', '2022-02-04', 59),
 ('gridoto_news', '2021-12-27', 1),
 ('gridoto_news', '2021-12-09', 2),
 ('gridoto_news', '2021-12-06', 1),
 ('gridoto_news', '2021-12-02', 2),
 ('gridoto_news', '2021-12-

In [6]:
# get facebook filenames
facebook_json = []

for fileStatus in status:
    file_path = fileStatus.getPath()
    if "facebook" in file_path.getName():
        facebook_json.append("hdfs://localhost:9000/social_media/raw_json/"+file_path.getName())

In [7]:
# from list of filenames, get the content and load the json
RDD1_filename = sc.wholeTextFiles(','.join(facebook_json))
RDD1_values = RDD1_filename.values()
RDD1 = RDD1_values.map(lambda x: json.loads(x))

# flatmap arr of arr of dict to just arr of dict
RDD2 = RDD1.flatMap(lambda x: x)

# take only dimension x['created_time'][:10] -> corresponding to date w format yyyy-mm-dd
RDD3 = RDD2.map(lambda x: (('facebook',x['created_time'][:10]), 1))

# do the same but from the comments_data
RDD4 = RDD2.map(lambda x: x['comments']['data'])
RDD5 = RDD4.flatMap(lambda x: x)
RDD6 = RDD5.map(lambda x: (('facebook',x['created_time'][:10]), 1))

# union the result, reducebykey, and reformat to (socmed, date, num)
RDD7 = RDD3.union(RDD6)
RDD8 = RDD7.reduceByKey(lambda x,y: x+y)
RDD9 = RDD8.map(lambda x: (x[0][0], x[0][1], x[1]))

# change to RDD_facebook for further processing and reuse variable name
RDD_facebook = RDD9
RDD_facebook.collect()

[('facebook', '2021-01-30', 17),
 ('facebook', '2021-01-28', 26),
 ('facebook', '2021-01-25', 22),
 ('facebook', '2021-01-21', 36),
 ('facebook', '2021-01-19', 105),
 ('facebook', '2021-01-18', 95),
 ('facebook', '2021-01-15', 65),
 ('facebook', '2021-01-10', 27),
 ('facebook', '2021-01-02', 24),
 ('facebook', '2021-02-28', 28),
 ('facebook', '2021-02-25', 57),
 ('facebook', '2021-02-23', 64),
 ('facebook', '2021-02-21', 27),
 ('facebook', '2021-02-15', 29),
 ('facebook', '2021-02-14', 50),
 ('facebook', '2021-02-13', 19),
 ('facebook', '2021-02-11', 52),
 ('facebook', '2021-02-07', 27),
 ('facebook', '2021-02-05', 117),
 ('facebook', '2021-02-04', 50),
 ('facebook', '2021-03-28', 42),
 ('facebook', '2021-03-26', 111),
 ('facebook', '2021-03-22', 84),
 ('facebook', '2021-03-19', 36),
 ('facebook', '2021-03-15', 47),
 ('facebook', '2021-03-11', 36),
 ('facebook', '2021-03-07', 34),
 ('facebook', '2021-03-06', 64),
 ('facebook', '2021-03-05', 84),
 ('facebook', '2021-03-04', 111),
 ('fac

In [8]:
# get instagram filenames
instagram_json = []

for fileStatus in status:
    file_path = fileStatus.getPath()
    if "instagram" in file_path.getName():
        instagram_json.append("hdfs://localhost:9000/social_media/raw_json/"+file_path.getName())

In [10]:
# from list of filenames, get the content and load the json
RDD1_filename = sc.wholeTextFiles(','.join(instagram_json))
RDD1_values = RDD1_filename.values()
RDD1 = RDD1_values.map(lambda x: json.loads(x))

# flatmap arr of arr of dict to just arr of dict
RDD2 = RDD1.flatMap(lambda x: x)


# take only dimension x['created_time'] and convert to string w format yyyy-mm-dd
RDD3 = RDD2.map(lambda x: (('instagram',datetime.utcfromtimestamp(int(x['created_time'])).strftime('%Y-%m-%d')), 1))
# reduce by key and reformat
RDD4 = RDD3.reduceByKey(lambda x,y: x+y)
RDD5 = RDD4.map(lambda x: (x[0][0], x[0][1], x[1]))
# save to another variable for reuse
RDD_instagram = RDD5
RDD_instagram.collect()

[('instagram', '2021-03-20', 95),
 ('instagram', '2021-03-21', 35),
 ('instagram', '2021-03-22', 53),
 ('instagram', '2021-03-29', 5),
 ('instagram', '2021-04-09', 33),
 ('instagram', '2021-04-22', 9),
 ('instagram', '2021-01-04', 10),
 ('instagram', '2021-01-15', 25),
 ('instagram', '2021-01-16', 89),
 ('instagram', '2021-01-17', 20),
 ('instagram', '2021-01-18', 28),
 ('instagram', '2021-01-19', 16),
 ('instagram', '2021-01-20', 8),
 ('instagram', '2021-01-27', 17),
 ('instagram', '2021-01-13', 21),
 ('instagram', '2021-01-14', 8),
 ('instagram', '2020-12-10', 9),
 ('instagram', '2020-12-11', 2),
 ('instagram', '2020-12-12', 2),
 ('instagram', '2020-12-18', 4),
 ('instagram', '2021-03-12', 21),
 ('instagram', '2021-03-13', 2),
 ('instagram', '2021-02-14', 88),
 ('instagram', '2021-02-15', 18),
 ('instagram', '2021-02-09', 27),
 ('instagram', '2021-02-10', 24),
 ('instagram', '2021-02-11', 10),
 ('instagram', '2021-05-05', 20),
 ('instagram', '2021-05-06', 41),
 ('instagram', '2021-05

In [11]:
# get instagram filenames
youtube_json = []

for fileStatus in status:
    file_path = fileStatus.getPath()
    if "youtube" in file_path.getName():
        youtube_json.append("hdfs://localhost:9000/social_media/raw_json/"+file_path.getName())

In [12]:
# from list of filenames, get the content and load the json
RDD1_filename = sc.wholeTextFiles(','.join(youtube_json))
RDD1_values = RDD1_filename.values()
RDD1 = RDD1_values.map(lambda x: json.loads(x))

# flatmap arr of arr of dict to just arr of dict
RDD2 = RDD1.flatMap(lambda x: x)

# filter the ones containing publishedAt
RDD3 = RDD2.filter(lambda x: 'publishedAt' in list(x['snippet']))

# format to ((socmed, date), num)
RDD4 = RDD3.map(lambda x: (('youtube', x['snippet']['publishedAt'][:10]), 1))

# reduce by key and reformat
RDD5 = RDD4.reduceByKey(lambda x,y: x+y)
RDD6 = RDD5.map(lambda x: (x[0][0], x[0][1], x[1]))

RDD_youtube = RDD6
RDD_youtube.collect()

[('youtube', '2021-07-26', 11),
 ('youtube', '2021-07-22', 8),
 ('youtube', '2021-07-15', 8),
 ('youtube', '2021-06-27', 6),
 ('youtube', '2021-06-23', 10),
 ('youtube', '2021-06-17', 12),
 ('youtube', '2021-06-14', 7),
 ('youtube', '2021-06-12', 172),
 ('youtube', '2021-06-11', 4),
 ('youtube', '2021-06-08', 6),
 ('youtube', '2021-06-06', 14),
 ('youtube', '2021-06-01', 8),
 ('youtube', '2021-05-31', 28),
 ('youtube', '2021-05-20', 13),
 ('youtube', '2021-05-18', 10),
 ('youtube', '2021-05-15', 2),
 ('youtube', '2021-05-11', 34),
 ('youtube', '2021-05-05', 2),
 ('youtube', '2021-05-01', 2),
 ('youtube', '2021-04-30', 2),
 ('youtube', '2021-04-29', 10),
 ('youtube', '2021-04-28', 18),
 ('youtube', '2021-04-26', 86),
 ('youtube', '2021-04-25', 10),
 ('youtube', '2021-04-24', 26),
 ('youtube', '2021-04-23', 22),
 ('youtube', '2021-04-22', 54),
 ('youtube', '2021-04-21', 144),
 ('youtube', '2021-04-20', 366),
 ('youtube', '2021-07-20', 13),
 ('youtube', '2021-07-23', 11),
 ('youtube', '20

In [13]:
# get instagram filenames
twitter_json = []

for fileStatus in status:
    file_path = fileStatus.getPath()
    if "twitter" in file_path.getName():
        twitter_json.append("hdfs://localhost:9000/social_media/raw_json/"+file_path.getName())

In [14]:
# from list of filenames, get the content and load the json
RDD1_filename = sc.wholeTextFiles(','.join(twitter_json))
RDD1_values = RDD1_filename.values()
RDD1 = RDD1_values.map(lambda x: json.loads(x))

# rdd1 is an arr of arr of dict, so we flatmap
RDD2 = RDD1.flatMap(lambda x: x)
# map the data to ((socmed, date), 1)
RDD3 = RDD2.map(lambda x: (('twitter',\
                           datetime.strftime(datetime.strptime(x['created_at'],'%a %b %d %H:%M:%S +0000 %Y'), '%Y-%m-%d')),\
                          1)
               )

# reducebykey and reformat
RDD4 = RDD3.reduceByKey(lambda x,y: x+y)
RDD5 = RDD4.map(lambda x: (x[0][0], x[0][1], x[1]))

# save to another variable for savekeeping
RDD_twitter = RDD5
RDD_twitter.collect()

[('twitter', '2021-01-01', 11),
 ('twitter', '2021-01-10', 5),
 ('twitter', '2021-01-16', 1),
 ('twitter', '2021-01-18', 7),
 ('twitter', '2021-01-19', 24),
 ('twitter', '2021-02-05', 185),
 ('twitter', '2021-01-29', 3),
 ('twitter', '2021-01-31', 7),
 ('twitter', '2021-01-28', 4),
 ('twitter', '2021-02-06', 17),
 ('twitter', '2021-02-12', 3),
 ('twitter', '2021-02-08', 7),
 ('twitter', '2021-03-04', 13),
 ('twitter', '2021-03-01', 43),
 ('twitter', '2021-03-02', 21),
 ('twitter', '2021-03-03', 67),
 ('twitter', '2021-03-12', 27),
 ('twitter', '2021-03-06', 20),
 ('twitter', '2021-03-11', 12),
 ('twitter', '2021-03-07', 35),
 ('twitter', '2021-03-05', 13),
 ('twitter', '2021-03-09', 62),
 ('twitter', '2021-03-08', 28),
 ('twitter', '2021-03-22', 29),
 ('twitter', '2021-03-20', 39),
 ('twitter', '2021-03-18', 35),
 ('twitter', '2021-03-19', 40),
 ('twitter', '2021-03-15', 30),
 ('twitter', '2021-03-17', 81),
 ('twitter', '2021-03-16', 35),
 ('twitter', '2021-03-26', 27),
 ('twitter', '2

In [15]:
RDD_all = RDD_others.union(RDD_facebook)
RDD_all = RDD_all.union(RDD_instagram)
RDD_all = RDD_all.union(RDD_youtube)
RDD_all = RDD_all.union(RDD_twitter)
RDD_all.collect()

[('anaktester_go', '2022-02-11', 84),
 ('anaktester_go', '2022-02-10', 164),
 ('anaktester_go', '2022-02-07', 51),
 ('anaktester_go', '2022-02-01', 7),
 ('anaktester_go', '2022-01-28', 2),
 ('anaktester_go', '2022-01-27', 7),
 ('anaktester_go', '2022-01-17', 8),
 ('anaktester_go', '2022-01-16', 9),
 ('anaktester_go', '2022-01-15', 13),
 ('anaktester_go', '2022-01-14', 12),
 ('anaktester_go', '2022-01-13', 15),
 ('anaktester_go', '2022-01-09', 11),
 ('anaktester_go', '2022-01-07', 14),
 ('anaktester_go', '2022-01-06', 40),
 ('anaktester_go', '2022-01-03', 4),
 ('anaktester_go', '2022-01-01', 11),
 ('anaktester_go', '2021-12-30', 21),
 ('byu.id', '2022-02-15', 69),
 ('byu.id', '2022-02-14', 67),
 ('byu.id', '2022-02-11', 294),
 ('byu.id', '2022-02-10', 86),
 ('byu.id', '2022-02-07', 116),
 ('byu.id', '2022-02-04', 59),
 ('gridoto_news', '2021-12-27', 1),
 ('gridoto_news', '2021-12-09', 2),
 ('gridoto_news', '2021-12-06', 1),
 ('gridoto_news', '2021-12-02', 2),
 ('gridoto_news', '2021-12-

In [16]:
dataframe = RDD_all.toDF(['social_media','date','count'])
dataframe.show()

+-------------+----------+-----+
| social_media|      date|count|
+-------------+----------+-----+
|anaktester_go|2022-02-11|   84|
|anaktester_go|2022-02-10|  164|
|anaktester_go|2022-02-07|   51|
|anaktester_go|2022-02-01|    7|
|anaktester_go|2022-01-28|    2|
|anaktester_go|2022-01-27|    7|
|anaktester_go|2022-01-17|    8|
|anaktester_go|2022-01-16|    9|
|anaktester_go|2022-01-15|   13|
|anaktester_go|2022-01-14|   12|
|anaktester_go|2022-01-13|   15|
|anaktester_go|2022-01-09|   11|
|anaktester_go|2022-01-07|   14|
|anaktester_go|2022-01-06|   40|
|anaktester_go|2022-01-03|    4|
|anaktester_go|2022-01-01|   11|
|anaktester_go|2021-12-30|   21|
|       byu.id|2022-02-15|   69|
|       byu.id|2022-02-14|   67|
|       byu.id|2022-02-11|  294|
+-------------+----------+-----+
only showing top 20 rows



In [17]:
dataframe\
.coalesce(1)\
.write.format('com.databricks.spark.csv')\
.options(header='true')\
.save('hdfs://localhost:9000/social_media/output_milestone_tubes2')