# Setting Spark Session in Google Collab Notebook

Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.

In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Next, we will install Apache Spark 3.1.1 with Hadoop 2.7 

In [4]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

Next, we will install Apache Spark 3.0.1 with Hadoop 2.7 

In [5]:
!tar xf spark-3.1.1-bin-hadoop2.7.tgz

There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.

In [6]:
pip install -q findspark

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [8]:
import findspark
findspark.init()

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [10]:
findspark.find()

'/content/spark-3.1.1-bin-hadoop2.7'

Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.

You can give a name to the session using appName() and add some configurations with config() if you wish.

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("BDP ICP10")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

print the SparkSession variable.

In [50]:
spark

If you want to view the Spark UI, you would have to include a few more lines of code to create a public URL for the UI page.

In [51]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

--2021-04-06 01:12:59--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 34.235.3.193, 3.209.27.98, 34.203.109.182, ...
Connecting to bin.equinox.io (bin.equinox.io)|34.235.3.193|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14746350 (14M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-04-06 01:13:00 (36.0 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [14746350/14746350]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://f7bdf26c95a4.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


# Part-1

**1. Import the dataset and create data frames directly on import.**

In [14]:
import csv
import pyspark.sql.functions as f

Loading data into PySpark

In [16]:
df = spark.read.csv(r"/content/sample_data/survey.csv",header=True)
df.createOrReplaceTempView("Survey")

**2. Save data to file.**

In [49]:
df.write.option("header", "true").csv("df_write_survey.csv")

In [18]:
df

DataFrame[Timestamp: string, Age: string, Gender: string, Country: string, state: string, self_employed: string, family_history: string, treatment: string, work_interfere: string, no_employees: string, remote_work: string, tech_company: string, benefits: string, care_options: string, wellness_program: string, seek_help: string, anonymity: string, leave: string, mental_health_consequence: string, phys_health_consequence: string, coworkers: string, supervisor: string, mental_health_interview: string, phys_health_interview: string, mental_vs_physical: string, obs_consequence: string, comments: string]

In [19]:
df.count()

1259

**3. Check for Duplicate records in the dataset.**

In [20]:
df=df.dropDuplicates()

In [21]:
df.count()

1259

Checking whether any duplicates are present.

In [22]:
df.groupBy(df.columns).count().where(f.col('count') > 1).select(f.sum('count')).show()

+----------+
|sum(count)|
+----------+
|      null|
+----------+



**4. Apply Union operation on the dataset and order the output by Country Name alphabetically.**

Creating two tables based on gender(Male, Female) and merging them using union operation

In [23]:
spark.sql("select * from Survey where Gender = 'Male' or Gender = 'M' or Gender='male'").createTempView("Table_Male")

In [24]:
spark.sql("select * from Survey where Gender = 'Female' or Gender = 'female'").createTempView("Table_Female")

In [25]:
spark.sql("select * from Table_Male union select * from Table_Female order by Country").show(100)

+-------------------+---+------+--------------------+-----+-------------+--------------+---------+--------------+--------------+-----------+------------+----------+------------+----------------+----------+----------+------------------+-------------------------+-----------------------+------------+------------+-----------------------+---------------------+------------------+---------------+--------------------+
|          Timestamp|Age|Gender|             Country|state|self_employed|family_history|treatment|work_interfere|  no_employees|remote_work|tech_company|  benefits|care_options|wellness_program| seek_help| anonymity|             leave|mental_health_consequence|phys_health_consequence|   coworkers|  supervisor|mental_health_interview|phys_health_interview|mental_vs_physical|obs_consequence|            comments|
+-------------------+---+------+--------------------+-----+-------------+--------------+---------+--------------+--------------+-----------+------------+----------+--------

**5. Use Groupby Query based on treatment.**

In [26]:
spark.sql("select treatment,count(*) as count from Survey group by treatment").show()

+---------+-----+
|treatment|count|
+---------+-----+
|       No|  622|
|      Yes|  637|
+---------+-----+



# Part-2

**1. Apply the basic queries related to Joins and aggregate functions (at least 2)**

Join Function

1.   Selecting the people from male and female tables joined by country
2.   Selecting the people from male and female tables joined by State

In [27]:
spark.sql("select m.age,m.Country,m.Gender, m.treatment,f.Gender,f.treatment from Table_Male m join Table_Female f on m.Country = f.Country").show()

+---+-------------+------+---------+------+---------+
|age|      Country|Gender|treatment|Gender|treatment|
+---+-------------+------+---------+------+---------+
| 44|United States|     M|       No|Female|      Yes|
| 44|United States|     M|       No|Female|      Yes|
| 44|United States|     M|       No|female|       No|
| 44|United States|     M|       No|female|      Yes|
| 44|United States|     M|       No|Female|      Yes|
| 44|United States|     M|       No|Female|      Yes|
| 44|United States|     M|       No|Female|      Yes|
| 44|United States|     M|       No|Female|      Yes|
| 44|United States|     M|       No|female|      Yes|
| 44|United States|     M|       No|female|      Yes|
| 44|United States|     M|       No|female|      Yes|
| 44|United States|     M|       No|Female|       No|
| 44|United States|     M|       No|Female|      Yes|
| 44|United States|     M|       No|Female|      Yes|
| 44|United States|     M|       No|female|      Yes|
| 44|United States|     M|  

In [29]:
spark.sql("select m.age,m.Country,m.Gender, m.treatment,f.Gender,f.treatment from Table_Male m left join Table_Female f on m.State = f.State").show()

+---+-------------+------+---------+------+---------+
|age|      Country|Gender|treatment|Gender|treatment|
+---+-------------+------+---------+------+---------+
| 44|United States|     M|       No|female|      Yes|
| 44|United States|     M|       No|Female|      Yes|
| 32|       Canada|  Male|       No|female|       No|
| 32|       Canada|  Male|       No|Female|      Yes|
| 32|       Canada|  Male|       No|Female|       No|
| 32|       Canada|  Male|       No|female|      Yes|
| 32|       Canada|  Male|       No|female|      Yes|
| 32|       Canada|  Male|       No|Female|      Yes|
| 32|       Canada|  Male|       No|Female|       No|
| 32|       Canada|  Male|       No|female|      Yes|
| 32|       Canada|  Male|       No|Female|       No|
| 32|       Canada|  Male|       No|Female|       No|
| 32|       Canada|  Male|       No|female|       No|
| 32|       Canada|  Male|       No|Female|      Yes|
| 32|       Canada|  Male|       No|Female|      Yes|
| 32|       Canada|  Male|  

Aggregate Function

---
Calculating sum of age of all the people





In [28]:
spark.sql("select sum(Age),count(Gender) from Survey").show()

+------------------------+-------------+
|sum(CAST(Age AS DOUBLE))|count(Gender)|
+------------------------+-------------+
|        1.00000038724E11|         1259|
+------------------------+-------------+



**2. Write a query to fetch 13th Row in the dataset.**

Identifying the 13th row of the dataframe using limit 13 then displaying it

In [30]:
spark.sql("select * from Survey ORDER BY Timestamp limit 13").createTempView("Test")

In [31]:
spark.sql("select * from Test order by Timestamp desc limit 1").show()

+-------------------+---+------+-------------+-----+-------------+--------------+---------+--------------+------------+-----------+------------+--------+------------+----------------+---------+----------+------------------+-------------------------+-----------------------+---------+----------+-----------------------+---------------------+------------------+---------------+--------+
|          Timestamp|Age|Gender|      Country|state|self_employed|family_history|treatment|work_interfere|no_employees|remote_work|tech_company|benefits|care_options|wellness_program|seek_help| anonymity|             leave|mental_health_consequence|phys_health_consequence|coworkers|supervisor|mental_health_interview|phys_health_interview|mental_vs_physical|obs_consequence|comments|
+-------------------+---+------+-------------+-----+-------------+--------------+---------+--------------+------------+-----------+------------+--------+------------+----------------+---------+----------+------------------+-------

# Mounting Drive to push generated files

This will enable you to access any directory on your Drive inside the Colab notebook

In [48]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
