In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import types as t

In [None]:
spark = SparkSession.builder.appName('streaming').getOrCreate()

In [None]:
df = spark.read.csv('paysim.csv',inferSchema = True,header = True)

In [None]:
df.columns

In [None]:
# no need for them now
df = df.drop('isFraud','isFlaggedFraud')

In [None]:
df.show(2,truncate = False)

In [None]:
# step maps a unit of time
df.groupby('step').count().show()

In [None]:
# a step represnets a unit of time
steps = df.select('step').distinct().collect()

In [None]:
for step in steps :
     print(step[0])

In [None]:
"""
we just want to simulate as if i am getting a new file for each step combines so we can later ingest it

here i am combing all the data related to each distinct step into one file so we can ingest it later
"""
for step in steps :
    df_current = df.filter(df.step == step[0])
    #save it in 1 file
    df_current.coalesce(1).write.mode('append').option("header" , "true").csv('data/paysim')

In [None]:
#just to get the schema
part = spark.read.csv('/paysim/part-00000-8ecc5d10-9134-4a1c-93e2-b761f3a997f7-c000.csv',
                         inferSchema = True,header = True)

In [None]:
data_schema = part.schema

In [None]:
#streaming dataframe
#limit it to one file per trigger
#lets read each file one by one as if it was a stream
streaming_df = (spark.readStream.schema(data_schema)
               .option('maxFilePerTrigger',1)
               .csv('/data/paysim/'))

In [None]:
#simple transformation
dest_count = streaming_df.groupBy('nameDest').count().orderBy('count',ascending = False)

In [None]:
# now since we have the streaming dataframe and the transformation defined
# create the destination sink , for now ill set it to memory

active_query = (dest_count.writeStream.queryName('dest_count')
                .format('memory')
                .outputMode('complete')
                .start())

# this is just for us keep looping untill these is new data
import time

for x in range(50):
    _df = spark.sql("select * from dest_count")
    
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

In [None]:
# get the active streams 
spark.streams.active

In [None]:
#stop the stream
active_query.stop()