<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"></h2>
Please write a program in any language of your choice which supports spark processing. The program
must implement a logic to stream credit card transactions from a CSV and calculate the count of
transactions over last 3 minutes with a sliding window of 2 minute. 

Please find additional instructions below:
0. Must use SPARK streaming
0. The CSV will need to have at least 10,000 transactions
0. Please use MOCKNEAT (https://github.com/nomemory/mockneat) to generate the credit card transaction

## Solution Approach:
* Used Structured Streaming technique with sliding window 
* Defined `3 minute window` with `sliding interval of 2 minutes`
* Create the streaming data frame to read one file from data files stored in AWS S3 for every trigger
* Group the data by window and count the transactions for each group
* Created the write stream and for our lab(debugging) purpose, used the `sink` as `console` so we can see the output on the console
* Gracefully stop the stream after it has processed all the files
* Have created 100 data files with credit card transactions, each file has 100 records. `Helps to simulate streaming by reading each file every 2 minute trigger`.

###Read Me - With Assumptions
0. Used (generatedata.com) site to generate the credit transactions
0. Files are split into 100 transactions to emulate slow streaming by reading 1 file during each trigger.
0. The Development environment used - `Databricks Community Edition Spark Cluster, S3 for storage of data file, Jupyter Notebook with pyspark`.
0. The data files were uploaded to AWS s3 storage and used during running of the code. PLEASE MAKE APPROPRIATE changes to `dataPath` depending on where you clone and store the data files.
0. The data file consists of the following columns:

| Field              | Description                  |
|-----------------   |------------------------------|
| `Transaction-date` | Transaction column           |
| `Name`             | Customer Name                |
| `Credit Card`      | Used MasterCard              |
| `Amount`           | Transaction Amount           |

#### Note - The field delimiter is pipe (`|`)

###Step1: Sample one file by Reading data stored in S3 in a data frame

Take a look at the first few lines of the dataset in one file.
##### Note that the `Transaction-date` when we use `inferSchema` get loaded as `String data type`

In [4]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
dataPath = "dbfs:/mnt/s3data/credit-card-data/dataAug-27-2020.csv"
df = spark.read.format("csv").option("header", True).option("inferSchema", True).option("delimiter","|").load(dataPath)
display(df)

Transaction-date,Name,Credit-Card,Amount
2019-12-31 01:04:09,Cherokee A. Nichols,5230 7401 5071 6506,$36.00
2021-05-23 16:24:44,Jackson F. Sandoval,548320 5253548366,$83.78
2021-06-28 10:20:43,Carson Hodge,5522352692047544,$62.59
2021-01-14 09:38:37,Kelly F. Guthrie,544697 007979 3969,$72.37
2020-02-26 08:43:30,Conan Yates,544853 8434407696,$74.54
2020-01-20 04:39:22,Cameron L. Ball,5590 2413 2616 9195,$30.08
2021-02-27 21:11:43,Melissa A. Francis,534625 030000 2428,$29.25
2020-10-13 08:04:13,Avram C. Blevins,522 90047 44032 752,$51.20
2019-11-29 04:37:30,Connor B. Buckley,542201 279372 4067,$3.45
2021-02-04 03:33:46,Daryl H. Robertson,5147222158831662,$85.16


In [5]:
df.printSchema()

###Step2: Define the schema and load the file 
#### confirm that the `transaction-date` is of `timestamp type`

In [7]:
dataSchema = StructType([StructField("Transaction-date",TimestampType(),False),
                        StructField("Name",StringType(),True),
                        StructField("Credit-Card",StringType(),True),
                        StructField("Amount",StringType(),True)])

df = spark.read.format("csv").option("header", True).schema(dataSchema).option("delimiter","|").load(dataPath)
display(df)


Transaction-date,Name,Credit-Card,Amount
2020-11-04T21:01:36.000+0000,Angela Y. Houston,5472 6195 7945 0383,$78.55
2019-12-16T19:20:00.000+0000,Jolene K. Cooke,5114 1527 6983 6132,$87.38
2020-02-25T07:35:53.000+0000,Chester K. Villarreal,558708 7368980287,$38.11
2019-11-08T15:41:07.000+0000,Hedwig K. Mcdowell,512995 0154246905,$75.94
2020-10-03T12:21:14.000+0000,Quintessa C. Dixon,541 76136 96931 696,$51.71
2020-05-15T20:03:38.000+0000,Mannix Johns,551 61011 21030 367,$56.90
2020-07-22T03:14:22.000+0000,Philip F. Calhoun,537262 3617968365,$97.70
2021-01-20T04:55:25.000+0000,Valentine J. Williamson,557895 817691 0917,$40.43
2020-07-19T10:09:24.000+0000,Alma Chapman,5330 2012 6126 2640,$5.84
2021-02-17T11:16:04.000+0000,Chava Malone,547331 5152098694,$55.10


### Note - Step 1 and 2 was the data discovery and analysis Phase...leading to correct schema and data type definition during loading of the data

#### Step3 - Now we will need to do the following tasks:
0. Define the data stream that reads `csv` files dumped to the directory `dataPath`
0. Control the size of each partition by forcing Spark to processes only 1 file per trigger.

Other notes:
0. The source data has been defined as `dataPath` to read all the files in the directory
0. The schema has already be defined as `dataSchema`

In [10]:
dataPath = "dbfs:/mnt/s3data/credit-card-data/*"

# Configure the shuffle partitions to match the number of cores  
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

streamDF = (spark                   # Start with the SparkSesion
  .readStream                       # Get the DataStreamReader
  .format("csv")                    # Configure the stream's source for the appropriate file type
  .option("delimiter","|")          # Field delimiter is `|` 
  .option("header", True)           # Every file has header
  .schema(dataSchema)               # Specify the csv files' schema
  .option("maxFilesPerTrigger", 1)  # Restrict Spark to processing only 1 file per trigger
  .load(dataPath)                   # Load the DataFrame specifying its location as dataPath
)


##### Step 4:
0. Define the sliding window of `3 minutes` with slide inteval of `2 minutes`
0. Group the data by window and count the data for each group
0. Using 6 minute watermark

#### Note - Handling late arrival and also avoiding OOM(Out of Memory or crashing of the cluster) Using `Watermarking` technique of streaming 
#### Note - Watermark may be neglected in my code since i have used `output mode as complete` but have added to show the best practice

In [12]:
countsDF = (streamDF                                             # Start with the DataFrame
  .withWatermark("Transaction-date", "6 minutes")               # Specify the watermark
  .groupBy(window("Transaction-date", "3 minute", "2 minute"))  # Aggregate the data
  .count()                                                       # Produce a count for each aggreate
  .withColumn("start", col("window.start"))                      # Add the column "start", extracting it from "window.start"
  .orderBy("start")                                              # Sort the stream by "start" 
)

#### Step 5: Write Stream to approriate format....here i have used `console` for debugging and lab purpose but in real production pipeline, we use file check-point with write-ahead logs for fault-tolerance 

0. Write the stream to  `console` 
0. For this lab, we will use `complete` output mode
0. Configure a `2 minute` trigger aligning with our `2 minute` sliding window
0. Start the query

In [14]:
streamingQuery = (countsDF 
 .writeStream                           # From the DataFrame get the DataStreamWriter
 .format("console")                     # Specify the sink format as "console"
 .outputMode("complete")                # Configure the output mode as "complete"
 .queryName("infosys_lab")               # Optional Query Name
 .trigger(processingTime="2 minute")  # Use a `2 minute` trigger to align with sliding window
 .start()                               # Start the query
)

The code below introduces **`awaitTermination()`**

**`awaitTermination()`** will block the current thread
* Until the stream stops naturally or 
* Until the specified timeout elapses (if specified)

If the stream was "canceled" or otherwise terminated abnormally, any resulting exceptions will be thrown by **`awaitTermination()`** as well.

In [16]:
try:
  
  # Stream for up to 13000 seconds while the current thread blocks. We have trigger time of 2 minutes/file * 100 * 60 + 10 extra minutes before we can shut down the stream gracefully
  streamingQuery.awaitTermination(13000)  
  
except Exception as e:
  print(e)

In [17]:
try:
  
  # Issue the command to stop the stream soon after `awaitTermination()`
  streamingQuery.stop()

except Exception:
  print(e)