<a href="https://colab.research.google.com/github/Asmaasa3d/Pyspark/blob/main/AsmaaSaeedStreaming_P_S__Student.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [None]:

from pyspark.sql.types import  StructType,StructField,DateType,FloatType,DoubleType,StringType



recordSchema = StructType([StructField('ID', StringType(), True),
                           StructField('Date', StringType(), True),
                           StructField('Open', DoubleType(), True),
                           StructField('High', DoubleType(), True),
                           StructField('Low', DoubleType(), True),
                           StructField('Close', DoubleType(), True),
                           StructField('Adj Close', DoubleType(), True),
                           StructField('Volume', DoubleType(), True),
                           ])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [None]:
df = spark.readStream.format('csv').schema(recordSchema).option("dateFormat", "m/d/YYYY").load('/content/InStream/')

In [None]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)



### Make sure the sataframe is streaming the files from the folder

In [None]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [None]:
writer=df.writeStream.outputMode('append').format('memory').queryName('stock')\
 .option("truncate", False) \
    .option("numRows", 100) \
.option("checkpointLocation", "oo")

### Start the write stream and make sure it works (read all columns from the table)

In [None]:
query= writer.start()

In [None]:
spark.sql('SELECT * FROM stock').show()

+----+----+----+---+-----+---------+------+
|Date|Open|High|Low|Close|Adj Close|Volume|
+----+----+----+---+-----+---------+------+
+----+----+----+---+-----+---------+------+



In [None]:
spark.sql('SELECT * FROM stock').show()

+----+----------+------------+------------+------------+------------+------------+--------+
|  ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|
+----+----------+------------+------------+------------+------------+------------+--------+
|null|      Date|        null|        null|        null|        null|        null|    null|
| 240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019.0|
| 241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791.0|
| 242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656.0|
| 243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964.0|
| 244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671.0|
| 245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560.0|
| 246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.23437

In [None]:
spark.sql('SELECT count(*)FROM stock').show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [None]:
query.stop()

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [None]:
# df3=df.dropna(how='all')
from pyspark.sql.functions import col
df3 = df.where(col("ID").isNotNull())
df3=df3.withColumn('diff',df['High']-df['Low'])

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [None]:
writer2 = df3.writeStream.outputMode("append") \
    .queryName('modified_data')\
    .format("memory") \
    .option("truncate", False) \
    .option("numRows", 20) 

In [None]:
q=writer2.start()

In [None]:
spark.sql('SELECT * FROM modified_data').show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019.0|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791.0| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656.0| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964.0| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671.0|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560.0|  639.798827999999|
|246|2000-

In [None]:
q.stop()

### Write the generated data into files instead of the memory. 

In [None]:
w = df3.writeStream.format('parquet')\
.outputMode('append')\
.option('path','OutStream5/')\
.option("checkpointLocation", "chkpnt23")

In [None]:
q2=w.start()

In [None]:
q2.stop()

In [None]:
df5 = spark.readStream.format('csv').schema(recordSchema).option("dateFormat", "m/d/YYYY").load('/content/OutStream2/')

In [None]:
pla=df.writeStream.outputMode('append').format('memory').queryName('pla')\
 .option("truncate", False) \
    .option("numRows", 100) \
.option("checkpointLocation", "pp")

In [None]:
qq=pla.start()

In [None]:
spark.sql('SELECT * FROM pla').show() ## why this Date word? && why to show only 20 rows even if set to show 100 

+----+----------+------------+------------+------------+------------+------------+--------+
|  ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|
+----+----------+------------+------------+------------+------------+------------+--------+
|null|      Date|        null|        null|        null|        null|        null|    null|
| 240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019.0|
| 241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791.0|
| 242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656.0|
| 243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964.0|
| 244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671.0|
| 245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560.0|
| 246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.23437

In [None]:
qq.stop()

### Stop the query. Now, try reading the generated parquet files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [None]:
finalDF=spark.read.parquet('/content/OutStream2/part-00001-2bea4382-7718-40b0-a39d-6545a3804f75-c000.snappy.parquet')

In [None]:
finalDF.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|280|2001-02-09|27758.199219|28362.400391|27722.699219|28362.400391| 26802.40625| 34044.0|  639.701172000001|
|281|2001-02-12|28362.400391|29499.800781|28291.300781|28504.599609| 26936.78125|196107.0|            1208.5|
|282|2001-02-13|28575.699219|28575.699219|27829.300781|     28007.0|26466.550781| 45017.0| 746.3984379999965|
|283|2001-02-14|27722.699219|28149.199219|27509.400391|28078.099609|26533.740234| 51489.0|  639.798827999999|
|284|2001-02-15|27864.800781|28078.099609|27509.400391|27864.800781|26332.175781| 57678.0| 568.6992180000016|
|285|2001-02-16|27367.300781|27722.699219|27367.300781|27651.599609|26130.699219| 46705.0|355.39843799999653|
|286|2001-

### Sort the dataframe based on the ID

In [None]:
finalDFSorted = finalDF.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+--------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|  Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+--------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745.0|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990.0|            2559.0|
| 10|2000-01-18|23457.599609|     23742.0|22746.800781|23422.099609|22133.832031| 27995.0| 995.1992189999983|
| 11|2000-01-19|22817.900391|23173.300781|     22036.0|     22036.0|20823.970703| 44173.0|1137.3007810000017|
| 12|2000-01-20|21325.099609|22000.400391|     20756.5|21680.599609|20488.117188| 47550.0|1243.9003909999992|
| 13|2000-01-21|21680.599609|22391.400391|20863.099609|21680.599609|20488.117188| 80750.0|1528.3007819999984|
| 14|2000-