In [1]:
# import neccessary libraries and packages

import datetime

import numpy as np
import pandas as pd
import random

import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set_style("darkgrid")

from wordcloud import (
    WordCloud,
    ImageColorGenerator,
    )
import stylecloud

import pyspark
from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.sql import SparkSession
from pyspark.sql.window import Window

from pyspark.sql.types import (
    StringType,
    IntegerType, 
    DateType, 
    TimestampType,
    )

from pyspark.sql.functions import (
    min as Fmin, max as Fmax, 
    sum as Fsum, round as Fround, 
    col, desc, asc,
    count, countDistinct, 
    when, lit,  
    isnull, isnan,
    from_unixtime, dayofmonth, month, hour, date_format,
    lag, lead,
    first, last, split,
    )


In [3]:
# build a Spark session using the SparkSession APIs

spark = (SparkSession
        .builder 
        .appName("ExplorePySpark")
        .getOrCreate())

sc = spark.sparkContext
sc.setLogLevel("WARN")

In [5]:
## create Spark context

from pyspark import SQLContext,SparkConf,SparkContext,HiveContext
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
import pyspark.sql.functions as F


In [6]:
shopping_data = \
[('Alex','2018-10-10','Paint',80),('Alex','2018-04-02','Ladder',20),('Alex','2018-06-22','Stool',20),\
('Alex','2018-12-09','Vacuum',40),('Alex','2018-07-12','Bucket',5),('Alex','2018-02-18','Gloves',5),\
('Alex','2018-03-03','Brushes',30),('Alex','2018-09-26','Sandpaper',10)]


In [7]:
df = spark.createDataFrame(shopping_data, ['name','date','product','price'])\
                .withColumn('date',F.col('date').cast(DateType()))

In [8]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- date: date (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)



In [9]:
df.show()

+----+----------+---------+-----+
|name|      date|  product|price|
+----+----------+---------+-----+
|Alex|2018-10-10|    Paint|   80|
|Alex|2018-04-02|   Ladder|   20|
|Alex|2018-06-22|    Stool|   20|
|Alex|2018-12-09|   Vacuum|   40|
|Alex|2018-07-12|   Bucket|    5|
|Alex|2018-02-18|   Gloves|    5|
|Alex|2018-03-03|  Brushes|   30|
|Alex|2018-09-26|Sandpaper|   10|
+----+----------+---------+-----+



In [12]:
w0 = Window.partitionBy('name')

In [25]:
w1 = Window().partitionBy("name").rowsBetween(Window.unboundedPreceding,
                                              Window.unboundedFollowing)

In [13]:
# Sort purchases by descending order of price and have continuous ranking for ties.
df.withColumn('price_rank',F.dense_rank().over(w0.orderBy(F.col('price').desc()))).show()

+----+----------+---------+-----+----------+
|name|      date|  product|price|price_rank|
+----+----------+---------+-----+----------+
|Alex|2018-10-10|    Paint|   80|         1|
|Alex|2018-12-09|   Vacuum|   40|         2|
|Alex|2018-03-03|  Brushes|   30|         3|
|Alex|2018-04-02|   Ladder|   20|         4|
|Alex|2018-06-22|    Stool|   20|         4|
|Alex|2018-09-26|Sandpaper|   10|         5|
|Alex|2018-07-12|   Bucket|    5|         6|
|Alex|2018-02-18|   Gloves|    5|         6|
+----+----------+---------+-----+----------+



In [16]:
# Sort purchases by ascending order of price and have skip rankings for ties.
df.withColumn('price_rank',F.rank().over(w0.orderBy(F.col('price').asc()))).show()

+----+----------+---------+-----+----------+
|name|      date|  product|price|price_rank|
+----+----------+---------+-----+----------+
|Alex|2018-07-12|   Bucket|    5|         1|
|Alex|2018-02-18|   Gloves|    5|         1|
|Alex|2018-09-26|Sandpaper|   10|         3|
|Alex|2018-04-02|   Ladder|   20|         4|
|Alex|2018-06-22|    Stool|   20|         4|
|Alex|2018-03-03|  Brushes|   30|         6|
|Alex|2018-12-09|   Vacuum|   40|         7|
|Alex|2018-10-10|    Paint|   80|         8|
+----+----------+---------+-----+----------+



In [17]:
# Bucket purchases into 4 tiles (e.g. least expensive, middle tiers and most expensive purchases).
df.withColumn('price_bucket',F.ntile(4).over(w0.orderBy(F.col('price').desc()))).show()

+----+----------+---------+-----+------------+
|name|      date|  product|price|price_bucket|
+----+----------+---------+-----+------------+
|Alex|2018-10-10|    Paint|   80|           1|
|Alex|2018-12-09|   Vacuum|   40|           1|
|Alex|2018-03-03|  Brushes|   30|           2|
|Alex|2018-04-02|   Ladder|   20|           2|
|Alex|2018-06-22|    Stool|   20|           3|
|Alex|2018-09-26|Sandpaper|   10|           3|
|Alex|2018-07-12|   Bucket|    5|           4|
|Alex|2018-02-18|   Gloves|    5|           4|
+----+----------+---------+-----+------------+



In [18]:
# Sort purchases and generating a relative/percent rank to distance from max price.
df.withColumn('price_rel_rank',F.percent_rank().over(w0.orderBy(F.col('price').desc()))).show()

+----+----------+---------+-----+-------------------+
|name|      date|  product|price|     price_rel_rank|
+----+----------+---------+-----+-------------------+
|Alex|2018-10-10|    Paint|   80|                0.0|
|Alex|2018-12-09|   Vacuum|   40|0.14285714285714285|
|Alex|2018-03-03|  Brushes|   30| 0.2857142857142857|
|Alex|2018-04-02|   Ladder|   20|0.42857142857142855|
|Alex|2018-06-22|    Stool|   20|0.42857142857142855|
|Alex|2018-09-26|Sandpaper|   10| 0.7142857142857143|
|Alex|2018-07-12|   Bucket|    5| 0.8571428571428571|
|Alex|2018-02-18|   Gloves|    5| 0.8571428571428571|
+----+----------+---------+-----+-------------------+



In [19]:
# common calculations
df.withColumn('avg_to_date',     F.round(F.avg('price').over(w0.orderBy(F.col('date'))),2))\
  .withColumn('accumulating_sum',F.sum('price').over(w0.orderBy(F.col('date'))))\
  .withColumn('max_to_date',     F.max('price').over(w0.orderBy(F.col('date'))))\
  .withColumn('max_of_last2',    F.max('price').over(w0.orderBy(F.col('date')).rowsBetween(-1,Window.currentRow)))\
  .withColumn('items_to_date',   F.count('*').over(w0.orderBy(F.col('date'))))\
  .show()


+----+----------+---------+-----+-----------+----------------+-----------+------------+-------------+
|name|      date|  product|price|avg_to_date|accumulating_sum|max_to_date|max_of_last2|items_to_date|
+----+----------+---------+-----+-----------+----------------+-----------+------------+-------------+
|Alex|2018-02-18|   Gloves|    5|        5.0|               5|          5|           5|            1|
|Alex|2018-03-03|  Brushes|   30|       17.5|              35|         30|          30|            2|
|Alex|2018-04-02|   Ladder|   20|      18.33|              55|         30|          30|            3|
|Alex|2018-06-22|    Stool|   20|      18.75|              75|         30|          20|            4|
|Alex|2018-07-12|   Bucket|    5|       16.0|              80|         30|          20|            5|
|Alex|2018-09-26|Sandpaper|   10|       15.0|              90|         30|          10|            6|
|Alex|2018-10-10|    Paint|   80|      24.29|             170|         80|        

## Row Item Difference

The two functions below, **lag** and **lead**, are probably the most abstract examples in this article and could be confusing at first. The core concept here is essentially a subtraction between some row (e.g. current) and prior or future row(s). For examples, from the table below we can say “ 13 = (2018–03–03) — (2018–02–18) “ — which is a difference of days between two dates.

In [20]:
df.withColumn('days_from_last_purchase', F.datediff('date',F.lag('date',1).over(w0.orderBy(F.col('date')))))\
  .withColumn('days_before_next_purchase', F.datediff(F.lead('date',1).over(w0.orderBy(F.col('date'))),'date'))\
  .show()

+----+----------+---------+-----+-----------------------+-------------------------+
|name|      date|  product|price|days_from_last_purchase|days_before_next_purchase|
+----+----------+---------+-----+-----------------------+-------------------------+
|Alex|2018-02-18|   Gloves|    5|                   null|                       13|
|Alex|2018-03-03|  Brushes|   30|                     13|                       30|
|Alex|2018-04-02|   Ladder|   20|                     30|                       81|
|Alex|2018-06-22|    Stool|   20|                     81|                       20|
|Alex|2018-07-12|   Bucket|    5|                     20|                       76|
|Alex|2018-09-26|Sandpaper|   10|                     76|                       14|
|Alex|2018-10-10|    Paint|   80|                     14|                       60|
|Alex|2018-12-09|   Vacuum|   40|                     60|                     null|
+----+----------+---------+-----+-----------------------+-------------------

## Aggregations : Lists and Sets

Collect a set of prices ever paid (no duplicates) and collect a list of items paid at a certain price (permit duplicates).

I’m adding another purchase of paint to my data set in line 1 for the sake of example to generate duplicated items in lines 14 & 15 below.

In [22]:
newRow = spark.createDataFrame([('Alex','2018-10-11','Paint',80)])
df2 = df.union(newRow)

df2.withColumn('items_by_price', F.collect_list('product').over(w0.partitionBy('price')))\
   .withColumn('all_prices',     F.collect_set('price').over(w0)).show()


+----+----------+---------+-----+----------------+--------------------+
|name|      date|  product|price|  items_by_price|          all_prices|
+----+----------+---------+-----+----------------+--------------------+
|Alex|2018-07-12|   Bucket|    5|[Bucket, Gloves]|[30, 5, 20, 10, 4...|
|Alex|2018-02-18|   Gloves|    5|[Bucket, Gloves]|[30, 5, 20, 10, 4...|
|Alex|2018-09-26|Sandpaper|   10|     [Sandpaper]|[30, 5, 20, 10, 4...|
|Alex|2018-10-10|    Paint|   80|  [Paint, Paint]|[30, 5, 20, 10, 4...|
|Alex|2018-10-11|    Paint|   80|  [Paint, Paint]|[30, 5, 20, 10, 4...|
|Alex|2018-03-03|  Brushes|   30|       [Brushes]|[30, 5, 20, 10, 4...|
|Alex|2018-04-02|   Ladder|   20| [Ladder, Stool]|[30, 5, 20, 10, 4...|
|Alex|2018-06-22|    Stool|   20| [Ladder, Stool]|[30, 5, 20, 10, 4...|
|Alex|2018-12-09|   Vacuum|   40|        [Vacuum]|[30, 5, 20, 10, 4...|
+----+----------+---------+-----+----------------+--------------------+



In [23]:
df2.withColumn('items', F.collect_set('product').over(w0.partitionBy('price')))\
   .select('name','Price','items')\
   .distinct()\
   .show()

+----+-----+----------------+
|name|Price|           items|
+----+-----+----------------+
|Alex|    5|[Bucket, Gloves]|
|Alex|   10|     [Sandpaper]|
|Alex|   80|         [Paint]|
|Alex|   30|       [Brushes]|
|Alex|   20| [Ladder, Stool]|
|Alex|   40|        [Vacuum]|
+----+-----+----------------+



## Time Series — Moving Average

This is another slightly abstract idea along the lines of lag and lead. In this case we use the current row against a user defined range (e.g. 30 day buffer) in order to find a numerical value (e.g. mean) with the specified range.
As an example, let’s say I want to calculate the average purchase price over the past 30 days for each single 
purchase. 

From the example below on line 2, 17.5 = ( 5 + 30) /2 since the two purchases were within 30 days. Also we see a 40 = 40 / 1 , because the vacuum was the only product purchased in its look back period of 30 days.

In [24]:
days = lambda i: i * 86400 # 86400 seconds in a day  

df.withColumn('unix_time',F.col('date').cast('timestamp').cast('long'))\
  .withColumn('30day_moving_avg', F.avg('price') \
              .over(w0.orderBy(F.col('unix_time')) \
                    .rangeBetween(-days(30),0)))\
  .withColumn('45day_moving_stdv',F.stddev('price') \
              .over(w0.orderBy(F.col('unix_time')) \
                    .rangeBetween(-days(30),days(15))))\
  .show()

+----+----------+---------+-----+----------+----------------+------------------+
|name|      date|  product|price| unix_time|30day_moving_avg| 45day_moving_stdv|
+----+----------+---------+-----+----------+----------------+------------------+
|Alex|2018-02-18|   Gloves|    5|1518930000|             5.0| 17.67766952966369|
|Alex|2018-03-03|  Brushes|   30|1520053200|            17.5| 17.67766952966369|
|Alex|2018-04-02|   Ladder|   20|1522641600|            25.0|7.0710678118654755|
|Alex|2018-06-22|    Stool|   20|1529640000|            20.0|              null|
|Alex|2018-07-12|   Bucket|    5|1531368000|            12.5|10.606601717798213|
|Alex|2018-09-26|Sandpaper|   10|1537934400|            10.0| 49.49747468305833|
|Alex|2018-10-10|    Paint|   80|1539144000|            45.0| 49.49747468305833|
|Alex|2018-12-09|   Vacuum|   40|1544331600|            40.0|              null|
+----+----------+---------+-----+----------+----------------+------------------+

