In [1]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [20]:
from pyspark.sql.functions import *
import pyspark.sql.functions as F

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [7]:
s = spark.read.csv('InputStreamFullData/', header=True, inferSchema=True).schema
s

StructType([StructField('_c0', IntegerType(), True), StructField('Date', DateType(), True), StructField('Open', DoubleType(), True), StructField('High', DoubleType(), True), StructField('Low', DoubleType(), True), StructField('Close', DoubleType(), True), StructField('Adj Close', DoubleType(), True), StructField('Volume', IntegerType(), True)])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [8]:
df_stream = spark.readStream.format('csv')\
.schema(s)\
.load('InputStreamFullData/')

### Make sure the dataframe is streaming the files from the folder

In [9]:
df_stream.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [11]:
writer = df_stream.writeStream.format('memory')\
.outputMode('append')\
.queryName('stock')

### Start the write stream and make sure it works (read all columns from the table)

In [15]:
query.stop()

In [16]:
query = writer.start()
spark.sql('select * from stock').show()

23/10/06 04:20:58 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1c472ead-2202-4a44-a41b-e02d4611ae0a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/06 04:20:58 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


+---+----+----+----+---+-----+---------+------+
|_c0|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



In [17]:
spark.sql('select * from stock where 1 = 0').show()

+---+----+----+----+---+-----+---------+------+
|_c0|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



In [18]:
df = spark.sql('select * from stock')
df.show()

+----+----------+------------+------------+------------+------------+------------+------+
| _c0|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|NULL|      NULL|        NULL|        NULL|        NULL|        NULL|        NULL|  NULL|
| 240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
| 241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
| 242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
| 243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
| 244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|
| 245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|
| 246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385|
| 247|2000

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [26]:
df_1 = df_stream.dropna(how = 'all').withColumn('diff', col('High')-col('Low'))
df_1.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- diff: double (nullable = true)



### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [35]:
writer_new = df_1.writeStream.format('memory')\
.outputMode('append')\
.queryName('modified_data')

In [38]:
query_new = writer_new.start()

23/10/06 04:41:32 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-20be20f1-e9d8-4b93-88cc-d0e7abd7a719. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/06 04:41:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [37]:
query_new.stop()

In [33]:
spark.sql('select * from modified_data where 1 =0').show()

+---+----+----+----+---+-----+---------+------+----+
|_c0|Date|Open|High|Low|Close|Adj Close|Volume|diff|
+---+----+----+----+---+-----+---------+------+----+
+---+----+----+----+---+-----+---------+------+----+



In [41]:
df_modified = spark.sql('select * from modified_data')

### Write the generated data into files instead of the memory. 

In [44]:
writer = df_1.writeStream.format('csv')\
.outputMode('append')\
.option('checkpointLocation','chck0')\
.option('path','MyOutputStream/')

In [46]:
query = writer.start()

23/10/06 04:56:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [49]:
s_new = spark.read.csv('MyOutputStream/', header=True, inferSchema=True).schema
s_new

StructType([StructField('240', IntegerType(), True), StructField('2000-12-05', DateType(), True), StructField('26585.300781', DoubleType(), True), StructField('27367.300781', DoubleType(), True), StructField('26372.099609', DoubleType(), True), StructField('27011.800781', DoubleType(), True), StructField('25526.091797', DoubleType(), True), StructField('91019', IntegerType(), True), StructField('995.201172000001', DoubleType(), True)])

In [47]:
query_new.stop()

In [57]:
#spark.read.csv('InputStreamFullData/', header=True, inferSchema=True)
df_stream_new = spark.read.csv('InputStreamFullData/',header=True, inferSchema=True)
#.schema(s_new)\
#.load('MyOutputStream/')

In [64]:
df_stream_new1 = df_stream_new.withColumnRenamed('_c0','ID')
df_stream_new1.show()

+---+----------+------------+------------+------------+------------+------------+------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|
|246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385|
|247|2000-12-14|28469.099609|29784.099609|28291.300781|28362.400391| 26802.40625|256317|
|248|2000-12-15|28362

23/10/06 05:09:44 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, Open, High, Low, Close, Adj Close, Volume
 Schema: _c0, Date, Open, High, Low, Close, Adj Close, Volume
Expected: _c0 but found: 
CSV file: file:///home/ataa/Spark/InputStreamFullData/KOSPI_STOCK_6.csv


### Sort the dataframe based on the ID

In [65]:
finalDFSorted = df_stream_new1.sort('ID')
finalDFSorted.show()

23/10/06 05:09:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, Open, High, Low, Close, Adj Close, Volume
 Schema: _c0, Date, Open, High, Low, Close, Adj Close, Volume
Expected: _c0 but found: 
CSV file: file:///home/ataa/Spark/InputStreamFullData/KOSPI_STOCK_7.csv
23/10/06 05:09:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, Open, High, Low, Close, Adj Close, Volume
 Schema: _c0, Date, Open, High, Low, Close, Adj Close, Volume
Expected: _c0 but found: 
CSV file: file:///home/ataa/Spark/InputStreamFullData/KOSPI_STOCK_6.csv
23/10/06 05:09:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, Open, High, Low, Close, Adj Close, Volume
 Schema: _c0, Date, Open, High, Low, Close, Adj Close, Volume
Expected: _c0 but found: 
CSV file: file:///home/ataa/Spark/InputStreamFullData/KOSPI_STOCK_8.csv
23/10/06 05:09:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header

+---+----------+------------+------------+------------+------------+------------+------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|
|  6|2000-01-12|     24168.5|24452.800781|23457.599609|23670.900391|22368.947266| 61899|
|  7|2000-01-13|23670.900391|24132.900391|23102.199219|23244.400391| 21965.90625| 57538|
|  8|2000-01-14|23457