In [0]:
dbutils.fs.mkdirs("FileStore/datasets/car_source_stream")

In [0]:
car_stream_data = spark.readStream.format("cloudFiles") \
                            .option("cloudFiles.format","csv") \
                                .option("cloudFiles.schemaLocation","dbfs:/FileStore/datasets/car_source_stream") \
                                    .option("cloudFiles.schemaHints","price int,mileage int,engV float,year int") \
                                        .load("dbfs:/FileStore/datasets/car_source_stream")

In [0]:
car_stream_data.display()

car,price,body,mileage,engV,engType,registration,year,model,drive,_rescued_data
Ford,15500.0,crossover,68,2.5,Gas,yes,2010,Kuga,full,
Mercedes-Benz,20500.0,sedan,173,1.8,Gas,yes,2011,E-Class,rear,
Mercedes-Benz,35000.0,other,135,5.5,Petrol,yes,2008,CL 550,rear,
Mercedes-Benz,17800.0,van,162,1.8,Diesel,yes,2012,B 180,front,
Nissan,16600.0,crossover,83,2.0,Petrol,yes,2013,X-Trail,full,
Honda,6500.0,sedan,199,2.0,Petrol,yes,2003,Accord,front,
Renault,10500.0,vagon,185,1.5,Diesel,yes,2011,Megane,front,
Mercedes-Benz,21500.0,sedan,146,1.8,Gas,yes,2012,E-Class,rear,
Mercedes-Benz,22700.0,sedan,125,2.2,Diesel,yes,2010,E-Class,rear,
Nissan,,crossover,0,1.2,Petrol,yes,2016,Qashqai,front,"{""price"":""20447.154"",""_file_path"":""dbfs:/FileStore/datasets/car_source_stream/car_ad_01.csv""}"


In [0]:
car_filtered_data = car_stream_data.select(car_stream_data.car, car_stream_data.model, car_stream_data.price, car_stream_data.mileage, car_stream_data.year) \
    .where(car_stream_data.year > 2014)

car_filtered_data.display()

car,model,price,mileage,year
Nissan,Qashqai,,0,2016
BMW,750,129222.0,2,2016
Mercedes-Benz,GLE-Class,99999.0,0,2016
Mercedes-Benz,GLE-Class,104999.0,1,2016
Toyota,Land Cruiser 200,195000.0,0,2016
Porsche,Cayenne,99999.0,1,2016
Mercedes-Benz,GLE-Class,0.0,0,2016
Toyota,Land Cruiser 200,0.0,0,2016
Toyota,Land Cruiser 200,102999.0,0,2016
Toyota,Land Cruiser 200,103999.0,0,2016


In [0]:
query = car_filtered_data.writeStream \
                        .queryName("carDetailsAfter2014") \
                            .outputMode("append") \
                                .format("memory") \
                                    .start()

In [0]:
%sql

SELECT *
FROM carDetailsAfter2014

car,model,price,mileage,year
Nissan,Qashqai,,0,2016
BMW,750,129222.0,2,2016
Mercedes-Benz,GLE-Class,99999.0,0,2016
Mercedes-Benz,GLE-Class,104999.0,1,2016
Toyota,Land Cruiser 200,195000.0,0,2016
Porsche,Cayenne,99999.0,1,2016
Mercedes-Benz,GLE-Class,0.0,0,2016
Toyota,Land Cruiser 200,0.0,0,2016
Toyota,Land Cruiser 200,102999.0,0,2016
Toyota,Land Cruiser 200,103999.0,0,2016


In [0]:
car_grouped_data = car_stream_data.select(car_stream_data.body,car_stream_data.price,car_stream_data.mileage) \
                            .groupBy(car_stream_data.body) \
                                .agg({"price":"avg","mileage":"avg"})

car_grouped_data.display()

body,avg(price),avg(mileage)
van,7977.666666666667,208.0
crossover,51273.3,50.741379310344826
other,25750.0,121.0
sedan,20666.08333333333,145.21621621621622
hatch,14791.0,26.2
vagon,8355.0,217.66666666666663


In [0]:
query = car_grouped_data.writeStream \
                        .queryName("avgPriceMileageBody") \
                            .outputMode("append") \
                                .format("memory") \
                                    .start()

So append mode to write streams will not work when we are performing aggregations. So trying complete mode or update mode

In [0]:
query = car_grouped_data.writeStream \
                        .queryName("avgPriceMileageBody") \
                            .outputMode("complete") \
                                .format("memory") \
                                    .start()

In [0]:
%sql

SELECT *
FROM avgPriceMileageBody

body,avg(price),avg(mileage)
van,8974.875,213.11111111111111
crossover,51273.3,50.741379310344826
other,25750.0,121.0
sedan,20413.685714285715,148.22222222222223
hatch,14791.0,26.2
vagon,8355.0,216.0


In [0]:
car_grouped_data1 = car_stream_data.select(car_stream_data.car) \
                            .groupBy(car_stream_data.car) \
                                .agg({"car":"count"})

car_grouped_data1.display()

car,count(car)
Volkswagen,12
Jaguar,1
Mitsubishi,3
Kia,3
Hyundai,3
Honda,4
Audi,10
Land Rover,1
Mercedes-Benz,30
Renault,2


In [0]:
query = car_grouped_data1.writeStream \
                        .queryName("countByCars") \
                            .outputMode("update") \
                                .format("memory") \
                                    .start()

In [0]:
%sql

SELECt *
FROM countByCars

car,count(car)
Volkswagen,11
Jaguar,1
Mitsubishi,3
Kia,3
Hyundai,3
Honda,4
Audi,10
Land Rover,1
Mercedes-Benz,29
Renault,2
