In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
sc = pyspark.SparkContext.getOrCreate();

In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [None]:
from pyspark.sql.types import (StructType, StructField,
                               StringType, IntegerType , FloatType , DateType)
recordSchema = StructType([StructField('ID', IntegerType(), True),
                           StructField('Date', DateType(), True),
                           StructField('Open', FloatType(), True),
                           StructField('High', FloatType(), True),
                           StructField('Low', FloatType(), True),
                           StructField('Close', FloatType(), True),
                           StructField('Adj Close', FloatType(), True),
                           StructField('Volume', IntegerType(), True)
                           ])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [None]:
df = spark.readStream.format("csv")\
    .schema(recordSchema)\
    .load("InputStream/")

### Make sure the Dataframe is streaming the files from the folder

In [None]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [None]:
writer1 = df.writeStream.outputMode("append") \
    .format("memory") \
    .queryName("table1")

### Start the write stream and make sure it works (read all columns from the table)

In [None]:
query1 = writer1.start()

In [None]:
spark.sql("SELECT * FROM table1").show(10)

+----+----------+-------+-------+-------+-------+---------+------+
|  ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|
+----+----------+-------+-------+-------+-------+---------+------+
|null|      null|   null|   null|   null|   null|     null|  null|
| 120|2000-06-20|22817.9|23102.2|21680.6|22320.3|21092.633| 34466|
| 121|2000-06-21|21893.8|22675.7|21680.6|22675.7|21428.484| 68651|
| 122|2000-06-22|23386.6|23386.6|22462.5|23031.1|21764.336| 97209|
| 123|2000-06-23|22107.1|24097.4|22107.1|22889.0|21630.053|199483|
| 124|2000-06-26|23102.2|24168.5|22569.1|24026.3|22704.797|121969|
| 125|2000-06-27|24026.3|25519.1|23742.0|24026.3|22704.797|113809|
| 126|2000-06-28|23884.2|24666.1|23884.2|24666.1|23309.408| 86236|
| 127|2000-06-29|25234.7|25234.7|23919.7|24239.6|22906.365| 45299|
| 128|2000-06-30|24523.9|25092.6|23742.0|24879.3| 23510.88| 76670|
+----+----------+-------+-------+-------+-------+---------+------+
only showing top 10 rows



### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [None]:
import pyspark.sql.functions as psf

In [None]:
df2 = df.dropna()

In [None]:
df3 = df2.withColumn("diff" , psf.col("High") - psf.col("Low"))

In [None]:
writer2 = df3.writeStream.outputMode("append") \
    .format("memory") \
    .queryName("table2")

In [None]:
query1.stop()

In [None]:
query2 = writer2.start()

IllegalArgumentException: ignored

In [None]:
spark.sql("SELECT * FROM table2").show(10)

+---+----------+-------+-------+-------+-------+---------+------+---------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+------+---------+
|120|2000-06-20|22817.9|23102.2|21680.6|22320.3|21092.633| 34466|1421.5996|
|121|2000-06-21|21893.8|22675.7|21680.6|22675.7|21428.484| 68651| 995.0996|
|122|2000-06-22|23386.6|23386.6|22462.5|23031.1|21764.336| 97209| 924.0996|
|123|2000-06-23|22107.1|24097.4|22107.1|22889.0|21630.053|199483|1990.3008|
|124|2000-06-26|23102.2|24168.5|22569.1|24026.3|22704.797|121969|1599.4004|
|125|2000-06-27|24026.3|25519.1|23742.0|24026.3|22704.797|113809|1777.0996|
|126|2000-06-28|23884.2|24666.1|23884.2|24666.1|23309.408| 86236| 781.9004|
|127|2000-06-29|25234.7|25234.7|23919.7|24239.6|22906.365| 45299|   1315.0|
|128|2000-06-30|24523.9|25092.6|23742.0|24879.3| 23510.88| 76670|1350.5996|
|129|2000-07-03|24239.6|25590.2|24239.6|25092.6| 23712.45| 63306|1350.5996|
+---+-------

In [None]:
query2.stop()

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [None]:
writer3 = df3.writeStream \
    .format("csv") \
    .option("path", "/content/OutputStream/") \
    .option("checkpointLocation", "/path/to/checkpoint/folder")

In [None]:
query3 = writer3.start()

In [None]:
query3.stop()

In [None]:
modified_data = spark.readStream.format("csv")\
    .schema(recordSchema)\
    .load("OutputStream/")

In [None]:
writer4 = df3.writeStream.outputMode("append") \
    .format("memory") \
    .queryName("table4")

In [None]:
query4 = writer4.start()

In [None]:
spark.sql("SELECT * FROM table4").show(10)

+---+----------+-------+-------+-------+-------+---------+------+---------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+------+---------+
|120|2000-06-20|22817.9|23102.2|21680.6|22320.3|21092.633| 34466|1421.5996|
|121|2000-06-21|21893.8|22675.7|21680.6|22675.7|21428.484| 68651| 995.0996|
|122|2000-06-22|23386.6|23386.6|22462.5|23031.1|21764.336| 97209| 924.0996|
|123|2000-06-23|22107.1|24097.4|22107.1|22889.0|21630.053|199483|1990.3008|
|124|2000-06-26|23102.2|24168.5|22569.1|24026.3|22704.797|121969|1599.4004|
|125|2000-06-27|24026.3|25519.1|23742.0|24026.3|22704.797|113809|1777.0996|
|126|2000-06-28|23884.2|24666.1|23884.2|24666.1|23309.408| 86236| 781.9004|
|127|2000-06-29|25234.7|25234.7|23919.7|24239.6|22906.365| 45299|   1315.0|
|128|2000-06-30|24523.9|25092.6|23742.0|24879.3| 23510.88| 76670|1350.5996|
|129|2000-07-03|24239.6|25590.2|24239.6|25092.6| 23712.45| 63306|1350.5996|
+---+-------

In [None]:
query4.stop()

### Write the generated data into files instead of the memory. 

In [None]:
writer5 = modified_data.writeStream \
    .format("csv") \
    .option("path", "/content/OutputStream/") \
    .option("checkpointLocation", "/path/to/checkpoint/folder")

In [None]:
query5 = writer5.start()

In [None]:
query5.stop()

### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [None]:
finaldf = spark.read.csv("/content/OutputStream/part-00000-79dba7ef-bf98-4db8-ade0-de16d41445c1-c000.csv",schema=recordSchema,header=True)

In [None]:
finaldf.show()

+---+----------+-------+-------+-------+-------+---------+------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|
+---+----------+-------+-------+-------+-------+---------+------+
|121|2000-06-21|21893.8|22675.7|21680.6|22675.7|21428.484| 68651|
|122|2000-06-22|23386.6|23386.6|22462.5|23031.1|21764.336| 97209|
|123|2000-06-23|22107.1|24097.4|22107.1|22889.0|21630.053|199483|
|124|2000-06-26|23102.2|24168.5|22569.1|24026.3|22704.797|121969|
|125|2000-06-27|24026.3|25519.1|23742.0|24026.3|22704.797|113809|
|126|2000-06-28|23884.2|24666.1|23884.2|24666.1|23309.408| 86236|
|127|2000-06-29|25234.7|25234.7|23919.7|24239.6|22906.365| 45299|
|128|2000-06-30|24523.9|25092.6|23742.0|24879.3| 23510.88| 76670|
|129|2000-07-03|24239.6|25590.2|24239.6|25092.6| 23712.45| 63306|
|130|2000-07-04|25767.9|26087.7|25234.7|25448.0|24048.303| 45299|
|131|2000-07-05|25448.0|25590.2|24523.9|25448.0|24048.303| 48816|
|132|2000-07-06|25519.1|27367.3|25128.1|26585.3|25123.049|178662|
|133|2000-

### Sort the dataframe based on the ID

In [None]:
finalDFSorted = finaldf.sort('ID')
finalDFSorted.show()

+---+----------+-------+-------+-------+-------+---------+------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|
+---+----------+-------+-------+-------+-------+---------+------+
|  0|2000-01-04|22817.9|25696.8|22817.9|24879.3| 23510.88|108745|
|  1|2000-01-05|24523.9|26229.9|23670.9|24417.3|23074.295|175990|
|  2|2000-01-06|24381.7|24666.1|22746.8|22817.9|21562.865| 71746|
|  3|2000-01-07|22036.0|24879.3|22036.0|23884.2|22570.514|120984|
|  4|2000-01-10|24879.3|25519.1|23813.1|24061.9| 22738.44|151371|
|  5|2000-01-11|24168.5|25021.5|23955.2|24239.6|22906.365| 95943|
|  6|2000-01-12|24168.5|24452.8|23457.6|23670.9|22368.947| 61899|
|  7|2000-01-13|23670.9|24132.9|23102.2|23244.4|21965.906| 57538|
|  8|2000-01-14|23457.6|24168.5|22746.8|23244.4|21965.906| 84267|
|  9|2000-01-17|22533.6|23457.6|22533.6|23457.6|22167.377| 67807|
| 10|2000-01-18|23457.6|23742.0|22746.8|23422.1|22133.832| 27995|
| 11|2000-01-19|22817.9|23173.3|22036.0|22036.0| 20823.97| 44173|
| 12|2000-