In [1]:
sc

# Loading Data

In [3]:
ratings = spark.read.load("/FileStore/tables/u.data",format="csv", sep="\t", inferSchema="true", header="false")
ratings = ratings.toDF(*['user_id', 'movie_id', 'rating', 'unix_timestamp'])

In [4]:
ratings.show()

# 1. Using Spark Functions

The most pysparkish way to create a new column in a pyspark DataFrame is by using the built in functions. This is the most performant programmatical way to create a new column so So this is the first place I go whenever I want to do some column manipulation. We can use `.withcolumn` along with pyspark SQL functions to create a new column. The list of all the functions provided is given [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions). In essence you can find string functoins, Date functions and math functions already implemented using spark functions.

In [7]:
import pyspark.sql.functions as F

the `F.col` function gives us access to the column.

In [9]:
ratings_with_scale10 = ratings.withColumn("ScaledRating", 2*F.col("rating"))

In [10]:
ratings_with_scale10.show()

we can use math functions like `F.exp` function.

In [12]:
ratings_with_exp = ratings.withColumn("expRating", 2*F.exp("rating"))

In [13]:
ratings_with_exp.show()

# 2. Spark UDFs

In [15]:
from pyspark.sql.types import *

def somefunc(value):
  if   value < 3: 
      return 'low'
  else:
      return 'high'

#convert to a UDF Function by passing in the function and return type of function
udfsomefunc = F.udf(somefunc, StringType())

ratings_with_high_low = ratings.withColumn("high_low", udfsomefunc("rating"))


In [16]:
ratings_with_high_low.show()

# 3. Using RDD

In [18]:
import math
from pyspark.sql import Row

def rowwise_function(row):
  # convert row to dict:
  row_dict = row.asDict()
  # Add a new key in the dictionary with the new column name and value
  row_dict['Newcol'] = math.exp(row_dict['rating'])
  # convert dict to row:
  newrow = Row(**row_dict)
  # return new row
  return newrow

# convert ratings dataframe to RDD
ratings_rdd = ratings.rdd
# apply our function to RDD
ratings_rdd_new = ratings_rdd.map(lambda row: rowwise_function(row))
# Convert RDD Back to DataFrame
ratings_new_df = sqlContext.createDataFrame(ratings_rdd_new)

ratings_new_df.show()

# 4. Pandas UDF

In [20]:
# Declare the schema for the output of our function
outSchema = StructType([StructField('user_id',IntegerType(),True),StructField('movie_id',IntegerType(),True),StructField('rating',IntegerType(),True),StructField('unix_timestamp',IntegerType(),True),StructField('normalized_rating',DoubleType(),True)])

# decorate our function with pandas_udf decorator
@F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.rating
    v = v - v.mean()
    pdf['normalized_rating'] =v
    return pdf

rating_groupwise_normalization = ratings.groupby("movie_id").apply(subtract_mean)

rating_groupwise_normalization.show()

# 5. Using SQL

In [22]:
ratings.registerTempTable('ratings_table')
newDF = sqlContext.sql('select *, 2*rating as newCol from ratings_table')
newDF.show()