# RDD Social Media
### Muhammad Naufal Satriandana 13520068

## Initial Setup

In [1]:
import datetime
import time

url = "hdfs://127.0.0.1:9000/socmed_input_alt/"
out_url = "hdfs://127.0.0.1:9000/socmed_output_alt/"
spark

## Instagram

In [2]:
ig = spark.read.json(f"{url}instagram*.json").rdd
# map the lines to a key-value pair
def map(line):
    date = line['created_time']
    # format date
    date = datetime.datetime.fromtimestamp(int(date)).strftime('%Y-%m-%d')
    return (("instagram", date), 1)

ig = ig.map(map)

ig = ig.reduceByKey(lambda x, y: x + y)

ig = ig.map(lambda x: (x[0][0], x[0][1], x[1]))

ig.take(5)

[('instagram', '2022-01-12', 439),
 ('instagram', '2022-01-26', 1235),
 ('instagram', '2022-01-19', 1357),
 ('instagram', '2022-01-04', 446),
 ('instagram', '2021-11-20', 150)]

## Youtube

In [3]:
yt = spark.read.json(f"{url}youtube*.json").rdd
# map the lines to a key-value pair
def map(line):
    snippet = line['snippet']
    # check if comment is a reply
    if snippet['topLevelComment'] is not None:
        snippet = snippet['topLevelComment']['snippet']
    # get date from snippet.publishedAt
    date = snippet['publishedAt'][0:10]
    return (("youtube", date), 1)

yt = yt.map(map)

yt = yt.reduceByKey(lambda x, y: x + y)

yt = yt.map(lambda x: (x[0][0], x[0][1], x[1]))

yt.take(5)

[('youtube', '2021-10-11', 41),
 ('youtube', '2021-04-14', 68),
 ('youtube', '2021-05-23', 16),
 ('youtube', '2021-04-16', 22),
 ('youtube', '2021-04-17', 16)]

## Twitter

In [4]:
twt = spark.read.json(f"{url}twitter*.json").rdd
# map the lines to a key-value pair
def map(line):
    date = line['created_at']
    # format date
    date = datetime.datetime.strptime(date, '%a %b %d %H:%M:%S %z %Y').strftime('%Y-%m-%d')
    return (("twitter", date), 1)

twt = twt.map(map)

twt = twt.reduceByKey(lambda x, y: x + y)

twt = twt.map(lambda x: (x[0][0], x[0][1], x[1]))

twt.take(5)

[('twitter', '2021-12-23', 51),
 ('twitter', '2021-12-22', 208),
 ('twitter', '2022-01-26', 593),
 ('twitter', '2021-12-29', 209),
 ('twitter', '2021-12-31', 77)]

## Facebook

In [5]:
fb = spark.read.json(f"{url}facebook*.json").rdd

def map(line):
    result = []
    postDate = line["created_time"][0:10]
    comments = line["comments"]["data"]
    result.append((("facebook", postDate), 1))
    for comment in comments:
        date = comment["created_time"][0:10]
        result.append((("facebook", date), 1))
    return result

fb = fb.flatMap(map)

fb = fb.reduceByKey(lambda x, y: x + y)

fb = fb.map(lambda x: (x[0][0], x[0][1], x[1]))
fb.take(5)

[('facebook', '2022-01-01', 125),
 ('facebook', '2021-12-08', 209),
 ('facebook', '2021-12-05', 32),
 ('facebook', '2021-11-24', 27),
 ('facebook', '2021-11-22', 28)]

## Others

In [6]:
others = spark.read.option("multiline","true").json(f"{url}*.json.json").rdd
# map the lines to a key-value pair
def map(line):
    graphImages = line['GraphImages']
    result = []
    if(graphImages is not None):
        for image in graphImages:
            if image['taken_at_timestamp'] is not None:
                date = datetime.datetime.fromtimestamp(image['taken_at_timestamp']).strftime('%Y-%m-%d')
                result.append((("instagram", date), 1))
            if image['comments'] is not None:
                comments = image['comments']['data']
                for comment in comments:
                    date = datetime.datetime.fromtimestamp(comment['created_at']).strftime('%Y-%m-%d')
                    result.append((("instagram", date), 1))
    return result

others = others.flatMap(map)

others = others.reduceByKey(lambda x, y: x + y)

others = others.map(lambda x: (x[0][0], x[0][1], x[1]))

others.take(5)

[(('instagram', '2022-02-12'), 572),
 (('instagram', '2022-02-05'), 68),
 (('instagram', '2022-01-09'), 13),
 (('instagram', '2022-01-07'), 13),
 (('instagram', '2021-12-01'), 1)]

## Combined

In [12]:
RDD = others.union(ig.union(yt.union(twt.union(fb))))
RDD.collect()

[(('instagram', '2022-02-12'), 572),
 (('instagram', '2022-02-05'), 68),
 (('instagram', '2022-01-09'), 13),
 (('instagram', '2022-01-07'), 13),
 (('instagram', '2021-12-01'), 1),
 (('instagram', '2021-11-20'), 3),
 (('instagram', '2021-11-06'), 2),
 (('instagram', '2021-10-15'), 1),
 (('instagram', '2021-09-18'), 1),
 (('instagram', '2021-09-15'), 1),
 (('instagram', '2021-09-10'), 2),
 (('instagram', '2022-02-10'), 529),
 (('instagram', '2022-02-07'), 478),
 (('instagram', '2022-02-06'), 211),
 (('instagram', '2022-01-27'), 8),
 (('instagram', '2022-01-26'), 7),
 (('instagram', '2022-01-22'), 24),
 (('instagram', '2022-02-02'), 1),
 (('instagram', '2022-01-20'), 12),
 (('instagram', '2022-01-14'), 11),
 (('instagram', '2022-01-06'), 40),
 (('instagram', '2022-01-03'), 8),
 (('instagram', '2021-12-31'), 7),
 (('instagram', '2021-12-30'), 21),
 (('instagram', '2021-12-27'), 2),
 (('instagram', '2021-12-06'), 1),
 (('instagram', '2021-11-30'), 1),
 (('instagram', '2021-11-11'), 3),
 (('

## Export

In [8]:
df = RDD.toDF(['social_media', 'date', 'count'])
df.show()

+------------+----------+-----+
|social_media|      date|count|
+------------+----------+-----+
|   instagram|2022-01-12|  439|
|   instagram|2022-01-26| 1235|
|   instagram|2022-01-19| 1357|
|   instagram|2022-01-04|  446|
|   instagram|2021-11-20|  150|
|   instagram|2021-11-12|  148|
|   instagram|2021-11-09|   65|
|   instagram|2021-11-03|   96|
|   instagram|2021-10-26|   44|
|   instagram|2021-10-19|   65|
|   instagram|2021-10-07|   38|
|   instagram|2021-08-05|    6|
|   instagram|2021-07-16|  110|
|   instagram|2021-06-24|   13|
|   instagram|2021-07-13|   10|
|   instagram|2021-04-11|    9|
|   instagram|2021-02-24|   21|
|   instagram|2021-02-18|   17|
|   instagram|2021-07-30|   14|
|   instagram|2021-08-20|    4|
+------------+----------+-----+
only showing top 20 rows



In [10]:
df.write.csv(out_url)

Py4JJavaError: An error occurred while calling o310.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:684)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 58.0 failed 1 times, most recent failure: Lost task 35.0 in stage 58.0 (TID 1584, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 3 fields are required while 2 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15$$anonfun$apply$15.applyOrElse(EvaluatePython.scala:184)
	at org.apache.spark.sql.execution.python.EvaluatePython$.org$apache$spark$sql$execution$python$EvaluatePython$$nullSafeConvert(EvaluatePython.scala:208)
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15.apply(EvaluatePython.scala:180)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:752)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:752)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
	... 10 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 3 fields are required while 2 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15$$anonfun$apply$15.applyOrElse(EvaluatePython.scala:184)
	at org.apache.spark.sql.execution.python.EvaluatePython$.org$apache$spark$sql$execution$python$EvaluatePython$$nullSafeConvert(EvaluatePython.scala:208)
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15.apply(EvaluatePython.scala:180)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:752)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:752)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
	... 10 more
