SPARK Architecture:

- It follows master-slave architecture. 
- It works on the concept of cluster.
- A cluster is a group of machines that work together to process and analyze data.
- Spark Distribute the data and computations across multiple nodes in the cluster, allowing for parallel processing and faster data processing.
- One of the nodes acts as the master, and the rest act as worker nodes. 


Spark Architecture Components:

- Resource Manager (Master): Act as the manager that allocate resources for your work.
- Cluster : Comprises the Driver and Workers (nodes).
            
            1. Driver Node (Team Lead): Orchestrates the work. The Resource Manager allocates the  driver. The Driver understand the spark code and distributes the work among the workers.
            2. Worker Node (Executors):  The  developers who actually perform the work.


The Submission Flow:

- A developer writes a spark Application (code) and submits it using the ‘spark-submit’ to the Resource Manager.
- In the ‘spark-submit’ command, the developer specifies the resource requirements, and the size of the Executors (Worker Node).
- The resource Manager first creates the Driver Node and connects to it.
- The Driver Node reads the instruction and requests the required Executors from the Resource Manager.
- The Resource Manager creates the Executors (Workers).
- The Driver Node connects to the workers and distributes the work. The Workers perform the actual execution, while the Driver orchestrates and monitors.

The master role changes depending on the state: initially, the Resource Manager is the master. Once the driver and workers are connected, the Driver Node becomes the master as it gives instructions.


In [0]:
df = spark.read.option("header", True).csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv")

In [0]:
df.show(2)

+--------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:...|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:...|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|530496790|8e5f4f83-366c-4f7...|
+--------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
only showing top 2 rows


In [0]:
df.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



In [0]:
df.select("event_type", "category_code", "price").show(10)

+----------+--------------------+------+
|event_type|       category_code| price|
+----------+--------------------+------+
|      view|electronics.smart...|489.07|
|      view|appliances.sewing...|293.65|
|      view|                NULL| 28.31|
|      view|appliances.kitche...|712.87|
|      view|electronics.smart...|183.27|
|      view|  computers.notebook|360.09|
|      view|  computers.notebook|514.56|
|      view|                NULL| 30.86|
|      view|                NULL| 72.72|
|      view|electronics.smart...|732.07|
+----------+--------------------+------+
only showing top 10 rows


In [0]:
from pyspark.sql.functions import col

df = df.withColumn("price", col("price").cast("double"))
df.filter(col("price") > 100).count()

44549993

In [0]:
df.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



In [0]:
df.groupby("category_code").count().show()

+--------------------+--------+
|       category_code|   count|
+--------------------+--------+
| stationery.cartrige|   12778|
|electronics.video.tv| 2208046|
|  accessories.wallet|   70394|
|appliances.kitche...|   75128|
|                NULL|21898171|
|construction.tool...|  148510|
|appliances.enviro...|  285987|
|country_yard.furn...|    1589|
|       apparel.shoes| 1886890|
|electronics.audio...|   46749|
|appliances.kitche...|  160932|
|electronics.audio...|  319138|
|country_yard.lawn...|    6391|
|furniture.kitchen...|  198092|
|electronics.audio...|  142022|
|apparel.shoes.bal...|    4379|
|auto.accessories....|   89041|
|appliances.enviro...|  209371|
|   computers.desktop|  690502|
|   apparel.underwear|   47712|
+--------------------+--------+
only showing top 20 rows
