Setting up PySpark in Colab

Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java

In [1]:
!sudo apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 0 B/88.7 kB 0%] [Connecting 0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connected to cloud.r-pro                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connected to cloud.r-pro0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.152)0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                            

Next, we will install Apache Spark 3.0.1 with Hadoop 2.7

In [2]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

Now, we just need to unzip that folder.

In [3]:
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.

In [5]:
!pip install -q findspark

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [7]:
import findspark
findspark.init()

If you want to know the location where Spark is installed, use findspark.find()

In [8]:
findspark.find()

'/content/spark-3.0.1-bin-hadoop2.7'

Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.

You can give a name to the session using appName() and add some configurations with config() if you wish.

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Finally, print the SparkSession variable.

In [10]:
spark

In [23]:
# load data from data.csv file

df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)
df.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|  

In [18]:
df1 = df.select("customerID", "gender").show()


+----------+------+
|customerID|gender|
+----------+------+
|7590-VHVEG|Female|
|5575-GNVDE|  Male|
|3668-QPYBK|  Male|
|7795-CFOCW|  Male|
|9237-HQITU|Female|
|9305-CDSKC|Female|
|1452-KIOVK|  Male|
|6713-OKOMC|Female|
|7892-POOKP|Female|
|6388-TABGU|  Male|
|9763-GRSKD|  Male|
|7469-LKBCI|  Male|
|8091-TTVAX|  Male|
|0280-XJGEX|  Male|
|5129-JLPIS|  Male|
|3655-SNQYZ|Female|
|8191-XWSZG|Female|
|9959-WOFKT|  Male|
|4190-MFLUW|Female|
|4183-MYFRB|Female|
+----------+------+
only showing top 20 rows



# **TRANSFORMATIONS**





**Transformation1**


A lambda function is a small anonymous function.A lambda function can take any number of arguments, but can only have one expression.The power of lambda is better shown when you use them as an anonymous function inside another function.

The truncate() method resizes the file to the given number of bytes.



In [48]:

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction

binary_map = {'Yes':1.0, 'No':0.0, 'True':1.0, 'False':0.0}
toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

df = df.drop('customerID').withColumn('Churn', toNum(df['Churn'])).withColumn('Partner', toNum(df['Partner'])).withColumn('Dependents', toNum(df['Dependents'])).cache()

df.show()

+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|Female|            0|    1.0|       0.0|     1|          No|No phone service|            DSL|

**Transformation2**


A groupby operation involves some combination of splitting the object, applying a function, and combining the results. This can be used to group large amounts of data and compute operations on these groups.

In [31]:

from pyspark.sql.functions import max as sparkMax
transformation2=df.groupBy("PaymentMethod").count()
print("Transformation 2: Grouping data based on PaymentMethod")
transformation2.show()



Transformation 2: Grouping data based on PaymentMethod
+--------------------+-----+
|       PaymentMethod|count|
+--------------------+-----+
|Credit card (auto...| 1522|
|        Mailed check| 1612|
|Bank transfer (au...| 1544|
|    Electronic check| 2365|
+--------------------+-----+



**Transformation3**


Filtering data for monthlyCharges which are less than 100 USD

In [32]:
Transformation3 = df.filter(df['MonthlyCharges'] < 50)
print("Transformation 3: Filtering data based on MonthlyCharges which is less than 50 USD")
Transformation3.show(20)

Transformation 3: Filtering data based on MonthlyCharges which is less than 50 USD
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+-------------

**Transformation4**


Combining transformation task: Filtering + grouping by

In [35]:
# showing number of Internet services used when people pay more than 100 USD
Transformation4 = df.filter(df['MonthlyCharges'] > 100).groupBy('InternetService').count()
print("Transformation4: Filtering data based on MonthlyCharges which is greater than 100 USD")
Transformation4.show()

# showing number of Contract used when people pay more than 100 USD
Transformation4 = df.filter(df['MonthlyCharges'] > 100).groupBy('Contract').count()
print("Transformation4: Filtering data based on MonthlyCharges which is greater than 100 USD")
Transformation4.show()

Transformation4: Filtering data based on MonthlyCharges which is greater than 100 USD
+---------------+-----+
|InternetService|count|
+---------------+-----+
|    Fiber optic|  902|
+---------------+-----+

Transformation4: Filtering data based on MonthlyCharges which is greater than 100 USD
+--------------+-----+
|      Contract|count|
+--------------+-----+
|Month-to-month|  324|
|      One year|  273|
|      Two year|  305|
+--------------+-----+



# ACTIONS

**ACTION1**

Printing five rows of InternetService



In [63]:
df.select('InternetService').take(5)


[Row(InternetService='DSL'),
 Row(InternetService='DSL'),
 Row(InternetService='DSL'),
 Row(InternetService='DSL'),
 Row(InternetService='Fiber optic')]

**ACTION2**

Printing first row of InternetService


In [64]:
df.select('InternetService').first()


Row(InternetService='DSL')

**ACTION 3**

get last 3 element using tail function

In [36]:
action3 = df.tail(3)
print("action3: Getting last 3 data points from data set")
action3

action3: Getting last 3 data points from data set


[Row(customerID='4801-JZAZL', gender='Female', SeniorCitizen=0, Partner='Yes', Dependents='Yes', tenure=11, PhoneService='No', MultipleLines='No phone service', InternetService='DSL', OnlineSecurity='Yes', OnlineBackup='No', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='Yes', PaymentMethod='Electronic check', MonthlyCharges=29.6, TotalCharges='346.45', Churn='No'),
 Row(customerID='8361-LTMKD', gender='Male', SeniorCitizen=1, Partner='Yes', Dependents='No', tenure=4, PhoneService='Yes', MultipleLines='Yes', InternetService='Fiber optic', OnlineSecurity='No', OnlineBackup='No', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='Yes', PaymentMethod='Mailed check', MonthlyCharges=74.4, TotalCharges='306.6', Churn='Yes'),
 Row(customerID='3186-AJIEK', gender='Male', SeniorCitizen=0, Partner='No', Dependents='No', tenure=66, PhoneService=

**ACTION 4**

Printing summary

In [37]:
action4 = df.summary()
print("action4: Getting all the summary of data set")
action4.show()

action4: Getting all the summary of data set
+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|summary|customerID|gender|     SeniorCitizen|Partner|Dependents|            tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|    MonthlyCharges|      TotalCharges|Churn|
+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|  count|      7043|  704