## 1. PySpark first impression using Colab

In [1]:
# Download Java and Spark

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
# Set up the paths

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [3]:
# Create a Spark session

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark.conf.set("spark.sql.caseSensitive", True) # Avoid error "Found duplicate column(s) in the data schema"
spark

### 1.0 Import data from csv

In [5]:
# Upload 'Airline-Sentiment.csv' to the runtime.
# Note that the data will be deleted automatically if the runtime is inactive for a while.

# Load the csv into a dataframe
myDF = spark.read.csv("Airline-Sentiment.csv",header=True,encoding='latin1',escape="\"",multiLine=True)

In [6]:
# Take a look

myDF.show()

+-----------------+--------------+-------------+--------------------+-----------+-------------+--------------------+--------------------+
|airline_sentiment|       airline|retweet_count|                text|tweet_coord|tweet_created|      tweet_location|       user_timezone|
+-----------------+--------------+-------------+--------------------+-----------+-------------+--------------------+--------------------+
|          neutral|Virgin America|            0|@VirginAmerica Wh...|       null|2/24/15 11:35|                null|Eastern Time (US ...|
|         positive|Virgin America|            0|@VirginAmerica pl...|       null|2/24/15 11:15|                null|Pacific Time (US ...|
|          neutral|Virgin America|            0|@VirginAmerica I ...|       null|2/24/15 11:15|           Lets Play|Central Time (US ...|
|         negative|Virgin America|            0|@VirginAmerica it...|       null|2/24/15 11:15|                null|Pacific Time (US ...|
|         negative|Virgin America|

In [7]:
# Check the type

type(myDF)

In [8]:
# Check the total number of rows

myDF.count()

14640

In [9]:
# Understand the data types of each column

myDF.printSchema()

root
 |-- airline_sentiment: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- retweet_count: string (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_coord: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)



In [10]:
# Subset dataframe by columns

myDF.select(["airline","airline_sentiment"]).show()

+--------------+-----------------+
|       airline|airline_sentiment|
+--------------+-----------------+
|Virgin America|          neutral|
|Virgin America|         positive|
|Virgin America|          neutral|
|Virgin America|         negative|
|Virgin America|         negative|
|Virgin America|         negative|
|Virgin America|         positive|
|Virgin America|          neutral|
|Virgin America|         positive|
|Virgin America|         positive|
|Virgin America|          neutral|
|Virgin America|         positive|
|Virgin America|         positive|
|Virgin America|         positive|
|Virgin America|         positive|
|Virgin America|         negative|
|Virgin America|         positive|
|Virgin America|         negative|
|Virgin America|         positive|
|Virgin America|         positive|
+--------------+-----------------+
only showing top 20 rows



In [11]:
# Get all the airlines names

airlines = myDF.select("airline").distinct()
airlines.show()

+--------------+
|       airline|
+--------------+
|         Delta|
|Virgin America|
|        United|
|    US Airways|
|     Southwest|
|      American|
+--------------+



In [12]:
# Count number of tweets per airline

tweetsPerAirline = myDF.groupBy("airline").count()
tweetsPerAirline.show()

+--------------+-----+
|       airline|count|
+--------------+-----+
|         Delta| 2222|
|Virgin America|  504|
|        United| 3822|
|    US Airways| 2913|
|     Southwest| 2420|
|      American| 2759|
+--------------+-----+



In [13]:
# GroupBy multiple columns

myDF.groupBy("airline","airline_sentiment").count().show()

+--------------+-----------------+-----+
|       airline|airline_sentiment|count|
+--------------+-----------------+-----+
|     Southwest|         negative| 1186|
|      American|         negative| 1960|
|Virgin America|          neutral|  171|
|    US Airways|          neutral|  381|
|        United|         positive|  492|
|    US Airways|         negative| 2263|
|      American|         positive|  336|
|         Delta|         negative|  955|
|Virgin America|         positive|  152|
|     Southwest|          neutral|  664|
|        United|          neutral|  697|
|     Southwest|         positive|  570|
|         Delta|         positive|  544|
|         Delta|          neutral|  723|
|    US Airways|         positive|  269|
|        United|         negative| 2633|
|      American|          neutral|  463|
|Virgin America|         negative|  181|
+--------------+-----------------+-----+



### <font color=blue>Use groupBy to show the number of tweets per sentiment category (i.e., positive, negative, neutral)</font>

In [14]:
myDF.groupBy("airline_sentiment").count().show()

+-----------------+-----+
|airline_sentiment|count|
+-----------------+-----+
|         positive| 2363|
|          neutral| 3099|
|         negative| 9178|
+-----------------+-----+



### <font color=blue>Sort by airlines and then by sentiments for the results returned by myDF.groupBy("airline","airline_sentiment").count().show()</font>

In [15]:
# Simply use orderBy

myDF.groupBy("airline","airline_sentiment").count().orderBy("airline","airline_sentiment").show()

+--------------+-----------------+-----+
|       airline|airline_sentiment|count|
+--------------+-----------------+-----+
|      American|         negative| 1960|
|      American|          neutral|  463|
|      American|         positive|  336|
|         Delta|         negative|  955|
|         Delta|          neutral|  723|
|         Delta|         positive|  544|
|     Southwest|         negative| 1186|
|     Southwest|          neutral|  664|
|     Southwest|         positive|  570|
|    US Airways|         negative| 2263|
|    US Airways|          neutral|  381|
|    US Airways|         positive|  269|
|        United|         negative| 2633|
|        United|          neutral|  697|
|        United|         positive|  492|
|Virgin America|         negative|  181|
|Virgin America|          neutral|  171|
|Virgin America|         positive|  152|
+--------------+-----------------+-----+



### 1.1 Import data from json

### Please go to https://nijianmo.github.io/amazon/index.html and download the review data for "Magazine Subscriptions" (5-core, 2,375 reviews). Please then upload this file to the Colab runtime before running the code below.

In [16]:
# Compressed json file can be read directly with spark!

myReview = spark.read.json("Magazine_Subscriptions_5.json.gz")

In [17]:
# Take a look

myReview.show()

+----------+-----+-------+--------------------+-----------+--------------+------------------+-----------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|      reviewerName|            style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+------------------+-----------------+--------------------+--------------+--------+----+
|B00005N7P0| null|    4.0|I'm old, and so i...|02 26, 2014| A5QQOOZJOVPSF| John L. Mehlmauer|             null|   Cheapskates guide|    1393372800|    true|null|
|B00005N7PS| null|    5.0|There's nothing t...| 03 6, 2004| A5RHZE7B8SV5Q|    gorillazfan249|             null|The best mature M...|    1078531200|   false|   3|
|B00005N7PS| null|    1.0|If you're the kin...|07 15, 2003| A1RPTVW5VEOSI|Michael J. Edelman|             null|THE Magazine for ...|    1058227200|   false|  17|
|B00005N7PS| null|    1.0|No

In [18]:
# Check the total number of rows

myReview.count()

2375

In [19]:
# Understand the data types of each column

myReview.printSchema()

root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Format:: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: string (nullable = true)



In [20]:
# Subset dataframe by rows
# SQL can be supplied as string input for where() function

# Return the reviews with rating >= 4.0
myReview.where("overall >= 4").show()

+----------+-----+-------+--------------------+-----------+--------------+-----------------+-----------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|     reviewerName|            style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+-----------------+-----------------+--------------------+--------------+--------+----+
|B00005N7P0| null|    4.0|I'm old, and so i...|02 26, 2014| A5QQOOZJOVPSF|John L. Mehlmauer|             null|   Cheapskates guide|    1393372800|    true|null|
|B00005N7PS| null|    5.0|There's nothing t...| 03 6, 2004| A5RHZE7B8SV5Q|   gorillazfan249|             null|The best mature M...|    1078531200|   false|   3|
|B00005N7P0| null|    5.0|When PC Magazine ...| 10 5, 2010|A1IU9VPCBKZPE8|     Randolph Eck|             null|Excellent Compute...|    1286236800|    true|   2|
|B00005N7PS| null|    4.0|Details 

In [21]:
# Another example counting how many reviews for a particular product
# Don't forget the quotes within the statement quotes

myReview.where("asin = 'B00005N7P0'").count()

6

In [22]:
# More operations are available from Spark's SQL functions

from pyspark.sql import functions as f

In [23]:
# A slightly more complicated one, but very informative
# It shows how many reviews are provided by each unique reviewer and their average ratings.
# Note that in Python you can break lines within brackets for better visual

myReview.groupBy("reviewerID").agg(
    f.collect_set("asin").alias("products"),
    f.count("asin").alias("num_reviews"),
    f.avg("overall").alias("rating")
).sort("num_reviews",ascending=False).show()

+--------------+--------------------+-----------+------------------+
|    reviewerID|            products|num_reviews|            rating|
+--------------+--------------------+-----------+------------------+
|A3JPFWKS83R49V|[B00005N7QI, B000...|         30|3.7333333333333334|
|A2OTUWUSH49XIN|[B000W3MB5M, B000...|         20|              3.95|
|A3GA09FYFKL4EY|[B00005N7TB, B000...|         19|3.6842105263157894|
| AA14AMM03HMXW|[B001LF4EVO, B000...|         19|3.6842105263157894|
| AVF9FV7AMRP5C|[B00007AX0N, B000...|         19| 4.052631578947368|
| AKMEY1BSHSDG7|[B001LF4EVO, B000...|         18|               4.5|
| A8N76G7E26DM4|[B00005NIN8, B000...|         16|            3.5625|
|A2BSKT65F16LAQ|[B000XXDJ70, B000...|         15|               4.8|
|A21FFUC8BEMDJ7|[B00005N7PN, B000...|         14| 4.785714285714286|
|A281NPSIMI1C2R|[B00005N7SL, B000...|         14| 4.857142857142857|
| AXV3OJG9BZ2JW|[B00005N7TG, B000...|         13|               5.0|
|A1PGC1GEBY9CZU|[B000069YW9, B000.

### <font color=blue>Summarise the average ratings for each unique product/asin in the year 2014.</font>

In [24]:
# Subset by filtering for the year 2014
mySet = myReview.where(f.col("reviewTime").contains("2014"))

# Now summarise the average ratings for each product
mySet.groupBy("asin").agg(
    f.count("overall").alias("num_reviews"),
    f.avg("overall").alias("rating")
).sort("num_reviews",ascending=False).show()

+----------+-----------+------------------+
|      asin|num_reviews|            rating|
+----------+-----------+------------------+
|B00005N7OV|         14| 4.642857142857143|
|B000ILY9LW|         13| 4.769230769230769|
|B00005N7QG|         10|               4.3|
|B00005N7PN|         10|               4.7|
|B000IJ84F6|         10|               4.7|
|B00005NIOO|          9| 3.888888888888889|
|B00007B10Y|          9| 4.777777777777778|
|B000OPOEEE|          9| 3.888888888888889|
|B00005R8BR|          9| 4.555555555555555|
|B001LF4EVO|          8|              4.75|
|B00005NIOH|          8|              4.75|
|B0037STB02|          8|             4.625|
|B000IJ7RQ8|          8|             4.625|
|B01CF3ECNK|          7|3.7142857142857144|
|B00005N7SG|          7|3.7142857142857144|
|B000FTJ7JQ|          7|3.7142857142857144|
|B000IOE9Y6|          6| 4.833333333333333|
|B00EVV77A0|          5|               5.0|
|B0001MS2D4|          5|               5.0|
|B00005N7RA|          5|        

## 2. PySpark dataframe operations

In [25]:
# Again make sure 'Airline-Sentiment.csv' is in the runtime.

# Load the csv into a dataframe
# This time we add "inferSchema=True" so that "retweet_count" is recognised as integer type
mySparkDF = spark.read.csv("Airline-Sentiment.csv",header=True,encoding='latin1',escape="\"",multiLine=True,inferSchema=True)
mySparkDF.show(5)

+-----------------+--------------+-------------+--------------------+-----------+-------------+--------------+--------------------+
|airline_sentiment|       airline|retweet_count|                text|tweet_coord|tweet_created|tweet_location|       user_timezone|
+-----------------+--------------+-------------+--------------------+-----------+-------------+--------------+--------------------+
|          neutral|Virgin America|            0|@VirginAmerica Wh...|       null|2/24/15 11:35|          null|Eastern Time (US ...|
|         positive|Virgin America|            0|@VirginAmerica pl...|       null|2/24/15 11:15|          null|Pacific Time (US ...|
|          neutral|Virgin America|            0|@VirginAmerica I ...|       null|2/24/15 11:15|     Lets Play|Central Time (US ...|
|         negative|Virgin America|            0|@VirginAmerica it...|       null|2/24/15 11:15|          null|Pacific Time (US ...|
|         negative|Virgin America|            0|@VirginAmerica an...|       

In [26]:
# Produce summary statistics

mySparkDF.describe("retweet_count")

summary,retweet_count
count,14640.0
mean,0.0826502732240437
stddev,0.7457781608465299
min,0.0
max,44.0


In [27]:
# Previously we used "printSchema" to show data types
# Below is another way to show data types for each column

mySparkDF.dtypes

[('airline_sentiment', 'string'),
 ('airline', 'string'),
 ('retweet_count', 'int'),
 ('text', 'string'),
 ('tweet_coord', 'string'),
 ('tweet_created', 'string'),
 ('tweet_location', 'string'),
 ('user_timezone', 'string')]

In [28]:
# Get the columns

mySparkDF.columns

['airline_sentiment',
 'airline',
 'retweet_count',
 'text',
 'tweet_coord',
 'tweet_created',
 'tweet_location',
 'user_timezone']

In [29]:
# Get a single column
# Add truncate=False to show more content

mySparkDF.select('text').show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|@VirginAmerica What @dhepburn said.                                                                                                         |
|@VirginAmerica plus you've added commercials to the experience... tacky.                                                                    |
|@VirginAmerica I didn't today... Must mean I need to take another trip!                                                                     |
|@VirginAmerica it's really aggressive to blast obnoxious "entertainment" in your guests' faces &amp; they have little recourse              |

In [30]:
# Rename a column

mySparkDF.withColumnRenamed("text", "tweet").show()

+-----------------+--------------+-------------+--------------------+-----------+-------------+--------------------+--------------------+
|airline_sentiment|       airline|retweet_count|               tweet|tweet_coord|tweet_created|      tweet_location|       user_timezone|
+-----------------+--------------+-------------+--------------------+-----------+-------------+--------------------+--------------------+
|          neutral|Virgin America|            0|@VirginAmerica Wh...|       null|2/24/15 11:35|                null|Eastern Time (US ...|
|         positive|Virgin America|            0|@VirginAmerica pl...|       null|2/24/15 11:15|                null|Pacific Time (US ...|
|          neutral|Virgin America|            0|@VirginAmerica I ...|       null|2/24/15 11:15|           Lets Play|Central Time (US ...|
|         negative|Virgin America|            0|@VirginAmerica it...|       null|2/24/15 11:15|                null|Pacific Time (US ...|
|         negative|Virgin America|

In [31]:
# Where as an alias of filter

mySparkDF.where("retweet_count > 1").count()

127

In [32]:
# Try with filter

mySparkDF.filter("retweet_count > 1").orderBy("retweet_count",ascending=False).show()

+-----------------+----------+-------------+--------------------+-----------+-------------+--------------------+--------------------+
|airline_sentiment|   airline|retweet_count|                text|tweet_coord|tweet_created|      tweet_location|       user_timezone|
+-----------------+----------+-------------+--------------------+-----------+-------------+--------------------+--------------------+
|         negative|US Airways|           44|@USAirways 5 hr f...|       null|2/17/15 20:06|                null|Eastern Time (US ...|
|         negative|US Airways|           32|@USAirways of cou...|       null|2/17/15 20:50|                null|Eastern Time (US ...|
|         negative|     Delta|           31|STOP. USING.THIS....|       null|2/23/15 10:30|        New York, NY|                null|
|          neutral|US Airways|           28|@USAirways with t...|       null|2/22/15 11:46|Airports Around T...|Eastern Time (US ...|
|         positive| Southwest|           22|@SouthwestAir bea.

In [33]:
# Multiple conditions

mySparkDF.filter((mySparkDF["retweet_count"] > 1) &
                 (mySparkDF["airline"] == 'Delta')
                 ).orderBy("retweet_count",ascending=False).show()

+-----------------+-------+-------------+--------------------+-----------+-------------+--------------------+--------------------+
|airline_sentiment|airline|retweet_count|                text|tweet_coord|tweet_created|      tweet_location|       user_timezone|
+-----------------+-------+-------------+--------------------+-----------+-------------+--------------------+--------------------+
|         negative|  Delta|           31|STOP. USING.THIS....|       null|2/23/15 10:30|        New York, NY|                null|
|         negative|  Delta|           22|can you not? RT @...|       null|2/23/15 10:51|         Raleigh, NC|Eastern Time (US ...|
|         negative|  Delta|           18|Just in case you ...|       null|2/23/15 12:04|Sunny Cali by way...|Pacific Time (US ...|
|          neutral|  Delta|           15|X____x RT @JetBlu...|       null|2/23/15 11:03|      i'm only human|Eastern Time (US ...|
|         positive|  Delta|           11|@JetBlue what a g...|       null|2/22/15 1

In [34]:
# Spark dataframe can be converted to Pandas dataframe

myPandasDF = mySparkDF.toPandas()
type(myPandasDF)

In [35]:
# Create a SQL temp view from the dataframe

mySparkDF.createOrReplaceTempView("airlineSentiment")

In [36]:
# Find out which tables are available in this Spark session

spark.catalog.listTables()

[Table(name='airlineSentiment', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [37]:
# Now able to use explicit SQL queries

spark.sql("SELECT * FROM airlineSentiment ORDER BY retweet_count DESC")

airline_sentiment,airline,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
negative,US Airways,44,@USAirways 5 hr f...,,2/17/15 20:06,,Eastern Time (US ...
negative,US Airways,32,@USAirways of cou...,,2/17/15 20:50,,Eastern Time (US ...
negative,Delta,31,STOP. USING.THIS....,,2/23/15 10:30,"New York, NY",
neutral,US Airways,28,@USAirways with t...,,2/22/15 11:46,Airports Around T...,Eastern Time (US ...
positive,Southwest,22,@SouthwestAir bea...,,2/20/15 12:27,"Bellevue, WA",Hawaii
negative,Delta,22,can you not? RT @...,,2/23/15 10:51,"Raleigh, NC",Eastern Time (US ...
negative,Delta,18,Just in case you ...,,2/23/15 12:04,Sunny Cali by way...,Pacific Time (US ...
neutral,Delta,15,X____x RT @JetBlu...,,2/23/15 11:03,i'm only human,Eastern Time (US ...
positive,Delta,11,@JetBlue what a g...,,2/22/15 12:35,Palo Alto CA,Pacific Time (US ...
positive,US Airways,9,@USAirways Wow un...,,2/23/15 10:42,TotalWorldJaxxina...,


### <font color=blue>Upload the file "titanic_train.csv" to the runtime</font>

### <font color=blue>Use PySpark dataframe operations to get information for all male passengers with age 50 and above</font>

In [38]:
# First load the data (be sure it is uploaded in the runtime)
titanic_train = spark.read.csv('titanic_train.csv', header = True, inferSchema=True)

# Use multiple conditions to get the information required
titanic_train.filter((titanic_train["Age"] >= 50) &
                     (titanic_train["Sex"] == 'male')
                     ).show()

+-----------+--------+------+--------------------+----+----+-----+-----+-----------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|     Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+-----------+-------+-----+--------+
|          7|       0|     1|McCarthy, Mr. Tim...|male|54.0|    0|    0|      17463|51.8625|  E46|       S|
|         34|       0|     2|Wheadon, Mr. Edwa...|male|66.0|    0|    0| C.A. 24579|   10.5| null|       S|
|         55|       0|     1|Ostby, Mr. Engelh...|male|65.0|    0|    1|     113509|61.9792|  B30|       C|
|         95|       0|     3|   Coxon, Mr. Daniel|male|59.0|    0|    0|     364500|   7.25| null|       S|
|         97|       0|     1|Goldschmidt, Mr. ...|male|71.0|    0|    0|   PC 17754|34.6542|   A5|       C|
|        117|       0|     3|Connors, Mr. Patrick|male|70.5|    0|    0|     370369|   7.75| null|       Q|
|        125|       0|     1

### <font color=blue>Use SQL statements to get information for the oldest passenger in first class</font>

In [39]:
# Create a SQL temp view from the dataframe
titanic_train.createOrReplaceTempView("titanicTable")

# Use SQL query to retrieve the information
spark.sql("SELECT * FROM titanicTable WHERE Pclass = 1 ORDER BY Age DESC")

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
631,1,1,"Barkworth, Mr. Al...",male,80.0,0,0,27042,30.0,A23,S
494,0,1,"Artagaveytia, Mr....",male,71.0,0,0,PC 17609,49.5042,,C
97,0,1,"Goldschmidt, Mr. ...",male,71.0,0,0,PC 17754,34.6542,A5,C
746,0,1,"Crosby, Capt. Edw...",male,70.0,1,1,WE/P 5735,71.0,B22,S
55,0,1,"Ostby, Mr. Engelh...",male,65.0,0,1,113509,61.9792,B30,C
457,0,1,"Millet, Mr. Franc...",male,65.0,0,0,13509,26.55,E38,S
439,0,1,"Fortune, Mr. Mark",male,64.0,1,4,19950,263.0,C23 C25 C27,S
546,0,1,"Nicholson, Mr. Ar...",male,64.0,0,0,693,26.0,,S
276,1,1,"Andrews, Miss. Ko...",female,63.0,1,0,13502,77.9583,D7,S
830,1,1,"Stone, Mrs. Georg...",female,62.0,0,0,113572,80.0,B28,
