In [1]:
# Configuration properties of Apache Spark
#sc.stop()
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, from_unixtime

APP_NAME = 'pyspark_python'
MASTER = 'local[*]'

conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster(MASTER)
spark = SparkSession.builder.config(conf = conf).getOrCreate()
sc = spark.sparkContext

In [2]:
%load_ext autoreload
%autoreload 2
# load my own functions
from utils.complete_missing_dates import *
from utils.partitions import *

In [3]:
import pyspark.sql.functions as psf
from pyspark.sql import Window
from datetime import datetime, timedelta
from itertools import product

### Create the data for the example

We use month partitions

In [4]:
data = (  # recreate the DataFrame
    (1, datetime(2019, 12, 2, 14, 54, 17), 49.94),
    (1, datetime(2019, 11, 3, 8, 58, 39), 50.49),
    (1, datetime(2019, 8, 6, 10, 44, 1), 50.24),
    (2, datetime(2019, 8, 2, 8, 58, 39), 62.32),
    (2, datetime(2019, 5, 4, 10, 44, 1), 65.64))
df = spark.createDataFrame(data, schema=("person", "timestamp", "weight"))

In [5]:
df.show()

+------+-------------------+------+
|person|          timestamp|weight|
+------+-------------------+------+
|     1|2019-12-02 14:54:17| 49.94|
|     1|2019-11-03 08:58:39| 50.49|
|     1|2019-08-06 10:44:01| 50.24|
|     2|2019-08-02 08:58:39| 62.32|
|     2|2019-05-04 10:44:01| 65.64|
+------+-------------------+------+



In [6]:
df.printSchema()

root
 |-- person: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- weight: double (nullable = true)



### Example of application

In [7]:
referece_col = 'person'
time_col = 'timestamp'

In [8]:
df = complete_missing_months(df, time_col, referece_col, spark)

In [9]:
df = df.sort('person', 'timestamp')

In [10]:
df = df.withColumn('partition_id', create_partitions_from_df('timestamp'))

In [11]:
df.sort('person', 'partition_id').show()

+-------------------+------+------+------------+
|          timestamp|person|weight|partition_id|
+-------------------+------+------+------------+
|2019-05-01 00:00:00|     1|  null|    20190531|
|2019-06-01 00:00:00|     1|  null|    20190630|
|2019-07-01 00:00:00|     1|  null|    20190731|
|2019-08-01 00:00:00|     1| 50.24|    20190831|
|2019-09-01 00:00:00|     1|  null|    20190930|
|2019-10-01 00:00:00|     1|  null|    20191031|
|2019-11-01 00:00:00|     1| 50.49|    20191130|
|2019-12-01 00:00:00|     1| 49.94|    20191231|
|2019-05-01 00:00:00|     2| 65.64|    20190531|
|2019-06-01 00:00:00|     2|  null|    20190630|
|2019-07-01 00:00:00|     2|  null|    20190731|
|2019-08-01 00:00:00|     2| 62.32|    20190831|
|2019-09-01 00:00:00|     2|  null|    20190930|
|2019-10-01 00:00:00|     2|  null|    20191031|
|2019-11-01 00:00:00|     2|  null|    20191130|
|2019-12-01 00:00:00|     2|  null|    20191231|
+-------------------+------+------+------------+



## With partitions

In [12]:
df_part = df.drop('timestamp')

In [13]:
df_part.show()

+------+------+------------+
|person|weight|partition_id|
+------+------+------------+
|     1|  null|    20190531|
|     1|  null|    20190630|
|     1|  null|    20190731|
|     1| 50.24|    20190831|
|     1|  null|    20190930|
|     1|  null|    20191031|
|     1| 50.49|    20191130|
|     1| 49.94|    20191231|
|     2| 65.64|    20190531|
|     2|  null|    20190630|
|     2|  null|    20190731|
|     2| 62.32|    20190831|
|     2|  null|    20190930|
|     2|  null|    20191031|
|     2|  null|    20191130|
|     2|  null|    20191231|
+------+------+------------+



In [14]:
referece_col = 'person'
time_col     = 'partition_id'

In [15]:
df_part = df_part.withColumn(time_col, montly_partition_YYmmdd(time_col))

In [16]:
df_part.printSchema()

root
 |-- person: long (nullable = true)
 |-- weight: double (nullable = true)
 |-- partition_id: timestamp (nullable = true)



In [17]:
list_intermediate_months(df_part, time_col) 

[datetime.date(2019, 5, 31),
 datetime.date(2019, 6, 30),
 datetime.date(2019, 7, 31),
 datetime.date(2019, 8, 31),
 datetime.date(2019, 9, 30),
 datetime.date(2019, 10, 31),
 datetime.date(2019, 11, 30),
 datetime.date(2019, 12, 31)]

In [18]:
df_part = complete_missing_months(df_part, time_col, referece_col, spark)

In [19]:
df_part.sort('person', 'partition_id').show()

+-------------------+------+------+
|       partition_id|person|weight|
+-------------------+------+------+
|2019-05-01 00:00:00|     1|  null|
|2019-06-01 00:00:00|     1|  null|
|2019-07-01 00:00:00|     1|  null|
|2019-08-01 00:00:00|     1| 50.24|
|2019-09-01 00:00:00|     1|  null|
|2019-10-01 00:00:00|     1|  null|
|2019-11-01 00:00:00|     1| 50.49|
|2019-12-01 00:00:00|     1| 49.94|
|2019-05-01 00:00:00|     2| 65.64|
|2019-06-01 00:00:00|     2|  null|
|2019-07-01 00:00:00|     2|  null|
|2019-08-01 00:00:00|     2| 62.32|
|2019-09-01 00:00:00|     2|  null|
|2019-10-01 00:00:00|     2|  null|
|2019-11-01 00:00:00|     2|  null|
|2019-12-01 00:00:00|     2|  null|
+-------------------+------+------+



## Other examples

Dates like integers

In [20]:
data = (  # recreate the DataFrame
    (1, 20191231, 49.94),
    (1, 20191130, 50.49),
    (1, 20191031, 50.24),
    (1, 20190531, 55.24),
    (2, 20190831, 62.32),
    (2, 20190131, 65.64))
df = spark.createDataFrame(data, schema=("person", "timestamp", "weight"))

In [21]:
df = df.withColumn("timestamp", 
                             sf.date_format(sf.to_date(sf.unix_timestamp(df['timestamp'].cast('string'), 
                              "yyyyMMdd").cast("timestamp")), 'yyyy-MM-dd'))


In [22]:
df.show()

+------+----------+------+
|person| timestamp|weight|
+------+----------+------+
|     1|2019-12-31| 49.94|
|     1|2019-11-30| 50.49|
|     1|2019-10-31| 50.24|
|     1|2019-05-31| 55.24|
|     2|2019-08-31| 62.32|
|     2|2019-01-31| 65.64|
+------+----------+------+



In [23]:
referece_col = 'person'
time_col     = 'timestamp'

In [24]:
df = complete_missing_months(df, time_col, referece_col, spark)

In [25]:
df.sort('person', 'timestamp').show(100, False)

+-------------------+------+------+
|timestamp          |person|weight|
+-------------------+------+------+
|2019-01-01 00:00:00|1     |null  |
|2019-02-01 00:00:00|1     |null  |
|2019-03-01 00:00:00|1     |null  |
|2019-04-01 00:00:00|1     |null  |
|2019-05-01 00:00:00|1     |55.24 |
|2019-06-01 00:00:00|1     |null  |
|2019-07-01 00:00:00|1     |null  |
|2019-08-01 00:00:00|1     |null  |
|2019-09-01 00:00:00|1     |null  |
|2019-10-01 00:00:00|1     |50.24 |
|2019-11-01 00:00:00|1     |50.49 |
|2019-12-01 00:00:00|1     |49.94 |
|2019-01-01 00:00:00|2     |65.64 |
|2019-02-01 00:00:00|2     |null  |
|2019-03-01 00:00:00|2     |null  |
|2019-04-01 00:00:00|2     |null  |
|2019-05-01 00:00:00|2     |null  |
|2019-06-01 00:00:00|2     |null  |
|2019-07-01 00:00:00|2     |null  |
|2019-08-01 00:00:00|2     |62.32 |
|2019-09-01 00:00:00|2     |null  |
|2019-10-01 00:00:00|2     |null  |
|2019-11-01 00:00:00|2     |null  |
|2019-12-01 00:00:00|2     |null  |
+-------------------+------+

In [26]:
df.na.fill(0).sort('person', 'timestamp').show(100, False)

+-------------------+------+------+
|timestamp          |person|weight|
+-------------------+------+------+
|2019-01-01 00:00:00|1     |0.0   |
|2019-02-01 00:00:00|1     |0.0   |
|2019-03-01 00:00:00|1     |0.0   |
|2019-04-01 00:00:00|1     |0.0   |
|2019-05-01 00:00:00|1     |55.24 |
|2019-06-01 00:00:00|1     |0.0   |
|2019-07-01 00:00:00|1     |0.0   |
|2019-08-01 00:00:00|1     |0.0   |
|2019-09-01 00:00:00|1     |0.0   |
|2019-10-01 00:00:00|1     |50.24 |
|2019-11-01 00:00:00|1     |50.49 |
|2019-12-01 00:00:00|1     |49.94 |
|2019-01-01 00:00:00|2     |65.64 |
|2019-02-01 00:00:00|2     |0.0   |
|2019-03-01 00:00:00|2     |0.0   |
|2019-04-01 00:00:00|2     |0.0   |
|2019-05-01 00:00:00|2     |0.0   |
|2019-06-01 00:00:00|2     |0.0   |
|2019-07-01 00:00:00|2     |0.0   |
|2019-08-01 00:00:00|2     |62.32 |
|2019-09-01 00:00:00|2     |0.0   |
|2019-10-01 00:00:00|2     |0.0   |
|2019-11-01 00:00:00|2     |0.0   |
|2019-12-01 00:00:00|2     |0.0   |
+-------------------+------+

### Better complete with the following number

It is clear that we do not have values for the fisrt days, so we have to complete them with other values.

In [27]:
complete_with_forward_value(df, 'person', 'weight', 'timestamp').show()

+------+-------------------+------+
|person|          timestamp|weight|
+------+-------------------+------+
|     1|2019-01-01 00:00:00|  null|
|     1|2019-02-01 00:00:00|  null|
|     1|2019-03-01 00:00:00|  null|
|     1|2019-04-01 00:00:00|  null|
|     1|2019-05-01 00:00:00| 55.24|
|     1|2019-06-01 00:00:00| 55.24|
|     1|2019-07-01 00:00:00| 55.24|
|     1|2019-08-01 00:00:00| 55.24|
|     1|2019-09-01 00:00:00| 55.24|
|     1|2019-10-01 00:00:00| 50.24|
|     1|2019-11-01 00:00:00| 50.49|
|     1|2019-12-01 00:00:00| 49.94|
|     2|2019-01-01 00:00:00| 65.64|
|     2|2019-02-01 00:00:00| 65.64|
|     2|2019-03-01 00:00:00| 65.64|
|     2|2019-04-01 00:00:00| 65.64|
|     2|2019-05-01 00:00:00| 65.64|
|     2|2019-06-01 00:00:00| 65.64|
|     2|2019-07-01 00:00:00| 65.64|
|     2|2019-08-01 00:00:00| 62.32|
+------+-------------------+------+
only showing top 20 rows



In [35]:
complete_with_backward_value(df, 'person', 'weight', 'timestamp').show(100,False)

+------+-------------------+------+
|person|timestamp          |weight|
+------+-------------------+------+
|1     |2019-01-01 00:00:00|55.24 |
|1     |2019-02-01 00:00:00|55.24 |
|1     |2019-03-01 00:00:00|55.24 |
|1     |2019-04-01 00:00:00|55.24 |
|1     |2019-05-01 00:00:00|55.24 |
|1     |2019-06-01 00:00:00|50.24 |
|1     |2019-07-01 00:00:00|50.24 |
|1     |2019-08-01 00:00:00|50.24 |
|1     |2019-09-01 00:00:00|50.24 |
|1     |2019-10-01 00:00:00|50.24 |
|1     |2019-11-01 00:00:00|50.49 |
|1     |2019-12-01 00:00:00|49.94 |
|2     |2019-01-01 00:00:00|65.64 |
|2     |2019-02-01 00:00:00|62.32 |
|2     |2019-03-01 00:00:00|62.32 |
|2     |2019-04-01 00:00:00|62.32 |
|2     |2019-05-01 00:00:00|62.32 |
|2     |2019-06-01 00:00:00|62.32 |
|2     |2019-07-01 00:00:00|62.32 |
|2     |2019-08-01 00:00:00|62.32 |
|2     |2019-09-01 00:00:00|null  |
|2     |2019-10-01 00:00:00|null  |
|2     |2019-11-01 00:00:00|null  |
|2     |2019-12-01 00:00:00|null  |
+------+-------------------+

In [38]:
from pyspark.sql.window import Window
import pyspark.sql.functions as func

df = complete_with_backward_value(df, 'person', 'weight', 'timestamp')
### Defining the window 
Windowspec=Window.partitionBy("person").orderBy("timestamp")

### Calculating lag of price at each day level
prev_day_price= df.withColumn('prev_day_price',
                        func.lag(df['weight'])
                                .over(Windowspec))

result = prev_day_price.withColumn('daily_return', 
          (prev_day_price['weight'] - prev_day_price['prev_day_price']) / 
prev_day_price['weight'] ).show()

+------+-------------------+------+--------------+--------------------+
|person|          timestamp|weight|prev_day_price|        daily_return|
+------+-------------------+------+--------------+--------------------+
|     1|2019-01-01 00:00:00| 55.24|          null|                null|
|     1|2019-02-01 00:00:00| 55.24|         55.24|                 0.0|
|     1|2019-03-01 00:00:00| 55.24|         55.24|                 0.0|
|     1|2019-04-01 00:00:00| 55.24|         55.24|                 0.0|
|     1|2019-05-01 00:00:00| 55.24|         55.24|                 0.0|
|     1|2019-06-01 00:00:00| 50.24|         55.24|-0.09952229299363057|
|     1|2019-07-01 00:00:00| 50.24|         50.24|                 0.0|
|     1|2019-08-01 00:00:00| 50.24|         50.24|                 0.0|
|     1|2019-09-01 00:00:00| 50.24|         50.24|                 0.0|
|     1|2019-10-01 00:00:00| 50.24|         50.24|                 0.0|
|     1|2019-11-01 00:00:00| 50.49|         50.24|0.004951475539

In [68]:
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as f
df = df.withColumn('add', lit(-1)).withColumn('max', lit(f.sum(col('add'))))
cum_sum =cum_sum.withColumn('cumsum', f.sum(col('add')).over(Window.partitionBy(col('person'))
                                                                          .orderBy().rowsBetween(-sys.maxsize, 0)))
cum_sum.show()

AnalysisException: "grouping expressions sequence is empty, and '`person`' is not an aggregate function. Wrap '(sum(CAST(`add` AS BIGINT)) AS `max`)' in windowing function(s) or wrap '`person`' in first() (or first_value) if you don't care which value you get.;;\nAggregate [person#171L, timestamp#5986, weight#5987, add#25522, sum(cast(add#25522 as bigint)) AS max#25528L]\n+- Project [person#171L, timestamp#5986, weight#5987, -1 AS add#25522]\n   +- Project [person#171L, coalesce(timestamp#3535, to_timestamp('timestamp, None)) AS timestamp#5986, coalesce(weight#3536, completed_column#5981) AS weight#5987]\n      +- Project [person#171L, timestamp#3535, weight#3536, completed_column#5981]\n         +- Project [person#171L, timestamp#3535, weight#3536, completed_column#5981, completed_column#5981]\n            +- Window [first(weight#3536, true) windowspecdefinition(person#171L, timestamp#3535 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), unboundedfollowing$())) AS completed_column#5981], [person#171L], [timestamp#3535 ASC NULLS FIRST]\n               +- Project [person#171L, timestamp#3535, weight#3536]\n                  +- Project [person#171L, coalesce(timestamp#1485, to_timestamp('timestamp, None)) AS timestamp#3535, coalesce(weight#1486, completed_column#3530) AS weight#3536]\n                     +- Project [person#171L, timestamp#1485, weight#1486, completed_column#3530]\n                        +- Project [person#171L, timestamp#1485, weight#1486, completed_column#3530, completed_column#3530]\n                           +- Window [first(weight#1486, true) windowspecdefinition(person#171L, timestamp#1485 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), unboundedfollowing$())) AS completed_column#3530], [person#171L], [timestamp#1485 ASC NULLS FIRST]\n                              +- Project [person#171L, timestamp#1485, weight#1486]\n                                 +- Project [person#171L, coalesce(timestamp#1474, to_timestamp('timestamp, None)) AS timestamp#1485, coalesce(weight#1475, completed_column#1480) AS weight#1486]\n                                    +- Project [person#171L, timestamp#1474, weight#1475, completed_column#1480]\n                                       +- Project [person#171L, timestamp#1474, weight#1475, completed_column#1480, completed_column#1480]\n                                          +- Window [first(weight#1475, true) windowspecdefinition(person#171L, timestamp#1474 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), unboundedfollowing$())) AS completed_column#1480], [person#171L], [timestamp#1474 ASC NULLS FIRST]\n                                             +- Project [person#171L, timestamp#1474, weight#1475]\n                                                +- Project [person#171L, coalesce(timestamp#170, to_timestamp('timestamp, None)) AS timestamp#1474, coalesce(weight#130, completed_column#1469) AS weight#1475]\n                                                   +- Project [timestamp#170, person#171L, weight#130, completed_column#1469]\n                                                      +- Project [timestamp#170, person#171L, weight#130, completed_column#1469, completed_column#1469]\n                                                         +- Window [first(weight#130, true) windowspecdefinition(person#171L, timestamp#170 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), unboundedfollowing$())) AS completed_column#1469], [person#171L], [timestamp#170 ASC NULLS FIRST]\n                                                            +- Project [timestamp#170, person#171L, weight#130]\n                                                               +- Project [coalesce(timestamp#148, cast(timestamp#167 as timestamp)) AS timestamp#170, coalesce(person#128L, person#166L) AS person#171L, weight#130]\n                                                                  +- Join FullOuter, ((timestamp#148 = cast(timestamp#167 as timestamp)) && (person#128L = person#166L))\n                                                                     :- Project [person#128L, cast(unix_timestamp(trunc(cast(timestamp#134 as date), month), yyyy-MM-dd HH:mm:ss, Some(Europe/Madrid)) as timestamp) AS timestamp#148, weight#130]\n                                                                     :  +- Project [person#128L, date_format(cast(to_date(cast(unix_timestamp(cast(timestamp#129L as string), yyyyMMdd, None) as timestamp), None) as timestamp), yyyy-MM-dd, Some(Europe/Madrid)) AS timestamp#134, weight#130]\n                                                                     :     +- LogicalRDD [person#128L, timestamp#129L, weight#130], false\n                                                                     +- LogicalRDD [person#166L, timestamp#167], false\n"