In [1]:
spark

In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [3]:
from pyspark.sql.types import (StructType, StructField,
                               StringType, IntegerType)
read_schema = spark.read.csv('/home/omar/BigData/CourseMaterial/Day5/Lab5/InputStream/KOSPI_STOCK_0.csv',header=True,inferSchema=True)
recordSchema_1 = read_schema.schema

# Another Way to check schema

recordSchema_2 = StructType([StructField('Id', IntegerType(), True), StructField('Date', StringType(), True), StructField('Open', StringType(), True), StructField('High', StringType(), True), StructField('Low', StringType(), True), StructField('Close', StringType(), True), StructField('Adj Close', StringType(), True), StructField('Volume', StringType(), True)])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [4]:
df = spark.readStream.format('csv')\
.schema(recordSchema_1)\
.option('header','true')\
.load('/home/omar/BigData/CourseMaterial/Day5/Lab5/Input/')

In [5]:
df_2 = df.withColumnRenamed('_c0','Id')

### Make sure the dataframe is streaming the files from the folder

In [6]:
df_2.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [7]:
writer = df_2.writeStream.format('memory')\
.outputMode('append')\
.queryName('stock')

### Start the write stream and make sure it works (read all columns from the table)

In [8]:
query = writer.start()

23/10/06 11:42:46 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-42752ba4-5ef5-405c-a1ae-ada19f20f96c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/06 11:42:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [9]:
spark.sql("select * from stock").show()

+---+----+----+----+---+-----+---------+------+
| Id|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



23/10/06 11:43:11 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, Open, High, Low, Close, Adj Close, Volume
 Schema: _c0, Date, Open, High, Low, Close, Adj Close, Volume
Expected: _c0 but found: 
CSV file: file:///home/omar/BigData/CourseMaterial/Day5/Lab5/Input/KOSPI_STOCK_0.csv


In [10]:
spark.sql("select * from stock").show(100)

+---+----------+------------+------------+------------+------------+------------+------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|
|  6|2000-01-12|     24168.5|24452.800781|23457.599609|23670.900391|22368.947266| 61899|
|  7|2000-01-13|23670.900391|24132.900391|23102.199219|23244.400391| 21965.90625| 57538|
|  8|2000-01-14|23457

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [11]:
df_drop_1st = df.dropna(how='all')
df_add_diff = df_drop_1st.withColumn('Diff',(df_drop_1st['High']-df_drop_1st['Low']))

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [12]:
writer_2 = df_add_diff.writeStream.format('memory')\
.outputMode('append')\
.queryName('modified_data')

In [13]:
query_2 = writer_2.start()

23/10/06 11:44:18 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-519dec3b-9882-4c00-b796-ab0f451c129e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/06 11:44:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/10/06 11:44:19 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, Open, High, Low, Close, Adj Close, Volume
 Schema: _c0, Date, Open, High, Low, Close, Adj Close, Volume
Expected: _c0 but found: 
CSV file: file:///home/omar/BigData/CourseMaterial/Day5/Lab5/Input/KOSPI_STOCK_0.csv


In [14]:
spark.sql("select * from modified_data").show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
|_c0|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5

In [15]:
spark.sql("select * from modified_data").show(150)

+---+----------+------------+------------+------------+------------+------------+------+------------------+
|_c0|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5

### Write the generated data into files instead of the memory. 

In [16]:
writer_3 = df_add_diff.writeStream.format('csv')\
.outputMode('append')\
.option('path','/home/omar/BigData/CourseMaterial/Day5/Lab5/Output')\
.option('checkpointlocation','/home/omar/BigData/checkpoints/chk5')
query_3 = writer_3.start()

23/10/06 11:44:36 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/10/06 11:44:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, Open, High, Low, Close, Adj Close, Volume
 Schema: _c0, Date, Open, High, Low, Close, Adj Close, Volume
Expected: _c0 but found: 
CSV file: file:///home/omar/BigData/CourseMaterial/Day5/Lab5/Input/KOSPI_STOCK_0.csv


### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [17]:
recordSchema_3 = StructType([StructField('Id', IntegerType(), True), StructField('Date', StringType(), True), StructField('Open', StringType(), True), StructField('High', StringType(), True), StructField('Low', StringType(), True), StructField('Close', StringType(), True), StructField('Adj Close', StringType(), True), StructField('Volume', StringType(), True),StructField('Diff', StringType(), True)])
finalDF = spark.read.format('csv')\
.schema(recordSchema_3)\
.option('header','true')\
.load('/home/omar/BigData/CourseMaterial/Day5/Lab5/outputStream/')
finalDF.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|1777.0996090000008|
|126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.408203| 86236| 781.9003900000025|
|127|2000-06-29|25234.699219

23/10/06 11:45:31 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 120, 2000-06-20, 22817.900391, 23102.199219, 21680.599609, 22320.300781, 21092.632813, 34466, 1421.5996099999975
 Schema: Id, Date, Open, High, Low, Close, Adj Close, Volume, Diff
Expected: Id but found: 120
CSV file: file:///home/omar/BigData/CourseMaterial/Day5/Lab5/outputStream/part-00000-7e533ccb-229c-4fac-b762-a45034833946-c000.csv


### Sort the dataframe based on the ID

In [18]:
finalDFSorted = finalDF.sort('Id')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5|24452.800781|23457.599609|23670.900391|22368.947266| 61899|  995.201172000001|
|  7|2000-01-13|23670.900391

23/10/06 11:45:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 0, 2000-01-04, 22817.900391, 25696.800781, 22817.900391, 24879.300781, 23510.880859, 108745, 2878.9003900000025
 Schema: Id, Date, Open, High, Low, Close, Adj Close, Volume, Diff
Expected: Id but found: 0
CSV file: file:///home/omar/BigData/CourseMaterial/Day5/Lab5/outputStream/part-00003-f8d31429-b396-4b40-9356-86a275aee898-c000.csv
23/10/06 11:45:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 80, 2000-04-25, 21111.900391, 21325.099609, 20756.5, 20969.699219, 19816.320313, 22227, 568.5996090000008
 Schema: Id, Date, Open, High, Low, Close, Adj Close, Volume, Diff
Expected: Id but found: 80
CSV file: file:///home/omar/BigData/CourseMaterial/Day5/Lab5/outputStream/part-00002-10d5454d-06d7-4a83-9a02-9a0ee55eb804-c000.csv
23/10/06 11:45:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 120, 2000-06-20, 22817.900391, 23102.199219, 21680.599