## UD(A)Fs with PySpark
Ref : https://www.inovex.de/blog/efficient-udafs-with-pyspark/

In [1]:
# Connect to Spark by creating a Spark session
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("UD(A)F")\
    .getOrCreate()

In [2]:
import logging
import pandas as pd


_logger = logging.getLogger(__name__)


def rows_to_pandas(rows):
    """Converts a Spark Row iterator of a partition to a Pandas DataFrame assuming YARN

    Args:
        rows: iterator over PySpark Row objects

    Returns:
        Pandas DataFrame
    """
    first_row, rows = peek(rows)
    if not first_row:
        _logger.warning("Spark DataFrame is empty! Returning empty Pandas DataFrame!")
        return pd.DataFrame()

    first_row_info = ["{} ({}): {}".format(k, rtype(first_row[k]), first_row[k])
                      for k in first_row.__fields__]
    _logger.debug("First partition row: {}".format(first_row_info))
    df = pd.DataFrame.from_records(rows, columns=first_row.__fields__)
    _logger.debug("Converted partition to DataFrame of shape {} with types:\n{}".format(df.shape, df.dtypes))
    return df

In [3]:
from itertools import chain

def peek(iterable):
    """Peek into the first element and return the whole iterator again

    Args:
        iterable: iterable object like list or iterator

    Returns:
        tuple of first element and original iterable
    """
    iterable = iter(iterable)
    try:
        first_elem = next(iterable)
    except StopIteration:
        return None, iterable
    iterable = chain([first_elem], iterable)
    return first_elem, iterable


def rtype(var):
    """Heuristic representation for nested types/containers

    Args:
        var: some (nested) variable

    Returns:
        str: string representation of nested datatype (NA=Not Available)
    """
    def etype(x):
        return type(x).__name__

    if isinstance(var, list):
        elem_type = etype(var[0]) if var else "NA"
        return "List[{}]".format(elem_type)
    elif isinstance(var, dict):
        keys = list(var.keys())
        if keys:
            key = keys[0]
            key_type, val_type = etype(key), etype(var[key])
        else:
            key_type, val_type = "NA", "NA"
        return "Dict[{}, {}]".format(key_type, val_type)
    elif isinstance(var, tuple):
        elem_types = ', '.join(etype(elem) for elem in var)
        return "Tuple[{}]".format(elem_types)
    else:
        return etype(var)

In [4]:
import numpy as np
from pyspark.sql.types import Row


def convert_dtypes(rows):
    """Converts some Pandas data types to pure Python data types

    Args:
        rows (array): numpy recarray holding all rows

    Returns:
        Iterator over lists of row values
    """
    dtype_map = {pd.Timestamp: lambda x: x.to_pydatetime(),
                 np.int8: lambda x: int(x),
                 np.int16: lambda x: int(x),
                 np.int32: lambda x: int(x),
                 np.int64: lambda x: int(x),
                 np.float16: lambda x: float(x),
                 np.float32: lambda x: float(x),
                 np.float64: lambda x: float(x),
                 np.float128: lambda x: float(x)}
    for row in rows:
        yield [dtype_map.get(type(elem), lambda x: x)(elem) for elem in row]


def pandas_to_rows(df):
    """Converts Pandas DataFrame to iterator of Row objects

    Args:
        df: Pandas DataFrame

    Returns:
        Iterator over PySpark Row objects
    """
    if df is None:
        _logger.debug("Returning nothing")
        return iter([])
    if type(df) is pd.Series:
        df = df.to_frame().T
    if df.empty:
        _logger.warning("Pandas DataFrame is empty! Returning nothing!")
        return iter([])
    _logger.debug("Convert DataFrame of shape {} to partition with types:\n{}".format(df.shape, df.dtypes))
    records = df.to_records(index=False)
    records = convert_dtypes(records)
    first_row, records = peek(records)
    first_row_info = ["{} ({}): {}".format(k, rtype(v), v) for k, v in zip(df.columns, first_row)]
    _logger.debug("First record row: {}".format(first_row_info))
    row = Row(*df.columns)
    return (row(*elems) for elems in records)

In [5]:
from functools import wraps


class pandas_udaf(object):
    """Decorator for PySpark UDAFs using Pandas

    Args:
        loglevel (int): minimum loglevel for emitting messages
    """
    def __init__(self, loglevel=logging.INFO):
        self.loglevel = loglevel

    def __call__(self, func):
        @wraps(func)
        def wrapper(*args):
            # use *args to allow decorating methods (incl. self arg)
            args = list(args)
            setup_logger(loglevel=self.loglevel)
            args[-1] = rows_to_pandas(args[-1])
            df = func(*args)
            return pandas_to_rows(df)
        return wrapper

In [6]:
import os
import sys


def setup_logger(loglevel=logging.INFO, logfile="pyspark.log"):
    """Setup basic logging for logging on the executor

    Args:
        loglevel (int): minimum loglevel for emitting messages
        logfile (str): name of the logfile
    """
    logformat = "%(asctime)s %(levelname)s %(module)s.%(funcName)s: %(message)s"
    datefmt = "%y/%m/%d %H:%M:%S"
    try:
        logfile = os.path.join(os.environ['LOG_DIRS'].split(',')[0], logfile)
    except (KeyError, IndexError):
        logging.basicConfig(level=loglevel,
                            stream=sys.stdout, 
                            format=logformat,
                            datefmt=datefmt)
        logger = logging.getLogger(__name__)
        logger.error("LOG_DIRS is not in environment variables or empty, using STDOUT instead.")

    logging.basicConfig(level=loglevel,
                        filename=logfile,
                        format=logformat,
                        datefmt=datefmt)

In [7]:
import pyspark_udaf
import logging


@pyspark_udaf.pandas_udaf(loglevel=logging.DEBUG)
def my_func(df):
    if df.empty:
        return
    df = df.groupby('country').apply(lambda x: x.drop('country', axis=1).describe())
    return df.reset_index()

In [8]:
# make pyspark_udaf.py available to the executors
#spark.sparkContext.addFile('./pyspark_udaf.py')

df = spark.createDataFrame(
    data = [('DEU', 2, 1.0), ('DEU', 3, 8.0), ('FRA', 2, 6.0),
            ('FRA', 0, 8.0), ('DEU', 3, 8.0), ('FRA', 1, 3.0)],
    schema = ['country', 'feature1', 'feature2'])

stats_df = df.repartition('country').rdd.mapPartitions(my_func).toDF()
print(stats_df.toPandas())

   country level_1  feature1  feature2
0      FRA   count  3.000000  3.000000
1      FRA    mean  1.000000  5.666667
2      FRA     std  1.000000  2.516611
3      FRA     min  0.000000  3.000000
4      FRA     25%  0.500000  4.500000
5      FRA     50%  1.000000  6.000000
6      FRA     75%  1.500000  7.000000
7      FRA     max  2.000000  8.000000
8      DEU   count  3.000000  3.000000
9      DEU    mean  2.666667  5.666667
10     DEU     std  0.577350  4.041452
11     DEU     min  2.000000  1.000000
12     DEU     25%  2.500000  4.500000
13     DEU     50%  3.000000  8.000000
14     DEU     75%  3.000000  8.000000
15     DEU     max  3.000000  8.000000


## Example-2

In [9]:
data = [('F1','A1',30,800),
('F1','A2',60,100),
('F1','A3',90,150),
('F2','B1',30,50),
('F2','B2',60,40),
('F2','B3',90,60),
('F2','B4',0,200),
('F3','C1',30,90),
('F3','C2',60,50),
('F3','C3',90,10),
('F4','D1',30,300),
('F4','D2',0,20),
('F4','D3',90,100),
('F4','D4',0,60)
]
df = spark.createDataFrame(data, ["facility", "account", "delq","bal"])


In [10]:
df.show()

+--------+-------+----+---+
|facility|account|delq|bal|
+--------+-------+----+---+
|      F1|     A1|  30|800|
|      F1|     A2|  60|100|
|      F1|     A3|  90|150|
|      F2|     B1|  30| 50|
|      F2|     B2|  60| 40|
|      F2|     B3|  90| 60|
|      F2|     B4|   0|200|
|      F3|     C1|  30| 90|
|      F3|     C2|  60| 50|
|      F3|     C3|  90| 10|
|      F4|     D1|  30|300|
|      F4|     D2|   0| 20|
|      F4|     D3|  90|100|
|      F4|     D4|   0| 60|
+--------+-------+----+---+



In [11]:
import pyspark_udaf
import logging
from pyspark.sql.functions import lit

@pyspark_udaf.pandas_udaf(loglevel=logging.DEBUG)
def my_udf(df):
    if df.empty:
        return
    df['max'] = df.bal.max()
    grouped_df = df.groupby('facility').max()['bal']
    df = df.drop(['bal'], axis=1)
    final = df.join(grouped_df , on = 'facility')
    final['bal_10'] = final['bal'] * 10
    return final.reset_index()

In [18]:
print(df.rdd.getNumPartitions())
df = df.withColumn("sim_id", lit(0))
df_all = df
N = 400

for i in range(1,N):
    new_df = df.withColumn("sim_id", lit(i))
    df_all = df_all.union(new_df)

#df_all.show()
print(df_all.count())
result = df.repartition('facility').rdd.mapPartitions(my_udf).toDF()
result1 = df_all.repartition('facility','sim_id').rdd.mapPartitions(my_udf).toDF()

df2 = df_all.select('facility','sim_id').groupBy('facility','sim_id').count()
df2.count()
#print(result.rdd.getNumPartitions())

4
5600


1600

In [None]:
result.show()
result1.show()

##### SPARK-5063 Spark does not support nested RDDs or performing Spark actions inside of transformations
PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference 
an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside
of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map transformation. For more information.
    

In [14]:
def func():
    result = df.repartition('facility').rdd.mapPartitions(my_udf).toDF()
    return result

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import random
import time

sc = spark.sparkContext
#You Can't do this
#df = sc.parallelize([time.time() + i for i in range(2)]) \
#            .map(func).toDF()