In [2]:
#run only if you will use vs code
import os
os.chdir('/home/jovyan')
!pwd
!ls


/home/jovyan
graphframes-0.8.2-spark3.2-s_2.12.jar  kospi  work


In [3]:
!pip install findspark
!pyspark --version

[0mCollecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.15
Branch HEAD
Compiled by user hgao on 2022-01-20T19:26:14Z
Revision 4f25b3f71238a00508a356591553f2dfa89f8290
Url https://github.com/apache/spark
Type --help for more information.


In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [5]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [6]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import split , explode
from pyspark.sql.types import *

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [7]:
Schema = StructType([StructField('id', IntegerType(), True),
                           StructField('Date', DateType(), True),
                           StructField('Open', DoubleType(), True),
                           StructField('High', DoubleType(), True),
                           StructField('Low', DoubleType(), True),
                           StructField('Close', DoubleType(), True),
                           StructField('Adj Close', DoubleType(), True),
                           StructField('Volume', IntegerType(), True),
                       ])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [8]:
df = spark.readStream.format('csv').option('header','True').schema(Schema).load("kospi")

### Make sure the sataframe is streaming the files from the folder

In [9]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [10]:
writer = df.writeStream.outputMode("append") \
    .format("memory")  \
    .queryName('stock')

### Start the write stream and make sure it works (read all columns from the table)

In [11]:
stream = writer.start()

In [14]:
spark.sql('select * from stock').show()

+---+----------+------------+------------+------------+------------+------------+------+
| id|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|
|246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385|
|247|2000-12-14|28469.099609|29784.099609|28291.300781|28362.400391| 26802.40625|256317|
|248|2000-12-15|28362

In [15]:
stream.stop()

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [17]:
df2 = df.na.drop("all")
# df2.show()

In [18]:
df3 =df2.withColumn("diff",col("High")- col("low")) 
# df3.show()

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [19]:
writer = df3.writeStream.outputMode("append") \
    .format("memory")  \
    .queryName('modified_data')

In [20]:
stream = writer.start()

In [21]:
spark.sql("SELECT * FROM modified_data").show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| id|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

In [22]:
stream.stop()

### Write the generated data into files instead of the memory. 

In [35]:
writer = df3.writeStream.format("csv")\
                .option("path", "csv")\
                .option("checkpointLocation", "checkpoint")\
                .outputMode("append")

In [36]:
stream = writer.start()

In [37]:
stream.stop()

### Stop the query. Now, try reading the generated parquet files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [38]:
df5 = spark.read.csv("csv",schema = Schema)

In [39]:
sort_df = df5.sort('ID')
sort_df.show()

+---+----------+------------+------------+------------+------------+------------+------+
| id|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|
|  6|2000-01-12|     24168.5|24452.800781|23457.599609|23670.900391|22368.947266| 61899|
|  7|2000-01-13|23670.900391|24132.900391|23102.199219|23244.400391| 21965.90625| 57538|
|  8|2000-01-14|23457