
# **Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.4.5 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

Run a local spark session to test your installation
At the first line, [*] means for all, as many threads as there are in your machine. Local keyword is used to run spark locally. There are different parameters that can be used with it . More info in https://stackoverflow.com/questions/32356143/what-does-setmaster-local-mean-in-spark



In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [44]:
!ls

characters.csv	spark-2.4.5-bin-hadoop2.7      spark-warehouse
sample_data	spark-2.4.5-bin-hadoop2.7.tgz


the code below is the old way of creating SparkSession. Both ways are still valid

If you don't run PySpark through the interactive shell but rather as a Python script. You will need some additional code at the top of your script.

The first thing you must do is create a SparkContext() object. This tells Spark how to access a cluster. SparkConf() is where you can set the configuration for your Spark application.

Note: You can only have **ONE** SparkContext running at once.

In [0]:
#from pyspark import SparkContext, SparkConf
#from pyspark.sql import SQLContext
#conf = SparkConf().setAppName('Spark DL Tabular Pipeline').setMaster('local[*]')
#sc = SparkContext(conf=conf)
#sql_context = SQLContext(sc)

Upload a sample dataset. We will use a small dataset in the below link


In [6]:
from google.colab import files
files.upload()

Saving characters.csv to characters.csv


{'characters.csv': b'name,height,mass,hair_color,skin_color,eye_color,birth_year,gender,homeworld,species\nLuke Skywalker,172,77,blond,fair,blue,19BBY,male,Tatooine,Human\nC-3PO,167,75,NA,gold,yellow,112BBY,NA,Tatooine,Droid\nR2-D2,96,32,NA,"white, blue",red,33BBY,NA,Naboo,Droid\nDarth Vader,202,136,none,white,yellow,41.9BBY,male,Tatooine,Human\nLeia Organa,150,49,brown,light,brown,19BBY,female,Alderaan,Human\nOwen Lars,178,120,"brown, grey",light,blue,52BBY,male,Tatooine,Human\nBeru Whitesun lars,165,75,brown,light,blue,47BBY,female,Tatooine,Human\nR5-D4,97,32,NA,"white, red",red,NA,NA,Tatooine,Droid\nBiggs Darklighter,183,84,black,light,brown,24BBY,male,Tatooine,Human\nObi-Wan Kenobi,182,77,"auburn, white",fair,blue-gray,57BBY,male,Stewjon,Human\nAnakin Skywalker,188,84,blond,fair,blue,41.9BBY,male,Tatooine,Human\nWilhuff Tarkin,180,NA,"auburn, grey",fair,blue,64BBY,male,Eriadu,Human\nChewbacca,228,112,brown,NA,blue,200BBY,male,Kashyyyk,Wookiee\nHan Solo,180,80,brown,fair,brown,29BBY

check on the file 

In [7]:
!ls
!pwd

characters.csv	spark-2.4.5-bin-hadoop2.7
sample_data	spark-2.4.5-bin-hadoop2.7.tgz
/content


Congrats! Your Colab is ready to run Pyspark.

# Introduction to pyspark

Now that we have uploaded the dataset, we can start analyzing. 


In [0]:
df = spark.read.csv('characters.csv',inferSchema=True, header =True)


Notice that we used InferSchema inside read.csv mofule. InferSchema enables us to infer automatically different data types for each column.

Let us print look into the dataset to see the data types of each column:

In [9]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- height: string (nullable = true)
 |-- mass: string (nullable = true)
 |-- hair_color: string (nullable = true)
 |-- skin_color: string (nullable = true)
 |-- eye_color: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- homeworld: string (nullable = true)
 |-- species: string (nullable = true)



use show() to get a sample of df

In [10]:
df.show(5)

+--------------+------+----+----------+-----------+---------+----------+------+---------+-------+
|          name|height|mass|hair_color| skin_color|eye_color|birth_year|gender|homeworld|species|
+--------------+------+----+----------+-----------+---------+----------+------+---------+-------+
|Luke Skywalker|   172|  77|     blond|       fair|     blue|     19BBY|  male| Tatooine|  Human|
|         C-3PO|   167|  75|        NA|       gold|   yellow|    112BBY|    NA| Tatooine|  Droid|
|         R2-D2|    96|  32|        NA|white, blue|      red|     33BBY|    NA|    Naboo|  Droid|
|   Darth Vader|   202| 136|      none|      white|   yellow|   41.9BBY|  male| Tatooine|  Human|
|   Leia Organa|   150|  49|     brown|      light|    brown|     19BBY|female| Alderaan|  Human|
+--------------+------+----+----------+-----------+---------+----------+------+---------+-------+
only showing top 5 rows



# Data Exploration

find out total of rows

In [11]:
total_rows = df.count()
print(total_rows)


87


for larger data we can get a smaller sample so we can iterate faster

Here we have an example of lazy evaluation. Lets look at the df1 which takes 20% of the dataframe. It will not be compute until the next code block which is df_sample.show()



In [0]:
df_sample = df.sample(fraction=.1)


here is where the df_sample will be computed

In [13]:
df_sample.show()


+---------------+------+----+----------+-------------------+---------+----------+------+------------+----------+
|           name|height|mass|hair_color|         skin_color|eye_color|birth_year|gender|   homeworld|   species|
+---------------+------+----+----------+-------------------+---------+----------+------+------------+----------+
|       Han Solo|   180|  80|     brown|               fair|    brown|     29BBY|  male|    Corellia|     Human|
|   Roos Tarpals|   224|  82|      none|               grey|   orange|        NA|  male|       Naboo|    Gungan|
|     Rugor Nass|   206|  NA|      none|              green|   orange|        NA|  male|       Naboo|    Gungan|
|        Sebulba|   112|  40|      none|          grey, red|   orange|        NA|  male|   Malastare|       Dug|
|       Dud Bolt|    94|  45|      none|         blue, grey|   yellow|        NA|  male|     Vulpter|Vulptereen|
| Ben Quadinaros|   163|  65|      none|grey, green, yellow|   orange|        NA|  male|        

Now lets create a tempview and have some fun with it . 
Note : 
The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

Throws TempTableAlreadyExistsException, if the view name already exists in the catalog.

In [0]:
df.createOrReplaceTempView("df1")

Describe function 

- Computes statistics for numeric and string columns.

- This include count, mean, stddev, min, and max.

- If no columns are given, this function computes statistics for all numerical or string columns.

Lets learn about the statistic of the starwar character height

In [15]:
df.describe("height").show()


+-------+------------------+
|summary|            height|
+-------+------------------+
|  count|                87|
|   mean|174.35802469135803|
| stddev| 34.77042875849222|
|    min|               112|
|    max|                NA|
+-------+------------------+



Select characters that have height less than 150

In [21]:

df_height=spark.sql("select * from df1 where height<150")
df_height.show()


+--------------------+------+----+----------+-----------+---------+----------+------+-----------+--------------+
|                name|height|mass|hair_color| skin_color|eye_color|birth_year|gender|  homeworld|       species|
+--------------------+------+----+----------+-----------+---------+----------+------+-----------+--------------+
|               R2-D2|    96|  32|        NA|white, blue|      red|     33BBY|    NA|      Naboo|         Droid|
|               R5-D4|    97|  32|        NA| white, red|      red|        NA|    NA|   Tatooine|         Droid|
|                Yoda|    66|  17|     white|      green|    brown|    896BBY|  male|         NA|Yoda's species|
|Wicket Systri War...|    88|  20|     brown|      brown|    brown|      8BBY|  male|      Endor|          Ewok|
|               Watto|   137|  NA|     black| blue, grey|   yellow|        NA|  male|   Toydaria|     Toydarian|
|             Sebulba|   112|  40|      none|  grey, red|   orange|        NA|  male|  Malastare

lets try a group by function to see the relation of species and eye color

In [22]:
#df_result=spark.sql("select * from Character_Table")
df_species_eye_color=spark.sql("select species,count(eye_color) as number,eye_color from df1 group by eye_color,species")
df_species_eye_color.show()


+--------------+------+-------------+
|       species|number|    eye_color|
+--------------+------+-------------+
|        Gungan|     3|       orange|
|            NA|     1|         blue|
|         Human|     2|        hazel|
|            NA|     1|        brown|
|      Iktotchi|     1|       orange|
|     Geonosian|     1|       yellow|
|       Twi'lek|     1|        hazel|
|      Kaminoan|     2|        black|
|        Cerean|     1|       yellow|
|       Kaleesh|     1|green, yellow|
|      Mirialan|     2|         blue|
|         Human|     1|         dark|
|      Nautolan|     1|        black|
|         Human|    12|         blue|
|Yoda's species|     1|        brown|
|          Muun|     1|         gold|
|        Zabrak|     1|        brown|
|          Ewok|     1|        brown|
|          Hutt|     1|       orange|
|        Zabrak|     1|       yellow|
+--------------+------+-------------+
only showing top 20 rows



Remember I was talking about the RDD persistence concept in Spark Core API ? here I will show you how to make your dataframe persist with cache()

Only do this if you plan on reuse the result

In [23]:

df_species_eye_color.cache()


DataFrame[species: string, number: bigint, eye_color: string]

distinct function, again without the count() Spark will be lazy and not yet executed

In [0]:
df_hair_color = df.select('hair_color').distinct()


Lets find out how many hair color that we have

In [27]:
df_hair_color.count()


12

Alias is used to rename 

Notice that now df_eye_color is still exist and df_height is not because we saved it to cache

In [0]:
from pyspark.sql.functions import *
df_new1 = df_species_eye_color.alias("df_species_eye_color")

#df_new2 =df_height.alias("df_height")



Let find out the species that have more than 10 count of eye color

In [34]:
df_filter = df_new1.filter('number > 12')
df_filter.show()

+-------+------+---------+
|species|number|eye_color|
+-------+------+---------+
|  Human|    17|    brown|
+-------+------+---------+



agg
Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg())


In [38]:
df_species_eye_color.agg({"number": "max"}).collect()


[Row(max(number)=17)]

In [39]:
from pyspark.sql import functions as F
df_species_eye_color.agg(F.max(df_species_eye_color.number)).collect()


[Row(max(number)=17)]