In [0]:
%fs ls 'dbfs:/FileStore/'

path,name,size
dbfs:/FileStore/sales_log.zip,sales_log.zip,251635
dbfs:/FileStore/shared_uploads/,shared_uploads/,0
dbfs:/FileStore/tables/,tables/,0


In [0]:
dbutils.fs.cp("/FileStore/sales_log.zip", "file:/tmp/sales_log.zip")

In [0]:
dbutils.fs.mv("dbfs:/FileStore/sales_log.zip","/tmp/sales_log.zip")

In [0]:
%fs ls "/tmp/"

path,name,size
dbfs:/tmp/sales_log.zip,sales_log.zip,251635


In [0]:
%sh ls /tmp/

In [0]:
%sh unzip /tmp/sales_log.zip

In [0]:
%fs ls 'file:/databricks/driver/sales_log/'

path,name,size
file:/databricks/driver/sales_log/sales-50.csv,sales-50.csv,9240
file:/databricks/driver/sales_log/sales-16.csv,sales-16.csv,9199
file:/databricks/driver/sales_log/sales-24.csv,sales-24.csv,9208
file:/databricks/driver/sales_log/sales-73.csv,sales-73.csv,9178
file:/databricks/driver/sales_log/sales-56.csv,sales-56.csv,9187
file:/databricks/driver/sales_log/sales-80.csv,sales-80.csv,9250
file:/databricks/driver/sales_log/sales-54.csv,sales-54.csv,9215
file:/databricks/driver/sales_log/sales-14.csv,sales-14.csv,9315
file:/databricks/driver/sales_log/sales-46.csv,sales-46.csv,9200
file:/databricks/driver/sales_log/sales-79.csv,sales-79.csv,9276


In [0]:
from pyspark.sql.types import *

path = "file:/databricks/driver/sales_log/"

salesSchema = StructType([
  StructField("OrderId", DoubleType(), True),
  StructField("OrderDate", StringType(), True),
  StructField("Quantity", DoubleType(), True),
  StructField("DiscountPct", DoubleType(), True),
  StructField("Rate", DoubleType(), True),
  StructField("SaleAmount", DoubleType(), True),
  StructField("CustomerName", StringType(), True),
  StructField("State", StringType(), True),
  StructField("Region", StringType(), True),
  StructField("ProductKey", StringType(), True),
  StructField("RowCount", DoubleType(), True),
  StructField("ProfitMargin", DoubleType(), True)
])

df = (spark.read.schema(salesSchema).csv(path))

display(df)

OrderId,OrderDate,Quantity,DiscountPct,Rate,SaleAmount,CustomerName,State,Region,ProductKey,RowCount,ProfitMargin
44935.0,11/17/10,37.0,0.06,200.0,7011.4,Arthur Nelson,New Mexico,West,Development - Scala,1.0,0.65
44935.0,11/17/10,37.0,0.01,140.0,5169.04,Arthur Nelson,Florida,South,Development - Database,1.0,0.64
2563.0,11/18/10,12.0,0.04,200.0,2322.41,Brenda Hildebrand,Mississippi,South,Development - Scala,1.0,0.74
2563.0,11/18/10,33.0,0.01,120.0,3951.72,Brenda Hildebrand,Mississippi,South,Development - Business Logic,1.0,0.44
2752.0,11/18/10,30.0,0.03,150.0,4399.87,Todd Cacioppo,Wisconsin,Central,Consulting - Business Model,1.0,0.53
2752.0,11/18/10,41.0,0.02,140.0,5670.14,Todd Cacioppo,Wisconsin,Central,Development - Database,1.0,0.56
2752.0,11/18/10,10.0,0.03,200.0,1955.5,Todd Cacioppo,Wisconsin,Central,Development - Big Data,1.0,0.69
2752.0,11/18/10,10.0,0.04,150.0,1451.5,Todd Cacioppo,Wisconsin,Central,Consulting - Market Research,1.0,0.7
13607.0,11/18/10,12.0,0.07,150.0,1687.37,Matthew Lucas,New Mexico,West,Consulting - Business Model,1.0,0.59
13607.0,11/18/10,37.0,0.09,200.0,6787.8,Matthew Lucas,New Mexico,West,Development - Scala,1.0,0.74


In [0]:
from pyspark.sql.functions import *

#Reads in files as a stream one file at a time
readStreamInputDF = (spark.readStream.schema(salesSchema).option('maxFilesPerTrigger',1).csv(path))

streamingCountsDF = (readStreamInputDF.select("ProductKey","SaleAmount").groupBy("ProductKey").sum())

streamingCountsDF.isStreaming

In [0]:
queryStream = (streamingCountsDF.writeStream.format("memory").queryName("sales_stream").outputMode("complete").start())

In [0]:
%sql select * from sales_stream order by 2 desc limit 100

ProductKey,sum(SaleAmount)
Consulting - Market Research,554767.11
Development - Python,479107.72
Development - Java,440399.43
Development - Big Data,427602.41
Consulting - Business Model,353415.53
Training - SQL,336511.51
Development - Business Logic,284540.72000000003
Development - .Net,278956.27
Development - Scala,243146.87
Consulting - Strategy,170785.89
