In [1]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 66.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=c0ed3cf59281a9da78445c3357cdb17d705b84932e250edd490202ed99c624f0
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [3]:
from pyspark.sql.types import (StructType, StructField,DoubleType,DateType,
                               StringType, IntegerType)

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [4]:
recordSchema = StructType([StructField('Id', IntegerType(), True),
                           StructField('Date', DateType(), True),
                           StructField('Open', DoubleType(), True),
                           StructField('High', DoubleType(), True),
                           StructField('Low', DoubleType(), True),
                           StructField('Close', DoubleType(), True),
                           StructField('Adj Close', DoubleType(), True),
                           StructField('Volume', DoubleType(), True)
                           ])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [5]:
df = spark.readStream.format("csv") \
    .schema(recordSchema) \
    .load("kospi/")

### Make sure the sataframe is streaming the files from the folder

In [6]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [7]:
writer = df.writeStream.outputMode("append") \
    .format("memory")  \
    .queryName('stock')

### Start the write stream and make sure it works (read all columns from the table)

In [8]:
query= writer.start()

In [9]:
spark.sql('SELECT * FROM stock').show()

+---+----+----+----+---+-----+---------+------+
| Id|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



In [10]:
spark.sql('SELECT * FROM stock').show()

+----+----------+------------+------------+------------+------------+------------+--------+
|  Id|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|
+----+----------+------------+------------+------------+------------+------------+--------+
|null|      null|        null|        null|        null|        null|        null|    null|
| 240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019.0|
| 241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791.0|
| 242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656.0|
| 243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964.0|
| 244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671.0|
| 245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560.0|
| 246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.23437

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [11]:
type(df)

pyspark.sql.dataframe.DataFrame

In [12]:
df_v1 = df.dropna(how="all")
writer = df_v1.writeStream.outputMode("append") \
    .format("memory")  \
    .queryName('stock_v1')
query= writer.start()


In [13]:
spark.sql('SELECT * FROM stock_v1').show()

+---+----------+------------+------------+------------+------------+------------+--------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|
+---+----------+------------+------------+------------+------------+------------+--------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019.0|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791.0|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656.0|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964.0|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671.0|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560.0|
|246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385.0|
|247|2000-12-14|28469.099609|29784.099609|28291.300781|28362.400391| 26802.40625|256317.0|

In [14]:
diff = df_v1['High'] - df_v1['Low']
df_v2 = df_v1.withColumn("diff",diff)

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [15]:
writer = df_v2.writeStream.outputMode("append") \
    .format("memory")  \
    .queryName('stock_v2')
query= writer.start()

In [16]:
spark.sql('SELECT * FROM stock_v2').show()

+---+----+----+----+---+-----+---------+------+----+
| Id|Date|Open|High|Low|Close|Adj Close|Volume|diff|
+---+----+----+----+---+-----+---------+------+----+
+---+----+----+----+---+-----+---------+------+----+



+---+----+----+----+---+-----+---------+------+----+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|diff|
+---+----+----+----+---+-----+---------+------+----+
+---+----+----+----+---+-----+---------+------+----+



In [17]:
spark.sql('SELECT * FROM stock_v2').show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019.0|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791.0| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656.0| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964.0| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671.0|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560.0|  639.798827999999|
|246|2000-

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|1777.0996090000008|
|126|2000-06-28|23884.199219

### Write the generated data into files instead of the memory. 

In [18]:
writer = df_v2.writeStream.outputMode("append")\
.format("parquet")\
.option("path", "Output/")\
.option("checkpointLocation", "chkpnt")

query = writer.start()

### Stop the query. Now, try reading the generated parquet files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [25]:
# Stop Query
query.stop()

# Read From 
# out_df = spark.read.parquet("Output/")
out_df = spark.read.option("header","true").option("recursiveFileLookup","true").parquet("Output/")

Row(Id=240, Date=datetime.date(2000, 12, 5), Open=26585.300781, High=27367.300781, Low=26372.099609, Close=27011.800781, Adj Close=25526.091797, Volume=91019.0, diff=995.201172000001)

In [27]:
out_df.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019.0|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791.0| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656.0| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964.0| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671.0|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560.0|  639.798827999999|
|246|2000-

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|1777.0996090000008|
|126|2000-06-28|23884.199219

### Sort the dataframe based on the ID

In [28]:

finalDFSorted = out_df.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| Id|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745.0|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990.0|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746.0| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984.0|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371.0|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943.0|1066.3007810000017|
|  6|2000-

In [None]:
finalDFSorted = finalDF.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5