<a href="https://colab.research.google.com/github/Athoillah21/Homework-DigitalSkola/blob/main/Homework_PySpark_Muhammad_Atho'illah.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Homework PySpark - Muhammad Atho'illah

**1. Install Java and Spark**

In [24]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [25]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

**2. Import Dependencies**

In [26]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

**3. Upload and Unzip Data**

In [27]:
from google.colab import files
uploaded = files.upload()

In [28]:
!unzip Dataset.zip

**4. Initializing Spark**

In [29]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("homework") \
    .getOrCreate()

**5. Read Data**

In [30]:
dft_h = spark.read.format("csv").option("parserLib", "univocity").option("header", "true").option("delimiter", ",").load("Dataset/exercise/HashtagTwitter_POS.csv")
dft_a = spark.read.format("csv").option("parserLib", "univocity").option("header", "true").option("delimiter", ",").load("Dataset/exercise/AkunTwitter_POS.csv")
dfig  = spark.read.format("json").load("Dataset/exercise/Instagram_POS.json")

In [31]:
dft_h.printSchema()
dft_a.printSchema()
dfig.printSchema()

root
 |-- id: string (nullable = true)
 |-- conversation_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- username: string (nullable = true)
 |-- name: string (nullable = true)
 |-- place: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- urls: string (nullable = true)
 |-- photos: string (nullable = true)
 |-- replies_count: string (nullable = true)
 |-- retweets_count: string (nullable = true)
 |-- likes_count: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- cashtags: string (nullable = true)
 |-- link: string (nullable = true)
 |-- retweet: string (nullable = true)
 |-- quote_url: string (nullable = true)
 |-- video: string (nullable = true)
 |-- user_rt_id: string (nullable = true)
 |-- near: string (nullable = true)
 |-- geo: st

**6. Flattening Dataframe (Struct to Array)**

In [32]:
def FlatDF(schema, prefix=None):
        fields = []
        for field in schema.fields:
            name = prefix + '.' + field.name if prefix else field.name
            dtype = field.dataType
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType

            if isinstance(dtype, StructType):
                fields += FlatDF(dtype, prefix=name)
            else:
                fields.append(name)

        return fields

In [33]:
dfig = dfig.select(FlatDF(dfig.schema))

In [34]:
dfig.printSchema()
dfig.show()

root
 |-- caption: string (nullable = true)
 |-- author: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- comment: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- datetime: string (nullable = true)
 |-- img_urls: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- key: string (nullable = true)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             caption|              author|             comment|            datetime|            img_urls|                 key|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Oranger menjaga s...|[mamah.asyraf, po...|[Cek DM, Baik, Sa...|2019-07-13T04:06:...|[https://scontent...|https://www.insta...|
|Oranger, layanan ...|[rizqimuhammad77,...|[@ari_hr04 @adi04...|2019-07-12T09:55:...|[https://scontent...|http

In [35]:
dfig.select(col("author")[1], col("comment")[1]).show()

+-------------------+--------------------+
|          author[1]|          comment[1]|
+-------------------+--------------------+
|    posindonesia.ig|Baik, Sahabat, mo...|
|griyakulakannganjuk|Kirim paket belum...|
|       ojombokfolou|SANGAT KECEWA. sa...|
|    posindonesia.ig|Hai sahabat, kiri...|
|   jovian_aryodhito|@mamah.asyraf trs...|
|    posindonesia.ig|Hai sahabat, kiri...|
|    posindonesia.ig|Halo Sahabat pos....|
|    posindonesia.ig|Layanan tersebut ...|
|          leylyeyle|@posindonesia.ig ...|
|      irenerufianti|Min.. kirim parce...|
|               null|                null|
|               null|                null|
|    posindonesia.ig|Baik Sahabat, jik...|
|    posindonesia.ig|Sama-sama Sahabat...|
|             xxyn99|@syamsul_manalu23...|
|        nyomandante|Semoga mendapat t...|
|     intanardianty_|Tingkatkan pelaya...|
|        nana_munzir|Ka. Kenapa ya pak...|
|    posindonesia.ig|Mohon maaf atas k...|
|    posindonesia.ig|Halo sahabat pos....|
+----------

**7. Filter Only Author, Comment and add Source (Instagram)**

In [63]:
dfig_clean = dfig.withColumn("tmp", arrays_zip("author", "comment")) \
          .withColumn("tmp", explode("tmp")) \
          .select(col("tmp.author"), col("tmp.comment"))

In [64]:
dfig_clean = dfig_clean.withColumn("source", lit("Instagram"))
dfig_clean.show()

+-------------------+--------------------+---------+
|             author|             comment|   source|
+-------------------+--------------------+---------+
|       mamah.asyraf|              Cek DM|Instagram|
|    posindonesia.ig|Baik, Sahabat, mo...|Instagram|
|    akhirini_husein|Kiriman saya tida...|Instagram|
|    posindonesia.ig|Halo, Sahabat, bi...|Instagram|
|             figha_|Pengambilan paket...|Instagram|
|    posindonesia.ig|Halo, Sahabat. Mo...|Instagram|
|             figha_|@posindonesia.ig ...|Instagram|
|    posindonesia.ig|Sama-sama Sahabat...|Instagram|
|       phytastoreso|Kak tolong cek dm...|Instagram|
|    posindonesia.ig|Baik, telah kami ...|Instagram|
|             figha_|Kalo mengambil pa...|Instagram|
|    rizqimuhammad77|@ari_hr04 @adi047...|Instagram|
|griyakulakannganjuk|Kirim paket belum...|Instagram|
|             snttaa|Tolong cek STR040...|Instagram|
|           widie489|https://ecoracing...|Instagram|
|    posindonesia.ig|Boleh diinformasi...|Inst

**8. Filter Only Username, Tweet and add Source (Twitter_Account)**

In [65]:
dft_a_clean = dft_a.select("username", "tweet", lit("Twitter").alias("source"))
dft_a_clean.show()

+------------+--------------------+-------+
|    username|               tweet| source|
+------------+--------------------+-------+
|posindonesia|Baik sahabat, tel...|Twitter|
|posindonesia|Hai sahabat. Kiri...|Twitter|
|posindonesia|/ handphone pener...|Twitter|
|posindonesia|Hai sahabat, kiri...|Twitter|
|posindonesia|Oranger menjaga s...|Twitter|
|posindonesia|Hai sahabat. Untu...|Twitter|
|posindonesia|Terima kasih tela...|Twitter|
|posindonesia|Hai sahabat. Moho...|Twitter|
|posindonesia|Hai sahabat. Moho...|Twitter|
|posindonesia|Sama - sama sahab...|Twitter|
|posindonesia|Hai sahabat. Kiri...|Twitter|
|posindonesia|biaya pelalubeaan...|Twitter|
|posindonesia|Halo Sahabat, seb...|Twitter|
|posindonesia|Berdasarkan hasil...|Twitter|
|posindonesia|Mohon diinformasi...|Twitter|
|posindonesia|kami informasikan...|Twitter|
|posindonesia|Baik Sahabat, Moh...|Twitter|
|posindonesia|Halo Sahabat. Moh...|Twitter|
|posindonesia|Halo Sahabat pos....|Twitter|
|posindonesia|Baik sahabat moho.

**9. Filter Only Username, Tweet and add Source (Twitter_Hashtag)**

In [66]:
dft_h_clean = dft_h.select("username", "tweet", lit("Twitter").alias("source"))
dft_h_clean.show()

+--------------+--------------------+-------+
|      username|               tweet| source|
+--------------+--------------------+-------+
|  kantorpospbg|Ada Kiriman Uang?...|Twitter|
|      detikcom|Resi Pos merupaka...|Twitter|
|  posindonesia|Oranger, layanan ...|Twitter|
|        k59300|#Repost posindone...|Twitter|
|  posindonesia|Kiriman lewat Pos...|Twitter|
|amantepatwaktu|Kenali Pos sedari...|Twitter|
|  kantorpospbg|#Repost posindone...|Twitter|
|  posindonesia|Mau layanan Pos I...|Twitter|
|  posindonesia|Oranger Pos Indon...|Twitter|
|        k59300|#Repost posindone...|Twitter|
|     tokondutz|Sale cd audio  #s...|Twitter|
|     tokondutz|Sale cd audio  #s...|Twitter|
|  kantorpospbg|#Repost posindone...|Twitter|
|  posindonesia|Hati-hati dalam m...|Twitter|
|aboben_variasi|Kita ga cuma mela...|Twitter|
|  therapistsby|Ready stock #dild...|Twitter|
|  posindonesia|Kantor pos sekara...|Twitter|
|  posindonesia|Pengen kirim untu...|Twitter|
|     hardy49jr|Riding lagi.. ngu.

In [67]:
dfig_clean = dfig_clean.select(col("author").alias("username"), col("comment").alias("content"), "source")
dft_a_clean = dft_a_clean.select("username", col("tweet").alias("content"), "source")
dft_h_clean = dft_h_clean.select("username", col("tweet").alias("content"), "source")

**10. Union Data from All Source**

In [68]:
from functools import reduce
from pyspark.sql import DataFrame

dfs = [dfig_clean, dft_a_clean, dft_h_clean]

df_union = reduce(DataFrame.unionAll, dfs)

In [69]:
df_union.show()

+-------------------+--------------------+---------+
|           username|             content|   source|
+-------------------+--------------------+---------+
|       mamah.asyraf|              Cek DM|Instagram|
|    posindonesia.ig|Baik, Sahabat, mo...|Instagram|
|    akhirini_husein|Kiriman saya tida...|Instagram|
|    posindonesia.ig|Halo, Sahabat, bi...|Instagram|
|             figha_|Pengambilan paket...|Instagram|
|    posindonesia.ig|Halo, Sahabat. Mo...|Instagram|
|             figha_|@posindonesia.ig ...|Instagram|
|    posindonesia.ig|Sama-sama Sahabat...|Instagram|
|       phytastoreso|Kak tolong cek dm...|Instagram|
|    posindonesia.ig|Baik, telah kami ...|Instagram|
|             figha_|Kalo mengambil pa...|Instagram|
|    rizqimuhammad77|@ari_hr04 @adi047...|Instagram|
|griyakulakannganjuk|Kirim paket belum...|Instagram|
|             snttaa|Tolong cek STR040...|Instagram|
|           widie489|https://ecoracing...|Instagram|
|    posindonesia.ig|Boleh diinformasi...|Inst

**11. Filter Final Content without Special Character and Pos Indonesia**

In [72]:
for c, t in df_union.dtypes:
        if t == "string":
            df_union = df_union.withColumn(c, regexp_replace(c, "[^\w\s]", ""))

In [73]:
df_union.show()

+-------------------+--------------------+---------+
|           username|             content|   source|
+-------------------+--------------------+---------+
|        mamahasyraf|              Cek DM|Instagram|
|     posindonesiaig|Baik Sahabat moho...|Instagram|
|    akhirini_husein|Kiriman saya tida...|Instagram|
|     posindonesiaig|Halo Sahabat bisa...|Instagram|
|             figha_|Pengambilan paket...|Instagram|
|     posindonesiaig|Halo Sahabat Moho...|Instagram|
|             figha_|posindonesiaig te...|Instagram|
|     posindonesiaig|Samasama Sahabat ...|Instagram|
|       phytastoreso|Kak tolong cek dm...|Instagram|
|     posindonesiaig|Baik telah kami r...|Instagram|
|             figha_|Kalo mengambil pa...|Instagram|
|    rizqimuhammad77|ari_hr04 adi04731...|Instagram|
|griyakulakannganjuk|Kirim paket belum...|Instagram|
|             snttaa|Tolong cek STR040...|Instagram|
|           widie489|httpsecoracingwin...|Instagram|
|     posindonesiaig|Boleh diinformasi...|Inst

In [76]:
for c, t in df_union.dtypes:
        if t == "string":
            df_union = df_union.filter((df_union.username != 'posindonesiaig')&(df_union.username !='posindonesia'))

In [77]:
df_union.show()

+-------------------+--------------------+---------+
|           username|             content|   source|
+-------------------+--------------------+---------+
|        mamahasyraf|              Cek DM|Instagram|
|    akhirini_husein|Kiriman saya tida...|Instagram|
|             figha_|Pengambilan paket...|Instagram|
|             figha_|posindonesiaig te...|Instagram|
|       phytastoreso|Kak tolong cek dm...|Instagram|
|             figha_|Kalo mengambil pa...|Instagram|
|    rizqimuhammad77|ari_hr04 adi04731...|Instagram|
|griyakulakannganjuk|Kirim paket belum...|Instagram|
|             snttaa|Tolong cek STR040...|Instagram|
|           widie489|httpsecoracingwin...|Instagram|
|    keziacatherines|Min bis Surat sud...|Instagram|
|      audreygraciam|Hi min tolong lia...|Instagram|
|  ptmitajayamandiri|Pagi mau tanya un...|Instagram|
|  ptmitajayamandiri|untuk kantor yang...|Instagram|
|          lutviyyah|Pagi tolong di ch...|Instagram|
|         wawannnn__|Kak kok paket say...|Inst

**12. Save to CSV**

In [78]:
df_union.coalesce(1).write.format("csv").mode("overwrite").option("header", "true").save('final_data.csv')