In [103]:
import findspark
findspark.init()

In [110]:
import pyspark
from pyspark.sql.session import SparkSession

In [138]:
spark = SparkSession.builder \
    .appName("ch03") \
    .master("yarn") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

25/05/11 10:49:51 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [112]:
spark.sparkContext

In [102]:
!hdfs dfs -ls /data/retail-data/by-day

Found 305 items
-rw-r--r--   2 hadoop supergroup     275001 2025-05-10 14:57 /data/retail-data/by-day/2010-12-01.csv
-rw-r--r--   2 hadoop supergroup     191826 2025-05-10 14:57 /data/retail-data/by-day/2010-12-02.csv
-rw-r--r--   2 hadoop supergroup     190700 2025-05-10 14:57 /data/retail-data/by-day/2010-12-03.csv
-rw-r--r--   2 hadoop supergroup     246056 2025-05-10 14:57 /data/retail-data/by-day/2010-12-05.csv
-rw-r--r--   2 hadoop supergroup     339039 2025-05-10 14:57 /data/retail-data/by-day/2010-12-06.csv
-rw-r--r--   2 hadoop supergroup     255832 2025-05-10 14:57 /data/retail-data/by-day/2010-12-07.csv
-rw-r--r--   2 hadoop supergroup     235974 2025-05-10 14:57 /data/retail-data/by-day/2010-12-08.csv
-rw-r--r--   2 hadoop supergroup     252904 2025-05-10 14:58 /data/retail-data/by-day/2010-12-09.csv
-rw-r--r--   2 hadoop supergroup     241468 2025-05-10 14:57 /data/retail-data/by-day/2010-12-10.csv
-rw-r--r--   2 hadoop supergroup     132120 2025-05-10 14:57 /data/retail-d

In [113]:
static_data_frame = spark.read.option("header", True).option("inferSchema", True).csv("hdfs:///data/retail-data/by-day/*.csv")

                                                                                

In [114]:
static_data_frame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [115]:
static_data_frame.createOrReplaceTempView("retail_data")
static_schema = static_data_frame.schema

Which customer making larger purchase day by day

In [116]:
# SQL way
spark.sql("""
    select DATE(InvoiceDate), CustomerID, round(sum(UnitPrice * Quantity), 2) as total_cost
    from retail_data
    where CustomerID is not NULL
    group by CustomerID, DATE(InvoiceDate)
    order by total_cost desc
    limit 5
""").show()

[Stage 6:>                                                          (0 + 1) / 1]

+-----------+----------+----------+
|InvoiceDate|CustomerID|total_cost|
+-----------+----------+----------+
| 2011-09-20|   17450.0|  71601.44|
| 2011-09-15|   18102.0|  31661.54|
| 2011-10-21|   18102.0|  29693.82|
| 2010-12-07|   18102.0|  25920.37|
| 2011-10-20|   14646.0|  25833.56|
+-----------+----------+----------+



                                                                                

In [117]:
from pyspark.sql.functions import col, desc, window, to_date, sum as _sum, round


#### window

<pre>
window() is used to group data into time-based buckets for aggregations on streaming or static datasets with timestamp/date columns.
For "1 day" window, record with timestamp 2025-05-07 12:04:56 falls into:
Window: 
start = 2025-05-07 00:00:00
end =   2025-05-08 00:00:00
</pre>


In [118]:
static_data_frame.selectExpr(
    "InvoiceDate",
    "CustomerID", 
    "(UnitPrice * Quantity) as cost") \
    .where(col("CustomerID").isNotNull()) \
    .groupBy(col("CustomerID"), window(col("InvoiceDate"), "1 day")) \
    .agg(round(_sum(col("cost")), 2).alias("cost")) \
    .withColumn("InvoiceDate", col("window").getItem("start")) \
    .orderBy(desc("cost")) \
    .limit(5) \
    .show()



+----------+--------------------+--------+-------------------+
|CustomerID|              window|    cost|        InvoiceDate|
+----------+--------------------+--------+-------------------+
|   17450.0|{2011-09-20 00:00...|71601.44|2011-09-20 00:00:00|
|   18102.0|{2011-09-15 00:00...|31661.54|2011-09-15 00:00:00|
|   18102.0|{2011-10-21 00:00...|29693.82|2011-10-21 00:00:00|
|   18102.0|{2010-12-07 00:00...|25920.37|2010-12-07 00:00:00|
|   14646.0|{2011-10-20 00:00...|25833.56|2011-10-20 00:00:00|
+----------+--------------------+--------+-------------------+



                                                                                

#### readStream

In [125]:
streaming_df = spark.readStream \
               .schema(static_schema) \
               .option("maxFilesPerTrigger", 100) \
               .format("csv") \
               .option("header", True) \
               .load("hdfs:///data/retail-data/by-day/*.csv")

                                                                                

In [134]:
purchase_day = streaming_df \
            .selectExpr(
                "InvoiceDate",
                "CustomerID", 
                "(UnitPrice * Quantity) as cost") \
            .where(col("CustomerID").isNotNull()) \
            .groupBy("CustomerID", window(col("InvoiceDate"), "1 day")) \
            .agg(round(_sum(col("cost")), 2).alias("total_cost")) \
            .orderBy(desc("total_cost"))         
            

In [None]:
query = purchase_day.writeStream \
        .format("console") \
        .queryName("customer_purchase") \
        .outputMode("complete")

query.start() 

25/05/11 09:43:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-66b528eb-6587-4bb7-9189-11abf24a966e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/11 09:43:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f1556bc1de0>

25/05/11 09:43:50 WARN FileStreamSource: Listed 305 file(s) in 2765 ms          
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+--------------------+----------+
|CustomerID|              window|total_cost|
+----------+--------------------+----------+
|   12931.0|{2011-08-04 00:00...|  19045.48|
|   17949.0|{2011-06-30 00:00...|  18854.78|
|   14646.0|{2011-03-29 00:00...|   18247.5|
|   16684.0|{2011-10-05 00:00...|  18047.42|
|   14156.0|{2011-01-14 00:00...|  16774.72|
|   12415.0|{2011-03-03 00:00...|  16558.14|
|   12415.0|{2011-10-05 00:00...|  16471.77|
|   18102.0|{2011-06-14 00:00...|  14471.92|
|   12415.0|{2011-02-15 00:00...|  14022.92|
|   12415.0|{2011-05-17 00:00...|   11924.8|
|   18102.0|{2011-02-07 00:00...|  10535.48|
|   14646.0|{2011-01-14 00:00...|  10389.06|
|   15769.0|{2011-03-17 00:00...|   10065.0|
|   14646.0|{2011-11-10 00:00...|   9823.12|
|   18102.0|{2011-05-17 00:00...|   9270.08|
|   18102.0|{2011-05-16 00:00...|   8895.66|
|   14156.0|{2011-09-05 00:00...|   8343.86|
|  

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+--------------------+----------+
|CustomerID|              window|total_cost|
+----------+--------------------+----------+
|   17450.0|{2011-09-20 00:00...|  71601.44|
|   18102.0|{2011-09-15 00:00...|  31661.54|
|   18102.0|{2010-12-07 00:00...|  25920.37|
|   14646.0|{2011-10-20 00:00...|  25833.56|
|   12415.0|{2011-06-15 00:00...|  23426.81|
|   12415.0|{2011-08-18 00:00...|  21880.44|
|   14646.0|{2011-08-11 00:00...|  19150.66|
|   12931.0|{2011-08-04 00:00...|  19045.48|
|   17949.0|{2011-06-30 00:00...|  18854.78|
|   14646.0|{2011-03-29 00:00...|   18247.5|
|   16684.0|{2011-10-05 00:00...|  18047.42|
|   14156.0|{2011-01-14 00:00...|  16774.72|
|   12415.0|{2011-03-03 00:00...|  16558.14|
|   14646.0|{2011-05-12 00:00...|  16478.46|
|   12415.0|{2011-10-05 00:00...|  16471.77|
|   17450.0|{2011-08-17 00:00...|   16084.9|
|   14646.0|{2011-09-19 00:00...|  15618.28|
|  

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+--------------------+----------+
|CustomerID|              window|total_cost|
+----------+--------------------+----------+
|   17450.0|{2011-09-20 00:00...|  71601.44|
|   18102.0|{2011-09-15 00:00...|  31661.54|
|   18102.0|{2011-10-21 00:00...|  29693.82|
|   18102.0|{2010-12-07 00:00...|  25920.37|
|   14646.0|{2011-10-20 00:00...|  25833.56|
|   12415.0|{2011-06-15 00:00...|  23426.81|
|   15749.0|{2011-01-11 00:00...|   22998.4|
|   18102.0|{2011-10-03 00:00...|  22429.69|
|   12415.0|{2011-08-18 00:00...|  21880.44|
|   14646.0|{2011-08-11 00:00...|  19150.66|
|   12931.0|{2011-08-04 00:00...|  19045.48|
|   17949.0|{2011-06-30 00:00...|  18854.78|
|   17450.0|{2011-01-11 00:00...|   18620.2|
|   14646.0|{2011-03-29 00:00...|   18247.5|
|   16684.0|{2011-10-05 00:00...|  18047.42|
|   14156.0|{2011-01-14 00:00...|  16774.72|
|   12415.0|{2011-03-03 00:00...|  16558.14|
|  

25/05/11 09:45:04 WARN FileStreamSource: Listed 305 file(s) in 2569 ms          

-------------------------------------------
Batch: 3
-------------------------------------------


                                                                                

+----------+--------------------+----------+
|CustomerID|              window|total_cost|
+----------+--------------------+----------+
|   17450.0|{2011-09-20 00:00...|  71601.44|
|   18102.0|{2011-09-15 00:00...|  31661.54|
|   18102.0|{2011-10-21 00:00...|  29693.82|
|   18102.0|{2010-12-07 00:00...|  25920.37|
|   14646.0|{2011-10-20 00:00...|  25833.56|
|   12415.0|{2011-06-15 00:00...|  23426.81|
|   15749.0|{2011-01-11 00:00...|   22998.4|
|   18102.0|{2011-10-03 00:00...|  22429.69|
|   12415.0|{2011-08-18 00:00...|  21880.44|
|   14646.0|{2011-08-11 00:00...|  19150.66|
|   12931.0|{2011-08-04 00:00...|  19045.48|
|   17949.0|{2011-06-30 00:00...|  18854.78|
|   17450.0|{2011-01-11 00:00...|   18620.2|
|   14646.0|{2011-02-21 00:00...|  18279.48|
|   14646.0|{2011-03-29 00:00...|   18247.5|
|   16684.0|{2011-10-05 00:00...|  18047.42|
|   14156.0|{2011-01-14 00:00...|  16774.72|
|   12415.0|{2011-03-03 00:00...|  16558.14|
|   18102.0|{2011-06-09 00:00...|   16488.0|
|   14646.

25/05/11 09:45:33 WARN FileStreamSource: Listed 305 file(s) in 2731 ms          
25/05/11 09:45:36 WARN FileStreamSource: Listed 305 file(s) in 2522 ms          
25/05/11 09:45:38 WARN FileStreamSource: Listed 305 file(s) in 2417 ms          
25/05/11 09:45:41 WARN FileStreamSource: Listed 305 file(s) in 2755 ms          
25/05/11 09:45:43 WARN FileStreamSource: Listed 305 file(s) in 2576 ms          
25/05/11 09:45:46 WARN FileStreamSource: Listed 305 file(s) in 2683 ms          
25/05/11 09:45:49 WARN FileStreamSource: Listed 305 file(s) in 2684 ms          
25/05/11 09:45:52 WARN FileStreamSource: Listed 305 file(s) in 2656 ms          
25/05/11 09:45:54 WARN FileStreamSource: Listed 305 file(s) in 2490 ms          

In [136]:
for stream in spark.streams.active:
    stream.stop()

25/05/11 09:45:55 WARN TaskSetManager: Lost task 172.0 in stage 139.0 (TID 28377) (kumar-rke2-2 executor 1): TaskKilled (Stage cancelled: Job 111 cancelled part of cancelled job group 4d09f35c-48e4-4129-a5f0-e1d5aa4b15ff)


### RDD

<pre>
An RDD can hold any type: ints, strings, dicts, objects — Spark doesn't care at this level. It's just distributed data.
</pre>

In [None]:

rdd = spark.sparkContext.parallelize(range(1000))

In [144]:
rdd.take(3)

[0, 1, 2]

In [145]:
rdd.toDF()

                                                                                

PySparkTypeError: [CANNOT_INFER_SCHEMA_FOR_TYPE] Can not infer schema for type: `int`.

<pre>
To convert to a DataFrame, Spark needs structured rows, not just raw elements like 0, 1, 2.

That's why this fails:
rdd.toDF()  # ❌ PySparkTypeError
</pre>
#### Fix:
<pre>
rdd.map(lambda x: (x,)).toDF(["value"])  # ✅

map(lambda x: (x,)) makes each item a 1-column tuple, like (0,), (1,).
.toDF(["value"]) assigns a column name.
</pre>

In [148]:
df = rdd.map(lambda x: (x,)).toDF(["value"])
df.show(5)

[Stage 4:>                                                          (0 + 1) / 1]

+-----+
|value|
+-----+
|    0|
|    1|
|    2|
|    3|
|    4|
+-----+
only showing top 5 rows



                                                                                

In [149]:
# Another Way

In [153]:
from pyspark.sql.types import StructType, StructField, IntegerType

schema = StructType([StructField("value", IntegerType(), True)])
df1 = spark.createDataFrame(rdd.map(lambda x: [x]), schema)
df1.show(5)

+-----+
|value|
+-----+
|    0|
|    1|
|    2|
|    3|
|    4|
+-----+
only showing top 5 rows



                                                                                

In [155]:
rdd.getNumPartitions()

2

### View Data By partitions

In [None]:
#This will create 10 partitions, allowing Spark to process your data in 10 parallel tasks
rdd1 = spark.sparkContext.parallelize(range(50), numSlices=10)

def show_partition(index, iterator):
    yield index, list(iterator)

partioned_data = rdd1.mapPartitionsWithIndex(show_partition)

for part in partioned_data.collect():
    print(part[0], part[1])



0 [0, 1, 2, 3, 4]
1 [5, 6, 7, 8, 9]
2 [10, 11, 12, 13, 14]
3 [15, 16, 17, 18, 19]
4 [20, 21, 22, 23, 24]
5 [25, 26, 27, 28, 29]
6 [30, 31, 32, 33, 34]
7 [35, 36, 37, 38, 39]
8 [40, 41, 42, 43, 44]
9 [45, 46, 47, 48, 49]


                                                                                

🌀 What's a shuffle?

A shuffle happens when data needs to move between partitions on different nodes. Common operations that cause this:

    groupBy(...)

    join(...)

    distinct(...)

    orderBy(...)

    repartition(...)

When shuffle happens:

    Spark redistributes the data.
    The number of output partitions is controlled by spark.sql.shuffle.partitions.

In [163]:
spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [164]:
spark.stop()