# Task-2 Notebook
Author: M. Abdi Haryadi. H (13519156)

In [1]:
import json
import pyspark.sql.functions as psf
import time

In [2]:
def hadoop_to_absolute_path(hadoop_path):
    return f"hdfs://localhost:9000{hadoop_path}"

In [3]:
def read_json_from_hadoop_path(hadoop_path: str):
    absolute_path = hadoop_to_absolute_path(hadoop_path)
    return spark.read.json(absolute_path)

## Instagram Dataframe

### General-Data Instagram Dataframe

In [4]:
print("Reading Instagram general contents ...")
df = read_json_from_hadoop_path("/social_media/raw_json/instagram_*.json")
print("Reading done.")

coalesce_df = df.coalesce(1)
id_date_df = coalesce_df.select(
    psf.col("id"),
    psf.from_unixtime(timestamp="created_time", format="yyyy-MM-dd").alias("date")
)

general_id_date_df = id_date_df

Reading Instagram contents ...
Reading done.


### Specific-Data Instagram Dataframe
There are five files with `.json.json` postfix. `anaktester_go.json.json` has a broken file at the start and the end, but the others only at the start. Therefore, I ignore `anaktester_go.json.json`. The missing character is an open curly-bracket (`{`).

In [5]:
def fixed_json_flat_map(path_content_couple):
    path, content = path_content_couple
    new_content = "{" + content
    result = []
    try:
        json_dict = json.loads(new_content)
        result.append(new_content)
    except json.JSONDecodeError:
        pass # Just ignore it.
    
    return result

rdd = sc.wholeTextFiles(hadoop_to_absolute_path("/social_media/raw_json/*.json.json"))
fixed_rdd = rdd.flatMap(fixed_json_flat_map)

print("Reading Instagram specific contents ...")
df = spark.read.json(fixed_rdd)
print("Reading done.")

coalesce_df = df.coalesce(1)
exploded_df = coalesce_df.select(psf.explode("GraphImages").alias("GraphImages")).select("GraphImages.*")

specific_contents_id_date_df = (
    exploded_df.select(
        psf.col("id"),
        psf.from_unixtime(timestamp="taken_at_timestamp", format="yyyy-MM-dd").alias("date")
    )
)

specific_comments_id_date_df = (
    exploded_df.select("comments.*")
        .select(psf.explode("data").alias("data"))
        .select("data.*")
        .select(
            psf.col("id"),
            psf.from_unixtime(timestamp="created_at", format="yyyy-MM-dd").alias("date")
        )
)

union_df = specific_contents_id_date_df.union(specific_comments_id_date_df)
coalesce_df = union_df.coalesce(1)

specific_id_date_df = coalesce_df

### Merging

In [6]:
df = general_id_date_df.union(specific_id_date_df)
coalesce_df = df.coalesce(1)
distinct_df = coalesce_df.distinct()
date_group = distinct_df.groupBy("date")
count_df = date_group.agg(psf.count("id").alias("count"))

instagram_df = count_df

## Facebook Dataframe

In [7]:
print("Reading Facebook posts ...")
df = read_json_from_hadoop_path("/social_media/raw_json/facebook_post_*.json")
print("Reading done.")

coalesce_df = df.coalesce(1)
id_date_df = coalesce_df.select(
    psf.col("id"),
    psf.to_date("created_time").alias("date")
)
distinct_df = id_date_df.distinct()
date_group = distinct_df.groupBy("date")
count_df = date_group.agg(psf.count("id").alias("count"))

facebook_df = count_df

Reading Facebook posts ...
Reading done.


## YouTube Dataframe

### YouTube Comment Dataframe

In [8]:
print("Reading Youtube comments ...")
df = read_json_from_hadoop_path("/social_media/raw_json/youtube_comment_*.json")
print("Reading done.")

coalesce_df = df.coalesce(1)
id_date_df = coalesce_df.select(
    "snippet.topLevelComment.id",
    psf.to_date("snippet.topLevelComment.snippet.publishedAt").alias("date")
)
distinct_df = id_date_df.distinct()
date_group = distinct_df.groupBy("date")
count_df = date_group.agg(psf.count("id").alias("count"))

youtube_comment_df = count_df

Reading Youtube comments ...
Reading done.


### YouTube Video Dataframe

In [9]:
print("Reading YouTube videos ...")
df = read_json_from_hadoop_path("/social_media/raw_json/youtube_video_*.json")
print("Reading done.")

coalesce_df = df.coalesce(1)
id_date_df = coalesce_df.select(
    "id",
    psf.to_date("snippet.publishedAt").alias("date")
)
distinct_df = id_date_df.distinct()
date_group = distinct_df.groupBy("date")
count_df = date_group.agg(psf.count("id").alias("count"))

youtube_video_df = count_df

Reading YouTube videos ...
Reading done.


### Merging

In [10]:
df = youtube_comment_df.union(youtube_video_df)
coalesce_df = df.coalesce(1)

youtube_df = coalesce_df

## Twitter Dataframe

In [11]:
print("Reading Twitter status ...")
df = read_json_from_hadoop_path("/social_media/raw_json/twitter_status_*.json")
print("Reading done.")

coalesce_df = df.coalesce(1)
id_date_df = coalesce_df.select(
    psf.col("id"),
    psf.to_date(psf.substring("created_at", 5, 26), "MMM dd HH:mm:ss Z yyyy").alias("date")
)
distinct_df = id_date_df.distinct()
date_group = distinct_df.groupBy("date")
count_df = date_group.agg(psf.count("id").alias("count"))

twitter_df = count_df

Reading Twitter status ...
Reading done.


## Merging

In [12]:
labeled_df = instagram_df.select(
    psf.col("date"),
    psf.lit("instagram").alias("social_media"),
    psf.col("count")
)

merged_df = labeled_df

In [13]:
labeled_df = facebook_df.select(
    psf.col("date"),
    psf.lit("facebook").alias("social_media"),
    psf.col("count")
)
union_df = labeled_df.union(merged_df)
coalesce_df = union_df.coalesce(1)

merged_df = coalesce_df

In [14]:
labeled_df = youtube_df.select(
    psf.col("date"),
    psf.lit("youtube").alias("social_media"),
    psf.col("count")
)
union_df = labeled_df.union(merged_df)
coalesce_df = union_df.coalesce(1)

merged_df = coalesce_df

In [15]:
labeled_df = twitter_df.select(
    psf.col("date"),
    psf.lit("twitter").alias("social_media"),
    psf.col("count")
)
union_df = labeled_df.union(merged_df)
coalesce_df = union_df.coalesce(1)

merged_df = coalesce_df

In [16]:
print("Writing ...")
start_time = time.time()
merged_df.write.csv(hadoop_to_absolute_path("/social_media/task_2_output"))
end_time = time.time()
print("Done!")
print("Duration: {} s".format(end_time - start_time))

Writing ...
Done!
Duration: 36.08039116859436 s
