In [1]:
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
from pyspark.sql.types import StringType, IntegerType, StructType, StructField

schema = StructType([
        StructField("city", StringType(), True),
        StructField("country", StringType(), True),
        StructField("population", IntegerType(), True)])

countries = ['Colombia', 'US@A', 'Brazil', 'Spain']
cities = ['Bogotá', 'New York', '   São Paulo   ', '~Madrid']
population = [37800000,19795791,12341418,6489162]

In [8]:
df = spark.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df.head(3)

[Row(city='Bogotá', country='Colombia', population=37800000),
 Row(city='New York', country='US@A', population=19795791),
 Row(city='   São Paulo   ', country='Brazil', population=12341418)]

In [4]:
# Referece 
# https://stackoverflow.com/questions/41598383/is-it-possible-to-subclass-dataframe-in-pyspark
# Adding fucntion to python on the fly http://olivierpisano.over-blog.com/article-adding-methods-to-a-type-without-subclassing-72909750.html

from pyspark.sql import DataFrame
from pyspark.sql.functions import *


class op_dataframe(DataFrame):
    def __init__(self, df):
        super(self.__class__, self).__init__(df._jdf, df.sql_ctx)

    def lower(self, columns):
        return op_dataframe(self.withColumn(columns, lower(col(columns))))
    
    def upper(self, columns):
        return op_dataframe(self.withColumn(columns, upper(col(columns))))
        
    def reverse(self, columns):
         return op_dataframe(self.withColumn(columns, reverse(col(columns))))
    
    def max(self,columns):
        return op_dataframe(self.agg("max", columns))
    
class op():
    def DataFrame(data, schema):
        df = spark.createDataFrame(data, schema=schema)
        return op_dataframe(df)

# This approach has a couple of problems
# 1. if you chain a dataframe method and then use a optimus dataframe the chaining will fail beacuse you get a vanilla dataframe
# df1 = op_df.withColumn("city", lower(col("city"))).reverse("city")
# 2. if you use a withColumn as last methos you will have to 'cast' to a optimus dataframe
# df1 = op_df.withColumn("city", lower(col("city")))
# df1 = op_dataframe(df1)
# df1.upper("city").show()

In [6]:

op_df = op.DataFrame(list(zip(cities, countries, population)), schema = schema)

print(type(op_df))
df1 = op_df.upper("city").reverse("city").withColumn("city", lower(col("city")))
print(type(df1))

#df1.max1()
df1.explain(True)
df1.show()
print(df1)

<class '__main__.op_dataframe'>
<class 'pyspark.sql.dataframe.DataFrame'>
== Parsed Logical Plan ==
'Project [lower('city) AS city#26, country#13, population#14]
+- AnalysisBarrier
      +- Project [reverse(city#18) AS city#22, country#13, population#14]
         +- Project [upper(city#12) AS city#18, country#13, population#14]
            +- LogicalRDD [city#12, country#13, population#14], false

== Analyzed Logical Plan ==
city: string, country: string, population: int
Project [lower(city#22) AS city#26, country#13, population#14]
+- Project [reverse(city#18) AS city#22, country#13, population#14]
   +- Project [upper(city#12) AS city#18, country#13, population#14]
      +- LogicalRDD [city#12, country#13, population#14], false

== Optimized Logical Plan ==
Project [lower(reverse(upper(city#12))) AS city#26, country#13, population#14]
+- LogicalRDD [city#12, country#13, population#14], false

== Physical Plan ==
*(1) Project [lower(reverse(upper(city#12))) AS city#26, country#13, pop

In [7]:
column = "city"
result = df.withColumn(column, upper(col(column))).withColumn(column, reverse(col(column))).withColumn(column, lower(col(column)))
result.explain(True)
result.show()

== Parsed Logical Plan ==
'Project [lower('city) AS city#49, country#1, population#2]
+- AnalysisBarrier
      +- Project [reverse(city#41) AS city#45, country#1, population#2]
         +- Project [upper(city#0) AS city#41, country#1, population#2]
            +- LogicalRDD [city#0, country#1, population#2], false

== Analyzed Logical Plan ==
city: string, country: string, population: int
Project [lower(city#45) AS city#49, country#1, population#2]
+- Project [reverse(city#41) AS city#45, country#1, population#2]
   +- Project [upper(city#0) AS city#41, country#1, population#2]
      +- LogicalRDD [city#0, country#1, population#2], false

== Optimized Logical Plan ==
Project [lower(reverse(upper(city#0))) AS city#49, country#1, population#2]
+- LogicalRDD [city#0, country#1, population#2], false

== Physical Plan ==
*(1) Project [lower(reverse(upper(city#0))) AS city#49, country#1, population#2]
+- Scan ExistingRDD[city#0,country#1,population#2]
+---------------+--------+----------+
| 

# using decorators

In [9]:
# Reference https://medium.com/@mgarod/dynamically-add-a-method-to-a-class-in-python-c49204b85bd6

from functools import wraps # This convenience func preserves name and docstring

from pyspark.sql import DataFrame
from pyspark.sql import functions as F

# decorator to attach a custom fuction to a class
def add_method(cls):
    def decorator(func):
        @wraps(func) 
        def wrapper(self, *args, **kwargs): 
            return func(self, *args, **kwargs)
        setattr(cls, func.__name__, wrapper)
        # Note we are not binding func, but wrapper which accepts self but does exactly the same as func
        return func # returning func means func can still be used normally
    return decorator

@add_method(DataFrame)
def lower(self, columns):
     return self.apply(columns, F.lower)

@add_method(DataFrame)
def upper(self, columns):
     return self.apply(columns, F.upper)

@add_method(DataFrame)
def reverse(self, columns):
    return self.apply(columns, F.reverse)

@add_method(DataFrame)
def astype(self, type):
    print
    return 1


@add_method(DataFrame)
def select_cols(self, columns):
    
     # Verify that columns are a string or list of string
    assert_type_str_or_list(columns)
    
    # if columns is * get all columns
    _columns = list(map(lambda t: t[0], df.dtypes))
    if (columns == "*"): columns = list(map(lambda t: t[0], df.dtypes))
    else:
        # Remove duplicated columns
        if isinstance(columns, list): columns = set(columns)
   
         # if string convert to list. Because we always return a list
        if isinstance(columns, str): columns = [columns]
        
        # Check if the columns you want to select exits in the dataframe
        r = []
        for column in columns: 
            if column not in self.columns: r.append(column)
        assert len(r) == 0, "Error:%s column(s) not exist(s)" % r
                    
    return columns

def assert_type_str_or_list(var):
    """This function asserts if variable is a string or a list dataType."""
    assert isinstance(var, (str, list)), \
        "Error: argument must be a string or a list."
            
@add_method(DataFrame)
def apply(self, columns, func):
    
    columns = select_cols(self, columns)
        
    for column in columns: 
        self= self.withColumn(column, func(col(column)))
    return self


In [10]:
schema = StructType([
        StructField("city", StringType(), True),
        StructField("country", StringType(), True),
        StructField("population", IntegerType(), True)])

countries = ['Colombia', 'US@A', 'Brazil', 'Spain']
cities = ['Bogotá', 'New York', '   São Paulo   ', '~Madrid']
population = [37800000,19795791,12341418,6489162]

# Create dataframe
df = spark.createDataFrame(list(zip(cities, countries, population)), schema=schema)

# Some operation in multiple columns
df.show()

# Testing bonded function and withcolumn
r = df.upper(["city","country"]).withColumn("city", F.reverse(col("city")))
r.show()

# Testing apply and withcolum
r = r.apply("country", F.upper).apply(["city","country"], F.lower).withColumn("city", F.reverse(col("city")))
r.show()

# Testing custom function
def remove_some_chars(col_name):
    removed_chars = ("@", "?")
    regexp = "|".join('\{0}'.format(i) for i in removed_chars)
    return regexp_replace(col_name, regexp, "")

# r = r.apply(["country"],remove_some_chars).upper(["city"]).withColumn("city", F.reverse(col("country")))
r = r.apply(["country"], remove_some_chars).upper(["city"]).withColumn("country", F.reverse(col("country"))).lower(["city"])
r.show()

r = r.upper(["country"])
r.show()


+---------------+--------+----------+
|           city| country|population|
+---------------+--------+----------+
|         Bogotá|Colombia|  37800000|
|       New York|    US@A|  19795791|
|   São Paulo   |  Brazil|  12341418|
|        ~Madrid|   Spain|   6489162|
+---------------+--------+----------+



NameError: name 'col' is not defined

In [265]:
# Filters all string columns in dataFrame
valid_cols = list(map(lambda t: t[0], df.dtypes))

# If None or [] is provided with column parameter:
if columns == "*": columns = valid_cols
print(valid_cols)
print(df.dtypes)

['city', 'country', 'population']
[('city', 'string'), ('country', 'string'), ('population', 'int')]


In [221]:
list(map(lambda t: t[0], df.dtypes))

['city', 'country', 'population']

In [254]:
items = [1,2,3,4]
Z = [3,4]
y = []
for i in items: 
    if i not in Z: y.append(i)
        
assert len(y) == 0, "the list is non empty"
print(y)

AssertionError: the list is non empty