Install Pyspark

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

Set Pyspark Environment

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

Import Pyspark Package

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

Upload File

In [0]:
from google.colab import files
uploaded = files.upload()

Saving Dataset.zip to Dataset.zip


In [0]:
!unzip Dataset.zip

Archive:  Dataset.zip
   creating: Dataset/
 extracting: Dataset/cities1.csv     
  inflating: Dataset/cities1_schema.csv  
 extracting: Dataset/cities2.csv     
   creating: Dataset/exercise/
  inflating: Dataset/exercise/AkunTwitter_POS.csv  
  inflating: Dataset/exercise/HashtagTwitter_POS.csv  
  inflating: Dataset/exercise/Instagram_POS.json  
   creating: Dataset/json/
 extracting: Dataset/json/user1.json  
 extracting: Dataset/json/user2.json  
 extracting: Dataset/pcodes.csv      
  inflating: Dataset/people-no-pcode.csv  
  inflating: Dataset/purplecow.txt   
 extracting: Dataset/zcodes.csv      


Initiate Spark

In [0]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

RDD

In [0]:
myRDD = spark.sparkContext. \
  textFile("Dataset/purplecow.txt")

In [0]:
for line in myRDD.take(4):
  print(line)

I've never seen a purple cow.
I never hope to see one;
But I can tell you, anyhow,
I'd rather see than be one.


In [0]:
userRDD = spark.sparkContext. \
  wholeTextFiles("Dataset/json")

In [0]:
for x in userRDD.collect():
  print(x)

('file:/content/Dataset/json/user1.json', '{ "firstName":"Fred", "lastName":"Flintstone", "userid":"123" }')
('file:/content/Dataset/json/user2.json', '{ "firstName":"Barney", "lastName":"Rubble", "userid":"234" }')


In [0]:
sc = spark.sparkContext

In [0]:
myData = ["Alice","Carlos","Frank","Barbara"]
myRDD = sc.parallelize(myData)

In [0]:
for line in myRDD.take(2):
  print(line)

Alice
Carlos


In [0]:
distinctRDD = sc.textFile("Dataset/cities1.csv"). \
  distinct()

for city in distinctRDD.collect():
  print(city)

Boston,MA
Palo Alto,CA
Santa Fe,NM


In [0]:
unionRDD = sc.textFile("Dataset/cities2.csv"). \
  union(distinctRDD)

for city in unionRDD.collect():
  print(city)

Calgary,AB
Chicago,IL
Palo Alto,CA
Boston,MA
Palo Alto,CA
Santa Fe,NM


Dataframe

In [0]:
myDF = spark.read.format("csv").load("Dataset/cities1_schema.csv")
myDF.show()
myDF.printSchema()

+---------+-----+----------+
|      _c0|  _c1|       _c2|
+---------+-----+----------+
|     City|State|Population|
|   Boston|   MA|      1000|
|Palo Alto|   CA|      2000|
| Santa Fe|   NM|      3000|
|Palo Alto|   CA|      4000|
+---------+-----+----------+

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [0]:
myDF = spark.read. \
  format("csv"). \
  option("inferSchema","true"). \
  option("header","true"). \
  load("Dataset/cities1_schema.csv")
myDF.show()
myDF.printSchema()

+---------+-----+----------+
|     City|State|Population|
+---------+-----+----------+
|   Boston|   MA|      1000|
|Palo Alto|   CA|      2000|
| Santa Fe|   NM|      3000|
|Palo Alto|   CA|      4000|
+---------+-----+----------+

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Population: integer (nullable = true)



In [0]:
myDF = spark.read.format("json").load("Dataset/json/user1.json")

In [0]:
myDF.printSchema()
myDF.show()

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- userid: string (nullable = true)

+---------+----------+------+
|firstName|  lastName|userid|
+---------+----------+------+
|     Fred|Flintstone|   123|
+---------+----------+------+



In [0]:
mydata = [["Josiah","Bartlett",33],["Harry","Potter",20]]
myDF = spark.createDataFrame(mydata)
myDF.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [0]:
columnsList = [
  StructField("firstName", StringType()),
  StructField("lastName", StringType()),
  StructField("age", IntegerType())]

schema = StructType(columnsList)

In [0]:
myDF = spark.createDataFrame(mydata, schema)
myDF.printSchema()
myDF.show()

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- age: integer (nullable = true)

+---------+--------+---+
|firstName|lastName|age|
+---------+--------+---+
|   Josiah|Bartlett| 33|
|    Harry|  Potter| 20|
+---------+--------+---+



In [0]:
myDF.write. \
  mode("append"). \
  saveAsTable("my_db.my_table")

AnalysisException: ignored

In [0]:
myDF.write.json("mydata")

In [0]:
myDF.coalesce(1).write.mode("overwrite").json("mydata")

In [0]:
myDF.select("lastName", "age").show()

+--------+---+
|lastName|age|
+--------+---+
|Bartlett| 33|
|  Potter| 20|
+--------+---+



In [0]:
myDF.where(myDF["age"] > 20).show()

+---------+--------+---+
|firstName|lastName|age|
+---------+--------+---+
|   Josiah|Bartlett| 33|
+---------+--------+---+



In [0]:
myDF.select(myDF["lastName"].alias("Last Name"), col("age")*10).show()

+---------+----------+
|Last Name|(age * 10)|
+---------+----------+
| Bartlett|       330|
|   Potter|       200|
+---------+----------+



In [0]:
myDF = myDF.withColumn("age_10", col("age")*10)
myDF.show()

+---------+--------+---+------+
|firstName|lastName|age|age_10|
+---------+--------+---+------+
|   Josiah|Bartlett| 33|   330|
|    Harry|  Potter| 20|   200|
+---------+--------+---+------+



In [0]:
myDF = myDF.withColumn("age_group", when(col("age") > 25, lit("Adult")).otherwise(lit("Kid")))
myDF.show()

+---------+--------+---+------+---------+
|firstName|lastName|age|age_10|age_group|
+---------+--------+---+------+---------+
|   Josiah|Bartlett| 33|   330|    Adult|
|    Harry|  Potter| 20|   200|      Kid|
+---------+--------+---+------+---------+



In [0]:
myDF = myDF.withColumn("fullName", concat(col("firstName"), lit(" "), col("lastName")))
myDF.show()

+---------+--------+---+------+---------+---------------+
|firstName|lastName|age|age_10|age_group|       fullName|
+---------+--------+---+------+---------+---------------+
|   Josiah|Bartlett| 33|   330|    Adult|Josiah Bartlett|
|    Harry|  Potter| 20|   200|      Kid|   Harry Potter|
+---------+--------+---+------+---------+---------------+



In [0]:
peopleDF = spark.read.option("header","true").csv("Dataset/people-no-pcode.csv")
pcodesDF = spark.read.option("header","true").csv("Dataset/pcodes.csv")
peopleDF.show()
pcodesDF.show()

+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|02134|  Hopper|    Grace| 52|
| null|  Turing|     Alan| 32|
|94020|Lovelace|      Ada| 28|
|87501| Babbage|  Charles| 49|
|02134|   Wirth|  Niklaus| 48|
+-----+--------+---------+---+

+-----+---------+-----+
|pcode|     city|state|
+-----+---------+-----+
|02134|   Boston|   MA|
|94020|Palo Alto|   CA|
|87501| Santa Fe|   NM|
|60645|  Chicago|   IL|
+-----+---------+-----+



In [0]:
peopleDF.join(pcodesDF, "pcode").show()
peopleDF.join(pcodesDF, "pcode", "left_outer") \
.where(col("city").isNotNull()) \
.show()

+-----+--------+---------+---+---------+-----+
|pcode|lastName|firstName|age|     city|state|
+-----+--------+---------+---+---------+-----+
|02134|  Hopper|    Grace| 52|   Boston|   MA|
|94020|Lovelace|      Ada| 28|Palo Alto|   CA|
|87501| Babbage|  Charles| 49| Santa Fe|   NM|
|02134|   Wirth|  Niklaus| 48|   Boston|   MA|
+-----+--------+---------+---+---------+-----+

+-----+--------+---------+---+---------+-----+
|pcode|lastName|firstName|age|     city|state|
+-----+--------+---------+---+---------+-----+
|02134|  Hopper|    Grace| 52|   Boston|   MA|
|94020|Lovelace|      Ada| 28|Palo Alto|   CA|
|87501| Babbage|  Charles| 49| Santa Fe|   NM|
|02134|   Wirth|  Niklaus| 48|   Boston|   MA|
+-----+--------+---------+---+---------+-----+



In [0]:
zcodesDF = spark.read.option("header","true").csv("Dataset/zcodes.csv")
zcodesDF.show()

+-----+---------+-----+
|  zip|     city|state|
+-----+---------+-----+
|02134|   Boston|   MA|
|94020|Palo Alto|   CA|
|87501| Santa Fe|   NM|
|60645|  Chicago|   IL|
+-----+---------+-----+



In [0]:
peopleDF.join(zcodesDF, peopleDF["pcode"] == zcodesDF["zip"]).show()
pcodesDF.alias("left").join(zcodesDF.alias("right"), pcodesDF["pcode"] == zcodesDF["zip"]) \
.select("left.*").show()

+-----+--------+---------+---+-----+---------+-----+
|pcode|lastName|firstName|age|  zip|     city|state|
+-----+--------+---------+---+-----+---------+-----+
|02134|  Hopper|    Grace| 52|02134|   Boston|   MA|
|94020|Lovelace|      Ada| 28|94020|Palo Alto|   CA|
|87501| Babbage|  Charles| 49|87501| Santa Fe|   NM|
|02134|   Wirth|  Niklaus| 48|02134|   Boston|   MA|
+-----+--------+---------+---+-----+---------+-----+

+-----+---------+-----+
|pcode|     city|state|
+-----+---------+-----+
|02134|   Boston|   MA|
|94020|Palo Alto|   CA|
|87501| Santa Fe|   NM|
|60645|  Chicago|   IL|
+-----+---------+-----+



Exercise 1

Load dataframe dari 3 data yang berada di folder exercise:
1. AkunTwitter_POS.csv
2. HashtagTwitter_POS.csv
3. Instagram_POS.json

Check schema dari masing-masing dataframe. Data tersebut berasal dari 2 sosial media yang berbeda: twitter & instagram. 

Gabungan 3 dataframe diatas agar menjadi 1 format. Menjadi format:

**| Username | Content  | Source    |**

| posindo    | Hi, posindo | Twitter   |

| fathur12   | Halo, tolong     | Instagram |


Hint: sesuaikan 'content' agar merepresentasikan aspirasi user di masing-masing sosial media

https://stackoverflow.com/questions/37471346/automatically-and-elegantly-flatten-dataframe-in-spark-sql
https://stackoverflow.com/questions/41027315/pyspark-split-multiple-array-columns-into-rows