In [None]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=de656b63e8371a14717e0dfcaacf842ce29b97f00765543a796a84552ca53b81
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


Retail Data Analytics

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import findspark
findspark.init()

In [None]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
df = spark.read.option("delimiter", ";").option("inferSchema", "true").csv("retail-data.csv")

In [None]:
customerAmount = df.select(col("_c3").alias("CustomerID"),col("_c2").alias("Amount"))

In [None]:
results = customerAmount.groupBy("CustomerID").avg("Amount").withColumnRenamed("avg(Amount)", "AvgAmount").orderBy("CustomerID")

In [None]:
results.show()

Social Media Content Wrangling

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, explode, arrays_zip
from pyspark.sql.types import ArrayType, StructType
import findspark
findspark.init()

In [None]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
akunTwitter = spark.read.option("header", "true").option("inferSchema", "true").csv("AkunTwitter_POS.csv")

In [None]:
hashtagTwitter = spark.read.option("header", "true").option("inferSchema", "true").csv("HashtagTwitter_POS.csv")

In [None]:
instagram = spark.read.option("header", "true").option("inferSchema", "true").json("Instagram_POS.json")

In [None]:
akun = akunTwitter.select("username", col("tweet").alias("content")).withColumn("source", lit("Twitter"))

In [None]:
hashtag = hashtagTwitter.select("username", col("tweet").alias("content")).withColumn("source", lit("Twitter"))

In [None]:
ig = instagram.select("caption","comments.author","comments.comment")

In [None]:
igAuthorComment = ig.withColumn("comments", arrays_zip("author", "comment")) \
.withColumn("comments", explode("comments")) \
.select(col("comments.author").alias("username"), col("comments.comment").alias("content")) \
.withColumn("source", lit("Instagram"))

In [None]:
joinAll = akun.unionAll(hashtag).unionAll(igAuthorComment)

In [None]:
joinAll.coalesce(1).write.option("header", "true").csv("joinAll.csv")

In [None]:
spark.stop()

SparkSQL Data Analytics

In [None]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

findspark.init()

In [None]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
schema = StructType([ \
                     StructField("InvoiceNo", IntegerType(), True), \
                     StructField("StockCode", IntegerType(), True), \
                     StructField("Description", StringType(), True), \
                     StructField("Quantity", IntegerType(), True), \
                     StructField("InvoiceData", StringType(), True), \
                     StructField("Amount", FloatType(), True), \
                     StructField("CustomerID", FloatType(), True), \
                     StructField("Country", StringType(), True)])


In [None]:
df = spark.read.schema(schema).option("delimiter", ";").csv("retail-data-full.csv")

In [None]:
df.createOrReplaceTempView("retail_data_full")
results = spark.sql("SELECT CustomerID, min(Amount) FROM retail_data_full GROUP BY CustomerID ORDER BY CustomerID").collect()

In [None]:
for result in results:
  print(result)

OR

In [None]:
minAmount = df.select("CustomerID", "Amount")
results = minAmount.groupBy("CustomerID").min("Amount").sort("CustomerID").collect()

In [None]:
for result in results:
  print(result)

In [None]:
spark.stop()