#### Install Spark on Collab

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

#### Set Environment Variables

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

##### Get Data

Will be using the titanic dataset [here](https://www.kaggle.com/competitions/titanic/data?select=train.csv)

In [4]:
# load the dataframe

df = spark.read.csv("train.csv", header=True)
df

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


Note: Spark will show only the above 20 results unless we ask for more through the table's schema

In [5]:
# getting the schema
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



From the above we can see that our schema data is read as strings, so we need to inferschema to correct that

In [6]:
#infering Schema
df = spark.read.csv("train.csv", header=True, inferSchema=True)
df

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


In [7]:
#checking our scema again
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [11]:
df.head(5)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'),
 Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'),
 Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S')]

In [12]:
df.limit(5)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S


In [14]:
# sql select statement

df.select('Pclass').limit(3)

Pclass
3
1
3


In [21]:
#where condition
from pyspark.sql.functions import col

df.where(col('Pclass') < 3)


PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C
12,1,1,"Bonnell, Miss. El...",female,58.0,0,0,113783,26.55,C103,S
16,1,2,"Hewlett, Mrs. (Ma...",female,55.0,0,0,248706,16.0,,S
18,1,2,"Williams, Mr. Cha...",male,,0,0,244373,13.0,,S
21,0,2,"Fynney, Mr. Joseph J",male,35.0,0,0,239865,26.0,,S
22,1,2,"Beesley, Mr. Lawr...",male,34.0,0,0,248698,13.0,D56,S
24,1,1,"Sloper, Mr. Willi...",male,28.0,0,0,113788,35.5,A6,S


In [23]:
df.select('PassengerId', 'Survived').limit(5)

PassengerId,Survived
1,0
2,1
3,1
4,1
5,0


In [25]:
df.where((df.Age > 20) & (df.Survived == 1)).limit(5)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
12,1,1,"Bonnell, Miss. El...",female,58.0,0,0,113783,26.55,C103,S


In [26]:
df.agg({'Age':'avg'})

avg(Age)
29.69911764705882


In [27]:
df.agg({'Fare':'avg'})

avg(Fare)
32.2042079685746


In [28]:
df.groupBy('Pclass').agg({'Fare':'avg'}).orderBy('Pclass', ascending=True)

Pclass,avg(Fare)
1,84.15468749999992
2,20.66218315217391
3,13.675550101832997


In [36]:
from pyspark.sql.functions import avg, col

result = df.filter(col("age") > 28).agg(avg(col("age")).alias("average_age"))

# Show the result
result.show()

+-----------------+
|      average_age|
+-----------------+
|41.13636363636363|
+-----------------+



In [37]:
df.filter(df.Age > 25).agg({'Fare':'avg'})

avg(Fare)
37.61960169491524


In [38]:
# rounding down the Fare column using udf

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def round_float_down(x):
  return int(x)

round_float_down_udf = udf(round_float_down, IntegerType())

df.select('PassengerId', 'Fare', round_float_down_udf('Fare').alias('Fare Rounded Down')).limit(5)

PassengerId,Fare,Fare Rounded Down
1,7.25,7
2,71.2833,71
3,7.925,7
4,53.1,53
5,8.05,8


Directly using SQl 

In [39]:
# First create a temp view of our dataframe and name it

df.createOrReplaceTempView("Titanic")
    

In [40]:
#then use the normal sql syntax
spark.sql('select * from Titanic')

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C
