In [2]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 31.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=4a52b9f251dc0304180fb7333fdbd5f5b9383d180515a196eace180efe221e79
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [3]:
from pyspark.sql.types import (StructType, StructField,
                               StringType, IntegerType,FloatType,DateType)

In [4]:
recordSchema = StructType([StructField('ID', StringType(), True),
                           StructField('Date', StringType(), True),
                           StructField('Open', FloatType(), True),
                           StructField('High', FloatType(), True),
                           StructField('Low', FloatType(), True),
                           StructField('Close', FloatType(), True),
                           StructField('Adj Close', IntegerType(), True),
                           StructField('Volume', IntegerType(), True)])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [5]:
df = spark.readStream.format("csv") \
    .schema(recordSchema) \
    .load("/content/stream_test")

### Make sure the sataframe is streaming the files from the folder

In [6]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [7]:
writer = df.writeStream.outputMode("append") \
    .format("memory")  \
    .queryName('stock')

In [8]:
query= writer.start()

### Start the write stream and make sure it works (read all columns from the table)

In [9]:
display(spark.sql('SELECT * FROM stock').show())

+---+----+----+----+---+-----+---------+------+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



None

In [14]:
display(spark.sql('SELECT * FROM stock').show())

+----+----------+-------+-------+-------+-------+---------+------+
|  ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|
+----+----------+-------+-------+-------+-------+---------+------+
|null|      Date|   null|   null|   null|   null|     null|  null|
|   0|2000-01-04|22817.9|25696.8|22817.9|24879.3|     null|108745|
|   1|2000-01-05|24523.9|26229.9|23670.9|24417.3|     null|175990|
|   2|2000-01-06|24381.7|24666.1|22746.8|22817.9|     null| 71746|
|   3|2000-01-07|22036.0|24879.3|22036.0|23884.2|     null|120984|
|   4|2000-01-10|24879.3|25519.1|23813.1|24061.9|     null|151371|
|   5|2000-01-11|24168.5|25021.5|23955.2|24239.6|     null| 95943|
|   6|2000-01-12|24168.5|24452.8|23457.6|23670.9|     null| 61899|
|   7|2000-01-13|23670.9|24132.9|23102.2|23244.4|     null| 57538|
|   8|2000-01-14|23457.6|24168.5|22746.8|23244.4|     null| 84267|
|   9|2000-01-17|22533.6|23457.6|22533.6|23457.6|     null| 67807|
|  10|2000-01-18|23457.6|23742.0|22746.8|23422.1|     null| 27

None

In [15]:
spark.sql('SELECT Count(*) FROM stock').show()

+--------+
|count(1)|
+--------+
|      82|
+--------+



In [16]:
display(spark.sql('SELECT * FROM stock').show())

+----+----------+-------+-------+-------+-------+---------+------+
|  ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|
+----+----------+-------+-------+-------+-------+---------+------+
|null|      Date|   null|   null|   null|   null|     null|  null|
|   0|2000-01-04|22817.9|25696.8|22817.9|24879.3|     null|108745|
|   1|2000-01-05|24523.9|26229.9|23670.9|24417.3|     null|175990|
|   2|2000-01-06|24381.7|24666.1|22746.8|22817.9|     null| 71746|
|   3|2000-01-07|22036.0|24879.3|22036.0|23884.2|     null|120984|
|   4|2000-01-10|24879.3|25519.1|23813.1|24061.9|     null|151371|
|   5|2000-01-11|24168.5|25021.5|23955.2|24239.6|     null| 95943|
|   6|2000-01-12|24168.5|24452.8|23457.6|23670.9|     null| 61899|
|   7|2000-01-13|23670.9|24132.9|23102.2|23244.4|     null| 57538|
|   8|2000-01-14|23457.6|24168.5|22746.8|23244.4|     null| 84267|
|   9|2000-01-17|22533.6|23457.6|22533.6|23457.6|     null| 67807|
|  10|2000-01-18|23457.6|23742.0|22746.8|23422.1|     null| 27

None

In [17]:
spark.sql('SELECT Count(*) FROM stock').show()

+--------+
|count(1)|
+--------+
|     412|
+--------+



### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [18]:
df2 = df.dropna(thresh=3)

In [19]:
df2.isStreaming

True

In [20]:
df2 = df2.withColumn('diff',df2.High - df2.Low)

In [21]:
query.stop()

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [22]:
writer2 = df2.writeStream.outputMode("append") \
    .format("memory")  \
    .queryName('modified_data')

In [23]:
query2 = writer2.start()

In [24]:
display(spark.sql('SELECT * FROM modified_data').show())

+---+----------+-------+-------+-------+-------+---------+------+---------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+------+---------+
|240|2000-12-05|26585.3|27367.3|26372.1|27011.8|     null| 91019| 995.2012|
|241|2000-12-06|27011.8|27509.4|26798.6|26869.7|     null|105791| 710.8008|
|242|2000-12-07|27011.8|27011.8|26478.7|26656.4|     null| 40656|533.10156|
|243|2000-12-08|26656.4|27722.7|26656.4|27651.6|     null|149964|1066.2988|
|244|2000-12-11|27687.1|28860.0|27651.6|28078.1|     null|159671|1208.4004|
|245|2000-12-12|28042.6|28078.1|27438.3|27935.9|     null| 74560| 639.7988|
|246|2000-12-13|27651.6|29286.5|27651.6|28469.1|     null|270385|1634.9004|
|247|2000-12-14|28469.1|29784.1|28291.3|28362.4|     null|256317|1492.7988|
|248|2000-12-15|28362.4|28895.6|27793.8|27935.9|     null|108886|1101.7988|
|249|2000-12-18|27580.5|28433.5|27367.3|28291.3|     null| 92848|1066.1992|
|250|2000-12

None

In [25]:
query2.stop()

### Write the generated data into files instead of the memory. 

In [26]:
writer3 = df2.writeStream.outputMode("append") \
    .format("parquet") \
    .option("path", "OutStream/") \
    .option("checkpointLocation", "chkpnt") 
query = writer3.start()


In [27]:
query.stop()

### Stop the query. Now, try reading the generated parquet files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [28]:
df = spark.read.parquet('OutStream/')
df.show()

+---+----------+-------+-------+-------+-------+---------+------+---------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+------+---------+
|240|2000-12-05|26585.3|27367.3|26372.1|27011.8|     null| 91019| 995.2012|
|241|2000-12-06|27011.8|27509.4|26798.6|26869.7|     null|105791| 710.8008|
|242|2000-12-07|27011.8|27011.8|26478.7|26656.4|     null| 40656|533.10156|
|243|2000-12-08|26656.4|27722.7|26656.4|27651.6|     null|149964|1066.2988|
|244|2000-12-11|27687.1|28860.0|27651.6|28078.1|     null|159671|1208.4004|
|245|2000-12-12|28042.6|28078.1|27438.3|27935.9|     null| 74560| 639.7988|
|246|2000-12-13|27651.6|29286.5|27651.6|28469.1|     null|270385|1634.9004|
|247|2000-12-14|28469.1|29784.1|28291.3|28362.4|     null|256317|1492.7988|
|248|2000-12-15|28362.4|28895.6|27793.8|27935.9|     null|108886|1101.7988|
|249|2000-12-18|27580.5|28433.5|27367.3|28291.3|     null| 92848|1066.1992|
|250|2000-12

### Sort the dataframe based on the ID

In [29]:
finalDFSorted = df.sort('ID')
finalDFSorted.show()

+---+----------+-------+-------+-------+-------+---------+------+---------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+------+---------+
|  0|2000-01-04|22817.9|25696.8|22817.9|24879.3|     null|108745|2878.9004|
|  1|2000-01-05|24523.9|26229.9|23670.9|24417.3|     null|175990|   2559.0|
| 10|2000-01-18|23457.6|23742.0|22746.8|23422.1|     null| 27995| 995.1992|
|100|2000-05-23|17557.7|17913.1|16775.8|17557.7|     null| 39671|1137.2988|
|101|2000-05-24|17664.3|18481.8|16846.9|17415.5|     null| 57256|1634.9004|
|102|2000-05-25|18339.6|19157.1|17628.8|18695.0|     null|219319|1528.2988|
|103|2000-05-26|18695.0|20116.7|18126.4|18979.4|     null| 95521|1990.2988|
|104|2000-05-29|18268.5|19334.8|18268.5|19192.6|     null| 59929|1066.3008|
|105|2000-05-30|19192.6|19192.6|19192.6|19192.6|     null|     0|      0.0|
|106|2000-05-31|19761.3|21183.0|19761.3|21183.0|     null| 96787|1421.6992|
|107|2000-06

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5