<a href="https://colab.research.google.com/github/DianaMutekhele/BIG-DATA/blob/main/Copy_of_Big_Data_with_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Introduction

More than 500 million tweets, 90 billion emails, 65 million WhatsApp messages are sent – all in a single day! 4 Petabytes of data are generated only on Facebook in 24 hours. That’s incredible!
This, of course, comes with challenges of its own. How does a data science team capture this amount of data? How do you process it and build machine learning models?

-This is where Spark comes into the picture. Spark is written in Scala and it provides APIs to work with Scala, JAVA, Python, and R. PySpark is the Python API written in Python to support Spark

## Why Pyspark?

- One way of handling Big Data is to use a distributed frameworks like Hadoop but these frameworks require a lot of read-write operations on a hard disk which makes it very expensive in terms of time and speed
-PySpark deals with this in an efficient and easy-to-understand way

## Important Key Concepts

## Partitions in Spark
-Partitioning means that the complete data is not present in a single place. It is divided into multiple chunks and these chunks are placed on different nodes.

- If you have one partition, Spark will only have a parallelism of one, even if you have thousands of executors. 
- Also, if you have many partitions but only one executor, Spark will still only have a parallelism of one because there is only one computation resource


## Transformations in Spark
- Data structures are immutable in Spark therefore In order to make any change, we need to instruct Spark on how we would like to modify our data.
- Narrow Transformation: all the elements that are required to compute the results of a single partition live in the single partition
- wide transformation: all the elements that are required to compute the results of single partitions may live in more than one partition 



In [None]:
'''
connecting to drive
'''
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


##Setting up pyspark
Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore we download java first

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [None]:
#install apache(http://spark.apache.org/downloads.html)

!wget -q https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz


In [None]:
!ls

drive	     spark-3.1.1-bin-hadoop2.7.tgz
sample_data  spark-3.1.1-bin-hadoop2.7.tgz.1


In [None]:
#unzip the folder
!tar xf spark-3.1.1-bin-hadoop2.7.tgz


In [None]:
"""
we need to install findspark to help as locate the disk and import it as a single library
https://pypi.org/project/findspark/
"""
!pip install -q findspark


## Set up Environment Variable so that we are able to run pyspark on Colab

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
#we need to locate spark,therefore we import it and use its init()
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.1.1-bin-hadoop2.7'

Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.

You can give a name to the session using appName() and add some configurations with config() if you wish.



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

## If you want to view UI,we need to create a public URL



In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')

--2021-09-29 10:21:24--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.237.133.81, 18.205.222.128, 54.161.241.46, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.237.133.81|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-09-29 10:21:26 (8.22 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [None]:
!curl -s http://localhost:4040/api/tunnels | python3 -c "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"


http://afc6-35-190-156-169.ngrok.io


In [None]:
'''
Load data into pyspark
'''
df =spark.read.csv("/content/drive/MyDrive/blood_pressure.csv",header=True,inferSchema=True)

Exploratory Data Analysis(EDA)

In [None]:
'''
show column details
df.info() > pandas equivalent
'''
df.printSchema()

root
 |-- patient: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- agegrp: string (nullable = true)
 |-- bp_before: integer (nullable = true)
 |-- bp_after: integer (nullable = true)



In [None]:
'''
displaying a given number of rows
df.head()#pandas equivalent
'''
df.show(5)

+-------+----+------+---------+--------+
|patient| sex|agegrp|bp_before|bp_after|
+-------+----+------+---------+--------+
|      1|Male| 30-45|      143|     153|
|      2|Male| 30-45|      163|     170|
|      3|Male| 30-45|      153|     168|
|      4|Male| 30-45|      153|     142|
|      5|Male| 30-45|      146|     141|
+-------+----+------+---------+--------+
only showing top 5 rows



In [None]:
'''
Number of rows in df
df.shape[0] >> pandas equivalent
'''
df.count()

120

In [None]:
'''
Display Specific columns
'''
df.select('bp_before','bp_after').show(5)

+---------+--------+
|bp_before|bp_after|
+---------+--------+
|      143|     153|
|      163|     170|
|      153|     168|
|      153|     142|
|      146|     141|
+---------+--------+
only showing top 5 rows



In [None]:
x_df=df.select("bp_before",'bp_after','agegrp')

In [None]:
x_df.show(5)

+---------+--------+------+
|bp_before|bp_after|agegrp|
+---------+--------+------+
|      143|     153| 30-45|
|      163|     170| 30-45|
|      153|     168| 30-45|
|      153|     142| 30-45|
|      146|     141| 30-45|
+---------+--------+------+
only showing top 5 rows



In [None]:
#statistical summary
df.describe().show()

+-------+------------------+------+------+-----------------+------------------+
|summary|           patient|   sex|agegrp|        bp_before|          bp_after|
+-------+------------------+------+------+-----------------+------------------+
|  count|               120|   120|   120|              120|               120|
|   mean|              60.5|  null|  null|           156.45|151.35833333333332|
| stddev|34.785054261852174|  null|  null|11.38984510116671|14.177622226198425|
|    min|                 1|Female| 30-45|              138|               125|
|    max|               120|  Male|   60+|              185|               185|
+-------+------------------+------+------+-----------------+------------------+



In [None]:
'''
get specific values within a given column
'''
df.select("sex").distinct().show()

+------+
|   sex|
+------+
|Female|
|  Male|
+------+



In [None]:
df.select("agegrp").distinct().show()

+------+
|agegrp|
+------+
|   60+|
| 30-45|
| 46-59|
+------+



In [None]:
'''
fill missing values
'''
df=df.fillna({"bp_before":123,"bp_before":1})

In [None]:
'''
save a file
'''
df.write.csv("/content/results.csv")

In [None]:
'''check the number of partitions available
'''
df.rdd.getNumPartitions()

1