Install Pyspark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

Set Environment

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

Import Pyspark

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

Upload Dataset

In [4]:
from google.colab import files
uploaded = files.upload()

Saving Dataset.zip to Dataset.zip


In [5]:
!unzip Dataset.zip

Archive:  Dataset.zip
   creating: Dataset/
 extracting: Dataset/cities1.csv     
  inflating: Dataset/cities1_schema.csv  
 extracting: Dataset/cities2.csv     
   creating: Dataset/exercise/
  inflating: Dataset/exercise/AkunTwitter_POS.csv  
  inflating: Dataset/exercise/HashtagTwitter_POS.csv  
  inflating: Dataset/exercise/Instagram_POS.json  
   creating: Dataset/json/
 extracting: Dataset/json/user1.json  
 extracting: Dataset/json/user2.json  
 extracting: Dataset/pcodes.csv      
  inflating: Dataset/people-no-pcode.csv  
  inflating: Dataset/purplecow.txt   
 extracting: Dataset/zcodes.csv      


Initiate Spark

In [6]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

In [7]:
spark

Load Dataframe

In [8]:
# dataframe akun twitter
DFakun= spark.read. \
  format("csv"). \
  option("inferSchema","true"). \
  option("header","true"). \
  load("Dataset/exercise/AkunTwitter_POS.csv")

In [9]:
# dataframe hashtag twitter
DFtag= spark.read. \
  format("csv"). \
  option("inferSchema","true"). \
  option("header","true"). \
  load("Dataset/exercise/HashtagTwitter_POS.csv")

In [10]:
# dataframe instagram
DFinsta= spark.read.format("json").load("Dataset/exercise/Instagram_POS.json")

Filter Dataframe

In [165]:
# filter dataframe akun
DFakunfilter= DFakun.select('username', col('tweet').alias('content')).where(col('username')!='posindonesia')
DFakunfilter= DFakunfilter.withColumn('source', lit('twitter'))
DFakunfilter.na.drop('any')
DFakunfilter.distinct()
DFakunfilter.show(truncate=False)

+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|username    |content                                                                                                                                                                                                                                                                                                          |source |
+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|kabarinegeri

In [157]:
# filter dataframe hashtag
DFtagfilter= DFtag.select('username', col('tweet').alias('content')).where(col('hashtags').like('%posindonesia%') & ~col('username').like('%pos%'))
DFtagfilter= DFtagfilter.withColumn('source', lit('twitter'))
DFtagfilter.na.drop('any')
DFtagfilter.distinct()
DFtagfilter.show(truncate=False)

+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|username       |content                                                                                                                                                                                                                                                                                                                                                                                                                                    |source |
+---------------+---------------------------------------------------------------------------

In [73]:
# fungsi untuk memisahkan struct menjadi array
def FlatDF(schema, prefix=None):
        fields = []
        for field in schema.fields:
            name = prefix + '.' + field.name if prefix else field.name
            dtype = field.dataType
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType

            if isinstance(dtype, StructType):
                fields += FlatDF(dtype, prefix=name)
            else:
                fields.append(name)

        return fields

In [116]:
# flaten dataframe instagram
DFinstaflat= DFinsta.select(FlatDF(DFinsta.schema))

In [159]:
# filter dataframe instagram
DFinstafilter= DFinstaflat.select(col('author')[1].alias('username'), col('comment')[1].alias('content')).where(col('author')[1]!='posindonesia.ig')
DFinstafilter= DFinstafilter.withColumn('source', lit('instagram'))
DFinstafilter.na.drop('any')
DFinstafilter.distinct()
DFinstafilter.show(truncate=False)

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|username           |content                                                                                                                                                                                          |source   |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|griyakulakannganjuk|Kirim paket belum di terima tapi sudah bilang di tolak alasan rusak. Itu gimama yaa kak. Di terima aja belum tapi sudah bilang rusak.                                                            |instagram|
|ojombokfolou       |SANGAT KECEWA. saya kirim barang dr Jakarta ke Makassar dng kilat khusus, t

Union Dataframe

In [167]:
# gabung dataframe yang telah difilter
DFkonten= DFtagfilter.union(DFakunfilter).union(DFinstafilter)
DFkonten.show()

+---------------+--------------------+-------+
|       username|             content| source|
+---------------+--------------------+-------+
|       detikcom|Resi Pos merupaka...|twitter|
|         k59300|#Repost posindone...|twitter|
| amantepatwaktu|Kenali Pos sedari...|twitter|
|         k59300|#Repost posindone...|twitter|
|      tokondutz|Sale cd audio  #s...|twitter|
|      tokondutz|Sale cd audio  #s...|twitter|
| aboben_variasi|Kita ga cuma mela...|twitter|
|   therapistsby|Ready stock #dild...|twitter|
|      hardy49jr|Riding lagi.. ngu...|twitter|
|hanayuniartii11|At-- #PosIndonesi...|twitter|
|      agityagit|IKATAN MOTOR POS ...|twitter|
|         k59300|Acara Reward Q1 T...|twitter|
|    kosasihsony|@PosIndonesia sti...|twitter|
| amantepatwaktu|Cinta adalah jeni...|twitter|
|         k59300|#Repost posindone...|twitter|
|ruritandiansyah|Sehingga lanjut b...|twitter|
|   penembaklove|Ramadhan sebentar...|twitter|
|       gembulzs|Ini cuma genangan...|twitter|
|       gembu