In [21]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [49]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [50]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [77]:
from pyspark.sql.types import (StructType, StructField,
                               StringType, IntegerType,FloatType)

MySchema = StructType([StructField('id', StringType(), True),
                       StructField('date', StringType(), True),
                           StructField('open', FloatType(), True),
                           StructField('High', FloatType(), True),
                           StructField('Low', FloatType(), True),
                           StructField('Close', FloatType(), True),
                           StructField('AdjClose', FloatType(), True),
                           StructField('Volume', FloatType(), True) 
                           ])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [78]:
df = spark.readStream.format("csv").option('header' , 'true') \
    .schema(MySchema) \
    .load("/content/IpStream/")

### Make sure the sataframe is streaming the files from the folder

In [79]:
df.printSchema(
)

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- AdjClose: float (nullable = true)
 |-- Volume: float (nullable = true)



### Create a stream writer into memory and specify the query name "stock:

In [63]:
writer = df.writeStream.outputMode('append')\
                        .format('memory')\
                        .queryName('stock')

### Start the write stream and make sure it works (read all columns from the table)

In [64]:
query=writer.start()

In [69]:
spark.sql('select * from stock').show(100)

+---+----------+-------+-------+-------+-------+---------+--------+
| id|      date|   open|   High|    Low|  Close| AdjClose|  Volume|
+---+----------+-------+-------+-------+-------+---------+--------+
|  0|2000-01-04|22817.9|25696.8|22817.9|24879.3| 23510.88|108745.0|
|  1|2000-01-05|24523.9|26229.9|23670.9|24417.3|23074.295|175990.0|
|  2|2000-01-06|24381.7|24666.1|22746.8|22817.9|21562.865| 71746.0|
|  3|2000-01-07|22036.0|24879.3|22036.0|23884.2|22570.514|120984.0|
|  4|2000-01-10|24879.3|25519.1|23813.1|24061.9| 22738.44|151371.0|
|  5|2000-01-11|24168.5|25021.5|23955.2|24239.6|22906.365| 95943.0|
|  6|2000-01-12|24168.5|24452.8|23457.6|23670.9|22368.947| 61899.0|
|  7|2000-01-13|23670.9|24132.9|23102.2|23244.4|21965.906| 57538.0|
|  8|2000-01-14|23457.6|24168.5|22746.8|23244.4|21965.906| 84267.0|
|  9|2000-01-17|22533.6|23457.6|22533.6|23457.6|22167.377| 67807.0|
| 10|2000-01-18|23457.6|23742.0|22746.8|23422.1|22133.832| 27995.0|
| 11|2000-01-19|22817.9|23173.3|22036.0|22036.0|

In [70]:
query.stop()

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [71]:
df2=df.na.drop()

In [75]:
df3=df2.withColumn("diff", df.High - df.Low)

In [76]:
df3.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- AdjClose: float (nullable = true)
 |-- Volume: float (nullable = true)
 |-- diff: float (nullable = true)



### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [80]:
writer2 = df3.writeStream.outputMode('append')\
                        .format('memory')\
                        .queryName('modified_data')

In [81]:
query2=writer2.start()

In [83]:
spark.sql('select * from modified_data' ).show(30)

+---+----------+-------+-------+-------+-------+---------+--------+---------+
| id|      date|   open|   High|    Low|  Close| AdjClose|  Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+--------+---------+
|240|2000-12-05|26585.3|27367.3|26372.1|27011.8|25526.092| 91019.0| 995.2012|
|241|2000-12-06|27011.8|27509.4|26798.6|26869.7|25391.805|105791.0| 710.8008|
|242|2000-12-07|27011.8|27011.8|26478.7|26656.4|25190.236| 40656.0|533.10156|
|243|2000-12-08|26656.4|27722.7|26656.4|27651.6|  26130.7|149964.0|1066.2988|
|244|2000-12-11|27687.1|28860.0|27651.6|28078.1| 26533.74|159671.0|1208.4004|
|245|2000-12-12|28042.6|28078.1|27438.3|27935.9|26399.361| 74560.0| 639.7988|
|246|2000-12-13|27651.6|29286.5|27651.6|28469.1|26903.234|270385.0|1634.9004|
|247|2000-12-14|28469.1|29784.1|28291.3|28362.4|26802.406|256317.0|1492.7988|
|248|2000-12-15|28362.4|28895.6|27793.8|27935.9|26399.361|108886.0|1101.7988|
|249|2000-12-18|27580.5|28433.5|27367.3|28291.3|26735.217| 92848

In [84]:
query2.stop()

### Write the generated data into files instead of the memory. 

In [143]:
writer3 = df3.writeStream.outputMode('append')\
                      .format('csv').option('path' , '/content/OpStream3').option("checkpointLocation", 'ch7') 


In [144]:
query3=writer3.start()

In [145]:
spark.sql('select * from modified_data' ).show(10)

+---+----------+-------+-------+-------+-------+---------+--------+---------+
| id|      date|   open|   High|    Low|  Close| AdjClose|  Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+--------+---------+
|240|2000-12-05|26585.3|27367.3|26372.1|27011.8|25526.092| 91019.0| 995.2012|
|241|2000-12-06|27011.8|27509.4|26798.6|26869.7|25391.805|105791.0| 710.8008|
|242|2000-12-07|27011.8|27011.8|26478.7|26656.4|25190.236| 40656.0|533.10156|
|243|2000-12-08|26656.4|27722.7|26656.4|27651.6|  26130.7|149964.0|1066.2988|
|244|2000-12-11|27687.1|28860.0|27651.6|28078.1| 26533.74|159671.0|1208.4004|
|245|2000-12-12|28042.6|28078.1|27438.3|27935.9|26399.361| 74560.0| 639.7988|
|246|2000-12-13|27651.6|29286.5|27651.6|28469.1|26903.234|270385.0|1634.9004|
|247|2000-12-14|28469.1|29784.1|28291.3|28362.4|26802.406|256317.0|1492.7988|
|248|2000-12-15|28362.4|28895.6|27793.8|27935.9|26399.361|108886.0|1101.7988|
|249|2000-12-18|27580.5|28433.5|27367.3|28291.3|26735.217| 92848

In [146]:
query3.stop()

### Stop the query. Now, try reading the generated parquet files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [154]:
MySchema2 = StructType([StructField('id', IntegerType(), True),
                       StructField('date', StringType(), True),
                           StructField('open', FloatType(), True),
                           StructField('High', FloatType(), True),
                           StructField('Low', FloatType(), True),
                           StructField('Close', FloatType(), True),
                           StructField('AdjClose', FloatType(), True),
                           StructField('Volume', FloatType(), True),
                           StructField('diff', FloatType(), True)  
                           ])

In [155]:
df_saved= spark.read.schema(MySchema2).csv('/content/OpStream3')

In [156]:
df_saved.show()

+---+----------+-------+-------+-------+-------+---------+--------+---------+
| id|      date|   open|   High|    Low|  Close| AdjClose|  Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+--------+---------+
|240|2000-12-05|26585.3|27367.3|26372.1|27011.8|25526.092| 91019.0| 995.2012|
|241|2000-12-06|27011.8|27509.4|26798.6|26869.7|25391.805|105791.0| 710.8008|
|242|2000-12-07|27011.8|27011.8|26478.7|26656.4|25190.236| 40656.0|533.10156|
|243|2000-12-08|26656.4|27722.7|26656.4|27651.6|  26130.7|149964.0|1066.2988|
|244|2000-12-11|27687.1|28860.0|27651.6|28078.1| 26533.74|159671.0|1208.4004|
|245|2000-12-12|28042.6|28078.1|27438.3|27935.9|26399.361| 74560.0| 639.7988|
|246|2000-12-13|27651.6|29286.5|27651.6|28469.1|26903.234|270385.0|1634.9004|
|247|2000-12-14|28469.1|29784.1|28291.3|28362.4|26802.406|256317.0|1492.7988|
|248|2000-12-15|28362.4|28895.6|27793.8|27935.9|26399.361|108886.0|1101.7988|
|249|2000-12-18|27580.5|28433.5|27367.3|28291.3|26735.217| 92848

In [157]:
from pyspark.sql.functions import col

### Sort the dataframe based on the ID

In [158]:
finalDFSorted = df_saved.sort(col('id'))
finalDFSorted.show()

+---+----------+-------+-------+-------+-------+---------+--------+---------+
| id|      date|   open|   High|    Low|  Close| AdjClose|  Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+--------+---------+
|  0|2000-01-04|22817.9|25696.8|22817.9|24879.3| 23510.88|108745.0|2878.9004|
|  1|2000-01-05|24523.9|26229.9|23670.9|24417.3|23074.295|175990.0|   2559.0|
|  2|2000-01-06|24381.7|24666.1|22746.8|22817.9|21562.865| 71746.0|1919.2988|
|  3|2000-01-07|22036.0|24879.3|22036.0|23884.2|22570.514|120984.0|2843.3008|
|  4|2000-01-10|24879.3|25519.1|23813.1|24061.9| 22738.44|151371.0|   1706.0|
|  5|2000-01-11|24168.5|25021.5|23955.2|24239.6|22906.365| 95943.0|1066.3008|
|  6|2000-01-12|24168.5|24452.8|23457.6|23670.9|22368.947| 61899.0| 995.2012|
|  7|2000-01-13|23670.9|24132.9|23102.2|23244.4|21965.906| 57538.0|1030.7012|
|  8|2000-01-14|23457.6|24168.5|22746.8|23244.4|21965.906| 84267.0|1421.6992|
|  9|2000-01-17|22533.6|23457.6|22533.6|23457.6|22167.377| 67807