# Setting up PySpark in Colab

Download Java

In [33]:
!sudo apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/comp

Install Apache Spark 3.0.1 with Hadoop 2.7

In [2]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

In [3]:
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

Install the findspark library - it will locate Spark on the system and import it as a regular library

In [4]:
!pip install -q findspark

Set the environment path - this will enable us to run Pyspark in the Colab environment

In [34]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

Import findspark and use the findspark.init() method

In [35]:
import findspark
findspark.init()

If you want to know the location where Spark is installed, use findspark.find()

In [36]:
findspark.find()

'/content/spark-3.0.1-bin-hadoop2.7'

Import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark

In [37]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Print the SparkSession variable.

In [38]:
spark

Load the dataset using the read.csv module

In [39]:
df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



# Transformations

## Conditional If Statement

Create a slice of data that elicits which rows involve a check as the payment method and which do not (1 = has check, 0 = does not have check)

In [44]:
from pyspark.sql import functions as F
df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)
df = df.withColumn('has check',\
     F.when(\
     F.lower(\
     F.col('PaymentMethod')).contains('check'),\
     F.lit(1)).\
                    otherwise(F.lit(0)))
df.select('has check', 'PaymentMethod').show(50)

+---------+--------------------+
|has check|       PaymentMethod|
+---------+--------------------+
|        1|    Electronic check|
|        1|        Mailed check|
|        1|        Mailed check|
|        0|Bank transfer (au...|
|        1|    Electronic check|
|        1|    Electronic check|
|        0|Credit card (auto...|
|        1|        Mailed check|
|        1|    Electronic check|
|        0|Bank transfer (au...|
|        1|        Mailed check|
|        0|Credit card (auto...|
|        0|Credit card (auto...|
|        0|Bank transfer (au...|
|        1|    Electronic check|
|        0|Credit card (auto...|
|        1|        Mailed check|
|        0|Bank transfer (au...|
|        0|Credit card (auto...|
|        1|    Electronic check|
|        1|    Electronic check|
|        0|Bank transfer (au...|
|        1|        Mailed check|
|        0|Credit card (auto...|
|        0|Credit card (auto...|
|        0|Bank transfer (au...|
|        1|    Electronic check|
|        1

## Sorting

Sort the data based on the MonthlyCharges column, followed by the TotalCharges column

In [12]:
from pyspark.sql import functions as F
df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)
df.sort(F.col("MonthlyCharges"), F.col("TotalCharges")).show(50, truncate=False)

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+-------------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity     |OnlineBackup       |DeviceProtection   |TechSupport        |StreamingTV        |StreamingMovies    |Contract      |PaperlessBilling|PaymentMethod            |MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+-------------------------+--------------+------------+-----+
|6823-SIDFQ|Male  |0            |No     |No        |28 

## Filtering

Filter the data by removing rows without phone service or internet service

In [29]:
df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)

df = df.sort(F.col("MonthlyCharges"))
df = df.filter(df.PhoneService != 'No')
df = df.filter(df.InternetService != 'No')

df.show(40, truncate=False)

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract      |PaperlessBilling|PaymentMethod            |MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|6959-UWKHF|Male  |0            |No     |No        |1     |Yes         |No           |DSL            |No            |No          |No              |No         |N

# Actions

## Collect

Collect the data in the customerID row by returning its elements as an array

In [57]:
df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)

df.select('customerID').collect()

[Row(customerID='7590-VHVEG'),
 Row(customerID='5575-GNVDE'),
 Row(customerID='3668-QPYBK'),
 Row(customerID='7795-CFOCW'),
 Row(customerID='9237-HQITU'),
 Row(customerID='9305-CDSKC'),
 Row(customerID='1452-KIOVK'),
 Row(customerID='6713-OKOMC'),
 Row(customerID='7892-POOKP'),
 Row(customerID='6388-TABGU'),
 Row(customerID='9763-GRSKD'),
 Row(customerID='7469-LKBCI'),
 Row(customerID='8091-TTVAX'),
 Row(customerID='0280-XJGEX'),
 Row(customerID='5129-JLPIS'),
 Row(customerID='3655-SNQYZ'),
 Row(customerID='8191-XWSZG'),
 Row(customerID='9959-WOFKT'),
 Row(customerID='4190-MFLUW'),
 Row(customerID='4183-MYFRB'),
 Row(customerID='8779-QRDMV'),
 Row(customerID='1680-VDCWW'),
 Row(customerID='1066-JKSGK'),
 Row(customerID='3638-WEABW'),
 Row(customerID='6322-HRPFA'),
 Row(customerID='6865-JZNKO'),
 Row(customerID='6467-CHFZW'),
 Row(customerID='8665-UTDHZ'),
 Row(customerID='5248-YGIJN'),
 Row(customerID='8773-HHUOZ'),
 Row(customerID='3841-NFECX'),
 Row(customerID='4929-XIHVW'),
 Row(cus

## Collect List & Collect Set


Use collect_list to return all values from a column **with** duplicates

In [68]:
df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)

from pyspark.sql import functions as F

df.select(F.collect_list("TotalCharges")).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Use collect_set to return all values from a column **without** duplicates

In [69]:
df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)

from pyspark.sql import functions as F

df.select(F.collect_set("TotalCharges")).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## First & Last

Use first function to show the first row of a column

In [70]:
df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)

from pyspark.sql import functions as F

df.select(F.first("MonthlyCharges")).show(truncate=False)

+---------------------+
|first(MonthlyCharges)|
+---------------------+
|29.85                |
+---------------------+



Use last function to show the last row of a column

In [71]:
df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)

from pyspark.sql import functions as F

df.select(F.last("MonthlyCharges")).show(truncate=False)

+--------------------+
|last(MonthlyCharges)|
+--------------------+
|105.65              |
+--------------------+

