In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.streaming import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType, LongType, DoubleType, FloatType, DataType, TimestampType

In [None]:
spark = (SparkSession
 .builder
 .appName("Task_2")
 .getOrCreate())

In [None]:
csv_file_path = "/home/ubuntu/Documents/Spark_Streaming/python_file/data/csv_files"
parquet_file_path = "/home/ubuntu/Documents/Spark_Streaming/python_file/data/parquet_files/"
parquet_checkpoint_path = "/home/ubuntu/Documents/Spark_Streaming/python_file/data/checkpoint_parquet"

In [None]:

sc = StructType([
            StructField('datetime', TimestampType(), True),
            StructField('clicks', IntegerType(), True),
            StructField('impressions', IntegerType(), True),
            StructField('quantity', IntegerType(), True),
            StructField('total_price', DoubleType(), True)
         ])




In [None]:
"""
Reading the data in stream from the CSV folder

"""

csv_df = spark.readStream.option("maxFilesPerTrigger",5).schema(sc).format('csv').option('header','true').load(csv_file_path)


In [None]:

csv_df.printSchema()

csv_df.isStreaming


In [None]:
"""

The main transformation where we are aggregating the data based on a window and watermark. This watermark will wait for the late arriving data is any.


"""

windows_df = csv_df.withWatermark('datetime','10 seconds').groupBy(window('datetime','30 seconds')).agg(sum('clicks').alias("Clicks"),sum('impressions').alias("Impressions"),sum('quantity').alias("Quantity"),sum('total_price').alias("Total_price"))

df = windows_df

df = df.withColumn('Start',df.window.start)

df = df.withColumn('End',df.window.end)

df = df.drop('window')


f_df = df.select('Clicks','Impressions','Quantity','Total_price','Start','End')



In [None]:
"""

Used foreachBatch because in a stream we can have empty data arriving and to tackle that used foreachBatch so that can use the functions of spark batch 
and based on that checking if we have data or not and write if have any data in parquet format else skipping that part.


"""

def writer(f_df,batch):
    if(f_df.count() <= 0):
        print("Empty Data")
        print(f_df.count())
    else:
        print("Not Empty")
        f_df.show()
        f_df.coalesce(1).write.format('csv').mode('append').option("path", parquet_file_path).option('header','true').save()

f_df.writeStream.outputMode('append').foreachBatch(writer).start()



In [None]:
"""

For debugging or checking if we getting the correct count we can output our data to console and see data over here.

"""

final_df = csv_df

final_df.writeStream.outputMode("append").format("console").option("path", parquet_file_path).option("checkpointLocation", "/home/ubuntu/Spark/Spark_Work/python_file/data/checkpoint_parquet").start()



In [None]:

spark.stop()