In [4]:
from pyspark import SparkContext
import sys
import json
from datetime import datetime
import csv, io

# rdd to csv formatter
def list_to_csv_str(x):
    """Given a list of strings, returns a properly-csv-formatted string."""
    output = io.StringIO("")
    csv.writer(output).writerow(x)
    return output.getvalue().strip() # remove extra newline

# insert file directory here
rdd = sc.wholeTextFiles("hdfs://localhost:9000/raw_json")

youtube_rdd = rdd.filter(lambda x: "youtube" in x[0]).map(lambda x: x[1])
facebook_rdd = rdd.filter(lambda x: "facebook" in x[0]).map(lambda x: x[1])
insta_rdd = rdd.filter(lambda x: "instagram" in x[0]).map(lambda x: x[1])
twitter_rdd = rdd.filter(lambda x: "twitter" in x[0]).map(lambda x: x[1])

youtube_json = youtube_rdd.flatMap(lambda string: json.loads(string))
youtube_json.persist()
facebook_json = facebook_rdd.flatMap(lambda string: json.loads(string))
facebook_json.persist()
insta_json = insta_rdd.flatMap(lambda string: json.loads(string))
insta_json.persist()
twitter_json = twitter_rdd.flatMap(lambda string: json.loads(string))
twitter_json.persist()

# process youtube
def youtube_mapper(post):
    try:
        # format yyyy-mm-dd
        if post["kind"] == "youtube#commentThread":
            post_date = post["snippet"]["topLevelComment"]["snippet"]["publishedAt"].split('T')[0]
        else:
            post_date = post["snippet"]["publishedAt"].split('T')[0]

        return (post_date, 1)

    except Exception as e:
        sys.stderr.write("certain attribute doesn't exist: %s" % e)

youtube_mapped = youtube_json.map(lambda post: youtube_mapper(post))
youtube_reduced = youtube_mapped.reduceByKey(lambda x, y: x + y).map(lambda x: ("youtube", x[0], x[1]))

# process twitter
def twitter_mapper(post):
    month_dict = dict({
        "Jan": "01", "Feb": "02", "Mar": "03",
        "Apr": "04", "May": "05", "Jun": "06",
        "Jul": "07", "Aug": "08", "Sep": "09",
        "Oct": "10", "Nov": "11", "Dec": "12"
    })
    
    try:
        # format yyyy-mm-dd
        try:
            post_dayname, post_month, post_daydate, post_ts, post_offset, post_year = post["created_at"].split(' ')
            post_month = month_dict[post_month]
            
            return ("{}-{}-{}".format(post_year, post_month, post_daydate), 1)

        except Exception as e:
            sys.stderr.write("failed while converting: %s" % e)

    except Exception as e:
        sys.stderr.write("certain attribute doesn't exist: %s" % e)

twitter_mapped = twitter_json.map(lambda post: twitter_mapper(post))
twitter_reduced = twitter_mapped.reduceByKey(lambda x, y: x + y).map(lambda x: ("twitter", x[0], x[1]))

# process instagram
def insta_mapper(post):
    # if IG status, media, or post
    if "link" in post and "instagram" in post["link"]:
        try:
            # format yyyy-mm-dd
            try:
                post_timestamp = int(post["created_time"])
                post_date = str(datetime.utcfromtimestamp(post_timestamp).strftime('%Y-%m-%d'))
                
                # return ("instagram", post_date.split(' ')[0])
                return (post_date, 1)

            except Exception as e:
                sys.stderr.write("failed converting from timestamp: %s" % e)               

        except Exception as e:
            sys.stderr.write("certain attribute doesn't exist: %s" % e)

    #if IG comment
    elif "parent" in post and "instagram" in post["parent"]["link"]:
        try:
            # format yyyy-mm-dd
            try:
                post_timestamp = int(post["created_time"])
                post_date = str(datetime.utcfromtimestamp(post_timestamp).strftime('%Y-%m-%d'))
                
                # return ("instagram", post_date.split(' ')[0])
                return (post_date, 1)

            except Exception as e:
                sys.stderr.write("failed converting from timestamp: %s" % e)               

        except Exception as e:
            sys.stderr.write("certain attribute doesn't exist: %s" % e)
    
    else:
        return ("NoDate", 1)

insta_mapped = insta_json.map(lambda post: insta_mapper(post))
insta_mapped.persist()
insta_reduced = insta_mapped.reduceByKey(lambda x, y: x + y).map(lambda x: ("instagram", x[0], x[1]))

# process facebook
def facebook_post_mapper(post):
    try:
        post_date = post["created_time"].split('T')[0]
        return (post_date, 1)

    except Exception as e:
        sys.stderr.write("certain attribute doesn't exist: %s" % e)
        
def facebook_comment_mapper(comment):
    try:
        comment_date = comment["created_time"].split('T')[0]
        return (comment_date, 1)
    
    except Exception as e:
        sys.stderr.write("certain attribute doesn't exist: %s" % e)
        
facebook_post_mapped = facebook_json.map(lambda post: facebook_post_mapper(post))
facebook_post_mapped.persist()

facebook_comment_json = facebook_json.flatMap(lambda x: x["comments"]["data"])
facebook_comment_mapped = facebook_comment_json.map(lambda comment: facebook_comment_mapper(comment))
facebook_comment_mapped.persist()

facebook_mapped = facebook_post_mapped.union(facebook_comment_mapped)
facebook_reduced = facebook_mapped.reduceByKey(lambda x, y: x + y).map(lambda x: ("facebook", x[0], x[1]))

result = youtube_reduced.union(facebook_reduced).union(twitter_reduced).union(insta_reduced)
result_as_csv = result.map(list_to_csv_str)
result_as_csv.saveAsTextFile("hdfs://localhost:9000/output_m2/milestone2_result2.csv")

print("done")


done


#### 