In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

In [58]:
import sys
import os

print(sys.executable)

os.environ['PYSPARK_PYTHON'] = sys.executable

spark = (
    SparkSession
    .builder
#     .master('local[2]')\
#     .appName('Hierarchical data') \
    .config('spark.sql.execution.arrow.pyspark.enabled', 'true')
    .config('spark.sql.shuffle.partitions', 4)
    .getOrCreate()
)

spark

C:\Users\chaon\anaconda3\envs\spark\python.exe


# Aggregate on hierarchical data

Steps:
1. extract hierarchy into dict, where key is child and value is a list containing itself, and its parent and grand-parents
2. replace child id with its parent list using udf
3. explode
4. do aggregate

In [3]:
hier = {
    1: [1],
    2: [2, 1],
    3: [3, 1],
    4: [4, 2, 1],
    5: [5, 3, 1],
    6: [6, 4, 2, 1],
    7: [7, 5, 3, 1],
    8: [8, 6, 4, 2, 1],
    9: [9, 5, 3, 1],
}

In [4]:
df = pd.DataFrame(data={
    'entity_id': [7, 7, 8, 8, 9, 9],
    'attr': ['f1', 'f2', 'f1', 'f2', 'f1', 'f2'],
    'value': [1, 5, 10, 50, 100, 500]
})
df

Unnamed: 0,entity_id,attr,value
0,7,f1,1
1,7,f2,5
2,8,f1,10
3,8,f2,50
4,9,f1,100
5,9,f2,500


In [5]:
dd = spark.createDataFrame(df)
dd.printSchema()

root
 |-- entity_id: long (nullable = true)
 |-- attr: string (nullable = true)
 |-- value: long (nullable = true)



In [6]:
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import *
from typing import List

@udf(returnType=ArrayType(StringType()))
def to_array(x: int) -> List[int]:
    return hier.get(x, [])

dd_new = (
    dd
    .withColumn('entity_id', to_array('entity_id'))
    .withColumn('entity_id', explode('entity_id'))
    .groupBy('entity_id', 'attr')
    .agg({'value': 'sum'})
)
dd_new.printSchema()

root
 |-- entity_id: string (nullable = true)
 |-- attr: string (nullable = true)
 |-- sum(value): long (nullable = true)



In [7]:
dd_new.show()

+---------+----+----------+
|entity_id|attr|sum(value)|
+---------+----+----------+
|        3|  f1|       101|
|        7|  f2|         5|
|        3|  f2|       505|
|        6|  f2|        50|
|        2|  f2|        50|
|        9|  f1|       100|
|        7|  f1|         1|
|        5|  f1|       101|
|        5|  f2|       505|
|        4|  f1|        10|
|        9|  f2|       500|
|        1|  f1|       111|
|        8|  f2|        50|
|        1|  f2|       555|
|        8|  f1|        10|
|        6|  f1|        10|
|        2|  f1|        10|
|        4|  f2|        50|
+---------+----+----------+



# 1D interpolation
## Fixed NA columns

In [59]:
# sample using numpy and scipy
x = np.array(['0', '0.3', '0.6', '1.0']).astype(np.float)
y = np.array([
    [0, np.nan, np.nan, 1],
    [1, np.nan, np.nan, 0],
])
cond = np.isnan(y[0])
yy = y[:, ~cond]
xx = x[~cond]

from scipy.interpolate import interp1d
f = interp1d(xx, yy)
y[:, cond] =  f(x[cond])
y

array([[0. , 0.3, 0.6, 1. ],
       [1. , 0.7, 0.4, 0. ]])

In [60]:
from pyspark.sql.functions import pandas_udf

# @pandas_udf()
# def interp_1d(pdf: pd.DataFrame) -> pd.DataFrame:
#     y = pdf.values
#     x = np.array(pdf.columns).astype(np.float)
#     cond = np.isnan(y[0])
#     yy = y[:, ~cond]
#     xx = x[~cond]
    
#     from scipy.interpolate import interp1d
#     f = interp1d(xx, yy)
#     y[:, cond] = f(x[cond])
#     return pd.DataFrame.from_records(y)

In [95]:
# test 
columns = ['0', '30', '60', '100']

df = pd.DataFrame(
    columns=['book_id'] + columns,
    data=[  
        ['a',  0.0, np.nan, np.nan, 100],
        ['b',  1.0, np.nan, np.nan, 0.0],
        ['c',  0.0, np.nan, np.nan, -100],
        ['d',  -1.0, np.nan, np.nan, 0.0],
     ])

df.dtypes == np.floating
# interp_1d.func(df[columns])
df.select_dtypes(include=np.floating).columns.isin(df.columns)

array([ True,  True,  True,  True])

In [96]:
from pyspark.sql.functions import struct, col

dd = spark.createDataFrame(df)
dd = dd.repartition(3)

def interp_1d(it):
    # assume each column is either all nan or all valid
    
    for pdf in it:
        # select floating columns
        df = pdf.select_dtypes(include=np.floating)
        columns = df.columns.tolist()
        
        # filter out nan columns
        y = df.values
        x = np.array(columns).astype(np.float)
        cond = np.isnan(y[0])
        yy = y[:, ~cond]
        xx = x[~cond]

        from scipy.interpolate import interp1d
        f = interp1d(xx, yy)
        y[:, cond] = f(x[cond])
        
        new_df = pd.DataFrame.from_records(y, columns=columns)
        
        df = pd.concat([new_df, pdf.select_dtypes(exclude=np.floating)], axis=1)
        
        yield df.reindex(columns=pdf.columns.tolist())
        
dd_new = dd.mapInPandas(interp_1d, dd.schema)

dd_new.printSchema()
dd_new.show()

root
 |-- book_id: string (nullable = true)
 |-- 0: double (nullable = true)
 |-- 30: double (nullable = true)
 |-- 60: double (nullable = true)
 |-- 100: double (nullable = true)

+-------+----+-----+-----+------+
|book_id|   0|   30|   60|   100|
+-------+----+-----+-----+------+
|      d|-1.0| -0.7| -0.4|   0.0|
|      a| 0.0| 30.0| 60.0| 100.0|
|      b| 1.0|  0.7|  0.4|   0.0|
|      c| 0.0|-30.0|-60.0|-100.0|
+-------+----+-----+-----+------+



# 2D interp

In [None]:
df = pd.DataFrame(
    columns=['book_id',  '0/0', '0/10', '10/0', '10/10'],
    data=[  ['a',  0.0, 10.0, 100.0, np.nan] ])

dd = spark.createDataFrame(df)
dd.printSchema()

In [None]:
from pyspark.sql.functions import pandas_udf
@pandas_udf('double')
def interp_2d(s: pd.Series) -> pd.Series:
    
    

# Window example

In [None]:
from pyspark import SparkContext
from pyspark.sql import Window
from pyspark.sql import functions as func
from pyspark.sql import SQLContext

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

tup = [(1, "a"), (3, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")]
df = sqlContext.createDataFrame(tup, ["id", "category"])
df.show()

In [None]:
window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", func.sum("id").over(window)).show()
# df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show()

In [None]:
# window = Window.partitionBy("category").orderBy("id").rowsBetween(-1, Window.currentRow)
window = Window.partitionBy("category").rowsBetween(-1, Window.currentRow)
df.withColumn("sum", func.sum("id").over(window)).show()
# df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show()

In [None]:
spark.stop()