##### Introduction as in Apache Website

In [None]:
from pyspark import S

sc = SparkContext(appName="Test")

##### Reading Text File

In [None]:
textFile = sc.textFile(r"file:///C:\Users\Dell\Desktop\Hadoop\Spark\bootStrap.sh")

In [None]:
textFile.count()

In [None]:
textFile.collect()

##### Filter == Query in pandas

In [None]:
textFile[textFile.value.contains("abc")].count()

###### Note that the above case has created RDD and dataframe because the file has been read using spark context and not SQL context

Let us create sparkSession

In [None]:
from pyspark import SQLContext
spark = SQLContext(sc).sparkSession

In [None]:
text = spark.read.text(r"file:///C:\Users\Dell\Desktop\Hadoop\Spark\bootStrap.sh")

In [None]:
text[text.value.contains("get")].collect()

In [None]:
from pyspark.sql.functions import *

##### Creating new column in spark dataframe

In [None]:
text1 = text.select(size(split(text.value,"\s+")).name("wordCount"))
text1.show()

In [None]:
text.select(size(split(text.value,"\s+")).name("wordCount")).agg(max(col("wordCount"))).show()

##### Simulating MapReduce action

In [None]:
text.select(split(text.value,"\s+").name("word")).collect()

In [None]:
 text.select(split(text.value,"\s+").name("word")).groupby("word").count().show()

##### Explode, GroupBy and OrderBy

In [None]:
 df = text.select(explode(split(text.value,"\S +")).name("word")).groupby("word").count().orderBy(desc("count"))

In [None]:
%%timeit
df.show()

In [None]:
df.show()

##### Importance of Cache

In [None]:
df.cache()

In [None]:
%%timeit
df.show()

##### One more way of creating spark session

In [None]:
from pyspark.sql import SparkSession
spark2 = SparkSession.builder.appName("temp").getOrCreate()

In [None]:
spark2.read.text(r"file:///C:\Users\Dell\Desktop\Hadoop\Spark\bootStrap.sh").show()

##### Quick Note on Cluster and spark program execution on them
* one single python spark program has 2 logical components.
    * Driver tasks
        * To assgin a task and collect the task operation output.
        * It cordianates with resource managers like yarn and mesos for resource availing. This is transperant to the programmer as he does only read, write and dataframe related operations. 
    * Worker tasks
        * indicates its presence to resource allocator and hence gets nominated for a task
        * executes the instructions as per driver program
        * note that here operation is limited to the data available in its scope and hence execution is faster and also parallel processing is feasible.
        
        
* Monitoring can be done through: http://<driver-node>:4040
* There are application like livy which also provides interactive access to jobs and thier features.

##### Job Scheduling
* unlike, in any other scheduling context, here we are talking about the resource scheduling


* Static Partitioning:
    * number of resources to a job can be limited through spark submit arguments
        * --num-executors : number of executors for a job
        * --executor-memory
        * --executor-cores : Note cores are logical units of computation in CPU



* Dynamic Resource Allocation:
    * Jobs can return the resource after current usage and request again for later use.
    * set spark.dynamicAllocation.enabled to true
    * set spark.shuffle.service.enabled to true
        * enabling resource executor to be removed but not the shuffle files. So intermediate results are retained so that when executor requires it, can be fetched. 
        * Shuffle service which always run and can collect all the shuffle files info across application can avoid a scenario of new executor trying to access the old executor shuffule file in progress writing.
            * Here Old executor can submit a shuffle right to the shuffle services and terminate gracefully.
            * Shuffle service will handle the situation of new executor requesting the old shuffle file content.

    * spark.dynamicAllocation.schedulerBacklogTimeout is used to trigger the request.
        * if not availed per first request, it will request again for every spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds. 
        * executor count is increased exponentially for every subsequent request
            * this is because job was waiting for it for so long and has to catch up for the waiting period
            * it also acts as a buffer for future use (as the intial request did not get fulfill).
            
    * executors are removed after spark.dynamicAllocation.executorIdleTimeout seconds.
            
        
        
* Scheduling Within an Application:
    * Fair scheduling: all jobs within application like save, collect etc are by default executed FIFO manner but Fair scheduling can allocate resources to smaller jobs even wen long jobs are being execute.
        * conf.set("spark.scheduler.mode", "FAIR")
        * Pools can also have 
            * weights to decide which has to be given higher preference
            * minShare to decide which is the minimum share despite being lower weights
        

##### Shared Variables
* Broadcast Variables:
    * a variable is broadcasted to all the nodes in the cluster.
    * read only copy
    * used when repeated usage of large dataset.
        * broadcastVar = sc.broadcast([1, 2, 3])
        * broadcastVar.value

* Accumulator Variables:
    * associative or cumulative operations on a varible by different tasks
    * cluster nodes can add values through (+=) operator to it but cannot read
    * only driver program can read it.
    
* Spark Streaming:
    * uses DStream API using Spark RDD support
    * input stream is divided into micro batch and executed
    * each micro batch is a RDD
    * after each micror batch source is polled for new micro batch
    * foreachRDD fetches you the data of each micro batch
   
* Spark Structured Streaming:
    * built on top of spark SQL programming, leverages the dataframe apis
    * new data input is row in unbound table
    * can handle late event data
    * foreachBatch gives resultant dataframe 
    * here latency is 100 ms
        * Contnuous processing (>2.3): has the end to end latency of 1ms.

##### Example of Spark Structured Streaming:

### Output mode: Complete

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession.builder.appName("WordCount").getOrCreate()

lines = spark.readStream.format("socket").option("host", "localhost").option("port","9999").load()

words = lines.select(explode(split(lines.value, " ")).alias("word"))

wordCount = words.groupBy("word").count()

query = wordCount.writeStream.format("console").outputMode("complete").trigger(processingTime='2 seconds').start()

query.awaitTermination()


from pyspark.sql.types import *
userSchema = StructType()\
				.add("mp_constituency", "string")\
				.add("assembly", "string")\
				.add("ward_no", "string")\
				.add("ward_name", "string")\
				.add("winning_candidate", "string")  \
				.add("winners_party", "string")  \
				.add("past_corporator_party", "string") 

csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv(r"C:\Users\Dell\Desktop\Hadoop\Real Time Analytics with Apache Storm\csvFolder")

partySeats = csvDF.groupBy("winners_party").count()
query = partySeats.writeStream.format("console").outputMode("complete").trigger(processingTime='2 seconds').start()

query.awaitTermination()

##### Note that above 2 cells are executed at anaconda command prompt

* python sparkInputFile.py
* spark-submit pysparkInputFile.py > sparkOutput
    * here spark output will be on terminal
    * however, sparkOutput will have application output [writeStream console output]
        * each time a new file is being processed update to result table is printed.

userSchema could have been also through:

```
name_list = ["mp_constituency", "assembly", "ward_no", "ward_name", "winning_candidate", "winners_party", "past_corporator_party"]
userSchema = StructType([StructField(field_name, StringType(), True) for field_name in name_list])
```



### Output mode Append

```
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WordCount").getOrCreate()
'''
userSchema = StructType()\
				.add("mp_constituency", "string")\
				.add("assembly", "string")\
				.add("ward_no", "string")\
				.add("ward_name", "string")\
				.add("winning_candidate", "string")  \
				.add("winners_party", "string")  \
				.add("past_corporator_party", "string") 
'''
name_list = ["mp_constituency", "assembly", "ward_no", "ward_name", "winning_candidate", "winners_party", "past_corporator_party"]
userSchema = StructType([StructField(field_name, StringType(), True) for field_name in name_list])

csvDF = spark \
    .readStream \
    .option("sep", ",") \
    .schema(userSchema) \
    .csv(r"C:\Users\Dell\Desktop\Hadoop\Real Time Analytics with Apache Storm\csvFolder")


isManjula = csvDF[csvDF.winning_candidate.contains("Manjula")]
query = isManjula.writeStream.format("console").outputMode("append").trigger(processingTime='2 seconds').start()

query.awaitTermination()
```

##### Output for the above pgm
```
-------------------------------------------
Batch: 0
-------------------------------------------
+-----------------+-------------------+-------+-----------+--------------------+-------------+---------------------+
|  mp_constituency|           assembly|ward_no|  ward_name|   winning_candidate|winners_party|past_corporator_party|
+-----------------+-------------------+-------+-----------+--------------------+-------------+---------------------+
|  Bangalore South|      B.T.M. Layout|    147|    Adugodi|             Manjula|          INC|                  INC|
|Bangalore Central|       Rajaji Nagar|    107|Shiva Nagar|  Manjula Vijaykumar|          INC|                  BJP|
|  Bangalore North|Rajarajeswari Nagar|     69|    Laggere|Manjula Narayanas...|          JDS|                  BJP|
|  Bangalore South|      B.T.M. Layout|    147|    Adugodi|             Manjula|          INC|                  INC|
|Bangalore Central|       Rajaji Nagar|    107|Shiva Nagar|  Manjula Vijaykumar|          INC|                  BJP|
|  Bangalore North|Rajarajeswari Nagar|     69|    Laggere|Manjula Narayanas...|          JDS|                  BJP|
|  Bangalore South|      B.T.M. Layout|    147|    Adugodi|             Manjula|          INC|                  INC|
|Bangalore Central|       Rajaji Nagar|    107|Shiva Nagar|  Manjula Vijaykumar|          INC|                  BJP|
|  Bangalore North|Rajarajeswari Nagar|     69|    Laggere|Manjula Narayanas...|          JDS|                  BJP|
|  Bangalore South|      B.T.M. Layout|    147|    Adugodi|             Manjula|          INC|                  INC|
|Bangalore Central|       Rajaji Nagar|    107|Shiva Nagar|  Manjula Vijaykumar|          INC|                  BJP|
|  Bangalore North|Rajarajeswari Nagar|     69|    Laggere|Manjula Narayanas...|          JDS|                  BJP|
|  Bangalore South|      B.T.M. Layout|    147|    Adugodi|             Manjula|          INC|                  INC|
|Bangalore Central|       Rajaji Nagar|    107|Shiva Nagar|  Manjula Vijaykumar|          INC|                  BJP|
|  Bangalore North|Rajarajeswari Nagar|     69|    Laggere|Manjula Narayanas...|          JDS|                  BJP|
|  Bangalore South|      B.T.M. Layout|    147|    Adugodi|             Manjula|          INC|                  INC|
|Bangalore Central|       Rajaji Nagar|    107|Shiva Nagar|  Manjula Vijaykumar|          INC|                  BJP|
|  Bangalore North|Rajarajeswari Nagar|     69|    Laggere|Manjula Narayanas...|          JDS|                  BJP|
|  Bangalore South|      B.T.M. Layout|    147|    Adugodi|             Manjula|          INC|                  INC|
|Bangalore Central|       Rajaji Nagar|    107|Shiva Nagar|  Manjula Vijaykumar|          INC|                  BJP|
+-----------------+-------------------+-------+-----------+--------------------+-------------+---------------------+
only showing top 20 rows

19/05/10 21:26:08 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 5689 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------+-------------------+-------+-----------+--------------------+-------------+---------------------+
|  mp_constituency|           assembly|ward_no|  ward_name|   winning_candidate|winners_party|past_corporator_party|
+-----------------+-------------------+-------+-----------+--------------------+-------------+---------------------+
|  Bangalore South|      B.T.M. Layout|    147|    Adugodi|             Manjula|          INC|                  INC|
|Bangalore Central|       Rajaji Nagar|    107|Shiva Nagar|  Manjula Vijaykumar|          INC|                  BJP|
|  Bangalore North|Rajarajeswari Nagar|     69|    Laggere|Manjula Narayanas...|          JDS|                  BJP|
+-----------------+-------------------+-------+-----------+--------------------+-------------+---------------------+

19/05/10 21:26:33 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3666 milliseconds
-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------+-------------------+-------+-----------+--------------------+-------------+---------------------+
|  mp_constituency|           assembly|ward_no|  ward_name|   winning_candidate|winners_party|past_corporator_party|
+-----------------+-------------------+-------+-----------+--------------------+-------------+---------------------+
|  Bangalore South|      B.T.M. Layout|    147|    Adugodi|             Manjula|          INC|                  INC|
|Bangalore Central|       Rajaji Nagar|    107|Shiva Nagar|  Manjula Vijaykumar|          INC|                  BJP|
|  Bangalore North|Rajarajeswari Nagar|     69|    Laggere|Manjula Narayanas...|          JDS|                  BJP|
+-----------------+-------------------+-------+-----------+--------------------+-------------+---------------------+

19/05/10 21:27:07 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 7877 milliseconds
```

##### Here each bath is new file being added to directory or files recognized in the intervals

### Output mode "Update"
* With the simple change to the earlier program we can see the "update" output.

```
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WordCount").getOrCreate()
'''
userSchema = StructType()\
				.add("mp_constituency", "string")\
				.add("assembly", "string")\
				.add("ward_no", "string")\
				.add("ward_name", "string")\
				.add("winning_candidate", "string")  \
				.add("winners_party", "string")  \
				.add("past_corporator_party", "string") 
'''
name_list = ["mp_constituency", "assembly", "ward_no", "ward_name", "winning_candidate", "winners_party", "past_corporator_party"]
userSchema = StructType([StructField(field_name, StringType(), True) for field_name in name_list])

csvDF = spark \
    .readStream \
    .option("sep", ",") \
    .schema(userSchema) \
    .csv(r"C:\Users\Dell\Desktop\Hadoop\Real Time Analytics with Apache Storm\csvFolder")

partySeats = csvDF.groupBy("winners_party").count()
query = partySeats.writeStream.format("console").outputMode("update").trigger(processingTime='2 seconds').start()
#isManjula = csvDF[csvDF.winning_candidate.contains("Manjula")]
#query = isManjula.writeStream.format("console").outputMode("update").trigger(processingTime='2 seconds').start()


query.awaitTermination()
```

##### Output for the above program

```
(base) C:\Users\Dell\Desktop\Hadoop\Real Time Analytics with Apache Storm>python StucturedStreamingOutputAppend.py
19/05/10 21:47:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/05/10 21:47:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/05/10 21:47:46 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
-------------------------------------------
Batch: 0
-------------------------------------------
+-------------+-----+
|winners_party|count|
+-------------+-----+
|         SDPI|    9|
|          INC|  684|
|          BJP|  900|
|          JDS|  126|
|       Others|   27|
|  Independent|   36|
+-------------+-----+

19/05/10 21:50:43 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 172900 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
+-------------+-----+
|winners_party|count|
+-------------+-----+
|          BJP|  979|
|          JDS|  137|
|       Others|   29|
|  Independent|   37|
+-------------+-----+

19/05/10 21:54:09 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 187771 milliseconds
-------------------------------------------
Batch: 2
-------------------------------------------
+-------------+-----+
|winners_party|count|
+-------------+-----+
|         SDPI|   10|
|          INC|  760|
|          BJP| 1000|
|          JDS|  140|
|       Others|   30|
|  Independent|   40|
+-------------+-----+

19/05/10 21:57:01 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 141160 milliseconds
-------------------------------------------
Batch: 3
-------------------------------------------
+-------------+-----+
|winners_party|count|
+-------------+-----+
|          INC|  836|
+-------------+-----+

19/05/10 22:01:20 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 140606 milliseconds

```

* in batch 1, I removed INC
* in batch 3, I added only INC

### Map & Faltmap are no longer in use for dfs

In [44]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WordCount").getOrCreate()


In [11]:
lines = spark.read.text("sparkOutput")


In [12]:
lines

[Row(value='-------------------------------------------'),
 Row(value='Batch: 0'),
 Row(value='-------------------------------------------'),
 Row(value='+-------------+-----+'),
 Row(value='|winners_party|count|'),
 Row(value='+-------------+-----+'),
 Row(value='|         SDPI|    7|'),
 Row(value='|          INC|  532|'),
 Row(value='|          BJP|  700|'),
 Row(value='|          JDS|   98|'),
 Row(value='|       Others|   21|'),
 Row(value='|  Independent|   28|'),
 Row(value='+-------------+-----+'),
 Row(value=''),
 Row(value='-------------------------------------------'),
 Row(value='Batch: 1'),
 Row(value='-------------------------------------------'),
 Row(value='+-------------+-----+'),
 Row(value='|winners_party|count|'),
 Row(value='+-------------+-----+'),
 Row(value='|         SDPI|    8|'),
 Row(value='|          INC|  608|'),
 Row(value='|          BJP|  800|'),
 Row(value='|          JDS|  112|'),
 Row(value='|       Others|   24|'),
 Row(value='|  Independent|   32|'

In [13]:
from pyspark.sql.functions import *

In [18]:
type(lines.value)

pyspark.sql.column.Column

In [20]:
type(split(lines.value," "))

pyspark.sql.column.Column

In [22]:
type(explode(split(lines.value," ")))

pyspark.sql.column.Column

In [40]:
words = lines.select(explode(split(lines.value," ")).name("word")).groupBy("word").count()

In [45]:
words.show()

+--------------------+-----+
|                word|count|
+--------------------+-----+
|                 27||    1|
|                  7||    1|
|           Terminate|    3|
|                  8||    1|
|-----------------...|    6|
|               SDPI||    3|
|                  9||    1|
|                   0|    1|
|+-------------+--...|    9|
|                 36||    1|
|                JDS||    3|
|                 21||    1|
|                 job|    3|
|                112||    1|
|                800||    1|
|                684||    1|
|                   ||   18|
|               batch|    3|
|             Others||    3|
||winners_party|co...|    3|
+--------------------+-----+
only showing top 20 rows



##### Note that if the file gets updated then spark donot selects the new rows. How do we manage the file updates ? How to avoid dirty read ? or How to have committed read ?

* It is clearly mentioned in the doc that
"Once processed, changes to a file within the current window will not cause the file to be reread. That is: updates are ignored." at https://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources

##### Window operations (details are best described at https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes)

```
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()
```

Here window attribute is on the field by name timestam, window width is 10 minutes. Sliding happens for 5 hopes.
* 5-15 , 10-20 etc

##### Handling late event data through water marking
* watermarking, which lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly. 

```
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()
```
* here withWatermark defines how long spark should wait for the late entry of a data. Here spark would wait for 10 minutes and beyond that limit it would ignore the data for any aggregation or selection operation.

#### Example for Window operation
```
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("WordCount").getOrCreate()

userSchema = StructType()\
				.add("task", "string")\
				.add("time", "string") 

#name_list = ["mp_constituency", "assembly", "ward_no", "ward_name", "winning_candidate", "winners_party", "past_corporator_party"]
#userSchema = StructType([StructField(field_name, StringType(), True) for field_name in name_list])

csvDF = spark \
    .readStream \
    .option("sep", "\t") \
    .schema(userSchema) \
    .csv(r"C:\Users\Dell\Desktop\Hadoop\Real Time Analytics with Apache Storm\time")


csvDF = csvDF.withColumn("timestamp",to_timestamp("time","dd-mm-yyyy hh:mm"))

partySeats = csvDF.groupBy("timestamp").count()

query = partySeats.writeStream.format("console").outputMode("complete").start()


query.awaitTermination()
```
* The above program would run as a normal spark structured stream; focus is on timestamp dataset creation and conversion to spark sql timestamp format.