In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [None]:
!pip install pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=f7351f0ff1545d852c7d7dd564ab19686a7e60746ba1f43900733d3f04446c8b
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder\
.master ("local")\
.appName ("Colab")\
.config ('spark.ui.port', '4050')\
.getOrCreate ()
spark

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [None]:
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Date", StringType(), True),
    StructField("Open", DoubleType(), True),
    StructField("High", DoubleType(), True),
    StructField("Low", DoubleType(), True),
    StructField("Close", DoubleType(), True),
    StructField("Adj Close", DoubleType(), True),
    StructField("Volume", DoubleType(), True)
])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [None]:
df = spark.readStream.format("csv").schema(schema).load("/content/drive/MyDrive/InputData")

### Make sure the dataframe is streaming the files from the folder

In [None]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [None]:
steamWriter1 = df.writeStream.outputMode("append") \
    .format("memory")\
    .queryName('stock')

### Start the write stream and make sure it works (read all columns from the table)

In [None]:
#query1.stop()

In [None]:
query1= steamWriter1.start()
df2 = spark.sql('SELECT * FROM stock')
df2.show()

+---+----+----+----+---+-----+---------+------+
|   |Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



In [None]:
df2 = spark.sql('SELECT * FROM stock')
df2.show()

+----+----------+------------+------------+------------+------------+------------+--------+
|    |      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|
+----+----------+------------+------------+------------+------------+------------+--------+
|NULL|      Date|        NULL|        NULL|        NULL|        NULL|        NULL|    NULL|
| 120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466.0|
| 121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0|
| 122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0|
| 123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|
| 124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|
| 125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|
| 126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.40820

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [None]:

df_2 = df.na.drop(how='any')

df3 = df_2.withColumn("Diff", df_2["High"] - df_2["Low"])

### Create a new write stream using the new generated dataframe and call the generated table "modified_data"

In [None]:
steamWriter2 = df3.writeStream.outputMode("append").format("memory").queryName('modified_data')

In [None]:
query2 = steamWriter2.start()
df5 = spark.sql('SELECT * FROM modified_data')
df5.show()

+---+----+----+----+---+-----+---------+------+----+
|   |Date|Open|High|Low|Close|Adj Close|Volume|Diff|
+---+----+----+----+---+-----+---------+------+----+
+---+----+----+----+---+-----+---------+------+----+



In [None]:
df5 = spark.sql('SELECT * FROM modified_data')
df5.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|   |      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466.0|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|1777.0996090000008|
|126|2000-

### Write the generated data into files instead of the memory.

In [None]:
csvWriter3 = df3.writeStream.outputMode("append").format("csv").option("path", "/content/drive/MyDrive/OutputData")\
.option("checkpointLocation", "chkpnt")

In [None]:
query3 = csvWriter3.start()

### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [None]:
query1.stop()


In [None]:
schema2 = StructType([
    StructField("", IntegerType(), True),
    StructField("Date", StringType(), True),
    StructField("Open", DoubleType(), True),
    StructField("High", DoubleType(), True),
    StructField("Low", DoubleType(), True),
    StructField("Close", DoubleType(), True),
    StructField("Adj Close", DoubleType(), True),
    StructField("Volume", DoubleType(), True),
    StructField('Diff', DoubleType(), True)])

In [None]:
df6 = spark.read.format("csv").schema(schema2).load("/content/drive/MyDrive/OutputData")

In [None]:
df6.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|   |      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466.0|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|1777.0996090000008|
|126|2000-

### Sort the dataframe based on the ID

In [None]:
df6.sort('ID').show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|   |      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              Diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745.0|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990.0|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746.0| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984.0|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371.0|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943.0|1066.3007810000017|
|  6|2000-