In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [3]:
recordSchema = StructType([ StructField('Id',IntegerType(), True),
                            StructField('Date',DateType(), True),
                            StructField('Open', DoubleType(), True),
                            StructField('High', DoubleType(), True),
                            StructField('Low', DoubleType(), True),
                            StructField('Close', DoubleType(), True),
                            StructField('Adj_Close', DoubleType(), True),
                            StructField('Volume', IntegerType(), True)])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [4]:
df1 = spark.readStream.format("csv")\
    .schema(recordSchema)\
    .load('InputStream')

### Make sure the dataframe is streaming the files from the folder

In [5]:
df1.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [6]:
writer1 = df1.writeStream.outputMode("append")\
    .format("memory")\
    .queryName('stock')

### Start the write stream and make sure it works (read all columns from the table)

In [7]:
query1= writer1.start()

23/10/31 18:12:53 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e47de461-4279-4008-b623-0511de5c0571. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/31 18:12:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [16]:
df2 = spark.sql('SELECT * FROM stock')
df2.show(1000)

+----+----------+------------+------------+------------+------------+------------+------+
|  Id|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|NULL|      NULL|        NULL|        NULL|        NULL|        NULL|        NULL|  NULL|
|   0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|
|   1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|
|   2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746|
|   3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|
|   4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|
|   5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|
|   6|2000-01-12|     24168.5|24452.800781|23457.599609|23670.900391|22368.947266| 61899|
|   7|2000

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [9]:
df3 = df1.na.drop(how='all')
df4 = df3.withColumn('Diff',df3['High'] - df3['Low'])

### Create a new write stream using the new generated dataframe and call the generated table "modified_data"

In [10]:
writer2 = df4.writeStream.outputMode("append")\
    .format("memory")\
    .queryName('modified_data')

In [11]:
query2 = writer2.start()

23/10/31 18:12:55 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4b9df4c0-352c-4678-81a4-041196549166. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/31 18:12:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [17]:
df5 = spark.sql('SELECT * FROM modified_data')
df5.show(150)

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5

### Write the generated data into files instead of the memory. 

In [13]:
writer3 = df4.writeStream.outputMode("append")\
.format("csv")\
.option("path", 'OutStream')\
.trigger(processingTime='10 second')\
.option("checkpointLocation", "chkpnt")
query3= writer3.start()

23/10/31 18:12:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [18]:
recordSchema = StructType([ StructField('Id',IntegerType(), True),
                            StructField('Date',DateType(), True),
                            StructField('Open', DoubleType(), True),
                            StructField('High', DoubleType(), True),
                            StructField('Low', DoubleType(), True),
                            StructField('Close', DoubleType(), True),
                            StructField('Adj_Close', DoubleType(), True),
                            StructField('Volume', IntegerType(), True),
                            StructField('Diff', DoubleType(), True)])

df6 = spark.read.format("csv")\
    .schema(recordSchema)\
    .load('OutStream')
df6.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5

### Sort the dataframe based on the ID

In [19]:
df6.sort('ID').show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5