# Spark

Spark is open source distributed computing engine designed to process large scale data quickly in parallel. It does this by keeping data in memorary rather than writing to disk after every operation. While there are multiple uses, we will be focusing on PySpark/Spark SQL

### **Architecture:**

Application -> Framework (Driver + Cluster Manager) -> Workers/Executors

**Driver:**

- Main Node (Brain)
- Starts the Spark Application
- Defines actions/transformations done on data
- Coordintes with cluster manager
- Collects final results from executors

**Cluster Manager:**

- Allocates resources to driver and executor nodes
- Monitors health of worker/executor nodes
- Cleans up resources after session
- Egs: local, YARN, Standalone, Kubernetes
- We need to submit via .master() method to connect with the cluster manager

**Worker/Executor:**

- Node where actual transformation happens
- Each worker node contains slots, as manay slots as number of CPU cores
- Data is partitioned and each partition is processed on a slot at a time; this is called Task, smallest unit of work


Data Transformations are lazy, nothing is performed until asked for. This is done via actions like show/count/collect etc.

Each job can be divided into stages and each stage into tasks. Stages are group of tasks with no data shuffling required.

Data Shuffling:

- Redistributing/exchanging data across partitions
- Wide transformations (groupby, join) requires data shuffling
- Requires saving data to disk, sending over network and reading again from disk

Spark Jobs can be run on cloud platforms like:

- Amazon EMR
- Databricks
- Synapse




In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = (
    SparkSession.builder
    .master("local[*]")     # use all CPU cores
    .appName("Basics")
    .config("spark.driver.memory", "4g")  # driver memory 
    .config("spark.executor.memory", "4g")  # memory per executor 
    .config("spark.executor.cores", "4")   # use multiple cores per executor 
    .getOrCreate()
)

In [3]:
spark

We can visit the Spark UI through the above link to monitor job, their schedules, actions plan, spark session information.

Submitting a spark job to a remote cluster can we done via CLI/bash and adding the .py file using SSH. Alternatively we can upload the script to say S3 and execute their. We can open jupyter notebooks on the above mentioned cloud platforms and execute directly. If there is an on-premise cloud service, we will need to provide the connection details via master/config commands or submit via SSH.

**Majority of syntax is similar to pandas. There are almost all equivalent pandas command and which can be referred via the [documentation]().**

In [4]:
#Reading csv
df = spark.read.csv([r"C:\Users\dydmu\Downloads\dd\yellow_tripdata_2016-03.csv",
                            r"C:\Users\dydmu\Downloads\dd\yellow_tripdata_2016-02.csv",
                            r"C:\Users\dydmu\Downloads\dd\yellow_tripdata_2016-01.csv",
                            r"C:\Users\dydmu\Downloads\dd\yellow_tripdata_2015-01.csv"
                          ]
                          , header = True)

In [9]:
df.count() #Number of records

47248845

In [10]:
type(df)

pyspark.sql.dataframe.DataFrame

In [22]:
df.printSchema()
#Equivalent to df.info

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)



In [18]:
df.show(1)

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2016-03-01 00:00:00|  2016-03-01 00:07:55|              1|         2.50|-73.97674560546875|40.765151977539062|         1| 

In [41]:
df.groupBy('store_and_fwd_flag').count().show()

+------------------+--------+
|store_and_fwd_flag|   count|
+------------------+--------+
|                 Y|  310683|
|                 N|46938162|
+------------------+--------+



In [19]:
#List out columns
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'RatecodeID',
 'store_and_fwd_flag',
 'dropoff_longitude',
 'dropoff_latitude',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount']

In [21]:
#Filtering data
df.filter(df.total_amount <= 10).count()

21759996

In [31]:
#Create new column using withColumn
#np.where equivalent is .when()
df = df.withColumn('payment_type', F.when(df.payment_type == 1, 'cash').otherwise('credit'))

In [33]:
df.filter((df.total_amount <= 10) & (df.payment_type == 'credit')).count()

10052335

In [68]:
#Grouping the data
#Can using multiple filter or use AND/OR
(
    df.filter(df.passenger_count <= 5)
    .filter(df.store_and_fwd_flag == 'N') 
    .groupBy('payment_type', 'passenger_count')
    .count()
    .show()
)

+------------+---------------+--------+
|payment_type|passenger_count|   count|
+------------+---------------+--------+
|        cash|              4|  518657|
|        cash|              1|22026778|
|      credit|              5|  873114|
|        cash|              5| 1678344|
|      credit|              1|11259639|
|      credit|              2| 2440027|
|        cash|              3| 1172957|
|      credit|              3|  729242|
|        cash|              2| 4236662|
|      credit|              0|    2995|
|      credit|              4|  387071|
|        cash|              0|    4768|
+------------+---------------+--------+



In [74]:
df.filter(df.passenger_count == 0).count()

8214

In [75]:
#Multiple aggregations using .agg()
(
    df.filter(df.store_and_fwd_flag == 'N')
    .groupBy('payment_type', 'passenger_count')
    .agg(F.count('VendorID').alias('count'), F.avg('total_amount').alias('avg'))
    .show()   
)

+------------+---------------+--------+------------------+
|payment_type|passenger_count|   count|               avg|
+------------+---------------+--------+------------------+
|        cash|              7|      58|45.523448275862066|
|        cash|              4|  518657|  17.2535993922764|
|        cash|              1|22026778| 16.99704312583126|
|      credit|              5|  873114| 12.72630430848364|
|        cash|              5| 1678344| 17.08351491708387|
|      credit|              1|11259639|12.431929109800231|
|        cash|              6| 1046978| 16.75319064965837|
|      credit|              2| 2440027|13.538295752463975|
|        cash|              3| 1172957| 17.09728693378995|
|      credit|              3|  729242|13.579074669312753|
|        cash|              2| 4236662|17.619134630054383|
|      credit|              0|    2995|10.826020033388959|
|      credit|              4|  387071|13.745014403043628|
|      credit|              6|  560708|12.58255694586208

For querying in a SQL type manner, we create views. Views are temporary SQL table/views queried by Spark SQL. These can be restricted to individual/global sessions

In [76]:
#Create view
df.createOrReplaceTempView('data')

In [79]:
spark.sql("""select payment_type, passenger_count, count(*) as count, avg(total_amount) as avg
          from data
          where store_and_fwd_flag = "N"
          group by payment_type, passenger_count""").show()

+------------+---------------+--------+------------------+
|payment_type|passenger_count|   count|               avg|
+------------+---------------+--------+------------------+
|        cash|              7|      58|45.523448275862066|
|        cash|              4|  518657|  17.2535993922764|
|        cash|              1|22026778| 16.99704312583126|
|      credit|              5|  873114| 12.72630430848364|
|        cash|              5| 1678344| 17.08351491708387|
|      credit|              1|11259639|12.431929109800231|
|        cash|              6| 1046978| 16.75319064965837|
|      credit|              2| 2440027|13.538295752463975|
|        cash|              3| 1172957| 17.09728693378995|
|      credit|              3|  729242|13.579074669312753|
|        cash|              2| 4236662|17.619134630054383|
|      credit|              0|    2995|10.826020033388959|
|      credit|              4|  387071|13.745014403043628|
|      credit|              6|  560708|12.58255694586208

### Connecting and querying database

In [None]:
# Reading data

jdbc_url = "jdbc:mysql://localhost:3306/my_database"
table_name = "my_table"
properties = {
    "user": "my_user",
    "password": "my_password",
    "driver": "com.mysql.cj.jdbc.Driver"   # MySQL 8+ driver
}

query = """
(select *
from my_table
where column <= N) as my_query
"""

df = spark.read.jdbc(url=jdbc_url, table=query, properties=properties,
                    column="id",      # numeric column for partitioning
                    lowerBound=1,     # min value of column
                    upperBound=10000, # max value of column
                    numPartitions=10)

df.show()

In [None]:
# Writing data

df.write.jdbc(url=jdbc_url, 
              table="my_new_table", 
              mode="append",
              properties=properties)