Setting up PySpark in Colab

Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java

In [None]:
!sudo apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.180% [Release.gpg gpgv 697 B] [Connecting to archive.ubuntu.com] [Connecting to s                                                                               Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
0% [Release.gpg gpgv 

Next, we will install Apache Spark 3.0.1 with Hadoop 2.7

In [None]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

Now, we just need to unzip that folder.

In [None]:
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.

In [None]:
!pip install -q findspark

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [None]:
import findspark
findspark.init()

If you want to know the location where Spark is installed, use findspark.find()

In [None]:
findspark.find()

'/content/spark-3.0.1-bin-hadoop2.7'

Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.

You can give a name to the session using appName() and add some configurations with config() if you wish.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Finally, print the SparkSession variable.

In [None]:
spark

In [None]:
#we need to load the dataset. We will use the read.csv module. 
#The inferSchema parameter provided will enable Spark to automatically determine the data type for each column but it has to go over the data once.
# If you don’t want that to happen, then you can instead provide the schema explicitly in the schema parameter.

df = spark.read.csv("/content/sample_data/data.csv", header=True, inferSchema= True)
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [None]:
#Select the Categorical Data from the Dataset
df_people_sub = df.select('customerID', "gender", 'InternetService')
df_people_sub.show()

+----------+------+---------------+
|customerID|gender|InternetService|
+----------+------+---------------+
|7590-VHVEG|Female|            DSL|
|5575-GNVDE|  Male|            DSL|
|3668-QPYBK|  Male|            DSL|
|7795-CFOCW|  Male|            DSL|
|9237-HQITU|Female|    Fiber optic|
|9305-CDSKC|Female|    Fiber optic|
|1452-KIOVK|  Male|    Fiber optic|
|6713-OKOMC|Female|            DSL|
|7892-POOKP|Female|    Fiber optic|
|6388-TABGU|  Male|            DSL|
|9763-GRSKD|  Male|            DSL|
|7469-LKBCI|  Male|             No|
|8091-TTVAX|  Male|    Fiber optic|
|0280-XJGEX|  Male|    Fiber optic|
|5129-JLPIS|  Male|    Fiber optic|
|3655-SNQYZ|Female|    Fiber optic|
|8191-XWSZG|Female|             No|
|9959-WOFKT|  Male|    Fiber optic|
|4190-MFLUW|Female|            DSL|
|4183-MYFRB|Female|    Fiber optic|
+----------+------+---------------+
only showing top 20 rows



In [None]:
#To get a summary statistics, of the data, you can use describe(). It will compute the :count, mean, standarddeviation, min, max
df.describe().show()

+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|summary|customerID|gender|     SeniorCitizen|Partner|Dependents|            tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|    MonthlyCharges|      TotalCharges|Churn|
+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|  count|      7043|  7043|              7043|   7043|      7043|     

In [None]:
#Applying the Filter Transformation on the Dataset
df_people_female = df.filter(df.gender == "Female")
df_people_male = df.filter(df.gender == "Male")

In [None]:
#Get the number of Females in the Dataset
df_people_female.count()


3488

In [None]:
#Get the number of Male in the Dataset
df_people_male.count()

3555

In [None]:
#Applying GroupBy Transformation on the Dataset to check How many poeople are using which service
df_provider = df.groupby("InternetService")
df_provider.count().show()

+---------------+-----+
|InternetService|count|
+---------------+-----+
|    Fiber optic| 3096|
|             No| 1526|
|            DSL| 2421|
+---------------+-----+



In [None]:
#Applying the Column Renamed Transformation through which we can rename a specific column
df = df.withColumnRenamed("gender", "sex")
df.show(10)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|   sex|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

In [None]:
#importing standard libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
spark = SparkSession.builder.appName('pyspark - example reducebykey()').getOrCreate()
sc = spark.sparkContext
  
avengers = [
    ("Hulk",1),
    ("Iron man",1),
    ("Hulk",1),
    ("Thor",1),
    ("Hulk",1),
    ("Iron man",1),
    ("Thor",1),
    ("Iron man",1),
    ("Spiderman",1),
    ("Thor",1)
]

schema = StructType([
        
         StructField('Name', StringType(), True),
         StructField('Index', StringType(), True)
         ])

rdd=spark.sparkContext.parallelize(avengers)

[('Hulk', 1), ('Iron man', 1), ('Hulk', 1), ('Thor', 1), ('Hulk', 1), ('Iron man', 1), ('Thor', 1), ('Iron man', 1), ('Spiderman', 1), ('Thor', 1)]


In [None]:
# Apply countByKey() action

rdd3=rdd.countByKey().items()
print(rdd3)

dict_items([('Hulk', 3), ('Iron man', 3), ('Thor', 3), ('Spiderman', 1)])


In [None]:
# Apply reduce() action

num_rdd = sc.parallelize(range(1,1000))
num_rdd.reduce(lambda x,y: x+y)

499500

In [None]:
# Apply max, min, sum, variance and standard deviation action

num_rdd.max(),num_rdd.min(), num_rdd.sum(),num_rdd.variance(),num_rdd.stdev() 

(999, 1, 499500, 83166.66666666667, 288.38631497813253)