## **APACHE SPARK SHOWCASE:**

In very simple words Pandas run operations on a single machine whereas PySpark runs on multiple machines. If you are working on a Machine Learning application where you are dealing with larger datasets, PySpark is a best fit which could processes operations many times(100x) faster than Pandas.

Pandas uses a single machine to perform iterative processes on a dataset whilst apache spark uses multiple machines.

Pandas can load the data by reading CSV, JSON, SQL, many other formats and creates a DataFrame which is a structured object containing rows and columns (similar to SQL table).

It doesn’t support distributed processing hence you would always need to increase the resources when you need additional horsepower to support your growing data.

PySpark is a Spark library written in Python to run Python applications using Apache Spark capabilities. Using PySpark we can run applications parallelly on the distributed cluster (multiple nodes) or even on a single node.

How to Decide Between Pandas vs PySpark:

Below are the few considerations when to choose PySpark over Pandas

- If your data is huge and grows significantly over the years and you wanted to improve your processing time.

- If you want fault-tolerant.

- ANSI SQL compatibility.
- Language to choose (Spark supports Python, Scala, Java & R)
- When you want Machine-learning capability.
- Would like to read Parquet, Avro, Hive, Casandra, Snowflake e.t.c
- If you wanted to stream the data and process it real-time.

#### Installing apache spark in python

In [1]:
%pip install PyArrow
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


PySpark uses Java underlying hence you need to have Java on your Mac. Since Java is a third party, you can install it using the Homebrew command brew. 

Since Oracle Java is not open source anymore, I am using the OpenJDK version 11. 

Run the below command in the terminal to install it.

`
brew install java`

and then enter the following

`sudo ln -sfn /opt/homebrew/opt/openjdk/libexec/openjdk.jdk \
     /Library/Java/JavaVirtualMachines/openjdk.jdk`

#### Initialize a spark session:

A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files. To create a SparkSession, use the following builder pattern:

In [4]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf

In [5]:
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('PySpark DataFrame From External Files').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/02 19:00:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Creating a dataframe using pyspark:

In [14]:
df = spark.read.csv('uber.csv', sep = ',', inferSchema = True, header = True)

#### DataFrame operations:

In [15]:
df

DataFrame[index: int, key: timestamp, fare_amount: double, pickup_datetime: timestamp, pickup_longitude: double, pickup_latitude: double, dropoff_longitude: double, dropoff_latitude: double, passenger_count: int]

In [16]:
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [25]:
df.show(5)

+--------+-------------------+-----------+-------------------+------------------+-----------------+------------------+-----------------+---------------+
|   index|                key|fare_amount|    pickup_datetime|  pickup_longitude|  pickup_latitude| dropoff_longitude| dropoff_latitude|passenger_count|
+--------+-------------------+-----------+-------------------+------------------+-----------------+------------------+-----------------+---------------+
|24238194|2015-05-07 19:52:06|        7.5|2015-05-07 20:52:06|-73.99981689453125|40.73835372924805|   -73.99951171875|40.72321701049805|              1|
|27835199|2009-07-17 20:04:56|        7.7|2009-07-17 21:04:56|        -73.994355|        40.728225|         -73.99471|        40.750325|              1|
|44984355|2009-08-24 21:45:00|       12.9|2009-08-24 22:45:00|        -74.005043|         40.74077|        -73.962565|        40.772647|              1|
|25894730|2009-06-26 08:22:21|        5.3|2009-06-26 09:22:21|        -73.976124| 

In [24]:
df.select('index').show(5)

+--------+
|   index|
+--------+
|24238194|
|27835199|
|44984355|
|25894730|
|17610152|
+--------+
only showing top 5 rows



In [23]:
df.filter(df['fare_amount'] > 30).show(5)

+--------+-------------------+-----------+-------------------+------------------+------------------+------------------+-----------------+---------------+
|   index|                key|fare_amount|    pickup_datetime|  pickup_longitude|   pickup_latitude| dropoff_longitude| dropoff_latitude|passenger_count|
+--------+-------------------+-----------+-------------------+------------------+------------------+------------------+-----------------+---------------+
|19277743|2014-06-04 06:49:00|       39.5|2014-06-04 07:49:00|-73.78808000000001|         40.642187|        -73.865042|        40.725997|              4|
|22405517|2013-01-03 22:24:41|       56.8|2013-01-03 22:24:41|        -73.993498|         40.764686|        -73.993498|        40.764686|              1|
|25485719|2009-08-07 10:43:07|      49.57|2009-08-07 11:43:07|-73.97505799999999|          40.78882|-73.97505799999999|         40.78882|              1|
|37942404|2011-11-18 09:51:00|       30.9|2011-11-18 09:51:00|        -73.99

In [27]:
df.count()

200000

We can also convert the DF as a table which can be queried using SQL programatically:

In [40]:
df.createOrReplaceTempView("mytable")

sqlDF = spark.sql("SELECT * FROM mytable")
sqlDF.show()

+--------+-------------------+-----------+-------------------+------------------+------------------+------------------+------------------+---------------+
|   index|                key|fare_amount|    pickup_datetime|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|passenger_count|
+--------+-------------------+-----------+-------------------+------------------+------------------+------------------+------------------+---------------+
|24238194|2015-05-07 19:52:06|        7.5|2015-05-07 20:52:06|-73.99981689453125| 40.73835372924805|   -73.99951171875| 40.72321701049805|              1|
|27835199|2009-07-17 20:04:56|        7.7|2009-07-17 21:04:56|        -73.994355|         40.728225|         -73.99471|         40.750325|              1|
|44984355|2009-08-24 21:45:00|       12.9|2009-08-24 22:45:00|        -74.005043|          40.74077|        -73.962565|         40.772647|              1|
|25894730|2009-06-26 08:22:21|        5.3|2009-06-26 09:22:21|        

In [34]:
sqldf = spark.sql("SELECT * FROM mytable")

AnalysisException: Table or view not found: mytable; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [mytable], [], false


In [39]:
df

DataFrame[index: int, key: timestamp, fare_amount: double, pickup_datetime: timestamp, pickup_longitude: double, pickup_latitude: double, dropoff_longitude: double, dropoff_latitude: double, passenger_count: int]