In [1]:
spark

In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
import numpy as np
TOTAL = 10000
dots = sc.parallelize([2.0 * np.random.random(2) - 1.0 for i in range(TOTAL)]).cache()
print("Number of random points:", dots.count())
stats = dots.stats()
print('Mean:', stats.mean())
print('stdev:', stats.stdev())

[Stage 0:>                                                        (0 + 12) / 12]

Number of random points: 10000
Mean: [0.00368806 0.00078681]
stdev: [0.57609008 0.57518852]


                                                                                

In [3]:
sc

In [4]:
sc.stop()

## Spark

In [47]:
import os

def get_data_path(path):
    return os.path.join("../../Spark-The-Definitive-Guide/", path)

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

In [10]:
myRange = spark.range(1000).toDF("number")

In [11]:
divisBy2 = myRange.where("number % 2 = 0")

In [14]:
divisBy2.show(5)

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
+------+
only showing top 5 rows



In [17]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("../../Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv")

In [19]:
flightData2015.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [21]:
flightData2015.sort('count').explain()

== Physical Plan ==
*(1) Sort [count#32 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#32 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#88]
   +- FileScan csv [DEST_COUNTRY_NAME#30,ORIGIN_COUNTRY_NAME#31,count#32] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/user/Study/spark-study/Spark-The-Definitive-Guide/data/flight-data/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [22]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [23]:
flightData2015.sort('count').explain()

== Physical Plan ==
*(1) Sort [count#32 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#32 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [id=#100]
   +- FileScan csv [DEST_COUNTRY_NAME#30,ORIGIN_COUNTRY_NAME#31,count#32] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/user/Study/spark-study/Spark-The-Definitive-Guide/data/flight-data/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [24]:
flightData2015.sort('count').take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

## Spark SQL

In [20]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [25]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

In [26]:
sqlWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#30], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#30, 5), ENSURE_REQUIREMENTS, [id=#126]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#30], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#30] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/user/Study/spark-study/Spark-The-Definitive-Guide/data/flight-data/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [27]:
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#30], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#30, 5), ENSURE_REQUIREMENTS, [id=#145]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#30], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#30] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/user/Study/spark-study/Spark-The-Definitive-Guide/data/flight-data/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [31]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [29]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [32]:
from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [33]:
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#146L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#30,destination_total#146L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#30], functions=[sum(cast(count#32 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#30, 5), ENSURE_REQUIREMENTS, [id=#291]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#30], functions=[partial_sum(cast(count#32 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#30,count#32] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/user/Study/spark-study/Spark-The-Definitive-Guide/data/flight-data/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




## Structed Streaming

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("../../Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

                                                                                

In [9]:
staticDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [18]:
from pyspark.sql.functions import window, column, desc, col

staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .sort(desc("sum(total_cost)"))\
  .show(5)



+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|{2011-09-20 09:00...|          71601.44|
|      null|{2011-11-14 09:00...|          55316.08|
|      null|{2011-11-07 09:00...|          42939.17|
|      null|{2011-03-29 09:00...| 33521.39999999998|
|      null|{2011-12-08 09:00...|31975.590000000007|
+----------+--------------------+------------------+
only showing top 5 rows



                                                                                

In [12]:
?spark.readStream.schema

In [14]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("../../Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")

In [15]:
streamingDataFrame.isStreaming

True

In [16]:
spark

In [19]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")

In [21]:
?purchaseByCustomerPerHour.writeStream.outputMode

In [22]:
purchasesWriteStream = purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("update")\
    .start()

21/12/28 11:16:53 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/2m/3swhq7jx1mzb06kqvlb70sm80000gn/T/temporary-80101bf7-c95e-4222-be0b-c2bf4c2f6961. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.

In [None]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)



In [None]:
spark.sql("""
  SELECT *
  FROM customer_purchases2
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)

In [25]:
# purchasesWriteStream.awaitTermination()

[Stage 177:>                                                        (0 + 1) / 1]

In [None]:
purchasesWriteStream

In [54]:
purchasesWriteStream = purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("custaomer_purchases2")\
    .outputMode("complete")\
    .start()

21/12/28 02:44:15 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/2m/3swhq7jx1mzb06kqvlb70sm80000gn/T/temporary-5c2abf15-a013-41c7-a2ad-0816ce40fd73. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x119a0bbb0>

In [65]:
spark.sql("""
  SELECT *
  FROM customer_purchases2
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|{2010-12-21 09:00...|31347.479999999938|
|   18102.0|{2010-12-07 09:00...|          25920.37|
|      null|{2010-12-10 09:00...|25399.560000000012|
|      null|{2010-12-17 09:00...|25371.769999999768|
|      null|{2010-12-06 09:00...|23395.099999999904|
+----------+--------------------+------------------+
only showing top 5 rows



In [66]:
spark.sql("""
  SELECT *
  FROM customer_purchases2
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|{2011-03-29 09:00...| 33521.39999999998|
|      null|{2010-12-21 09:00...|31347.479999999938|
|   18102.0|{2010-12-07 09:00...|          25920.37|
|      null|{2010-12-10 09:00...|25399.560000000012|
|      null|{2010-12-17 09:00...|25371.769999999768|
+----------+--------------------+------------------+
only showing top 5 rows



In [87]:
spark.sql("""
  SELECT *
  FROM customer_purchases2
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|{2011-09-20 09:00...|          71601.44|
|      null|{2011-03-29 09:00...| 33521.39999999998|
|   18102.0|{2011-09-15 09:00...|31661.540000000005|
|      null|{2010-12-21 09:00...|31347.479999999938|
|   18102.0|{2010-12-07 09:00...|          25920.37|
+----------+--------------------+------------------+
only showing top 5 rows



![image](https://user-images.githubusercontent.com/18041103/147495637-12b7c1c5-1b42-464b-a3bd-4e9b43a3ba63.png)


In [90]:
spark.stop()

21/12/28 02:53:16 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:562)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:548)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:415)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.

In [92]:
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

In [93]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("../../Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema



In [94]:
from pyspark.sql.functions import date_format, col

preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)

In [95]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")

In [96]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")


In [97]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

In [98]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")

In [99]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])

In [100]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)
transformedTraining = fittedPipeline.transform(trainDataFrame)



In [102]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)


In [103]:
kmModel = kmeans.fit(transformedTraining)

21/12/28 02:57:38 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/12/28 02:57:38 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [104]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [112]:
transformedTraining.show(3)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-----------------+-------------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|day_of_week_index|day_of_week_encoded|            features|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-----------------+-------------------+--------------------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|     Monday|              2.0|      (5,[2],[1.0])|(7,[0,1,4],[2.95,...|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|     Monday|              2.0|      (5,[2],[1.0])|(7,[0,1,4],[2.1,8...|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdo

In [117]:
transformedTest.show(3)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-----------------+-------------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|day_of_week_index|day_of_week_encoded|            features|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-----------------+-------------------+--------------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|              2.0|      (5,[2],[1.0])|(7,[0,1,4],[1.79,...|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|              2.0|      (5,[2],[1.0])|(7,[0,1,4],[1.25,...|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdo

In [114]:
kmModel.clusterCenters()

[array([3.6340258 , 5.63459494, 0.19581065, 0.1938833 , 0.18023509,
        0.17188605, 0.149228  ]),
 array([1.0400e+00, 7.4215e+04, 0.0000e+00, 1.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00]),
 array([ 1.0400e+00, -7.4215e+04,  0.0000e+00,  1.0000e+00,  0.0000e+00,
         0.0000e+00,  0.0000e+00]),
 array([ 3.897e+04, -1.000e+00,  0.000e+00,  0.000e+00,  0.000e+00,
         0.000e+00,  1.000e+00]),
 array([7.85972222e-01, 1.14452778e+03, 2.08333333e-01, 2.50000000e-01,
        9.72222222e-02, 2.22222222e-01, 1.66666667e-01]),
 array([ 5.43415e+03, -1.00000e+00,  0.00000e+00,  1.25000e-01,
         3.75000e-01,  0.00000e+00,  5.00000e-01]),
 array([ 1.6670865e+04, -1.0000000e+00,  0.0000000e+00,  0.0000000e+00,
         0.0000000e+00,  1.0000000e+00,  0.0000000e+00]),
 array([3.8500e-01, 4.4435e+03, 2.5000e-01, 2.5000e-01, 0.0000e+00,
        0.0000e+00, 5.0000e-01]),
 array([1.11846743e+00, 5.25536398e+02, 3.10344828e-01, 1.95402299e-01,
        1.76245211e-01, 2.06896552e

In [118]:
spark.stop()