## PySpark exploration

### Creating a session

In [1]:
from pyspark.sql import SparkSession

# Everytime we use spark we need to create a session
spark = SparkSession.builder.appName('multi_device').getOrCreate()
spark

### Reading, selecting and showing columns

In [2]:
# 'option' tells spark to use the first line as header, 'inferSchema' tries to infer dtypes from file
df = spark.read.option('header','true').csv('userInformation.csv', inferSchema=True)

# Alternatively we could write:
df = spark.read.csv('userInformation.csv', inferSchema=True, header=True)

# 'show' is equivalent to 'head' in pandas, necessary to print DF to screen
# default value for show is 20
df.show(5)

+------------------+------------------+--------------------+-------------------+
|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+------------------+------------------+--------------------+-------------------+
| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177|
|12.795188551078114| 37.53665330059473|   4.446308318351434|  599.4060920457634|
+------------------+------------------+--------------------+-------------------+
only showing top 5 rows



In [3]:
df.printSchema() # Similar to 'info' in pandas

root
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [4]:
df.columns # Get column names, just like in pandas

['Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [5]:
# Selecting multiple columns, almost like 'loc' in pandas
# Unfortunately, slicing doesn't work
df.select(['Time on App', 'Yearly Amount Spent']).show(5)

+------------------+-------------------+
|       Time on App|Yearly Amount Spent|
+------------------+-------------------+
| 12.65565114916675|  587.9510539684005|
|11.109460728682564|  392.2049334443264|
|11.330278057777512| 487.54750486747207|
|13.717513665142507|  581.8523440352177|
|12.795188551078114|  599.4060920457634|
+------------------+-------------------+
only showing top 5 rows



In [6]:
# Directly selecting columns would return a column element that we can use to refer to the data
# but it doesn't have a 'show' method
df['Time on App']

Column<'Time on App'>

In [7]:
df.describe().show() # Statistical summary

+-------+------------------+------------------+--------------------+-------------------+
|summary|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+-------+------------------+------------------+--------------------+-------------------+
|  count|               500|               500|                 500|                500|
|   mean|12.052487937166134| 37.06044542094859|   3.533461555915055|  499.3140382585909|
| stddev|0.9942156084725424|1.0104889067564033|  0.9992775024112585|   79.3147815497068|
|    min| 8.508152176032603| 33.91384724758464|  0.2699010899842742| 256.67058229005585|
|    max|15.126994288792467|40.005181638101895|   6.922689335035808|  765.5184619388373|
+-------+------------------+------------------+--------------------+-------------------+



### Adding, dropping and renaming columns in a data frame

In [8]:
# Creates a dummy column that adds 5y to the current length of membership
# not an inplace operation
df = df.withColumn('Membership after 5y', df['Length of Membership']+5)
df.show(5)

+------------------+------------------+--------------------+-------------------+-------------------+
|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|Membership after 5y|
+------------------+------------------+--------------------+-------------------+-------------------+
| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|  9.082620632952962|
|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|   7.66403418213262|
|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|  9.104543202376423|
|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177|  8.120178782748091|
|12.795188551078114| 37.53665330059473|   4.446308318351434|  599.4060920457634|  9.446308318351434|
+------------------+------------------+--------------------+-------------------+-------------------+
only showing top 5 rows



In [9]:
# Renames a given column, not an inplace operation
df = df.withColumnRenamed('Membership after 5y', 'After 5y')
df.show(5)

+------------------+------------------+--------------------+-------------------+-----------------+
|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|         After 5y|
+------------------+------------------+--------------------+-------------------+-----------------+
| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|9.082620632952962|
|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264| 7.66403418213262|
|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|9.104543202376423|
|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177|8.120178782748091|
|12.795188551078114| 37.53665330059473|   4.446308318351434|  599.4060920457634|9.446308318351434|
+------------------+------------------+--------------------+-------------------+-----------------+
only showing top 5 rows



In [10]:
# Drops column we just created, not an inplace operation
df = df.drop('After 5y')
df.show(5)

+------------------+------------------+--------------------+-------------------+
|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+------------------+------------------+--------------------+-------------------+
| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177|
|12.795188551078114| 37.53665330059473|   4.446308318351434|  599.4060920457634|
+------------------+------------------+--------------------+-------------------+
only showing top 5 rows



### Handling Missing values

In [11]:
# Dropping values, not inplace operation
subset = ['Time on App', 'Time on Website'] # Collection of columns

# 'na' attribute gets all rows with missing values
df.na.drop(
    how = 'any',        # any (default)| all: at least one null | necessarily all nulls
    thresh = 2,         # Tolerance threshold for null values
    subset = subset     # Limit function to subset of columns
).show(5)

+------------------+------------------+--------------------+-------------------+
|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+------------------+------------------+--------------------+-------------------+
| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177|
|12.795188551078114| 37.53665330059473|   4.446308318351434|  599.4060920457634|
+------------------+------------------+--------------------+-------------------+
only showing top 5 rows



In [12]:
# Filling values with a certain value, not inplace operation
df.na.fill(
    value = 'Missing',  # Replace value for null
    subset = subset,    # Subset of columns to apply
).show(5)

+------------------+------------------+--------------------+-------------------+
|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+------------------+------------------+--------------------+-------------------+
| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177|
|12.795188551078114| 37.53665330059473|   4.446308318351434|  599.4060920457634|
+------------------+------------------+--------------------+-------------------+
only showing top 5 rows



In [13]:
# Filling with statistics
from pyspark.ml.feature import Imputer

# Equivalent to column transformer in sklearn
imputer = Imputer(
    inputCols = subset,                             # Will apply to subset of columns
    outputCols = [f'{c} imputed' for c in subset]   # Creates a modified feature in format we defined
).setStrategy('mean')                               # Could be mean, mode or median

imputer.fit(df).transform(df).show(5)

+------------------+------------------+--------------------+-------------------+-------------------+-----------------------+
|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|Time on App imputed|Time on Website imputed|
+------------------+------------------+--------------------+-------------------+-------------------+-----------------------+
| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|  12.65565114916675|      39.57766801952616|
|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264| 11.109460728682564|     37.268958868297744|
|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207| 11.330278057777512|     37.110597442120856|
|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177| 13.717513665142507|      36.72128267790313|
|12.795188551078114| 37.53665330059473|   4.446308318351434|  599.4060920457634| 12.795188551078114|      37.53665330059473|


### Filtering operations

In [14]:
# Get mean from a column, say 'Time on App'
# Agg function returns a dataframe and collect returns a list of rows
mean_toa = df.agg({'Time on App': 'mean'}).collect()[0][0]

# Selecting all data from users that spent less than the mean amount of 'Time on App'
df.filter(df['Time on App'] < mean_toa).show(5)

+------------------+------------------+--------------------+-------------------+
|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+------------------+------------------+--------------------+-------------------+
|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|12.026925339755056| 34.47687762925054|   5.493507201364199|   637.102447915074|
|11.366348309710526| 36.68377615286961|   4.685017246570912|  521.5721747578274|
|11.814128294972196| 37.14516822352819|   3.202806071553459|  427.1993848953282|
+------------------+------------------+--------------------+-------------------+
only showing top 5 rows



In [15]:
# Get mean time on website
mean_tow = df.agg({'Time on Website': 'mean'}).collect()[0][0]

# Selecting users that spent less time on App than the mean
# but spent more time than average on website
df.filter(
    (df['Time on App'] < mean_toa) &
    (df['Time on Website'] > mean_tow)
).show(5)

+------------------+------------------+--------------------+-------------------+
|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+------------------+------------------+--------------------+-------------------+
|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|11.814128294972196| 37.14516822352819|   3.202806071553459|  427.1993848953282|
|11.584782999535266| 37.08792607098381|    3.71320920294043|  522.3374046069357|
|10.961298400154098| 37.42021557502538|   4.046423164299585|  408.6403510726275|
+------------------+------------------+--------------------+-------------------+
only showing top 5 rows



### Group by and aggregation

In [16]:
# First we are going to need a of category
from pyspark.ml.feature import Bucketizer

# Trying to simulate pandas cut
bucketizer = Bucketizer(
    splits = list(range(8,17,2)),   # We knew the min and max values beforehand
    inputCol = 'Time on App',           
    outputCol = 'ToA bucket',    
)

dfb = bucketizer.transform(df)  # Create new column by applying bucketizer
dfb = dfb.select('ToA bucket',*df.columns)  # Put new column first
dfb.show(5)

+----------+------------------+------------------+--------------------+-------------------+
|ToA bucket|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+----------+------------------+------------------+--------------------+-------------------+
|       2.0| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|       1.0|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|       1.0|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|       2.0|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177|
|       2.0|12.795188551078114| 37.53665330059473|   4.446308318351434|  599.4060920457634|
+----------+------------------+------------------+--------------------+-------------------+
only showing top 5 rows



In [17]:
# Seeing average time spent on both app and website by ToA bucket on selected columns
dfb.select(['ToA bucket', 'Time on App', 'Time on Website']).groupBy('ToA bucket').mean().show(5)

+----------+---------------+------------------+--------------------+
|ToA bucket|avg(ToA bucket)|  avg(Time on App)|avg(Time on Website)|
+----------+---------------+------------------+--------------------+
|       0.0|            0.0| 9.514189524389298|   36.59707105206839|
|       1.0|            1.0|11.333410770234082|   37.04831104588746|
|       3.0|            3.0|14.346744842142908|   37.42402072272212|
|       2.0|            2.0|12.787793427997197|   37.07685703208166|
+----------+---------------+------------------+--------------------+



### Linear Regression ML example

In [18]:
# In order to define features, we need to combine them into a single column
from pyspark.ml.feature import VectorAssembler

feature_lst = ['Time on App', 'Time on Website', 'Length of Membership']

featureassembler = VectorAssembler(
    inputCols = feature_lst,
    outputCol = 'Independent Features',
)

# Create a collaped feature column and our target and select only what's going
# to be used during linear regression
output = featureassembler.transform(df).select('Yearly Amount Spent', 'Independent Features')
output.show(5)

+-------------------+--------------------+
|Yearly Amount Spent|Independent Features|
+-------------------+--------------------+
|  587.9510539684005|[12.6556511491667...|
|  392.2049334443264|[11.1094607286825...|
| 487.54750486747207|[11.3302780577775...|
|  581.8523440352177|[13.7175136651425...|
|  599.4060920457634|[12.7951885510781...|
+-------------------+--------------------+
only showing top 5 rows



In [19]:
from pyspark.ml.regression import LinearRegression

# Train test split
train_data, test_data = output.randomSplit([0.9, 0.1])

# Baseline model
regressor = LinearRegression(
    featuresCol='Independent Features', 
    labelCol='Yearly Amount Spent',
).fit(train_data)

# Coefficients and intercept of a linear regression
regressor.coefficients, regressor.intercept

(DenseVector([37.5442, -0.5206, 62.4927]), -155.00108861158685)

In [20]:
# Prediction
predictions = regressor.evaluate(test_data)
predictions.predictions.show(5)

+-------------------+--------------------+------------------+
|Yearly Amount Spent|Independent Features|        prediction|
+-------------------+--------------------+------------------+
|  282.4712457199145|[11.5629362466526...| 352.9799980365739|
| 350.05820016384513|[11.9192423979391...|362.24898020364196|
|  357.7831107453153|[10.0794634519524...| 367.2198624429551|
|  375.3984554102432|[10.8048905576416...|399.90551285156516|
| 378.47356644790113|[10.5345534994610...|359.63764624811915|
+-------------------+--------------------+------------------+
only showing top 5 rows



In [21]:
# Some evaluation metrics
predictions.meanAbsoluteError, predictions.meanSquaredError

(21.654129499873985, 698.6670224736819)