In [1]:
from pyspark.sql import SparkSession

In [2]:
# session
spark = (SparkSession
        .builder
        .appName('flight_data_app')
        .getOrCreate()
)

# data fetch
# flight_data_2015 = (spark
#                     .read
#                     .option('inferSchema', 'true')
#                     .option('header', 'true')
#                     .csv(r'Spark-The-Definitive-Guide-master/data/flight-data/csv/2015-summary.csv')
# )

In [3]:
spark.conf.set('spark.sql.shuffle.partitions', 5)

flight_data_2015.sort('count').take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [6]:
flight_data_2015.createOrReplaceTempView('flight_data_2015_view')

In [11]:
sqlWay = spark.sql('''
SELECT DEST_COUNTRY_NAME, count(DEST_COUNTRY_NAME)
FROM flight_data_2015_view
GROUP BY DEST_COUNTRY_NAME
''')

In [12]:
dataFrameWay = (flight_data_2015
                .groupBy('DEST_COUNTRY_NAME')
                .count()
)

In [None]:
sqlWay.explain() == dataFrameWay.explain()

In [15]:
spark.sql('SELECT MAX(count) FROM flight_data_2015_view').take(1)

[Row(max(count)=370002)]

In [32]:
spark.sql('''
    SELECT 
        DEST_COUNTRY_NAME AS Destination, 
        SUM(count) AS Total_flights
    FROM flight_data_2015_view
    GROUP BY Destination
    ORDER BY Total_flights DESC
    LIMIT 5
''').show()

+--------------+-------------+
|   Destination|Total_flights|
+--------------+-------------+
| United States|       411352|
|        Canada|         8399|
|        Mexico|         7140|
|United Kingdom|         2025|
|         Japan|         1548|
+--------------+-------------+



#### Structured Streaming

In [3]:
staticDataFrame = (spark
                .read
                .format('csv')
                .option('inferSchema', 'true')
                .option('header', 'true')
                .load(r'Spark-The-Definitive-Guide-master\data\retail-data\all\online-retail-dataset.csv')
)

staticDataFrame.createOrReplaceTempView('retail_data')
staticSchema = staticDataFrame.schema

In [20]:
from pyspark.sql.functions import window, desc, col, to_date

spark.conf.set("spark.sql.shuffle.partitions", "5")

(staticDataFrame
        .withColumn("InvoiceDate", to_date(col("InvoiceDate"), "M/d/yyyy h:mm"))
        .selectExpr(
            "CustomerId",
            "(UnitPrice * Quantity) as total_cost",
            "InvoiceDate")
        .groupBy(
            col("CustomerId"), 
            window(col("InvoiceDate"), "1 day")
                )
        .sum("total_cost")
        .show(5)
)

+----------+--------------------+---------------+
|CustomerId|              window|sum(total_cost)|
+----------+--------------------+---------------+
|     13748|{2010-11-30 22:00...|          204.0|
|     15291|{2010-11-30 22:00...|          328.8|
|     17809|{2010-11-30 22:00...|           34.8|
|     14527|{2010-11-30 22:00...|          -27.5|
|     17420|{2010-11-30 22:00...|         130.85|
+----------+--------------------+---------------+
only showing top 5 rows



In [21]:
streamingDataFrame = (spark.readStream
                        .schema(staticSchema)
                        .option("maxFilesPerTrigger", 1)
                        .format("csv")
                        .option("header", "true")
                        .load(r"Spark-The-Definitive-Guide-master\data\retail-data\by-day\*.csv"))

In [None]:
streamingDataFrame.isStreaming

In [5]:
import pyspark.sql.functions as F


preppedDataFrame = (staticDataFrame
                        .na.fill(0)
                        .withColumn("InvoiceDate", F.to_date(F.col("InvoiceDate"), "M/d/yyyy h:mm"))
                        .withColumn("day_of_week", F.date_format(F.col("InvoiceDate"), "EEEE"))
                        .coalesce(5)
                    )

In [6]:
trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")

In [13]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler


indexer = (StringIndexer()
                .setInputCol("day_of_week")
                .setOutputCol("day_of_week_index"))

encoder = (OneHotEncoder()
                .setInputCol("day_of_week_index")
                .setOutputCol("day_of_week_encoded"))

vectorAssembler = (VectorAssembler()
                        .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])
                        .setOutputCol("features"))

In [25]:
from pyspark.ml import Pipeline


transformationPipeline = (Pipeline().setStages([indexer, encoder, vectorAssembler]))

fittedPipeline = transformationPipeline.fit(trainDataFrame)

transformedTraining = fittedPipeline.transform(trainDataFrame)
transformedTest = fittedPipeline.transform(testDataFrame)

In [None]:
transformedTraining.cache()

In [27]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator


kmeans = (KMeans()
            .setK(20)
            .setSeed(1)
        )

kmModel = kmeans.fit(transformedTraining)
evaluator = ClusteringEvaluator()

predictions_train = kmModel.transform(transformedTraining)
predictions_test = kmModel.transform(transformedTest)

silhouette_train = evaluator.evaluate(predictions_train)
silhouette_test = evaluator.evaluate(predictions_test)

print(f"Silhouette train with squared euclidean distance = {silhouette_train:.4f}")
print(f"Silhouette test with squared euclidean distance = {silhouette_test:.4f}")
print()

Silhouette train with squared euclidean distance = 0.9518
Silhouette test with squared euclidean distance = 0.9469
