# Exploring the data that will be the stream source

First we will explore the set of files and show the contents of one of the files.

In [0]:
%fs ls /FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/

path,name,size,modificationTime
dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_1.csv,data_1.csv,201517,1690993592000
dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_2.csv,data_2.csv,75385,1690993594000
dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_3.csv,data_3.csv,41131,1690993594000
dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_4.csv,data_4.csv,42411,1690993596000
dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_5.csv,data_5.csv,49053,1690993597000
dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_6.csv,data_6.csv,121067,1690993597000
dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_7.csv,data_7.csv,499149,1690993588000
dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_8.csv,data_8.csv,1580399,1690993591000
dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_9.csv,data_9.csv,2845961,1690993596000


In [0]:
%cat /dbfs/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_1.csv

# Opening a file as a regular dataframe

We can open one of the files without streaming it as a dataframe.  The schema is infered from the first file and used to support the streaming schema specification.

In [0]:
df_example = spark.read.csv(
  "dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data/data_1.csv",
  header=True,
  inferSchema=True
)

In [0]:
df_example.schema

In [0]:
df_example.head()

# Creating a stream

Below the stream is created from the folder that holds the CSV files. The schema is pulled from the first file opened as a data frame.

In [0]:
from pyspark.sql.types import * 

data_schema = df_example.schema
folder_csv_path = "dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data"

streaming_input_df = (
  spark.readStream.schema(data_schema)
 .option("maxFilesPerTrigger", 1)
 .csv(folder_csv_path)
)

# Performing a transformation within a stream

Apache Spark Strucuted Streaming manifests the stream as a dataframe. So dataframe methods can be used to perform transformations.

In [0]:
import pyspark.sql.functions as F
df_dest_count = streaming_input_df.groupBy("nameDest").count().orderBy(F.desc("count"))

# Specifying a stream sink

The stream processing begins with calling the start method on the sink.  A query is created in memory which allows retrieval of the streamed results. Console based sinks have issues with Databricks.

In [0]:
import time
activity_query = (
  df_dest_count.writeStream.queryName("dest_counts")
  .format("memory")
  .outputMode("complete")
  .start()
)

# include this in production
# activate_query.awaitTermination()

In [0]:

for x in range(25):
  _df = spark.sql(
    "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
  )
  if _df.count() > 2:
    _df.show(10)
  time.sleep(0.5)

In [0]:
%sql

SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2;

In [0]:
dir(spark.streams)

In [0]:
activity_query.stop()

In [0]:
data_schema = df_example.schema

new_schema = StructType([x if i > 0 else  StructField("step", TimestampType(), True) for i, x in enumerate(data_schema)])


In [0]:
new_schema

In [0]:
data_schema = df_example.schema
file_path = "dbfs:/FileStore/shared_uploads/1297906913.RANLY.NEI/paysum_data"

streaming_input_df_2 = (
  spark.readStream.schema(new_schema)
  .option("timestampFormat", "ss [yyyy-MM-dd]")
 .option("maxFilesPerTrigger", 1)
 .csv(file_path)
)

In [0]:
import pyspark.sql.functions as F
stream_counts_df = (
  streaming_input_df_2
  .groupBy(streaming_input_df_2.type) # , F.window(streaming_input_df_2.step, "1 second"))
  .count()
)

In [0]:
activity_query_v2 = (
  stream_counts_df.writeStream.queryName("stream_window_counts")
  .format("memory")
  .outputMode("complete")
  .start()
)

# include this in production
# activity_query_v2.awaitTermination()


In [0]:
# repeat and check the results as the stream is processed
for x in range(25):
  _df = spark.sql(
    "SELECT * FROM stream_window_counts"
  )
  if _df.count() > 2:
    _df.show(10)
  time.sleep(0.5)

In [0]:
%sql

SELECT * FROM stream_window_counts;

type,count
TRANSFER,6375
CASH_IN,14449
type,9
CASH_OUT,21306
PAYMENT,29735
DEBIT,789
