In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
spark.conf.set('spark.sql.shuffle.partitions', '5')

In [25]:
staticDataFrame = spark.read.format('csv')\
    .option("inferSchema", "true")\
    .option('header', 'true')\
    .load("../data/retail-data/by-day/2010-12-01.csv")

In [26]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [32]:
from pyspark.sql.functions import window, column, desc, col

staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 hour"))\
    .sum("total_cost")\
    .show(10)
  

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15291.0|{2010-12-01 09:00...|             328.8|
|   16098.0|{2010-12-01 09:00...|430.59999999999997|
|   18074.0|{2010-12-01 09:00...|             489.6|
|   17924.0|{2010-12-01 10:00...|             279.0|
|   15862.0|{2010-12-01 11:00...| 354.2299999999999|
|      null|{2010-12-01 11:00...|               0.0|
|   13758.0|{2010-12-01 12:00...|362.45000000000005|
|   13694.0|{2010-12-01 12:00...|            842.12|
|   17968.0|{2010-12-01 12:00...|277.34999999999997|
|   17377.0|{2010-12-01 12:00...|223.90000000000003|
+----------+--------------------+------------------+
only showing top 10 rows



In [39]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("../data/retail-data/by-day/2010-12-01.csv")

In [35]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 hours"))\
  .sum("total_cost")

In [36]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x265c2e2c490>

In [40]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(10)

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [83]:
from pyspark.sql.functions import date_format, col
# 원래는 'hh'가 아니라 'EEEE' 였으나 2010-12-01 데이터만 가져왔으므로 시간별로 나눠야 함
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "hh"))\
  .coalesce(5)

In [84]:
# 데이터 읽는 양이 많아 2010-12-01.csv만 읽었으므로 시간을 다시 조절해야함
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2010-12-01 12:00:00'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2010-12-01 12:00:00'")

In [85]:
testDataFrame.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   536420|    21889|WOODEN BOX OF DOM...|      12|2010-12-01 12:03:00|     1.25|   16583.0|United Kingdom|         12|
|   536420|    21892|TRADITIONAL WOODE...|      12|2010-12-01 12:03:00|     1.25|   16583.0|United Kingdom|         12|
|   536420|    21891|TRADITIONAL WOODE...|      12|2010-12-01 12:03:00|     1.25|   16583.0|United Kingdom|         12|
|   536420|    21890|S/6 WOODEN SKITTL...|       6|2010-12-01 12:03:00|     2.95|   16583.0|United Kingdom|         12|
|   536420|    21718|RED METAL BEACH S...|      12|2010-12-01 12:03:00|     1.25|   16583.0|United Kingdom|         12|
|   536420|    21716|BOYS VINTAGE TIN ..

In [86]:
# 여기서 요일(day_of_week)는 범주형이므로 onehotencdoer로 실행해야함 -> 밑의 블록
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")

In [87]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

In [88]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")

In [89]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])

In [90]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [94]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [95]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
    .setK(20)\
    .setSeed(333)

In [96]:
kmModel = kmeans.fit(transformedTraining)

In [97]:
transformedTest = fittedPipeline.transform(testDataFrame)