In [1]:
import findspark 
findspark.init() 
import pyspark # only run after findspark.init() 
from pyspark.sql import SparkSession 
import os 

root = '../../Data/Spark-The-Definitive-Guide/data/'
spark = SparkSession.builder.getOrCreate() 
os.path.join(root, "flight-data/csv/2015-summary.csv")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/19 15:58:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'../../Data/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv'

In [2]:

flight = spark.read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv(os.path.join(root, "flight-data/csv/2015-summary.csv"))

In [3]:
flight.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=27]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/shannon/Library/CloudStorage/OneDrive-國立臺灣科技大學/NTUST/Germa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




flight DataFrame 將被註冊為一個名為 "flight_data_2015" 的暫時性視圖。這意味著你可以使用 SQL 語法或 Spark 的 DataFrame API 在程式中直接查詢這個名為 "flight_data_2015" 的視圖。

為什麼要這樣做呢？

使用暫時性視圖可以讓你在 Spark 中更方便地使用 SQL 語法來操作 DataFrame。對於熟悉 SQL 語法的人來說，這提供了一個更熟悉且易於理解的方式來查詢和分析資料，而不是依賴於 DataFrame API 的語法。這樣的操作也有助於組織和簡化程式碼，特別是在需要進行較為複雜的查詢或分析時。

In [4]:
flight.createOrReplaceTempView("flight_data_2015")

In [5]:
# Spark 的 DataFrame API 和 SQL 語法實際上是等效的，它們可以互相轉換，而不會影響效能。
# 這種靈活性讓你可以選擇最方便的方式來定義轉換和操作，同時不會影響效能。
sqlway = spark.sql("""
                   SELECT DEST_COUNTRY_NAME, count(1) 
                   FROM flight_data_2015 
                   GROUP BY DEST_COUNTRY_NAME
                   """)
dataframeway = flight.groupBy("DEST_COUNTRY_NAME").count()  


# 你會透過 explain發現他們做了一樣的事情
sqlway.explain()
dataframeway.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=43]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/shannon/Library/CloudStorage/OneDrive-國立臺灣科技大學/NTUST/Germa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=56]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: In

In [6]:
# in Python
from pyspark.sql.functions import max

flight.select(max("count")).take(1)
spark.sql("SELECT max(count) from flight_data_2015").show()

+----------+
|max(count)|
+----------+
|    370002|
+----------+



In [7]:
from pyspark.sql.functions import desc

maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

# 使用 df 還挺麻煩的 還要先 import desc() 
max_df = flight\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)

max_df.show()
maxSql.show()

max_df.explain()
maxSql.explain()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#72L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#17,destination_total#72L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[sum(count#19)])
      +- Exchange hashpartitioning(DEST_COUNTR

In [8]:
# .option("header", "true") 這告訴 Spark 讀取 CSV 時將第一行視為標題行，而不是數據行，
# .option("inferSchema", "true") 這告訴 Spark 推斷每列的數據類型。
staticDataFrame = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load(os.path.join(root, "retail-data/by-day/*.csv"))
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema
staticSchema        

                                                                                

StructType([StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', IntegerType(), True), StructField('InvoiceDate', TimestampType(), True), StructField('UnitPrice', DoubleType(), True), StructField('CustomerID', DoubleType(), True), StructField('Country', StringType(), True)])

In [9]:
# in Python
# selectExpr 選擇 DataFrame 中的列並進行轉換 這裡選了三個column 並對一個column作轉換
# groupBy 進行分組
# window 通常與 groupby 一起使用，用來定義分組的時間窗口 

from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr( 
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|{2011-12-05 01:00...|            -37.6|
|   14126.0|{2011-11-29 01:00...|643.6300000000001|
|   13500.0|{2011-11-16 01:00...|497.9700000000001|
|   17160.0|{2011-11-08 01:00...|516.8499999999999|
|   15608.0|{2011-11-11 01:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



                                                                                

# Streaming DataFrames/Datasets

In [10]:
# in Python
# streamingDataFrame = spark.readStream\
#     .schema(staticSchema)\
#     .option("maxFilesPerTrigger", 1)\
#     .format("csv")\
#     .option("header", "true")\
#     .load(os.path.join(root,"retail-data/by-day/*.csv"))

# in Python
# purchaseByCustomerPerHour = streamingDataFrame\
#   .selectExpr(
#     "CustomerId",
#     "(UnitPrice * Quantity) as total_cost",
#     "InvoiceDate")\
#   .groupBy(
#     col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
#   .sum("total_cost")

# # writeStream 當每次 trigger 觸發的時候，會將結果輸出到內存表中
# streamingwritingstream = purchaseByCustomerPerHour.writeStream\
#     .format("console")\
#     .queryName("customer_purchases_2")\
#     .outputMode("complete")\
#     .start()



In [11]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



# Using Spark for Machine Learning

In [12]:
# in Python
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5) # 

In [13]:
preppedDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|     Monday|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|     Monday|
+---------+---------+-------------------

In [15]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")


print(f'training data count: {trainDataFrame.count()}') 
print(f'test data count: {testDataFrame.count()}')

training data count: 245903
test data count: 296006


In [17]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")

# in Python
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

# in Python
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")

In [18]:
# in Python
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])

In [19]:
# in Python
fittedPipeline = transformationPipeline.fit(trainDataFrame)
# in Python
transformedTraining = fittedPipeline.transform(trainDataFrame)
transformedTraining.cache()


DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [23]:
# in Python
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
    .setK(20)\
    .setSeed(1)
kmModel = kmeans.fit(transformedTraining)
kmModel

KMeansModel: uid=KMeans_68850adb828a, k=20, distanceMeasure=euclidean, numFeatures=7

In [25]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [29]:
from pyspark.ml.evaluation import ClusteringEvaluator

# 在訓練後使用訓練好的模型對測試數據進行預測
predictions = kmModel.transform(transformedTest)

# 評估聚類模型的 Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)

print("Silhouette with squared euclidean distance = " + str(silhouette))




Silhouette with squared euclidean distance = 0.4629312677226921


                                                                                

23/11/19 19:27:43 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 953140 ms exceeds timeout 120000 ms
23/11/19 19:27:43 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/19 19:27:46 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

# RDD 

In [None]:
# in Python
from pyspark.sql import Row

spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()