In [12]:
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("spark-the-definitive-guide-study/assets/exercises/week03/by-day/2010-12-01.csv")

In [13]:
staticDataFrame.explain(mode='extended')

== Parsed Logical Plan ==
Relation [InvoiceNo#198,StockCode#199,Description#200,Quantity#201,InvoiceDate#202,UnitPrice#203,CustomerID#204,Country#205] csv

== Analyzed Logical Plan ==
InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string
Relation [InvoiceNo#198,StockCode#199,Description#200,Quantity#201,InvoiceDate#202,UnitPrice#203,CustomerID#204,Country#205] csv

== Optimized Logical Plan ==
Relation [InvoiceNo#198,StockCode#199,Description#200,Quantity#201,InvoiceDate#202,UnitPrice#203,CustomerID#204,Country#205] csv

== Physical Plan ==
FileScan csv [InvoiceNo#198,StockCode#199,Description#200,Quantity#201,InvoiceDate#202,UnitPrice#203,CustomerID#204,Country#205] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/hjb/Developer/spark-the-definitive-guide-study/assets/exer..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Invoice

In [14]:
staticSchema = staticDataFrame.schema

In [16]:
staticDataFrame.explain(mode='formatted')

== Physical Plan ==
Scan csv  (1)


(1) Scan csv 
Output [8]: [InvoiceNo#198, StockCode#199, Description#200, Quantity#201, InvoiceDate#202, UnitPrice#203, CustomerID#204, Country#205]
Batched: false
Location: InMemoryFileIndex [file:/Users/hjb/Developer/spark-the-definitive-guide-study/assets/exercises/week03/by-day/2010-12-01.csv]
ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:int,InvoiceDate:timestamp,UnitPrice:double,CustomerID:double,Country:string>




In [18]:
streamingDataFrame = spark.readStream.format("csv")\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("spark-the-definitive-guide-study/assets/exercises/week03/by-day/*.csv")

In [19]:
import pyspark.sql.functions as F

In [29]:
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
    "CustomerID",
    "(UnitPrice * Quantity) as total_cost",
    "to_date(to_timestamp(InvoiceDate, 'M/d/yyyy H:mm')) as InvoiceDate")\
.groupBy(
    F.col("CustomerID"), F.window(F.col("InvoiceDate"), "1 day"))\
.sum("total_cost")

In [30]:
purchaseByCustomerPerHour.explain()

== Physical Plan ==
*(4) HashAggregate(keys=[CustomerID#226, window#311], functions=[sum(total_cost#296)])
+- StateStoreSave [CustomerID#226, window#311], state info [ checkpoint = <unknown>, runId = bb1dec25-dd9b-4667-b0a4-1f01fbc51c4c, opId = 0, ver = 0, numPartitions = 200], Append, 0, 2
   +- *(3) HashAggregate(keys=[CustomerID#226, window#311], functions=[merge_sum(total_cost#296)])
      +- StateStoreRestore [CustomerID#226, window#311], state info [ checkpoint = <unknown>, runId = bb1dec25-dd9b-4667-b0a4-1f01fbc51c4c, opId = 0, ver = 0, numPartitions = 200], 2
         +- *(2) HashAggregate(keys=[CustomerID#226, window#311], functions=[merge_sum(total_cost#296)])
            +- Exchange hashpartitioning(CustomerID#226, window#311, 200), ENSURE_REQUIREMENTS, [plan_id=510]
               +- *(1) HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(CustomerID#226)) AS CustomerID#226, window#311], functions=[partial_sum(total_cost#296)])
                  +- *(1) Pro

In [24]:
purchaseByCustomerPerHour.explain(mode='extended')

== Parsed Logical Plan ==
'Aggregate ['CustomerID, window('InvoiceDate, 86400000000, 86400000000, 0) AS window#242], ['CustomerID, window('InvoiceDate, 86400000000, 86400000000, 0) AS window#242, sum(total_cost#236) AS sum(total_cost)#247]
+- Project [CustomerID#226, (UnitPrice#225 * cast(Quantity#223 as double)) AS total_cost#236, to_date(to_timestamp(InvoiceDate#224, Some(M/d/yyyy H:mm), TimestampType, Some(Asia/Tokyo)), None, Some(Asia/Tokyo)) AS InvoiceDate#237]
   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@49f4de59,csv,List(),Some(StructType(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,TimestampType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true))),List(),None,Map(maxFilesPerTrigger -> 1, header -> true, path -> spark-the-definitive-guide-study/assets

In [28]:
purchaseByCustomerPerHourError = streamingDataFrame\
.selectExpr(
    "CustomerID",
    "(UnitPrice * Quantty) as total_cost", # Quantity 오타 테스트
    "to_date(to_timestamp(InvoiceDate, 'M/d/yyyy H:mm')) as InvoiceDate")\
.groupBy(
    F.col("CustomerID"), F.window(F.col("InvoiceDate"), "1 day"))\
.sum("total_cost")

AnalysisException: Column 'Quantty' does not exist. Did you mean one of the following? [Quantity, Country, CustomerID, InvoiceNo, StockCode, UnitPrice, Description, InvoiceDate]; line 1 pos 13;
'Project [CustomerID#226, (UnitPrice#225 * 'Quantty) AS total_cost#294, to_date(to_timestamp(InvoiceDate#224, Some(M/d/yyyy H:mm), TimestampType, Some(Asia/Tokyo)), None, Some(Asia/Tokyo)) AS InvoiceDate#295]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@49f4de59,csv,List(),Some(StructType(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,TimestampType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true))),List(),None,Map(maxFilesPerTrigger -> 1, header -> true, path -> spark-the-definitive-guide-study/assets/exercises/week03/by-day/*.csv),None), FileSource[spark-the-definitive-guide-study/assets/exercises/week03/by-day/*.csv], [InvoiceNo#220, StockCode#221, Description#222, Quantity#223, InvoiceDate#224, UnitPrice#225, CustomerID#226, Country#227]


In [31]:
purchaseByCustomerPerHour.explain(mode='formatted')

== Physical Plan ==
* HashAggregate (11)
+- StateStoreSave (10)
   +- * HashAggregate (9)
      +- StateStoreRestore (8)
         +- * HashAggregate (7)
            +- Exchange (6)
               +- * HashAggregate (5)
                  +- * Project (4)
                     +- * Project (3)
                        +- * Filter (2)
                           +- StreamingRelation (1)


(1) StreamingRelation
Output [8]: [InvoiceNo#220, StockCode#221, Description#222, Quantity#223, InvoiceDate#224, UnitPrice#225, CustomerID#226, Country#227]
Arguments: FileSource[spark-the-definitive-guide-study/assets/exercises/week03/by-day/*.csv], [InvoiceNo#220, StockCode#221, Description#222, Quantity#223, InvoiceDate#224, UnitPrice#225, CustomerID#226, Country#227]

(2) Filter [codegen id : 1]
Input [8]: [InvoiceNo#220, StockCode#221, Description#222, Quantity#223, InvoiceDate#224, UnitPrice#225, CustomerID#226, Country#227]
Condition : isnotnull(cast(cast(gettimestamp(InvoiceDate#224, M/d/yyyy H:mm, 

In [33]:
purchaseByCustomerPerHour.explain(mode='codegen')

Found 4 WholeStageCodegen subtrees.
== Subtree 1 / 4 (maxMethodCodeSize:556; maxConstantPoolSize:313(0.48% used); numInnerClasses:0) ==
*(1) HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(CustomerID#226)) AS CustomerID#226, window#319], functions=[partial_sum(total_cost#296)], output=[CustomerID#226, window#319, sum#313])
+- *(1) Project [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(InvoiceDate#297 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(InvoiceDate#297 as timestamp), TimestampType, LongType) - 0) + 86400000000) % 86400000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(InvoiceDate#297 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(InvoiceDate#297 as timestamp), TimestampType, LongType) - 0) + 86400000000) % 86400000000)) - 0) + 86400000000), LongType, TimestampType)) AS window#319, CustomerID#226, total_cos

In [34]:
purchaseByCustomerPerHour.explain(mode='cost')

== Optimized Logical Plan ==
Aggregate [CustomerID#226, window#321], [CustomerID#226, window#321, sum(total_cost#296) AS sum(total_cost)#306], Statistics(sizeInBytes=2.8 EiB)
+- Project [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(InvoiceDate#297 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(InvoiceDate#297 as timestamp), TimestampType, LongType) - 0) + 86400000000) % 86400000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(InvoiceDate#297 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(InvoiceDate#297 as timestamp), TimestampType, LongType) - 0) + 86400000000) % 86400000000)) - 0) + 86400000000), LongType, TimestampType)) AS window#321, CustomerID#226, total_cost#296], Statistics(sizeInBytes=2.8 EiB)
   +- Project [CustomerID#226, (UnitPrice#225 * cast(Quantity#223 as double)) AS total_cost#296, cast(gettimestamp(InvoiceDate#224, M/d/