In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
from pyspark import SparkContext

try:
    sc = SparkContext('local', 'my_context')
except ValueError:
    print('Spark_Context already exists!')

from pyspark.sql import SparkSession
try:
    spark = SparkSession.builder.appName('window_functions').getOrCreate()
except ValueError:
    print('Spark_Session already exists!')

In [2]:
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
import pyspark.sql.functions as F
from pyspark.sql.functions import rank, col

In [8]:
%%time 
df_spark = spark.read.csv("db_power_consumption.csv", header=True, inferSchema=True)

Wall time: 532 ms


In [4]:
df_spark.printSchema()

root
 |-- code: string (nullable = true)
 |-- area: string (nullable = true)
 |-- date: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- use: string (nullable = true)
 |-- stratum: integer (nullable = true)
 |-- consumption: double (nullable = true)



In [5]:
df_spark.show(10)

+--------+----+----------+-------------+-----------+-------+-----------+
|    code|area|      date| municipality|        use|stratum|consumption|
+--------+----+----------+-------------+-----------+-------+-----------+
|C1UPOLR2|   U|2012-01-31|    POLICARPA|Residential|      2|      1.009|
| 8UBARE0|   U|2015-07-31|    BARBACOAS|    Special|      0|      1.412|
| 8UROBC0|   U|2016-05-31|ROBERTO PAYAN| Commercial|      0|      2.361|
|C0RPOLR1|   R|2014-01-31|    POLICARPA|Residential|      1|      2.502|
|C4RLEIC0|   R|2014-08-31|        LEIVA| Commercial|      0|      3.419|
|C0RCUMR1|   R|2014-12-31|    CUMBITARA|Residential|      1|      4.854|
| 8UBARE0|   U|2014-12-31|    BARBACOAS|    Special|      0|      5.233|
|C1UPOLR2|   U|2011-12-31|    POLICARPA|Residential|      2|      7.353|
| 8UBARE0|   U|2013-10-31|    BARBACOAS|    Special|      0|      9.074|
| 8UBARE0|   U|2014-06-30|    BARBACOAS|    Special|      0|      9.511|
+--------+----+----------+-------------+-----------

In [11]:
df_spark.registerTempTable("dataframe")
df_spark1 = spark.sql("""select *,to_date(date) as date_fmt from dataframe""")
df_spark1.show(5)
df_spark1.printSchema()

+--------+----+----------+-------------+-----------+-------+-----------+----------+
|    code|area|      date| municipality|        use|stratum|consumption|  date_fmt|
+--------+----+----------+-------------+-----------+-------+-----------+----------+
|C1UPOLR2|   U|2012-01-31|    POLICARPA|Residential|      2|      1.009|2012-01-31|
| 8UBARE0|   U|2015-07-31|    BARBACOAS|    Special|      0|      1.412|2015-07-31|
| 8UROBC0|   U|2016-05-31|ROBERTO PAYAN| Commercial|      0|      2.361|2016-05-31|
|C0RPOLR1|   R|2014-01-31|    POLICARPA|Residential|      1|      2.502|2014-01-31|
|C4RLEIC0|   R|2014-08-31|        LEIVA| Commercial|      0|      3.419|2014-08-31|
+--------+----+----------+-------------+-----------+-------+-----------+----------+
only showing top 5 rows

root
 |-- code: string (nullable = true)
 |-- area: string (nullable = true)
 |-- date: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- use: string (nullable = true)
 |-- stratum: integer (null

#### For some reason, the following code doesn't convert to date format. Use the above snippet for now

In [9]:
from pyspark.sql.functions import date_format
df_spark1=df_spark.select("*",F.to_date('date', 'dd-MM-yyyy').alias('date_fmt'))

In [10]:
df_spark1.show(5)
df_spark1.printSchema()

+--------+----+----------+-------------+-----------+-------+-----------+--------+
|    code|area|      date| municipality|        use|stratum|consumption|date_fmt|
+--------+----+----------+-------------+-----------+-------+-----------+--------+
|C1UPOLR2|   U|2012-01-31|    POLICARPA|Residential|      2|      1.009|    null|
| 8UBARE0|   U|2015-07-31|    BARBACOAS|    Special|      0|      1.412|    null|
| 8UROBC0|   U|2016-05-31|ROBERTO PAYAN| Commercial|      0|      2.361|    null|
|C0RPOLR1|   R|2014-01-31|    POLICARPA|Residential|      1|      2.502|    null|
|C4RLEIC0|   R|2014-08-31|        LEIVA| Commercial|      0|      3.419|    null|
+--------+----+----------+-------------+-----------+-------+-----------+--------+
only showing top 5 rows

root
 |-- code: string (nullable = true)
 |-- area: string (nullable = true)
 |-- date: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- use: string (nullable = true)
 |-- stratum: integer (nullable = true)
 |-- 

### Drop not required values

In [12]:
drop_column_list = ['code', 'date','area','use','stratum']
df_spark1 = df_spark1.select([column for column in df_spark1.columns if column not in drop_column_list])

In [13]:
df_spark1.show(5)

+-------------+-----------+----------+
| municipality|consumption|  date_fmt|
+-------------+-----------+----------+
|    POLICARPA|      1.009|2012-01-31|
|    BARBACOAS|      1.412|2015-07-31|
|ROBERTO PAYAN|      2.361|2016-05-31|
|    POLICARPA|      2.502|2014-01-31|
|        LEIVA|      3.419|2014-08-31|
+-------------+-----------+----------+
only showing top 5 rows



### Rearrange the columns

In [14]:
df_spark1 = df_spark1.select("date_fmt","municipality","consumption")

In [15]:
df_spark1.show(5)

+----------+-------------+-----------+
|  date_fmt| municipality|consumption|
+----------+-------------+-----------+
|2012-01-31|    POLICARPA|      1.009|
|2015-07-31|    BARBACOAS|      1.412|
|2016-05-31|ROBERTO PAYAN|      2.361|
|2014-01-31|    POLICARPA|      2.502|
|2014-08-31|        LEIVA|      3.419|
+----------+-------------+-----------+
only showing top 5 rows



### Group data by municipality, date and consumption

#### Group and aggregate the data

In [23]:
df_spark2=df_spark1.groupby('municipality','date_fmt').agg(F.round(F.sum('consumption'),2).alias('tot_consumption'))

In [24]:
df_spark2.show()

+-------------+----------+---------------+
| municipality|  date_fmt|tot_consumption|
+-------------+----------+---------------+
|   EL ROSARIO|2015-09-30|       231414.9|
|    CUMBITARA|2012-10-31|       62700.26|
|    CUMBITARA|2012-05-31|       42375.88|
|   EL ROSARIO|2015-02-28|      225416.82|
|   EL ROSARIO|2015-07-31|      233605.42|
|    POLICARPA|2013-08-31|       233389.5|
|        LEIVA|2012-12-31|      193576.67|
|        LEIVA|2015-04-30|       221222.2|
|   EL ROSARIO|2011-12-31|      214559.13|
|        MAGUI|2013-04-30|       92288.27|
|        LEIVA|2011-03-31|      197617.95|
|ROBERTO PAYAN|2010-12-31|       48030.05|
|        MAGUI|2012-11-30|       93518.12|
|        MAGUI|2012-02-29|       72681.88|
|   EL ROSARIO|2014-04-30|      228163.92|
|    POLICARPA|2012-06-30|      224221.87|
|    BARBACOAS|2015-03-31|      278410.21|
|        MAGUI|2011-12-31|       75075.12|
|    CUMBITARA|2012-03-31|       61464.16|
|    CUMBITARA|2015-09-30|       38792.51|
+----------

In [18]:
# create window partition
WP = Window.partitionBy('municipality')

### Get Top 5 rows by power consumption of each municipality

In [19]:
n = 5
df_spark2=df_spark2.select(F.col('*'), F.row_number().over(WP.orderBy(F.col('tot_consumption').desc())).alias('row_number')) \
  .where(col('row_number') <= n) \
  .limit(20)

In [20]:
df_spark2.show()

+-------------+----------+---------------+----------+
| municipality|  date_fmt|tot_consumption|row_number|
+-------------+----------+---------------+----------+
|        MAGUI|2016-04-30|       124555.9|         1|
|        MAGUI|2015-01-31|      118490.82|         2|
|        MAGUI|2016-05-31|      112585.51|         3|
|        MAGUI|2014-04-30|       111919.1|         4|
|        MAGUI|2016-01-31|      108690.42|         5|
|   EL ROSARIO|2014-11-30|      236893.37|         1|
|   EL ROSARIO|2016-02-29|      234581.62|         2|
|   EL ROSARIO|2015-07-31|      233605.42|         3|
|   EL ROSARIO|2015-04-30|      232126.16|         4|
|   EL ROSARIO|2014-05-31|      231863.53|         5|
|    BARBACOAS|2012-01-31|      659715.81|         1|
|    BARBACOAS|2012-02-29|       643820.0|         2|
|    BARBACOAS|2011-02-28|      636049.18|         3|
|    BARBACOAS|2011-01-31|      629347.44|         4|
|    BARBACOAS|2011-11-30|      627496.59|         5|
|ROBERTO PAYAN|2012-11-30|  

### Rank by total consumption (Highest to lowest)

In [21]:
df_spark2.withColumn('tot_consumption_rank',F.rank().over(WP.orderBy(F.col('tot_consumption').desc()))).show(20)

+-------------+----------+---------------+----------+--------------------+
| municipality|  date_fmt|tot_consumption|row_number|tot_consumption_rank|
+-------------+----------+---------------+----------+--------------------+
|    BARBACOAS|2012-01-31|      659715.81|         1|                   1|
|    BARBACOAS|2012-02-29|       643820.0|         2|                   2|
|    BARBACOAS|2011-02-28|      636049.18|         3|                   3|
|    BARBACOAS|2011-01-31|      629347.44|         4|                   4|
|    BARBACOAS|2011-11-30|      627496.59|         5|                   5|
|   EL ROSARIO|2014-11-30|      236893.37|         1|                   1|
|   EL ROSARIO|2016-02-29|      234581.62|         2|                   2|
|   EL ROSARIO|2015-07-31|      233605.42|         3|                   3|
|   EL ROSARIO|2015-04-30|      232126.16|         4|                   4|
|   EL ROSARIO|2014-05-31|      231863.53|         5|                   5|
|        MAGUI|2016-04-30

### Dense rank by total consumption

In [22]:
df_spark2.withColumn('tot_consumption_rank',F.dense_rank().over(WP.orderBy(F.col('tot_consumption').desc()))).show(20)

+-------------+----------+---------------+----------+--------------------+
| municipality|  date_fmt|tot_consumption|row_number|tot_consumption_rank|
+-------------+----------+---------------+----------+--------------------+
|    BARBACOAS|2012-01-31|      659715.81|         1|                   1|
|    BARBACOAS|2012-02-29|       643820.0|         2|                   2|
|    BARBACOAS|2011-02-28|      636049.18|         3|                   3|
|    BARBACOAS|2011-01-31|      629347.44|         4|                   4|
|    BARBACOAS|2011-11-30|      627496.59|         5|                   5|
|   EL ROSARIO|2014-11-30|      236893.37|         1|                   1|
|   EL ROSARIO|2016-02-29|      234581.62|         2|                   2|
|   EL ROSARIO|2015-07-31|      233605.42|         3|                   3|
|   EL ROSARIO|2015-04-30|      232126.16|         4|                   4|
|   EL ROSARIO|2014-05-31|      231863.53|         5|                   5|
|        MAGUI|2016-04-30

### NTile by total consumption

In [26]:
## creates 10 buckets using ntile on municipality by total consumption
df_spark2.withColumn('tot_consumption_bucket',F.ntile(10).over(WP.orderBy(F.col('tot_consumption').desc()))).show()

+------------+----------+---------------+----------------------+
|municipality|  date_fmt|tot_consumption|tot_consumption_bucket|
+------------+----------+---------------+----------------------+
|       MAGUI|2016-04-30|       124555.9|                     1|
|       MAGUI|2015-01-31|      118490.82|                     1|
|       MAGUI|2016-05-31|      112585.51|                     1|
|       MAGUI|2014-04-30|       111919.1|                     1|
|       MAGUI|2016-01-31|      108690.42|                     1|
|       MAGUI|2014-12-31|      105525.45|                     1|
|       MAGUI|2014-11-30|      105206.87|                     1|
|       MAGUI|2013-01-31|      102780.78|                     2|
|       MAGUI|2014-01-31|      101169.26|                     2|
|       MAGUI|2014-09-30|       98895.35|                     2|
|       MAGUI|2014-10-31|       98130.11|                     2|
|       MAGUI|2016-03-31|       96010.68|                     2|
|       MAGUI|2012-11-30|

### Percent Rank

In [27]:
df_spark2.withColumn('tot_consumption_bucket_percent_rant',F.percent_rank().over(WP.orderBy(F.col('tot_consumption').desc()))).show()

+------------+----------+---------------+-----------------------------------+
|municipality|  date_fmt|tot_consumption|tot_consumption_bucket_percent_rant|
+------------+----------+---------------+-----------------------------------+
|       MAGUI|2016-04-30|       124555.9|                                0.0|
|       MAGUI|2015-01-31|      118490.82|               0.015384615384615385|
|       MAGUI|2016-05-31|      112585.51|                0.03076923076923077|
|       MAGUI|2014-04-30|       111919.1|               0.046153846153846156|
|       MAGUI|2016-01-31|      108690.42|                0.06153846153846154|
|       MAGUI|2014-12-31|      105525.45|                0.07692307692307693|
|       MAGUI|2014-11-30|      105206.87|                0.09230769230769231|
|       MAGUI|2013-01-31|      102780.78|                 0.1076923076923077|
|       MAGUI|2014-01-31|      101169.26|                0.12307692307692308|
|       MAGUI|2014-09-30|       98895.35|                0.13846

### Moving average and running sum - total consumption

In [28]:
df_spark2.withColumn('mov_avg_tot_consumption',F.round(F.avg('tot_consumption').over(WP.orderBy(F.col('date_fmt'))),2)).show()

+------------+----------+---------------+-----------------------+
|municipality|  date_fmt|tot_consumption|mov_avg_tot_consumption|
+------------+----------+---------------+-----------------------+
|       MAGUI|2010-12-31|       44328.21|               44328.21|
|       MAGUI|2011-01-31|       49912.92|               47120.57|
|       MAGUI|2011-02-28|       57197.63|               50479.59|
|       MAGUI|2011-03-31|       47008.81|               49611.89|
|       MAGUI|2011-04-30|        48695.6|               49428.63|
|       MAGUI|2011-05-31|       46675.22|               48969.73|
|       MAGUI|2011-06-30|       57055.37|               50124.82|
|       MAGUI|2011-07-31|       58957.25|               51228.88|
|       MAGUI|2011-08-31|       73748.56|               53731.06|
|       MAGUI|2011-09-30|       64655.32|               54823.49|
|       MAGUI|2011-10-31|        63876.4|               55646.48|
|       MAGUI|2011-11-30|       80375.26|               57707.21|
|       MA

In [29]:
df_spark2.withColumn('run_sum',F.round(F.sum('tot_consumption').over(WP.orderBy(F.col('date_fmt'))),2)).show()

+------------+----------+---------------+----------+
|municipality|  date_fmt|tot_consumption|   run_sum|
+------------+----------+---------------+----------+
|       MAGUI|2010-12-31|       44328.21|  44328.21|
|       MAGUI|2011-01-31|       49912.92|  94241.13|
|       MAGUI|2011-02-28|       57197.63| 151438.76|
|       MAGUI|2011-03-31|       47008.81| 198447.57|
|       MAGUI|2011-04-30|        48695.6| 247143.17|
|       MAGUI|2011-05-31|       46675.22| 293818.39|
|       MAGUI|2011-06-30|       57055.37| 350873.76|
|       MAGUI|2011-07-31|       58957.25| 409831.01|
|       MAGUI|2011-08-31|       73748.56| 483579.57|
|       MAGUI|2011-09-30|       64655.32| 548234.89|
|       MAGUI|2011-10-31|        63876.4| 612111.29|
|       MAGUI|2011-11-30|       80375.26| 692486.55|
|       MAGUI|2011-12-31|       75075.12| 767561.67|
|       MAGUI|2012-01-31|       86589.64| 854151.31|
|       MAGUI|2012-02-29|       72681.88| 926833.19|
|       MAGUI|2012-03-31|       67038.81|  993

### Get days from previous reading by each municipality

In [30]:
df_spark2.withColumn('days_from_last_consumption', F.datediff('date_fmt',F.lag('date_fmt',1).over(WP.orderBy(F.col('date_fmt'))))).show()  

+------------+----------+---------------+--------------------------+
|municipality|  date_fmt|tot_consumption|days_from_last_consumption|
+------------+----------+---------------+--------------------------+
|       MAGUI|2010-12-31|       44328.21|                      null|
|       MAGUI|2011-01-31|       49912.92|                        31|
|       MAGUI|2011-02-28|       57197.63|                        28|
|       MAGUI|2011-03-31|       47008.81|                        31|
|       MAGUI|2011-04-30|        48695.6|                        30|
|       MAGUI|2011-05-31|       46675.22|                        31|
|       MAGUI|2011-06-30|       57055.37|                        30|
|       MAGUI|2011-07-31|       58957.25|                        31|
|       MAGUI|2011-08-31|       73748.56|                        31|
|       MAGUI|2011-09-30|       64655.32|                        30|
|       MAGUI|2011-10-31|        63876.4|                        31|
|       MAGUI|2011-11-30|       80

### Get days for next reading by each municipality

In [31]:
df_spark2.withColumn('days_before_next_consumption', F.datediff(F.lead('date_fmt',1).over(WP.orderBy(F.col('date_fmt'))),'date_fmt')).show()

+------------+----------+---------------+----------------------------+
|municipality|  date_fmt|tot_consumption|days_before_next_consumption|
+------------+----------+---------------+----------------------------+
|       MAGUI|2010-12-31|       44328.21|                          31|
|       MAGUI|2011-01-31|       49912.92|                          28|
|       MAGUI|2011-02-28|       57197.63|                          31|
|       MAGUI|2011-03-31|       47008.81|                          30|
|       MAGUI|2011-04-30|        48695.6|                          31|
|       MAGUI|2011-05-31|       46675.22|                          30|
|       MAGUI|2011-06-30|       57055.37|                          31|
|       MAGUI|2011-07-31|       58957.25|                          31|
|       MAGUI|2011-08-31|       73748.56|                          30|
|       MAGUI|2011-09-30|       64655.32|                          31|
|       MAGUI|2011-10-31|        63876.4|                          30|
|     