In [None]:
from pyspark.sql import SparkSession as ss
from pyspark.sql import DataFrame
from pyspark.sql.streaming import DataStreamReader
from pyspark.sql import functions as f
from pyspark.ml import PipelineModel
from pyspark.sql.functions import udf
import pandas as pd
from IPython.display import display, clear_output

pd.options.display.max_columns = None
pd.options.display.max_rows = 30
pd.options.display.max_colwidth = 100

# Variable & settings
IN_PATH = "/home/jovyan/data-sets/twitter/"
MODEL_PATH = ""
OUT_PATH = ""
timestamp_format = "EEE MMM dd HH:mm:ss zzzz yyyy"

spark = ss.builder.appName("StructuredStreaming").getOrCreate()
# infer schema from the sample input data
schema = spark.read.json(IN_PATH).limit(10).schema

#create static and stream readers
static_spark_reader = spark.read.schema(schema)
stream_spark_reader = spark.readStream.schema(schema)

In [None]:
#Create a quick toggle switch
spark_reader = static_spark_reader
#spark_reader = stream_spark_reader

In [None]:
df = (
    spark_reader.json(IN_PATH)
    .select(
        "id",
        f.to_timestamp(f.col("created_at"), timestamp_format).alias("timestamp"),
        f.col("user.screen_name").alias("user"),
        "text"        
    )
    .coalesce(1)
)

distinct_user_count_df = df.select(f.approx_count_distinct("user"), f.current_timestamp())

if not df.isStreaming:
    print("Plain old Dataframe ...")
    distinct_user_count_df.show()
else:
    print("Lets Stream m m m m m m m m m")
    stream_writer = (
        distinct_user_count_df.writeStream
        .query("distinct_user_count")
        .trigger(
            processingTime="5 seconds"
            #once=True - setting this makes spark to run it only once, good for debug
        )
        .outputMode("complete")
        .format("memory")
    )
    
    query = stream_write.start()

In [None]:
df.isStreaming

In [None]:
#operations on query variable for streaming
query.isActive
query.stop
query.lastProgress
#Only one active stream with unique name can be started


In [None]:
# how to read from the streaming df - manually
display(spark.sql(f"SELECT * FROM {query.name}").toPandas())

In [None]:
# to chekc it at a fixed interval : every 1 second for 2 mins
from time import sleep
for i in range (0, 120):
    display(spark.sql(f"SELECT * FROM {query.name}").toPandas())
    sleep(1) # Adding a sleep for 1 sec
    clear_output(wait=True)
else:
    print("Live streaming ended.. ")

In [None]:
import html
@f.udf

user_regex = r"(@\w{1,15})"
hash_regex = r"(#\w{1})"
url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"
number_regex = "[^a-zA-Z']"
double_space_regex = " +"

def html_unescape(s):
    if(s):
        return html.unescape(s)
    else:
        return s
    
def clean_data(df):
    df_clean = (
    df
    .withColumn("text", f.regexp_replace(f.col("text"), url_regex, ""))
    .withColumn("text", f.regexp_replace(f.col("text"), email_regex, ""))
    .withColumn("text", f.regexp_replace(f.col("text"), user_regex, ""))
    .withColumn("text", f.regexp_replace(f.col("text"), "#", " "))
    .withColumn("text", html_unescape(f.col("text")))
        
    .withColumn("text", f.regexp_replace(f.col("text"), number_regex, " "))
    .withColumn("text", f.regexp_replace(f.col("text"), double_space_regex, " "))
    .withColumn("text", f.trim(f.col("text")))
    .filter(f.col("text") != " ")
    )
    return df_clean

ModuleNotFoundError: No module named 'cupy'

ModuleNotFoundError: No module named 'cupy'

In [2]:
pip install cupy

Collecting cupy
  Using cached https://files.pythonhosted.org/packages/07/f8/64b350de7bbf1aaa070ce97dcb26379a4226441a4916900f68dcfe235d62/cupy-9.0.0.tar.gz
[31m    ERROR: Complete output from command python setup.py egg_info:[0m
[31m    ERROR: Options: {'package_name': 'cupy', 'long_description': None, 'wheel_libs': [], 'wheel_includes': [], 'wheel_metadata': None, 'no_rpath': False, 'profile': False, 'linetrace': False, 'annotate': False, 'no_cuda': False, 'use_hip': False}
    
    -------- Configuring Module: cuda --------
    /tmp/tmpc1d2ye50/a.cpp:1:10: fatal error: cublas_v2.h: No such file or directory
     #include <cublas_v2.h>
              ^~~~~~~~~~~~~
    compilation terminated.
    command 'gcc' failed with exit status 1
    /tmp/tmpvg1jmwr6/a.cpp:1:10: fatal error: cuda_runtime_api.h: No such file or directory
     #include <cuda_runtime_api.h>
              ^~~~~~~~~~~~~~~~~~~~
    compilation terminated.
    **************************************************
    com