# PySpark之Structured Streaming基本操作
思想：将实时数据流视为一张正在不断添加的数据的表，可以把流计算等同于在一个静态表上的批处理查询，Spark会在不断添加数据的无界输入表上运行计算，并进行增量查询。  

编写Structured Streaming程序的基本步骤包括：  
- 导入pyspark模块
- 创建SparkSession对象
- 创建输入数据源
- 定义流计算过程
- 启动流计算并输出结果

两种处理模型：  
(1) 微批处理  
(2) 持续处理

## 词频统计
目标：一个包含很多英文语句的数据流远远不断到达，Structured Streaming程序对每行英文语句进行拆分，并统计每个单词出现的频率。  
![](./imgs/out_struct.png)

In [4]:
%%writefile structurednetworkwordcount.py
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import Row
from pyspark.sql.functions import split
from pyspark.sql.functions import explode

# 创建SparkSession对象
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
sc = spark.sparkContext

# 创建输入数据源
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# 定义流计算过程
words = lines.select(explode(split(lines.value, " ")).alias("word"))
word_count = words.groupBy("word").count()

# 启动流计算并输出结果
query = word_count \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="8 seconds") \
    .start()

query.awaitTermination()

## 文件读写

In [21]:
import os
import shutil
import random
import time

TEST_DATA_TEMP_DIR = "./data/tmp/"
TEST_DATA_DIR = "./data/tmp/testdata/"
ACTION_DEF = ["login", "logout", "purchase"]
DISTRICT_DEF = ["fujian", "beijin", "shanghai", "guangzhou"]
JSIN_LINE_PATTERN = '{{"eventTime": {}, "action": {}, "district":{}}}\n'

def testSetUp():
    """创建临时文件目录"""
    
    if os.path.exists(TEST_DATA_DIR):
        shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
        
    os.mkdir(TEST_DATA_DIR)
    
def testTearDown():
    """恢复测试环境"""
    
    if os.path.exists(TEST_DATA_DIR):
        shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)

def writeAndMove(filename, data):
    """生成测试文件"""
    
    with open(TEST_DATA_TEMP_DIR + filename, "wt", encoding="utf-8") as f:
        f.write(data)
    
    shutil.move(TEST_DATA_TEMP_DIR + filename, TEST_DATA_DIR + filename)

if __name__ == "__main__":
    
    testSetUp()
    
    for i in range(100):
        
        filename = 'e-mall-{}.json'.format(i)
        content = ""
        rndcount = list(range(100))
        random.shuffle(rndcount)
        
        for _ in rndcount:
            content += JSIN_LINE_PATTERN.format(str(int(time.time()))
                                                , random.choice(ACTION_DEF)
                                                , random.choice(DISTRICT_DEF)
                                               )
        writeAndMove(filename, content)
        time.sleep(1)
        
    testTearDown()

KeyboardInterrupt: 

In [32]:
# %%writefile spark_ss_filesource.py
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import Row
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql.types import StructField, StructType
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import window, asc

In [33]:
TEST_DATA_DIR_SPARK = "file://" + os.getcwd() + "data/tmp/testdata/"
schema = StructType([StructField("eventTime", TimestampType(), True)
                     , StructField("action", StringType(), True)
                     , StructField("district", StringType(), True)
                    ])

In [34]:
# 创建SparkSession对象
spark = SparkSession.builder.appName("StructuredEmallPurchaseCount.py").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
sc = spark.sparkContext

In [35]:
# 创建输入数据源
lines = spark.readStream \
    .format("json") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 100) \
    .load(TEST_DATA_DIR_SPARK)

# 定义流计算过程
words = lines.filter("action='purchase'").groupBy("district", window("eventTime", windowDuration)).count()
word_count = words.sort(asc("window"))

# 启动流计算并输出结果
query = word_count \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="8 seconds") \
    .start()

query.awaitTermination()

AnalysisException: Path does not exist: file:///home/gavin/Machine/PySpark/Spark基础data/tmp/testdata/

## 知识点
### explode的用法

In [6]:
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.show()

+---+---------+--------+
|  a|  intlist|mapfield|
+---+---------+--------+
|  1|[1, 2, 3]|{a -> b}|
+---+---------+--------+



In [7]:
eDF.select(explode(eDF.intlist).alias("anInt")).show()

+-----+
|anInt|
+-----+
|    1|
|    2|
|    3|
+-----+



## 参考
[Hadoop上传文件报错could only be written to 0 of the 1 minReplication nodes.](https://blog.csdn.net/sinat_38737592/article/details/101628357)