<a href="https://colab.research.google.com/github/SrijaG29/spark_streaming/blob/main/Spark_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=1a93b113541dfccd32a9be2cc40ead5723a921322cf89df5584e2045cf904cf0
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


**Pyspark Streaming:**

Create a spark session.

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Spark Streaming")
    .master("local[*]")
    .getOrCreate()
)

spark

Iam creating a text file to use in spark streaming.

In [2]:
with open('Example.txt','w') as f:
  f.write('simon has a dog and a cat the dog and cat used to love simon')

In [3]:
with open('Example.txt','r') as f:
  x = f.read()
  print(x)

simon has a dog and a cat the dog and cat used to love simon


Now we will read data from the text.

In [4]:
df_raw = spark.read.format("text").load('/content/Example.txt')
df_raw.printSchema()

root
 |-- value: string (nullable = true)



In [5]:
df_raw.show()

+--------------------+
|               value|
+--------------------+
|simon has a dog a...|
+--------------------+



Inside show if we use truncate = False then it will print complete data instead of above show statement.

In [6]:
# from os import truncate
df_raw.show(truncate = False)

+------------------------------------------------------------+
|value                                                       |
+------------------------------------------------------------+
|simon has a dog and a cat the dog and cat used to love simon|
+------------------------------------------------------------+



Now we need to count the repetition of each word so for that 1st we need to split this sentence into words

So we will import split function.

In [7]:
from pyspark.sql.functions import split

In [8]:
df_words = df_raw.withColumn('words',split('value',' '))

In [9]:
df_words.show(truncate = False)

+------------------------------------------------------------+----------------------------------------------------------------------------+
|value                                                       |words                                                                       |
+------------------------------------------------------------+----------------------------------------------------------------------------+
|simon has a dog and a cat the dog and cat used to love simon|[simon, has, a, dog, and, a, cat, the, dog, and, cat, used, to, love, simon]|
+------------------------------------------------------------+----------------------------------------------------------------------------+



now we need to explode the list into seperate words.

So for that we need to import explode.

In [10]:
from pyspark.sql.functions import explode

In [11]:
df_explode = df_words.withColumn('word',explode('words'))
df_explode.show(truncate = False)

+------------------------------------------------------------+----------------------------------------------------------------------------+-----+
|value                                                       |words                                                                       |word |
+------------------------------------------------------------+----------------------------------------------------------------------------+-----+
|simon has a dog and a cat the dog and cat used to love simon|[simon, has, a, dog, and, a, cat, the, dog, and, cat, used, to, love, simon]|simon|
|simon has a dog and a cat the dog and cat used to love simon|[simon, has, a, dog, and, a, cat, the, dog, and, cat, used, to, love, simon]|has  |
|simon has a dog and a cat the dog and cat used to love simon|[simon, has, a, dog, and, a, cat, the, dog, and, cat, used, to, love, simon]|a    |
|simon has a dog and a cat the dog and cat used to love simon|[simon, has, a, dog, and, a, cat, the, dog, and, cat, used, to

Now we have got all the words inside the text file in word column so we wiil drop rest of the columns.

In [12]:
df_explode = df_explode.drop('value','words')
df_explode.show(truncate = False)

+-----+
|word |
+-----+
|simon|
|has  |
|a    |
|dog  |
|and  |
|a    |
|cat  |
|the  |
|dog  |
|and  |
|cat  |
|used |
|to   |
|love |
|simon|
+-----+



Now we will count the no_of occurances of each word.

For that we need to use groupBy and agg functions

In [13]:
from pyspark.sql.functions import count
df_agg = df_explode.groupBy('word').agg(count('*').alias('word_count'))
df_agg.show(truncate = False)

+-----+----------+
|word |word_count|
+-----+----------+
|used |1         |
|simon|2         |
|dog  |2         |
|love |1         |
|cat  |2         |
|the  |1         |
|and  |2         |
|a    |2         |
|has  |1         |
|to   |1         |
+-----+----------+



Counting Unique Values.

In [14]:
df_unique = df_explode.select('word').distinct()
df_unique.show()

+-----+
| word|
+-----+
| used|
|simon|
|  dog|
| love|
|  cat|
|  the|
|  and|
|    a|
|  has|
|   to|
+-----+



Filtering Rows. if len of word greater than 3 then you need to print.

In [15]:
from pyspark.sql.functions import col
from pyspark.sql.functions import length
x = df_explode.filter(length(col('word')) > 3)
x.show(truncate = False)

+-----+
|word |
+-----+
|simon|
|used |
|love |
|simon|
+-----+



Calculating Average.

Generally we will get output in double float form i want ans in integer.

**Cast('Required datatype')** is used to change the datatype form one format to another.

This will return a datatframe as output.

In [16]:
from pyspark.sql.functions import avg, length, round

avg_length = df_explode.select(round(avg(length(col('word'))),0).cast('int').alias('avg length of words'))
avg_length.show(truncate = False)

+-------------------+
|avg length of words|
+-------------------+
|3                  |
+-------------------+



This will return as normal text as output.

**Collect:**
The collect method retrieves the data from the DataFrame to the driver program as a list of rows. It’s a way to pull the computed data into the local Python environment.

In [17]:
from pyspark.sql.functions import avg, length, round

avg_length = df_explode.select(round(avg(length(col('word'))),0).cast('int').alias('avg length of words')).collect()[0][0]
print('average length is: ',avg_length)

average length is:  3


In [18]:
from pyspark.sql import SparkSession
Spark = (
    SparkSession
    .builder
    .appName('temp')
    .config('spark.streaming.stopGracefullyOnShutdown',True)
    .master("local[*]")
    .getOrCreate()
)
Spark.conf.set("spark.sql.streaming.schemaInference",True)

In [19]:
stream_df = (
    Spark
    .readStream
    .option('cleanSource','archive')
    .option('sourceArchiveDir','archive_dir')
    .option('maxFilesPerTrigger',1)
    .format('json')
    .load('/content/Devices')
)

In [20]:
stream_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



To flatten this we use explode.

In [21]:
from pyspark.sql.functions import explode

In [22]:
exploded_df = stream_df.withColumn('data_devices',explode("data.devices"))

Now we don't have any use with data column so we will drop it.

In [23]:
from pyspark.sql.functions import col
exploded_df = exploded_df.drop("data")

In [24]:
exploded_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- data_devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [25]:
flat_df = exploded_df.withColumn("deviceID",col("data_devices.deviceId")).withColumn("measure",col("data_devices.measure")).withColumn("status",col("data_devices.status")).withColumn("temperature",col("data_devices.temperature"))

There is no need of data_devices column do we will drop it.

In [26]:
flat_df = flat_df.drop("data_devices")

In [None]:
(flat_df
 .writeStream
 .format('csv')
 .outputMode('append')
 .option('path','output_devices.csv')
 .option('checkpointLocation','checkpoint_dir')
 .start().awaitTermination())