<a href="https://colab.research.google.com/github/MennaFawzy/Pyspark/blob/main/Streaming_P_S__Student_Menna_Fawzy_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=3efcdd7a4ed0eb0ffd01a247569234f7e13615226692f74e90d1c6a2056d956b
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, DoubleType

In [3]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
spark = SparkSession.builder.getOrCreate()

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, DoubleType

schema = StructType([
    StructField("_c0", IntegerType(), nullable=True),
    StructField("Date", DateType(), nullable=True),
    StructField("Open", DoubleType(), nullable=True),
    StructField("High", DoubleType(), nullable=True),
    StructField("Low", DoubleType(), nullable=True),
    StructField("Close", DoubleType(), nullable=True),
    StructField("Adj Close", DoubleType(), nullable=True),
    StructField("Volume", IntegerType(), nullable=True)
])


### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [7]:
df = spark.readStream.format("csv") \
    .schema(schema) \
    .load("/content/drive/MyDrive/InputStream")

### Make sure the dataframe is streaming the files from the folder

In [8]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [9]:
writer = df.writeStream.outputMode("append") \
    .format("memory")  \
    .queryName('stock')

### Start the write stream and make sure it works (read all columns from the table)

In [10]:
query= writer.start()

In [11]:
spark.sql('SELECT * FROM stock').show()

+----+----------+------------+------------+------------+------------+------------+------+
| _c0|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|NULL|      NULL|        NULL|        NULL|        NULL|        NULL|        NULL|  NULL|
| 240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
| 241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
| 242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
| 243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
| 244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|
| 245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|
| 246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385|
| 247|2000

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [12]:

df_3 = df.dropna(how="all")
df_4 = df_3.withColumn("diff", col("High") - col("Low"))


### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [13]:
modified_data = df_4.writeStream.outputMode("append") \
    .format("memory")  \
    .queryName('modified_data')

In [14]:
query2 = modified_data.start()


In [15]:
spark.sql('SELECT * FROM modified_data').show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
|_c0|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

### Write the generated data into files instead of the memory.

In [16]:
writer = df_4.writeStream.outputMode("append") \
    .format("csv")  \
    .option("path", "/content/drive/MyDrive/output/") \
    .option("checkpointLocation", "chkpnt")

In [17]:
query = writer.start()


### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [18]:
query.stop()

In [19]:
schema = StructType([
    StructField("_c0", IntegerType(), nullable=True),
    StructField("Date", DateType(), nullable=True),
    StructField("Open", DoubleType(), nullable=True),
    StructField("High", DoubleType(), nullable=True),
    StructField("Low", DoubleType(), nullable=True),
    StructField("Close", DoubleType(), nullable=True),
    StructField("Adj Close", DoubleType(), nullable=True),
    StructField("Volume", IntegerType(), nullable=True),
    StructField("diff", DoubleType(), nullable=True)  # Add the "diff" column to the schema
])

# Read the generated files with the specified schema
output_df = spark.read.schema(schema).csv("/content/drive/MyDrive/output")

# Show the output DataFrame
output_df.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
|_c0|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

### Sort the dataframe based on the ID

In [None]:
final_df = output_df.sort('_c0')
final_df.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
|_c0|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5