In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [3]:
SreamSchema = StructType([ StructField('Id',IntegerType(), True),
                            StructField('Date',DateType(), True),
                            StructField('Open', DoubleType(), True),
                            StructField('High', DoubleType(), True),
                            StructField('Low', DoubleType(), True),
                            StructField('Close', DoubleType(), True),
                            StructField('Adj_Close', DoubleType(), True),
                            StructField('Volume', IntegerType(), True)])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [4]:
df = spark.readStream.format("csv")\
.schema(SreamSchema)\
.load("/home/mohamed/ITI/Pyspark/practical2/InputStream")

### Make sure the dataframe is streaming the files from the folder

In [5]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [6]:
writermem=df.writeStream.outputMode("append")\
.format("memory")\
.queryName("stock")

### Start the write stream and make sure it works (read all columns from the table)

In [7]:
query1= writermem.start()

23/10/06 11:20:11 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f695e9f4-2d83-4c1b-a8e2-d8abfdc654ba. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/06 11:20:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [8]:
df2 = spark.sql('select * from stock')
df2.show()

+---+----+----+----+---+-----+---------+------+
| Id|Date|Open|High|Low|Close|Adj_Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



In [9]:
df2 = spark.sql('select * from stock')
df2.show(50)

+----+----------+------------+------------+------------+------------+------------+------+
|  Id|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|NULL|      NULL|        NULL|        NULL|        NULL|        NULL|        NULL|  NULL|
|  40|2000-02-29|25163.699219|26087.699219|24239.599609|25519.099609|24115.490234|233246|
|  41|2000-03-01|25519.099609|25519.099609|25519.099609|25519.099609|24115.490234|     0|
|  42|2000-03-02|25767.900391|29144.300781|25767.900391|28575.699219|27003.972656|408391|
|  43|2000-03-03|27793.800781|29499.800781|26798.599609|27864.800781|26332.175781|216505|
|  44|2000-03-06|28291.300781|29144.300781|27367.300781|28717.800781|27138.255859|170784|
|  45|2000-03-07|28575.699219|29357.599609|27935.900391|28504.599609| 26936.78125|109730|
|  46|2000-03-08|28078.099609|28291.300781|     27154.0|28113.599609|26567.289063| 89331|
|  47|2000

23/10/06 11:20:15 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [10]:
df_drop = df.na.drop(how='all')
df_3=df_drop.withColumn('diff',df_drop['High']-df_drop['Low'])

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [11]:
Newwriter=df_3.writeStream.outputMode("append")\
.format("memory")\
.queryName("modified_data")

In [12]:
query2= Newwriter.start()

23/10/06 11:20:18 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3bceb10b-22b8-494c-a205-4d5e8bc98086. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/06 11:20:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [13]:
df_4 = spark.sql('select * from modified_data')
df_4.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
| 40|2000-02-29|25163.699219|26087.699219|24239.599609|25519.099609|24115.490234|233246|1848.0996099999975|
| 41|2000-03-01|25519.099609|25519.099609|25519.099609|25519.099609|24115.490234|     0|               0.0|
| 42|2000-03-02|25767.900391|29144.300781|25767.900391|28575.699219|27003.972656|408391|3376.4003900000025|
| 43|2000-03-03|27793.800781|29499.800781|26798.599609|27864.800781|26332.175781|216505| 2701.201172000001|
| 44|2000-03-06|28291.300781|29144.300781|27367.300781|28717.800781|27138.255859|170784|            1777.0|
| 45|2000-03-07|28575.699219|29357.599609|27935.900391|28504.599609| 26936.78125|109730|1421.6992180000016|
| 46|2000-03-08|28078.099609

### Write the generated data into files instead of the memory. 

In [14]:
writer3=df_3.writeStream.outputMode("append")\
.format("csv")\
.option("path","OutStreamk/")\
.trigger(processingTime='5 second')\
.option("checkpointLocation","chkpnt")
query3= writer3.start()

23/10/06 11:20:22 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [15]:
SreamSchema2 = StructType([ StructField('Id',IntegerType(), True),
                            StructField('Date',DateType(), True),
                            StructField('Open', DoubleType(), True),
                            StructField('High', DoubleType(), True),
                            StructField('Low', DoubleType(), True),
                            StructField('Close', DoubleType(), True),
                            StructField('Adj_Close', DoubleType(), True),
                            StructField('Volume', IntegerType(), True),
                            StructField('diff', DoubleType(), True)])

In [17]:
df_6 = spark.read.format("csv")\
.schema(SreamSchema2)\
.load("OutStreamk/")
df_6.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
| 40|2000-02-29|25163.699219|26087.699219|24239.599609|25519.099609|24115.490234|233246|1848.0996099999975|
| 41|2000-03-01|25519.099609|25519.099609|25519.099609|25519.099609|24115.490234|     0|               0.0|
| 42|2000-03-02|25767.900391|29144.300781|25767.900391|28575.699219|27003.972656|408391|3376.4003900000025|
| 43|2000-03-03|27793.800781|29499.800781|26798.599609|27864.800781|26332.175781|216505| 2701.201172000001|
| 44|2000-03-06|28291.300781|29144.300781|27367.300781|28717.800781|27138.255859|170784|            1777.0|
| 45|2000-03-07|28575.699219|29357.599609|27935.900391|28504.599609| 26936.78125|109730|1421.6992180000016|
| 46|2000-03-08|28078.099609

### Sort the dataframe based on the ID

In [18]:
finalDFSorted = df_6.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj_Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5