In [None]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

import findspark
findspark.init()

from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("spark_stream").getOrCreate()
sqlCon = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

from pyspark.streaming import *
from pyspark.sql.functions import *

%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.ticker as plticker


## Set IP and Port

Enter the IP and port at which `stream.py` is sending edit stream to.
Run the `stream.py` file using 
```
python stream.py "en.wikipedia.org" 5050 "20200515"

```

**en.wikipedia.org**:   English wikipedia edit stream is used here for example. Other languages can be used like for `German` use `de.wikipedia.org`

**5050**:   is the port at which edit stream will be written

**20200515**:   date from where edits will be reported till the latest. Format of date is `YYYYMMDD`

In [None]:
# time after which new rdd is generated
batchInterval = 3

ServerHost  = "127.0.0.1"
ServerPort  = 5050

In [None]:
# start streaming context with current spark context
ssc = StreamingContext(spark.sparkContext, batchDuration=batchInterval)

# instantiate new stream with given streaming context
englishStream = ssc.socketTextStream(ServerHost, ServerPort)

# time in milliseconds for which table will be saved in the memory
ssc.remember(2400)

## Wikipedia Stream Manipulation

In [None]:
# read new rdd after batchinterval, convert data from json to DataFrame and register temp table

def convert_data(time, rdd):
    try:
        sqlCon.read.json(rdd).registerTempTable("English_Edits")
    except Exception as e:
        print(e)

In [None]:
# register window operation on stream data. Make one partition of whole data and than make temp table

# windowDuration defines the number of batches retained
# slideDuration defines the step size for windowing

englishStream.window(windowDuration=60, slideDuration=3).repartition(1).foreachRDD(convert_data)

In [None]:
# start streaming context
ssc.start()

In [None]:
# take a peek on the data retrieved from Wikipedia 

df = spark.sql("SELECT * FROM English_Edits")
df.show()

In [None]:
# graphical summary of number of edits happened in last windowDuration

df = spark.sql("SELECT timestamp, count(*) as count FROM English_Edits GROUP BY timestamp ORDER BY timestamp")
df_pd = df.toPandas()

fig, ax = plt.subplots()
ax.bar(x=df_pd["timestamp"], height=df_pd["count"], width=0.25, color="Orange")
loc = plticker.MultipleLocator(base=25.0) # this locator puts ticks at regular intervals
ax.xaxis.set_major_locator(loc)
plt.xticks(rotation=45)
fig.tight_layout()

plt.show()

In [None]:
# stop streaming context
ssc.stop(stopSparkContext=False)

In [None]:
# stop spark context
spark.stop()