# Operations on Streaming Dataframes

---

[pyspark sql functions](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html) - SparkSQL 의 함수들을 많이 구현해놨음, 컬럼에 씌울 수 있는 함수들

[DataFrame API in scala](https://spark.apache.org/docs/3.0.3/api/scala/org/apache/spark/sql/Dataset.html) - 데이터프레임 객체를 조작하는 함수들

정적 DataFrame 을 조작하는 것과 동일

---

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
from pyspark.sql.functions import window, current_timestamp
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, TimestampType

In [2]:
spark = SparkSession.builder.appName("StreamingBuyCounter").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7feff03e5e20>


---

### 정의된 스키마에 맞게 소스가 들어온다고 가정

In [3]:
schema = StructType().add("time", "string").add("oId", "integer").add("cId", "integer")\
              .add("qty", "integer").add("price", "float").add("buy", "string")

filestream = spark.readStream.option("sep",",").csv("/data", schema=schema)

| time              | oId   | cId   | qty | price | buy  |
|-------------------|-------|-------|-----|-------|------|
| 3/18/2018 2:15:18 | 34626 | 39835 |  5  | 4.72  | buy  |
| 3/18/2018 2:24:31 | 84260 | 5443  |  9  | 15.26 | buy  |
| 3/18/2018 2:33:44 | 56050 | 77178 |  8  | 4.07  | buy  |
| 3/18/2018 2:42:57 | 32973 | 34441 |  5  | 15.49 | sell |
| 3/18/2018 2:52:10 | 57264 | 98905 |  8  | 1.31  | sell |
| 3/18/2018 3:01:23 | 21039 | 5821  |  9  | 18.85 | buy  |
| 3/18/2018 3:10:36 | 31880 | 86234 |  6  | 19.22 | buy  |
| 3/18/2018 3:19:49 | 82931 | 29797 |  9  | 18.10 | buy  |

### 데이터프레임 확인

In [4]:
print("columns:", filestream.columns, '\n')
print("schema:", filestream.schema, '\n')
print("dtypes:", filestream.dtypes, '\n')
print("explain:", filestream.explain(), '\n')

columns: ['time', 'oId', 'cId', 'qty', 'price', 'buy'] 

schema: StructType(List(StructField(time,StringType,true),StructField(oId,IntegerType,true),StructField(cId,IntegerType,true),StructField(qty,IntegerType,true),StructField(price,FloatType,true),StructField(buy,StringType,true))) 

dtypes: [('time', 'string'), ('oId', 'int'), ('cId', 'int'), ('qty', 'int'), ('price', 'float'), ('buy', 'string')] 

== Physical Plan ==
StreamingRelation FileSource[/data], [time#0, oId#1, cId#2, qty#3, price#4, buy#5]


explain: None 



---

### 특정 열 선택 (select)

In [5]:
column_selection = filestream.select("qty", "price", "buy")
column_selection.columns

['qty', 'price', 'buy']

| qty | price | buy  |
|-----|-------|------|
|  5  | 4.72  | buy  |
|  9  | 15.26 | buy  |
|  8  | 4.07  | buy  |
|  5  | 15.49 | sell |
|  8  | 1.31  | sell |
|  9  | 18.85 | buy  |
|  6  | 19.22 | buy  |
|  9  | 18.10 | buy  |

---

### 조건 필터링 (where)

In [6]:
price_over_thres = column_selection.where("price >= 10")
price_over_thres.dtypes

[('qty', 'int'), ('price', 'float'), ('buy', 'string')]

| qty | price | buy  |
|-----|-------|------|
|  9  | 15.26 | buy  |
|  5  | 15.49 | sell |
|  9  | 18.85 | buy  |
|  6  | 19.22 | buy  |
|  9  | 18.10 | buy  |

---

### 집계 (groupBy)

In [7]:
buySellMean = price_over_thres.groupBy("buy").mean("price").withColumnRenamed("avg(price)", "tmp")
buySellMean.dtypes

[('buy', 'string'), ('tmp', 'double')]

| buy | tmp |
|-----|-------|
|  buy  | 17.8575 |
|  sell  | 15.49 |

### 함수적용 (withColumn, udf)

In [8]:
from pyspark.sql.types import FloatType

func = udf(lambda x: round(x,1), FloatType())
result = buySellMean.withColumn("mean_price", func(buySellMean.tmp)).select("buy", "mean_price")
result.dtypes

[('buy', 'string'), ('mean_price', 'float')]

| buy | mean_price |
|-----|-------|
|  buy  | 17.9 |
|  sell  | 15.5 |

### lazy eval, sql optimizing

In [9]:
result.explain()

== Physical Plan ==
*(5) Project [buy#5, pythonUDF0#33 AS mean_price#26]
+- BatchEvalPython [<lambda>(agg#32)], [pythonUDF0#33]
   +- *(4) HashAggregate(keys=[buy#5], functions=[avg(cast(price#4 as double))])
      +- StateStoreSave [buy#5], state info [ checkpoint = <unknown>, runId = db50f5aa-f82b-4b21-824f-15d7023727f3, opId = 0, ver = 0, numPartitions = 200], Append, 0, 2
         +- *(3) HashAggregate(keys=[buy#5], functions=[merge_avg(cast(price#4 as double))])
            +- StateStoreRestore [buy#5], state info [ checkpoint = <unknown>, runId = db50f5aa-f82b-4b21-824f-15d7023727f3, opId = 0, ver = 0, numPartitions = 200], 2
               +- *(2) HashAggregate(keys=[buy#5], functions=[merge_avg(cast(price#4 as double))])
                  +- Exchange hashpartitioning(buy#5, 200), true, [id=#60]
                     +- *(1) HashAggregate(keys=[buy#5], functions=[partial_avg(cast(price#4 as double))])
                        +- *(1) Project [price#4, buy#5]
                      

### 그 외 기타 등등 - 필요에 따라 공식문서에서 찾아 쓰기

In [10]:
from pyspark.sql.functions import reverse, substring, sqrt

test = result.withColumn("test", substring(reverse(result.buy), 0,2))
test = test.withColumn("test2", sqrt(result.mean_price))

---

### csv 파일 수신

In [None]:
query = result.writeStream.format("console").outputMode("complete").start()
query.awaitTermination()

21/09/28 03:45:57 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1e297734-cf7e-490b-bf92-3076940e6e4a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------+
| buy|mean_price|
+----+----------+
| buy|      15.4|
|sell|      15.5|
+----+----------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----+----------+
| buy|mean_price|
+----+----------+
| buy|      15.4|
|sell|      15.5|
+----+----------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+----+----------+
| buy|mean_price|
+----+----------+
| buy|      15.4|
|sell|      15.3|
+----+----------+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+----+----------+
| buy|mean_price|
+----+----------+
| buy|      15.1|
|sell|      15.2|
+----+----------+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+----+----------+
| buy|mean_price|
+----+----------+
| buy|      15.1|
|sell|      15.3|
+----+----------+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+----+----------+
| buy|mean_price|
+----+----------+
| buy|      15.1|
|sell|      15.3|
+----+----------+



                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+----+----------+
| buy|mean_price|
+----+----------+
| buy|      15.2|
|sell|      15.2|
+----+----------+



                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+----+----------+
| buy|mean_price|
+----+----------+
| buy|      15.1|
|sell|      15.2|
+----+----------+



                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+----+----------+
| buy|mean_price|
+----+----------+
| buy|      15.2|
|sell|      15.3|
+----+----------+

