In [5]:
from google.colab import files
uploaded = files.upload()

Saving kospi.zip to kospi.zip


In [7]:
import zipfile
import io

zip_ref = zipfile.ZipFile(io.BytesIO(uploaded['kospi.zip']), 'r')
zip_ref.extractall("/content/kospi")
zip_ref.close()

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5a7d0d8d1a7a62ae50bc714e606f46d024cbd9ffa4191ac41704b49635298bc6
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType, IntegerType

In [3]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [4]:
spark = SparkSession.builder \
    .appName("Streaming") \
    .getOrCreate()

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [8]:
example_df=spark.read.option("header", "true").option("inferSchema", "true").csv("/content/kospi/kospi/KOSPI_STOCK_1.csv")

In [9]:
example_df.show(5)

+---+----------+------------+------------+------------+------------+------------+------+
|_c0|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
| 40|2000-02-29|25163.699219|26087.699219|24239.599609|25519.099609|24115.490234|233246|
| 41|2000-03-01|25519.099609|25519.099609|25519.099609|25519.099609|24115.490234|     0|
| 42|2000-03-02|25767.900391|29144.300781|25767.900391|28575.699219|27003.972656|408391|
| 43|2000-03-03|27793.800781|29499.800781|26798.599609|27864.800781|26332.175781|216505|
| 44|2000-03-06|28291.300781|29144.300781|27367.300781|28717.800781|27138.255859|170784|
+---+----------+------------+------------+------------+------------+------------+------+
only showing top 5 rows



In [10]:
example_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [21]:
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Date", DateType(), True),
    StructField("Open", DoubleType(), True),
    StructField("High", DoubleType(), True),
    StructField("Low", DoubleType(), True),
    StructField("Close", DoubleType(), True),
    StructField("Adj Close", DoubleType(), True),
    StructField("Volume", IntegerType(), True)
])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [31]:
streaming_df = spark.readStream \
    .schema(schema) \
    .format("csv") \
    .load("/content/kospi/kospi/*.csv")

### Make sure the dataframe is streaming the files from the folder

In [23]:
streaming_df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [24]:
stream_writer = streaming_df.writeStream \
    .queryName("stock") \
    .format("memory")

### Start the write stream and make sure it works (read all columns from the table)

In [27]:
query = stream_writer.start()

In [28]:
query.isActive

True

In [32]:
# Show the output DataFrame with all columns
output_df = spark.sql("SELECT * FROM stock")
output_df.show(5)

+----+----------+------------+------------+------------+------------+------------+------+
|  ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|NULL|      NULL|        NULL|        NULL|        NULL|        NULL|        NULL|  NULL|
| 240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
| 241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
| 242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
| 243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
+----+----------+------------+------------+------------+------------+------------+------+
only showing top 5 rows



### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [33]:
streaming_df = streaming_df.dropna(how="all")

In [34]:
streaming_df = streaming_df.withColumn("diff", streaming_df["High"] - streaming_df["Low"])

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [36]:
stream_writer2 = streaming_df.writeStream \
    .queryName("modified_data") \
    .format("memory")

In [37]:
query2 = stream_writer2.start()

In [38]:
output_df2 = spark.sql("SELECT * FROM modified_data")
output_df2.show(0)

+---+----+----+----+---+-----+---------+------+----+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|diff|
+---+----+----+----+---+-----+---------+------+----+
+---+----+----+----+---+-----+---------+------+----+
only showing top 0 rows



In [39]:
output_df2.show(5)

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
only showing top 5 rows



In [43]:
query2.stop()

### Write the generated data into files instead of the memory.

In [41]:
import os
# Create a directory to store the generated files
output_dir = "/content/generated_data"
os.makedirs(output_dir, exist_ok=True)

In [44]:
write_query =stream_writer2 \
    .format("csv") \
    .option("path", output_dir) \
    .option("checkpointLocation", os.path.join(output_dir, "checkpoint")) \
    .start()

In [46]:
#write_query.awaitTermination()

### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [47]:
write_query.stop()

In [48]:
generated_data_df = spark.read \
    .schema(schema) \
    .format("csv") \
    .load("/content/generated_data")

In [49]:
generated_data_df.show(10)

+---+----------+------------+------------+------------+------------+------------+------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|
|246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385|
|247|2000-12-14|28469.099609|29784.099609|28291.300781|28362.400391| 26802.40625|256317|
|248|2000-12-15|28362

### Sort the dataframe based on the ID

In [50]:
finalDFSorted =generated_data_df.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|
|  6|2000-01-12|     24168.5|24452.800781|23457.599609|23670.900391|22368.947266| 61899|
|  7|2000-01-13|23670.900391|24132.900391|23102.199219|23244.400391| 21965.90625| 57538|
|  8|2000-01-14|23457