In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [24]:
spark = SparkSession.builder.appName("lab6").getOrCreate()

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [26]:
df_csv = spark.read.csv('MyInputStream/walmart_stock.csv', header=True, inferSchema=True)
df_csv.show(3)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 3 rows


In [28]:
print(df_csv.schema)

StructType([StructField('Date', DateType(), True), StructField('Open', DoubleType(), True), StructField('High', DoubleType(), True), StructField('Low', DoubleType(), True), StructField('Close', DoubleType(), True), StructField('Volume', IntegerType(), True), StructField('Adj Close', DoubleType(), True)])


### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [29]:
schema = StructType([StructField('Date', DateType(), True), 
                    StructField('Open', DoubleType(), True), 
                    StructField('High', DoubleType(), True), 
                    StructField('Low', DoubleType(), True), 
                    StructField('Close', DoubleType(), True), 
                    StructField('Volume', IntegerType(), True), 
                    StructField('Adj Close', DoubleType(), True)])

df = spark.readStream.format('csv').schema(schema)\
            .option('header', True)\
            .load('MyInputStream/')

### Make sure the dataframe is streaming the files from the folder

In [42]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [45]:
writer = df.writeStream \
          .format("memory") \
          .queryName("stock1") \
          .outputMode("append") \
          .option("checkointLocation" , "stock_chkp") \
          .start()

25/07/12 08:46:11 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0d41b5ea-fdcf-4473-9021-0a0d97396f42. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/12 08:46:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Start the write stream and make sure it works (read all columns from the table)

In [47]:
spark.sql("SELECT * FROM stock1").show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [48]:
df_clean = df.na.drop(how='all') \
            .withColumn('diff', col('High') - col('Low'))

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [52]:
writer = df_clean.writeStream \
              .format("memory") \
              .queryName("modified_data1") \
              .outputMode("append") \
              .option("checkointLocation" , "modified_data_chkp") \
              .start()

25/07/12 08:56:32 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ddd9bb36-fd46-4497-bf8d-db004fa27c1d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/12 08:56:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [53]:
spark.sql("SELECT * FROM modified_data1").show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+-------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|               diff|
+----------+------------------+------------------+------------------+------------------+--------+------------------+-------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996| 1.1900019999999998|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475| 0.8799969999999959|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|               1.25|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922| 0.5800020000000004|
|2012-01-09|         59.029999|         5

### Write the generated data into files instead of the memory. 

In [55]:
writer_disk = df_clean.writeStream \
                    .format("csv") \
                    .option("checkpointLocation", "MyInputStream/chkp")\
                    .option("path", "MyInputStream/newfiles")\
                    .option("header", True)\
                    .outputMode("append")

In [56]:
query = writer_disk.start()

25/07/12 09:03:54 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [57]:
query.stop()

25/07/12 09:05:38 WARN DAGScheduler: Failed to cancel job group 9f1fd579-d25c-48c1-a7a6-d9374c9f674e. Cannot find active jobs for it.
25/07/12 09:05:38 WARN DAGScheduler: Failed to cancel job group 9f1fd579-d25c-48c1-a7a6-d9374c9f674e. Cannot find active jobs for it.


In [58]:
df_gernerated = spark.read.csv("MyInputStream/newfiles/", header=True, inferSchema=True)
df_gernerated.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+-------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|               diff|
+----------+------------------+------------------+------------------+------------------+--------+------------------+-------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996| 1.1900019999999998|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475| 0.8799969999999959|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|               1.25|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922| 0.5800020000000004|
|2012-01-09|         59.029999|         5

### Sort the dataframe based on the Date

In [60]:
df_gernerated.sort('Date').show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+-------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|               diff|
+----------+------------------+------------------+------------------+------------------+--------+------------------+-------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996| 1.1900019999999998|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475| 0.8799969999999959|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|               1.25|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922| 0.5800020000000004|
|2012-01-09|         59.029999|         5