<a href="https://colab.research.google.com/github/NEPatriots-Coder/Statistics/blob/main/TransforswithSpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Transformation with Pyspark


In [27]:
!pip install pyspark
!pip install py4j




In [33]:
import pandas as pd

In [28]:
from pyspark.sql import SparkSession

In [31]:
spark = SparkSession.builder.appName('lin_reg').getOrCreate()

In [35]:
df = spark.read.csv('/content/Ecommerce_Customers.csv', inferSchema=True, header=True)

In [36]:
df.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [37]:
df.head()

Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005)

# Setup DF for ML


In [38]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [39]:
df.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [59]:
coef_var = ['Avg Session Length','Time on App','Time on Website','Length of Membership']
assembler = VectorAssembler(inputCols=['Avg Session Length','Time on App','Time on Website','Length of Membership'],
                           outputCol='features')


In [45]:
output = assembler.transform(df)

In [51]:
final_df = output.select('features', 'Yearly Amount Spent')

In [50]:
train_data, test_data = final_df.randomSplit([0.7,0.3])

In [52]:
train_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                348|
|   mean|  499.9956242741462|
| stddev|  72.58330240760725|
|    min| 256.67058229005585|
|    max|  725.5848140556806|
+-------+-------------------+



In [54]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                152|
|   mean| 497.75356501245074|
| stddev|  93.15948493450814|
|    min|   266.086340948469|
|    max|  765.5184619388373|
+-------+-------------------+



In [55]:
from pyspark.ml.regression import LinearRegression

In [56]:
lm = LinearRegression(labelCol='Yearly Amount Spent')

In [57]:
model = lm.fit(train_data)

In [60]:
pd.DataFrame({"Coefficients": model.coefficients}, index=coef_var)

Unnamed: 0,Coefficients
Avg Session Length,25.39762
Time on App,38.750702
Time on Website,0.406056
Length of Membership,61.736347


In [58]:
import pandas as pd

In [None]:
import numpy as np
from google.colab import autoviz

def categorical_histogram(df, colname, figscale=1, mpl_palette_name='Dark2'):
  from matplotlib import pyplot as plt
  import seaborn as sns
  df.groupby(colname).size().plot(kind='barh', color=sns.palettes.mpl_palette(mpl_palette_name), figsize=(8*figscale, 4.8*figscale))
  plt.gca().spines[['top', 'right',]].set_visible(False)
  return autoviz.MplChart.from_current_mpl_state()

chart = categorical_histogram(_df_2, *['index'], **{})
chart

In [61]:
res = model.evaluate(test_data)

In [62]:
res.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -5.573566914926516|
|-17.230761823031685|
|-13.543763017575827|
| -4.321303495411939|
| 21.369253583159434|
|  2.530148491173236|
|-4.3438010463294745|
| -9.260594672854438|
| 3.4668401542167544|
|-26.854009615868904|
| 1.8884568519554819|
| -3.805001752329872|
|-2.2688019404041597|
|  5.208856048856774|
|  17.46220631699134|
| 16.325382427593183|
|0.48467238060197815|
| -1.997453299274639|
|  5.377265589546653|
|   8.00171089202371|
+-------------------+
only showing top 20 rows



In [65]:
unlabeled_data = test_data.select('features')

In [66]:
predictions = model.transform(unlabeled_data)

In [67]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[30.4925366965402...|288.04481263484104|
|[30.8162006488763...| 283.3171027715007|
|[31.0662181616375...| 462.4770562252502|
|[31.2681042107507...|427.79183666923586|
|[31.2834474760581...|  570.411835842508|
|[31.3091926408918...| 430.1905693487604|
|[31.5171218025062...| 280.2622216967152|
|[31.5261978982398...|418.35512086519225|
|[31.5316044825729...| 433.0487655751458|
|[31.6739155032749...| 502.5790775257501|
|[31.7366356860502...|495.04498940357644|
|[31.8124825597242...| 396.6153467361271|
|[31.8627411090001...| 558.5669431144509|
|[31.9480174211613...|456.71202084404104|
|[32.0180740106320...|340.32090442832396|
|[32.0478146331398...| 481.0641753312502|
|[32.0705462209254...| 532.2671152012344|
|[32.0775900443291...| 403.0305885183807|
|[32.0789475795693...| 352.4864530488451|
|[32.1151190660142...| 342.0564892718214|
+--------------------+------------

In [68]:
print("MAE:", res.meanAbsoluteError)
print("MSE:", res.meanSquaredError)
print("RMSE:", res.rootMeanSquaredError)
print("R2:", res.r2)
print("Adj R2:", res.r2adj)

MAE: 8.216674325442813
MSE: 115.26282687993898
RMSE: 10.736052667528181
R2: 0.9866309129877646
Adj R2: 0.9862671283071596
