In [1]:
from pyspark.sql.functions import *
import pyspark.sql.functions as F

In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [3]:
from pyspark.sql.types import *

In [4]:
schema = "_c0 Integer,Date date, Open Double, High Double, Low Double , Close Double ,Adj_Close Double , Volume Integer"

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [5]:
df = spark.readStream.format("csv")\
.schema(schema)\
.load("InputStream/")

### Make sure the dataframe is streaming the files from the folder

In [6]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [7]:
writer = df.writeStream.format("memory")\
.queryName("stock")\
.outputMode("append")

### Start the write stream and make sure it works (read all columns from the table)

In [8]:
query = writer.start()
spark.sql("select * from stock where 0=1").show()

23/10/06 10:42:21 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f6b89a75-f6c1-48dd-9eee-0919fcd1e0de. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/06 10:42:21 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


+---+----+----+----+---+-----+---------+------+
|_c0|Date|Open|High|Low|Close|Adj_Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



[Stage 0:>                                                          (0 + 4) / 4]                                                                                

In [9]:
new_df = spark.sql("select * from stock")
new_df.show(100)

+----+----------+------------+------------+------------+------------+------------+------+
| _c0|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|NULL|      NULL|        NULL|        NULL|        NULL|        NULL|        NULL|  NULL|
| 120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|
| 121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651|
| 122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209|
| 123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|
| 124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|
| 125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|
| 126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.408203| 86236|
| 127|2000

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [10]:
new_df2 = df.na.drop(how="all")

In [11]:
new_df3 = new_df2.withColumn("diff",col("High")-col("Low"))
new_df3.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj_Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- diff: double (nullable = true)



### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [12]:
new_df3.isStreaming

True

In [15]:
writer2 = new_df3.writeStream.format("memory")\
.queryName("modified_data")\
.outputMode("append")

In [16]:
query.stop()

In [17]:
query = writer2.start()
new_df4 = spark.sql ("Select * from modified_data where 0=1 ")
new_df4.show()

+---+----+----+----+---+-----+---------+------+----+
|_c0|Date|Open|High|Low|Close|Adj_Close|Volume|diff|
+---+----+----+----+---+-----+---------+------+----+
+---+----+----+----+---+-----+---------+------+----+



23/10/06 01:09:09 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d3183a93-8396-4e44-b8a2-dc07a222c463. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/06 01:09:09 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [18]:
new_df4.show(150) 

+---+----------+------------+------------+------------+------------+------------+------+------------------+
|_c0|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|1777.0996090000008|
|126|2000-06-28|23884.199219

### Write the generated data into files instead of the memory. 

In [19]:
new_df4.isStreaming

False

In [20]:
writer3 = new_df3.writeStream.format("csv")\
.outputMode("append")\
.option("path","outputstream/")\
.option("checkpointLocation","ch1")


In [21]:
query.stop()
query = writer3.start()

23/10/06 01:10:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [22]:
query.stop()
schema = "ID Integer,Date date, Open Double, High Double, Low Double , Close Double ,Adj_Close Double , Volume Integer , diff double"
new_df5 = spark.read.csv("outputstream/" , schema = schema)
new_df5.show(truncate=False)

+---+----------+------------+------------+------------+------------+------------+------+------------------+
|ID |Date      |Open        |High        |Low         |Close       |Adj_Close   |Volume|diff              |
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813|34466 |1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375|68651 |995.0996099999975 |
|122|2000-06-22|23386.599609|23386.599609|22462.5     |23031.099609|21764.335938|97209 |924.0996090000008 |
|123|2000-06-23|22107.099609|24097.400391|22107.099609|22889.0     |21630.052734|199483|1990.3007819999984|
|124|2000-06-26|23102.199219|24168.5     |22569.099609|24026.300781|22704.796875|121969|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|23742.0     |24026.300781|22704.796875|113809|1777.0996090000008|
|126|2000-06-28|23884.199219

### Sort the dataframe based on the ID

In [23]:
finalDFSorted = new_df5.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5