Họ và tên: Phạm Đức Thể

MSSV: 19522253

Lớp: DS200.M21

Spark for Streaming Data - 25/05/2022

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1T9XK3OFcDui7f-apDeWfxrwi66t6DoUi?usp=sharing)

# **Big Data - Spark for Streaming Data**

## Import Libraries

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 48.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=d095b790cd2a6bb1f02a9048d6947f97240601610a808e95d000b7bdb15d4e01
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
import pyspark 
import pandas as pd
import numpy as np
import time
import html

from IPython.display import display, clear_output
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f
from pyspark.sql.streaming import DataStreamReader

## Spark Structure Streaming

### Streaming in Spark

### Structure streaming

In [3]:
spark = SparkSession.builder.appName('StructuredNetworkWordCount').getOrCreate()

#### Quick example

In [4]:
# Create DataaFrame representing the stream of input lines from connection to localhost:9999
lines = spark.readStream.format('socket').option('host', 'localhost').option('port', 9999).load()

In [5]:
# Split the lines into words
words = lines.select(f.explode(f.split(lines.value, " ")).alias('word'))

In [6]:
# Generate runing word count
wordCounts = words.groupBy('word').count()

#### Quick example – Console sink

In [7]:
# Start runing the query that prints the runing counts to the console
query = wordCounts.writeStream.outputMode('complete').format('console').start()
query.awaitTermination()

StreamingQueryException: ignored

#### Quick example – Memory sink

In [8]:
# Start running the query that write the running counts to memory
query = wordCounts.writeStream.queryName('wordCounts').outputMode('complete').format('memory').start()

In [9]:
display(spark.sql(f'SELECT * from {query.name}')).show()

DataFrame[word: string, count: bigint]

AttributeError: ignored

In [10]:
# show live results for 2 minutes, refreshed every 1 second
for x in range(0, 120):
  # spark.sql can be used to request how the query is performing
  display(spark.sql(f'SELECT * from {query.name}').toPandas())
  time.sleep(1)
  clear_output(wait=True)
else:
  print('Live view ended...')

Live view ended...


#### Streaming from files

In [11]:
# SETTINGS
IN_PATH = ''

timestampformat = 'EE MMM dd HH:mm:ss zzzz yyyy'
spark.sql('set spark.sql.legacy.timeParserPolicy=LEGACY')
spark = SparkSession.builder.appName('StructuredStreamingExample').getOrCreate()
spark.conf.set('spark.sql.legacy.timeParserPolicy', 'LEGACY')
schema = spark.read.csv(IN_PATH).litmit(10).schema

spark_reader = spark.readStream.schema(schema)

IllegalArgumentException: ignored

In [12]:
def clean_data():
  pass

In [13]:
streaming_data_raw = (
    spark_reader.json(IN_PATH).select('id',
                                      # extract proper timestamp from created_at column
                                      f.to_timestamp(f.col('created_at'), timestampformat).alias('timestamp'),
                                      # extract user information
                                      f.col('user.screen_name').alias('user'),
                                      'text',).coalesce(1)
)

streaming_data_clean = clean_data(streaming_data_raw)

stream_writer = streaming_data_clean.writeStream.queryName('data').trigger(once=True).outputMode('append').format('memory')

query = stream_writer.start()

NameError: ignored

In [14]:
display(spark.sql(f'SELECT * from {query.name}').show())

+----+-----+
|word|count|
+----+-----+
+----+-----+



None

In [15]:
distinct_user_count = streaming_data_clean.select(f.approx_count_distinct('user'), f.current_timestamp)

stream_writer = distinct_user_count.writeStream.queryName('data').trigger(once=True).outputMode('complete').format('memory')

query = stream_writer.start()

NameError: ignored

In [16]:
display(spark.sql(f'SELECT * from {query.name}').show())

+----+-----+
|word|count|
+----+-----+
+----+-----+



None

In [17]:
sentiment_model = PipelineModel.load('')
raw_sentiment = sentiment_model.transform(streaming_data_clean)

# Select downstream columns
sentiment = raw_sentiment.select('id', 'timestamp', 'user', 'text', f.col('prediction').alias('user_sentiment'))

NameError: ignored

In [18]:
stream_writer = sentiment.writeStream.queryName('data').trigger(once=True).outputMode('complete').format('memory')
query = stream_writer.start()

NameError: ignored

In [19]:
display(spark.sql(f'SELECT * from {query.name}').show())

+----+-----+
|word|count|
+----+-----+
+----+-----+



None

In [20]:
negative_sentiment_count = sentiment.filter('user_sentiment == 0.0') \
.select(f.col('user_sentiment').alias('negative_sentiment')) \
.agg(f.count('negative_sentiment'))

positive_sentiment_count = sentiment.filter('user_sentiment == 4.0') \
.select(f.col('user_sentiment').alias('negative_sentiment')) \
.agg(f.count('negative_sentiment'))

average_sentiment = sentiment = sentiment.agg(f.avg('user_sentiment'))

NameError: ignored

In [21]:
data_to_stream = average_sentiment.agg(f.avg('user_sentiment'))

NameError: ignored

In [22]:
if isinstance(spark_reader, DataStreamReader):
  stream_writer = data_to_stream.writeStream.queryName('treaming_table') \
  .trigger(processingTime='20 seconds') \
  .outputMode('complete').format('memory')

  # Calling .start on a DataStreamWriter return an instance of StreamingQuery
  query = stream_writer.start()

NameError: ignored

In [23]:
display(spark.sql(f'SELECT * from {query.name}').show())

+----+-----+
|word|count|
+----+-----+
+----+-----+



None

In [24]:
# Let's see what we are outputing
if streaming_data_clean.isStreaming:
  for x in range(0, 200):
    try:
      if not query.isActive:
        break
        print('Showing live view refreshed every 10 seconds')
        print(f'Seconds passed: {x*10}')
        result = spark.sql(f'SELECT * from {query.name}')
        # spark.sql can be used to request how the query is performing
        display(result.toPandas())
        time.sleep(10)
        clear_output(wait=True)
    except:
      break
  print('Live view ended...')
else:
  print('Not streaming, showing static output instead')
  result = data_to_stream
  display(result.litmit(10).toPandas())

NameError: ignored

## Tài Liệu Tham Khảo
- [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Spark NLP](https://nlp.johnsnowlabs.com/docs/en/install)
