In [1]:
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=720f84bfbe93a60aca03570b9b0742c4730593bc61ef5a1fbf094381c953cdc4
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('RDD').getOrCreate()

sc = spark.sparkContext

In [3]:
spark

In [4]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType ,DateType,DoubleType

In [12]:
# Date,Open,High,Low,Close,Adj Close,Volume
mySchema = StructType([StructField('ID', IntegerType(), True),
                          StructField('Date', StringType(), True),
                           StructField('Open', DoubleType(), True),
                           StructField('High', DoubleType(), True),
                           StructField('Low', DoubleType(), True),
                           StructField('Close', DoubleType(), True),
                           StructField('Adj Close', DoubleType(), True),
                           StructField('Volume', DoubleType(), True),])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [93]:
df = spark.readStream.format('csv')\
.schema(mySchema)\
.option('header',True)\
.load('InputStream/')

### Make sure the sataframe is streaming the files from the folder

In [14]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)



In [15]:
df.isStreaming

True

True

### Create a stream writer into memory and specify the query name "stock:

In [84]:
writer = df.writeStream.format('memory')\
.outputMode('append')\
.option('truncate',False)\
.queryName("stock")
# .option('numRows',100)

### Start the write stream and make sure it works (read all columns from the table)

In [85]:
query = writer.start()


In [48]:
df.columns

['ID', 'Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

In [49]:
df_sql1 = spark.sql('select * from  stock')

df_sql1.show(100)

+---+----+----+----+---+-----+---------+------+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



+---+----+----+----+---+-----+---------+------+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



                                                                                

In [86]:
df_sql = spark.sql('select * from  stock')

df_sql.show(100)

+---+----------+------------+------------+------------+------------+------------+--------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|
+---+----------+------------+------------+------------+------------+------------+--------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466.0|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|
|126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.408203| 86236.0|
|127|2000-06-29|25234.699219|25234.699219|23919.699219|24239.599609|22906.365234| 45299.0|

+----+----------+------------+------------+------------+------------+------------+------+
|  ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|null|      null|        null|        null|        null|        null|        null|  null|
| 120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|
| 121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651|
| 122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209|
| 123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|
| 124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|
| 125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|
| 126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.408203| 86236|
| 127|2000

In [76]:
query.stop()

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [87]:
df_drop = df_sql.dropna(how ='all')
df_drop.show()

+---+----------+------------+------------+------------+------------+------------+--------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|
+---+----------+------------+------------+------------+------------+------------+--------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466.0|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|
|126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.408203| 86236.0|
|127|2000-06-29|25234.699219|25234.699219|23919.699219|24239.599609|22906.365234| 45299.0|

In [88]:
from pyspark.sql.functions import col

In [89]:
df_diff = df_drop.withColumn("diff", col("high") - col("low"))
df_diff.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466.0|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|1777.0996090000008|
|126|2000-

In [90]:
query.stop()

In [94]:
df_drop = df.na.drop(how ='all')
df_diff = df_drop.withColumn("diff", col("high") - col("low"))
df_diff

DataFrame[ID: int, Date: string, Open: double, High: double, Low: double, Close: double, Adj Close: double, Volume: double, diff: double]

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [139]:
writer = df_diff.writeStream.format('memory')\
.outputMode('append')\
.option('truncate',False)\
.queryName("modified_data")


In [140]:
query = writer.start()

In [141]:
res = spark.sql('select * from  modified_data')
res.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466.0|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|1777.0996090000008|
|126|2000-

In [None]:
###

+---+----+----+----+---+-----+---------+------+----+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|diff|
+---+----+----+----+---+-----+---------+------+----+
+---+----+----+----+---+-----+---------+------+----+



In [142]:
res = spark.sql('select * from  modified_data')
res.show(100)

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466.0|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|1777.0996090000008|
|126|2000-

In [None]:
###

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|1777.0996090000008|
|126|2000-06-28|23884.199219

In [143]:
query.stop()

### Write the generated data into files instead of the memory. 

In [145]:
writer = df_diff.writeStream.format('csv')\
.outputMode('append')\
.option('checkpointlocation','chkpnt4')\
.option('header',True)\
.option('path','OutStream1/')
q = writer.start()

In [146]:
q.stop()

### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [129]:
mySchema = StructType([StructField('ID', IntegerType(), True),
                          StructField('Date', StringType(), True),
                           StructField('Open', DoubleType(), True),
                           StructField('High', DoubleType(), True),
                           StructField('Low', DoubleType(), True),
                           StructField('Close', DoubleType(), True),
                           StructField('Adj Close', DoubleType(), True),
                           StructField('Volume', DoubleType(), True),
                          StructField('diff', DoubleType(), True),
                       ])

In [130]:
df = spark.readStream.format('csv')\
.schema(mySchema)\
.option('header',True)\
.load('/content/OutStream/')

In [131]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- diff: double (nullable = true)



In [132]:
writer = df.writeStream.format('memory')\
.outputMode('append')\
.option('truncate',False)\
.option('header',True)\
.queryName("out_data")

In [133]:
query = writer.start()

In [134]:
res = spark.sql('select * from  out_data')
res.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|1777.0996090000008|
|126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.408203| 86236.0| 781.9003900000025|
|127|2000-

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|1777.0996090000008|
|126|2000-06-28|23884.199219

In [128]:
query.stop()

### Sort the dataframe based on the ID

In [136]:
finalDFSorted = res.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745.0|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990.0|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746.0| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984.0|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371.0|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943.0|1066.3007810000017|
|  6|2000-

In [147]:
df = spark.read.csv('/content/OutStream1', header=True, )

In [148]:
df.show(5,truncate=False)

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|ID |Date      |Open        |High        |Low         |Close       |Adj Close   |Volume  |diff              |
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813|34466.0 |1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375|68651.0 |995.0996099999975 |
|122|2000-06-22|23386.599609|23386.599609|22462.5     |23031.099609|21764.335938|97209.0 |924.0996090000008 |
|123|2000-06-23|22107.099609|24097.400391|22107.099609|22889.0     |21630.052734|199483.0|1990.3007819999984|
|124|2000-06-26|23102.199219|24168.5     |22569.099609|24026.300781|22704.796875|121969.0|1599.4003909999992|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
only showi

In [150]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- diff: double (nullable = true)



In [152]:
df.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466.0|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651.0| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209.0| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483.0|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969.0|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809.0|1777.0996090000008|
|126|2000-

In [151]:
finalDFSorted = df.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745.0|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990.0|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746.0| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984.0|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371.0|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943.0|1066.3007810000017|
|  6|2000-

In [None]:
df_diff.write.format('csv')\
.option('header','true')\
.mode('append')\
.save('outData')