In [1]:
import findspark
spark_home = "~/opt/spark"
findspark.init(spark_home)
import pyspark

spark = pyspark.sql.SparkSession.builder \
    .master("local") \
    .appName("ch1") \
    .config("spark.driver.bindAddress","192.168.0.164") \
    .config("spark.ui.port","4050") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

print("http://{}:{}".format(spark.conf.get("spark.driver.bindAddress"), spark.conf.get("spark.ui.port")))


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/19 12:26:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/19 12:26:02 WARN Utils: Service 'SparkUI' could not bind on port 4050. Attempting port 4051.


http://192.168.0.164:4050


## Spark-submit
Submit codes to production

## Datasets, Type-safe structured APIs
easier to cooperate for large projects
not available for python and r, only with Java and Scala


## Structured Streaming
high-level API for steaming process

In [2]:
sdf = spark.read.format("csv") \
    .option("header", "true") \
        .option("inferSchema", "true") \
            .load("../data/retail-data/by-day/*.csv")

                                                                                

In [3]:
sdf.createOrReplaceTempView("retail_data")
staticSchema = sdf.schema

In [4]:
from pyspark.sql.functions import window, column, desc, col

sdf.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost","InvoiceDate") \
    .groupBy( col("CustomerId"), window(col("InvoiceDate"), "1 day") ) \
        .sum("total_cost").sort(desc("sum(total_cost)")).show()



+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|{2011-09-20 08:00...|          71601.44|
|      null|{2011-11-14 08:00...|          55316.08|
|      null|{2011-11-07 08:00...|          42939.17|
|      null|{2011-03-29 08:00...| 33521.39999999998|
|      null|{2011-12-08 08:00...|31975.590000000007|
|   18102.0|{2011-09-15 08:00...|31661.540000000005|
|      null|{2010-12-21 08:00...|31347.479999999938|
|   18102.0|{2011-10-21 08:00...|          29693.82|
|   18102.0|{2010-12-07 08:00...|          25920.37|
|   14646.0|{2011-10-20 08:00...|25833.559999999994|
|      null|{2010-12-10 08:00...|25399.560000000012|
|      null|{2010-12-17 08:00...|25371.769999999768|
|      null|{2011-11-25 08:00...|24148.069999999992|
|      null|{2011-11-29 08:00...|23744.250000000055|
|   12415.0|{2011-06-15 08:00...| 23426.81000000001|
|      null|{2010-12-06 08:00...|23395.0999999

                                                                                

In [5]:
steamingdf = spark.readStream \
    .schema(staticSchema) \
    .option("maxFilesPerTrigger", 1) \
    .format("csv") \
    .option("header", "true") \
    .load("../data/retail-data/by-day/*.csv")

                                                                                

In [6]:
steamingdf.isStreaming

True

In [7]:
purchaseByCustomerPerhour = steamingdf \
    .selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
    .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day")) \
    .sum("total_cost")

In [8]:
purchaseByCustomerPerhour.writeStream.format("memory") \
    .queryName("customer_purchase") \
    .outputMode("complete") \
    .start()

22/05/19 12:26:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/qz/3rpx4c8n1wlg5nc67fmtm2480000gn/T/temporary-be252878-9984-451a-b534-6ade7128c3dd. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/05/19 12:26:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x1053818e0>

In [24]:
spark.sql("SELECT * FROM customer_purchase").show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   13148.0|{2010-12-10 08:00...|309.40000000000003|
|   17368.0|{2011-01-06 08:00...| 563.1500000000001|
|   16907.0|{2010-12-09 08:00...|182.02000000000004|
|   15039.0|{2010-12-14 08:00...| 706.2500000000002|
|   17519.0|{2010-12-14 08:00...|-4.949999999999999|
+----------+--------------------+------------------+
only showing top 5 rows



