In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
sparkSession = SparkSession.builder.config(conf=SparkConf() \
                                          .setAppName('Health Data Generator') \
                                          .setMaster('local[4]')) \
                                    .enableHiveSupport() \
                                    .getOrCreate()

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

https://blog.knoldus.com/spark-structured-streaming-part-4-handling-late-data/

https://medium.com/towardsdataanalytics/spark-streaming-vs-structured-streaming-ef6863d5b60

In [None]:
# Read text from socket
socketDF = sparkSession \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

In [None]:
schema = StructType([StructField('Transaction_ID', StringType(), True) \
            , StructField('Transaction_Date', StringType(), True) \
            , StructField('Transaction_Value', IntegerType(), True) \
            , StructField('Transaction_Segment', StringType(), True) \
            , StructField('Credit_Card_ID', StringType(), True)])

df = sparkSession.readStream.schema(schema) \
            .csv('/Spark_streaming/TransactionBase/')

df = df.select('*').filter((df.Transaction_Value < 10000))

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
schema = StructType([StructField('Transaction_ID', StringType(), True) \
            , StructField('Transaction_Date', StringType(), True) \
            , StructField('Transaction_Value', IntegerType(), True) \
            , StructField('Transaction_Segment', StringType(), True) \
            , StructField('Credit_Card_ID', StringType(), True)])

df = sparkSession.readStream.format('csv') \
            .schema(schema) \
            .option('path','/Spark_streaming/TransactionBase/') \
            .load()

df = df.select('*').filter((df.Transaction_Value < 10000))

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

#### latestFirst: 
whether to process the latest new files first, useful when there is a large backlog of files (default: false) 

In [None]:
schema = StructType([StructField('Transaction_ID', StringType(), True) \
            , StructField('Transaction_Date', StringType(), True) \
            , StructField('Transaction_Value', IntegerType(), True) \
            , StructField('Transaction_Segment', StringType(), True) \
            , StructField('Credit_Card_ID', StringType(), True)])

df = sparkSession.readStream.format('csv') \
            .schema(schema) \
            .option('latestFirst', True) \
            .option('path','/Spark_streaming/TransactionBase/') \
            .load()

df = df.select('*').filter((df.Transaction_Value < 10000))

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

#### ResolveWriteToStream
Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d9e44aaa-c124-4532-adad-1f36ff03d8c3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.

ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.

FileStreamSource: 'latestFirst' is true. New files will be processed first, which may affect the watermark
value. In addition, 'maxFileAge' will be ignored.

In [None]:
schema = StructType([StructField('Transaction_ID', StringType(), True) \
            , StructField('Transaction_Date', StringType(), True) \
            , StructField('Transaction_Value', IntegerType(), True) \
            , StructField('Transaction_Segment', StringType(), True) \
            , StructField('Credit_Card_ID', StringType(), True)])

df = sparkSession.readStream.format('csv') \
            .schema(schema) \
            .option('latestFirst', True) \
            .option('spark.sql.streaming.forceDeleteTempCheckpointLocation',True) \
            .option('path','/Spark_streaming/TransactionBase/') \
            .load()

df = df.select('*').filter((df.Transaction_Value < 10000))

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

#### cleanSource
cleanSource: option to clean up completed files after processing.

Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.

For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.

Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.

Number of threads used in completed file cleaner can be configured with spark.sql.streaming.fileSource.cleaner.numThreads (default: 1).

NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.

NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. Spark may not clean up some source files in some circumstances - e.g. the application doesn't shut down gracefully, too many files are queued to clean up. 

In [None]:
schema = StructType([StructField('Transaction_ID', StringType(), True) \
            , StructField('Transaction_Date', StringType(), True) \
            , StructField('Transaction_Value', IntegerType(), True) \
            , StructField('Transaction_Segment', StringType(), True) \
            , StructField('Credit_Card_ID', StringType(), True)])

df = sparkSession.readStream.format('csv') \
            .schema(schema) \
            .option('cleanSource','archive') \
            .option('sourceArchiveDir', '/Spark_archive/') \
            .option("checkpointLocation","/Spark_checkpointing_dir/") \
            .option('latestFirst', True) \
            .option('spark.sql.streaming.forceDeleteTempCheckpointLocation',True) \
            .option('path','/Spark_streaming/TransactionBase/') \
            .load()

df = df.select('*').filter((df.Transaction_Value < 10000))

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

path: path to the input directory, and common to all file formats.
maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max)
latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false)

fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:

"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"

maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week) 

In [None]:
schema = StructType([StructField('_1', StringType(), True) \
                     , StructField('_2', StringType(), True), StructField('_3', StringType(), True) \
                     , StructField('_4', StringType(), True), StructField('_5', StringType(), True)])

df = sparkSession.readStream.schema(schema) \
                            .parquet('/Spark_streaming/TransactionBase_parquet/') \
                    .toDF('Transaction_ID','Transaction_Date','Transaction_Value' \
                          ,'Transaction_Segment','Credit_Card_ID')

df = df.select('*').filter((df.Transaction_Value > 10))

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
schema = StructType([StructField('_1', StringType(), True) \
                     , StructField('_2', StringType(), True), StructField('_3', StringType(), True) \
                     , StructField('_4', StringType(), True), StructField('_5', StringType(), True)])

df = sparkSession.readStream.schema(schema) \
                            .format('parquet') \
                            .option('path','/Spark_streaming/TransactionBase_parquet/') \
                            .load()\
                    .toDF('Transaction_ID','Transaction_Date','Transaction_Value' \
                          ,'Transaction_Segment','Credit_Card_ID')

df = df.select('*').filter((df.Transaction_Value > 10))

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
schema = StructType([StructField('_1', StringType(), True) \
                     , StructField('_2', StringType(), True), StructField('_3', StringType(), True) \
                     , StructField('_4', StringType(), True), StructField('_5', StringType(), True)])

df = sparkSession.readStream.schema(schema) \
                            .format('parquet') \
                            .option('cleanSource','archive') \
                            .option('sourceArchiveDir', '/Spark_archive/') \
                            .option("checkpointLocation","/Spark_checkpointing_dir/") \
                            .option('latestFirst', True) \
                            .option('spark.sql.streaming.forceDeleteTempCheckpointLocation',True) \
                            .option('path','/Spark_streaming/TransactionBase_parquet/') \
                            .load()\
                    .toDF('Transaction_ID','Transaction_Date','Transaction_Value' \
                          ,'Transaction_Segment','Credit_Card_ID')

df = df.select('*').filter((df.Transaction_Value > 10))

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
df = sparkSession.readStream.table("testing.health")

df = df.select('*').filter((df.Transaction_Value < 2000))

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:

query = data.writeStream.outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation","/Spark_checkpointing_dir/") \
    .option("path", "/Spark_streaming/Trans_parquet/") \
    .start()
query.awaitTermination()

In [None]:
schema = StructType([StructField('Transaction_ID', StringType(), True) \
            , StructField('Transaction_Date', StringType(), True) \
            , StructField('Transaction_Value', IntegerType(), True) \
            , StructField('Transaction_Segment', StringType(), True) \
            , StructField('Credit_Card_ID', StringType(), True)])

df = sparkSession.readStream.format('csv') \
            .schema(schema) \
            .option('cleanSource','archive') \
            .option('sourceArchiveDir', '/Spark_archive/') \
            .option("checkpointLocation","/Spark_checkpointing_dir/") \
            .option('latestFirst', True) \
            .option('spark.sql.streaming.forceDeleteTempCheckpointLocation',True) \
            .option('path','/Spark_streaming/TransactionBase/') \
            .load()

df = df.select('*').filter((df.Transaction_Value < 10000))

query = df \
    .writeStream \
    .outputMode("append") \
    .option("checkpointLocation","/Spark_checkpointing_dir1/") \
    .format("csv") \
    .option('path','/Spark_streaming/Trans_csv/') \
    .start()

query.awaitTermination()

In [None]:
schema = StructType([StructField('Transaction_ID', StringType(), True) \
            , StructField('Transaction_Date', StringType(), True) \
            , StructField('Transaction_Value', IntegerType(), True) \
            , StructField('Transaction_Segment', StringType(), True) \
            , StructField('Credit_Card_ID', StringType(), True)])

df = sparkSession.readStream.format('csv') \
            .schema(schema) \
            .option('cleanSource','archive') \
            .option('sourceArchiveDir', '/Spark_archive/') \
            .option("checkpointLocation","/Spark_checkpointing_dir/") \
            .option('latestFirst', True) \
            .option('spark.sql.streaming.forceDeleteTempCheckpointLocation',True) \
            .option('path','/Spark_streaming/TransactionBase/') \
            .load()

df = df.select('*').filter((df.Transaction_Value < 10000))

query = df.writeStream.outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation","/Spark_checkpointing_dir2/") \
    .option("path", "/Spark_streaming/Trans_parquet/") \
    .start()
query.awaitTermination()

In [None]:
schema = StructType([StructField('Transaction_ID', StringType(), True) \
            , StructField('Transaction_Date', StringType(), True) \
            , StructField('Transaction_Value', IntegerType(), True) \
            , StructField('Transaction_Segment', StringType(), True) \
            , StructField('Credit_Card_ID', StringType(), True)])

df = sparkSession.readStream.format('csv') \
            .schema(schema) \
            .option('cleanSource','archive') \
            .option('sourceArchiveDir', '/Spark_archive/') \
            .option("checkpointLocation","/Spark_checkpointing_dir/") \
            .option('latestFirst', True) \
            .option('spark.sql.streaming.forceDeleteTempCheckpointLocation',True) \
            .option('path','/Spark_streaming/TransactionBase/') \
            .load()

df = df.select('*').filter((df.Transaction_Value < 10000))

query = df.writeStream.outputMode("append") \
    .option("checkpointLocation","/Spark_checkpointing_dir3/") \
    .toTable('testing.transaction') \
    .start()

query.awaitTermination()

In [None]:
query = data \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()