In [24]:
import json
import datetime

In [38]:
def mapper_youtube(data):
    ret = []
    for d in data:
        if "publishedAt" not in d["snippet"]:
            continue
        ret.append((('youtube', d["snippet"]["publishedAt"].split("T")[0]), 1))
    return ret

def mapper_facebook(data):
    ret = []

    for d in data:
        created_time = d["created_time"]
        date = created_time.split("T")[0]
        ret.append((('facebook', date), 1))

        for comment in d["comments"]["data"]:
            comment_created_time = comment["created_time"]
            date = comment_created_time.split("T")[0]
            ret.append((('facebook', date), 1))
    
    return ret

def process_twitter_date(created_time):
    month_str_to_int = {
        "Jan": "01",
        "Feb": "02",
        "Mar": "03",
        "Apr": "04",
        "May": "05",
        "Jun": "06",
        "Jul": "07",
        "Aug": "08",
        "Sep": "09",
        "Oct": "10",
        "Nov": "11",
        "Dec": "12",
    }
    tokens = created_time.split()
    year = tokens[5]
    month = month_str_to_int[tokens[1]]
    date = tokens[2]
    return f"{year}-{month}-{date}"

def mapper_twitter(data):
    return [(('twitter', process_twitter_date(d["created_at"])), 1) for d in data]

def timestamp_to_YYYYMMDD(epoch):
    return datetime.datetime.utcfromtimestamp(epoch).strftime("%Y-%m-%d")

def mapper_instagram(data):
    return [(('instagram', timestamp_to_YYYYMMDD(float(d["created_time"]))), 1) for d in data]

def mapper_others(data, social_media_type):
    ret = []

    for d in data["GraphImages"]:
        try:
            timestamp_taken = d["taken_at_timestamp"]
            date = timestamp_to_YYYYMMDD(timestamp_taken)
            ret.append((social_media_type, date, 1))
            for comment in d["comments"]["data"]:
                timestamp_created_at = comment["created_at"]
                date = timestamp_to_YYYYMMDD(timestamp_created_at)
                ret.append((social_media_type, date, 1))
        except:
            pass

In [45]:
social_media_data = sc.wholeTextFiles("hdfs://localhost:9000/input-social-media/*.json")

def process_json_data(data):
    filename_full, json_raw = data
    filename_without_ext = filename_full.split(".")[0]
    filename_relative = filename_without_ext.split("/")[-1]
    social_media_type = filename_relative.split("_")[0]
    data_parsed = json.loads(json_raw)
    return (social_media_type, data_parsed)

def aggregate_according_type(data):
    social_media_type, dict_data = data
    
    if social_media_type == "youtube":
        return mapper_youtube(dict_data)
    elif social_media_type == "instagram":
        return mapper_instagram(dict_data)
    elif social_media_type == "twitter":
        return mapper_twitter(dict_data)
    elif social_media_type == "facebook":
        return mapper_facebook(dict_data)
    else:
        return []

social_media_type_and_data = social_media_data.map(process_json_data)
social_media_data = social_media_type_and_data.flatMap(aggregate_according_type)
social_media_data_agg = social_media_data.reduceByKey(lambda x, y: x + y)
social_media_data_agg = social_media_data_agg.sortByKey()
social_media_data_transformed = social_media_data_agg.map(lambda x: (x[0][0], x[0][1], x[1]))

print(social_media_data_transformed.collect())

[('facebook', '2021-01-01', 20), ('facebook', '2021-01-02', 24), ('facebook', '2021-01-03', 6), ('facebook', '2021-01-04', 40), ('facebook', '2021-01-05', 36), ('facebook', '2021-01-06', 48), ('facebook', '2021-01-07', 74), ('facebook', '2021-01-08', 52), ('facebook', '2021-01-09', 8), ('facebook', '2021-01-10', 27), ('facebook', '2021-01-11', 21), ('facebook', '2021-01-12', 6), ('facebook', '2021-01-13', 66), ('facebook', '2021-01-14', 9), ('facebook', '2021-01-15', 65), ('facebook', '2021-01-16', 14), ('facebook', '2021-01-17', 1), ('facebook', '2021-01-18', 95), ('facebook', '2021-01-19', 105), ('facebook', '2021-01-20', 47), ('facebook', '2021-01-21', 36), ('facebook', '2021-01-22', 2), ('facebook', '2021-01-23', 1), ('facebook', '2021-01-25', 22), ('facebook', '2021-01-26', 82), ('facebook', '2021-01-27', 67), ('facebook', '2021-01-28', 26), ('facebook', '2021-01-29', 10), ('facebook', '2021-01-30', 17), ('facebook', '2021-01-31', 30), ('facebook', '2021-02-01', 96), ('facebook', 

In [40]:
def toCSVLine(data):
    return ','.join(str(d) for d in data)

lines = social_media_data_transformed.map(toCSVLine)
# lines.saveAsTextFile("hdfs://localhost:9000/output-milestone-social-media-spark/output.csv")

In [44]:
csv_data = lines.collect()
import csv

with open('./out.csv', 'w') as f:
    writer = csv.writer(f)
    for row in csv_data:
        row = row.split(",")
        writer.writerow(row)