# PySpark and Machine Learning 

In [None]:
debug_level = "WARN"

In [2]:
# ===============================================
# Details of Data
# ===============================================
file = open('data/cal_housing.domain','r')
lines = file.readlines()
attributes = []
for line in lines:
	attribute = line.split(":")[0]
	attributes.append(attribute)
print("Data Attributes", attributes)

Data Attributes ['longitude', 'latitude', 'housingMedianAge', 'totalRooms', 'totalBedrooms', 'population', 'households', 'medianIncome', 'medianHouseValue']


In [3]:
# ===============================================
# Load Data
# ===============================================
from pyspark import SparkContext
sc = SparkContext("local", "Spark Regression")
sc.setLogLevel(logLevel=debug_level)
file_file= "data/cal_housing.data"
# Reading the file and creating RDD
data_rdd = sc.textFile(file_file).cache()
# Processing data from txt to list of entries
data_rdd = data_rdd.map(lambda line: line.split(","))

In [4]:
data_rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

# Restructure the data to make RDD to a DataFrame

In [5]:
# ===============================================
# restructure the data to make RDD to a DataFrame
# ===============================================
# following import will help to convert the spark rows to dataframe
from pyspark.sql import SparkSession
spark = SparkSession(sc)
# this import helps us to convert a list object to a row object in spark
from pyspark.sql import Row

In [6]:
df = data_rdd.map(lambda line: Row(longitude=line[0],
                              latitude=line[1],
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5],
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()


## Data details

In [7]:
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [8]:
df.columns

['households',
 'housingMedianAge',
 'latitude',
 'longitude',
 'medianHouseValue',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms']

In [9]:
df.dtypes

[('households', 'string'),
 ('housingMedianAge', 'string'),
 ('latitude', 'string'),
 ('longitude', 'string'),
 ('medianHouseValue', 'string'),
 ('medianIncome', 'string'),
 ('population', 'string'),
 ('totalBedRooms', 'string'),
 ('totalRooms', 'string')]

In [10]:
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



## Convert data from string type to float type

In [11]:
# Our datas are in string format, so for regression model we need to convert that data to float or interger type
from pyspark.sql.types import *

In [12]:
df = df.withColumn("longitude", df["longitude"].cast(FloatType()))\
	.withColumn("latitude", df["latitude"].cast(FloatType()))\
	.withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType()))\
	.withColumn("totalRooms", df["totalRooms"].cast(FloatType()))\
	.withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType()))\
	.withColumn("population", df["population"].cast(FloatType()))\
	.withColumn("households", df["households"].cast(FloatType()))\
	.withColumn("medianIncome", df["medianIncome"].cast(FloatType()))\
	.withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

In [13]:
df.printSchema()
# Now all the column are converted to data frame

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



In [14]:
# to write the above conversion in more clearner way
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df

df = convertColumn(df, attributes, FloatType())

In [15]:
df.show()

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedrooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|        452600.0|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|        358500.0|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|        352100.0|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|        341300.0|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|        342200.0|      3.8462|     565.0|        280.0|    1627.0|
|     193.0|            52.0|   37.85|  -122.25|        269700.0|      4.0368|  

In [16]:
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedrooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



## Query the dataframe

In [17]:
df.select('population','medianIncome').show(10)

+----------+------------+
|population|medianIncome|
+----------+------------+
|     322.0|      8.3252|
|    2401.0|      8.3014|
|     496.0|      7.2574|
|     558.0|      5.6431|
|     565.0|      3.8462|
|     413.0|      4.0368|
|    1094.0|      3.6591|
|    1157.0|        3.12|
|    1206.0|      2.0804|
|    1551.0|      3.6912|
+----------+------------+
only showing top 10 rows



In [18]:
df.groupBy("latitude").count().sort("latitude",ascending=False).show()

+--------+-----+
|latitude|count|
+--------+-----+
|   41.95|    2|
|   41.92|    1|
|   41.88|    1|
|   41.86|    3|
|   41.84|    1|
|   41.82|    1|
|   41.81|    2|
|    41.8|    3|
|   41.79|    1|
|   41.78|    3|
|   41.77|    1|
|   41.76|    2|
|   41.75|    2|
|   41.74|    3|
|   41.73|    3|
|   41.72|    1|
|    41.7|    1|
|   41.69|    1|
|   41.68|    1|
|   41.66|    1|
+--------+-----+
only showing top 20 rows



In [19]:
# this will give the five point summary of each column
df.describe().show()

+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|         latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedrooms|        totalRooms|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|            20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean|499.5396802325581|28.639486434108527|35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.3297528316098| 12.58555761211163|2.135952380602968|  2.003531742932898|115395.61587441359|1.

# Following will show the mean and std deviation normalization in pandas dataframe. This is just an overview of how we can do our normalization

In [20]:

import pandas
pandas_df = df.toPandas()

### Pandas  Cheat Sheet 
https://github.com/pandas-dev/pandas/blob/master/doc/cheatsheet/Pandas_Cheat_Sheet.pdf

### Spark Cheat Sheet
https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf

In [21]:
pandas_df.var()

households          1.461761e+05
housingMedianAge    1.583981e+02
latitude            4.562310e+00
longitude           4.014168e+00
medianHouseValue    1.331605e+10
medianIncome        3.609316e+00
population          1.282468e+06
totalBedrooms       1.774493e+05
totalRooms          4.759400e+06
dtype: float32

In [22]:
pandas_df.std()

households             382.329773
housingMedianAge        12.585629
latitude                 2.135956
longitude                2.003539
medianHouseValue    115395.187500
medianIncome             1.899820
population            1132.460815
totalBedrooms          421.247284
totalRooms            2181.604736
dtype: float32

In [23]:
pandas_df.mean()

households             499.539673
housingMedianAge        28.639486
latitude                35.631866
longitude             -119.569115
medianHouseValue    206854.968750
medianIncome             3.870662
population            1425.477905
totalBedrooms          537.898010
totalRooms            2635.758789
dtype: float32

In [24]:
pandas_df.cov()

Unnamed: 0,households,housingMedianAge,latitude,longitude,medianHouseValue,medianIncome,population,totalBedrooms,totalRooms
households,146176.0,-1457.58129,-58.010243,42.368075,2904924.0,9.466666,392803.6,157806.9,766104.6
housingMedianAge,-1457.581,158.39626,0.300345,-2.728244,153398.8,-2.84614,-4222.271,-1699.094,-9919.12
latitude,-58.01024,0.300345,4.562293,-3.957054,-35532.56,-0.32386,-263.1378,-59.671,-168.2178
longitude,42.36807,-2.728244,-3.957054,4.014139,-10627.43,-0.057765,226.3778,57.70951,194.8038
medianHouseValue,2904924.0,153398.801329,-35532.557411,-10627.428451,13316150000.0,150847.482718,-3221249.0,2459372.0,33772890.0
medianIncome,9.466666,-2.84614,-0.32386,-0.057765,150847.5,3.609323,10.40098,-6.477002,820.8524
population,392803.6,-4222.270582,-263.137805,226.377847,-3221249.0,10.400978,1282470.0,418859.7,2117613.0
totalBedrooms,157806.9,-1699.094476,-59.671004,57.709513,2459372.0,-6.477002,418859.7,177449.8,854572.8
totalRooms,766104.6,-9919.12006,-168.217826,194.803774,33772890.0,820.852406,2117613.0,854572.8,4759445.0


In [25]:
# Normalizing panda dataframe
pandas_df_normalized = (pandas_df-pandas_df.mean()) / pandas_df.std()

In [26]:
pandas_df_normalized.mean()

households          8.118390e-09
housingMedianAge   -2.272080e-07
latitude           -2.804416e-06
longitude          -2.947520e-04
medianHouseValue    7.292773e-06
medianIncome        4.650837e-06
population         -9.788812e-07
totalBedrooms       6.224194e-08
totalRooms          1.985943e-06
dtype: float32

In [27]:
pandas_df_normalized.std()

households          0.999997
housingMedianAge    1.000017
latitude            0.999999
longitude           1.000005
medianHouseValue    1.000000
medianIncome        0.999999
population          0.999999
totalBedrooms       0.999998
totalRooms          1.000005
dtype: float32

In [28]:
pandas_df.cov()

Unnamed: 0,households,housingMedianAge,latitude,longitude,medianHouseValue,medianIncome,population,totalBedrooms,totalRooms
households,146176.0,-1457.58129,-58.010243,42.368075,2904924.0,9.466666,392803.6,157806.9,766104.6
housingMedianAge,-1457.581,158.39626,0.300345,-2.728244,153398.8,-2.84614,-4222.271,-1699.094,-9919.12
latitude,-58.01024,0.300345,4.562293,-3.957054,-35532.56,-0.32386,-263.1378,-59.671,-168.2178
longitude,42.36807,-2.728244,-3.957054,4.014139,-10627.43,-0.057765,226.3778,57.70951,194.8038
medianHouseValue,2904924.0,153398.801329,-35532.557411,-10627.428451,13316150000.0,150847.482718,-3221249.0,2459372.0,33772890.0
medianIncome,9.466666,-2.84614,-0.32386,-0.057765,150847.5,3.609323,10.40098,-6.477002,820.8524
population,392803.6,-4222.270582,-263.137805,226.377847,-3221249.0,10.400978,1282470.0,418859.7,2117613.0
totalBedrooms,157806.9,-1699.094476,-59.671004,57.709513,2459372.0,-6.477002,418859.7,177449.8,854572.8
totalRooms,766104.6,-9919.12006,-168.217826,194.803774,33772890.0,820.852406,2117613.0,854572.8,4759445.0


In [29]:
pandas_df.describe()

Unnamed: 0,households,housingMedianAge,latitude,longitude,medianHouseValue,medianIncome,population,totalBedrooms,totalRooms
count,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0
mean,499.539673,28.639486,35.631866,-119.569115,206854.96875,3.870662,1425.477905,537.89801,2635.758789
std,382.329773,12.585629,2.135956,2.003539,115395.1875,1.89982,1132.460815,421.247284,2181.604736
min,1.0,1.0,32.540001,-124.349998,14999.0,0.4999,3.0,1.0,2.0
25%,280.0,18.0,33.93,-121.800003,119600.0,2.5634,787.0,295.0,1447.75
50%,409.0,29.0,34.259998,-118.489998,179700.0,3.5348,1166.0,435.0,2127.0
75%,605.0,37.0,37.709999,-118.010002,264725.0,4.74325,1725.0,647.0,3148.0
max,6082.0,52.0,41.950001,-114.309998,500001.0,15.0001,35682.0,6445.0,39320.0


In [30]:
pandas_df_normalized.describe()

Unnamed: 0,households,housingMedianAge,latitude,longitude,medianHouseValue,medianIncome,population,totalBedrooms,totalRooms
count,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0
mean,8.11839e-09,-2.27208e-07,-3e-06,-0.000295,7e-06,5e-06,-9.788812e-07,6.224194e-08,2e-06
std,0.9999966,1.000017,0.999999,1.000005,1.0,0.999999,0.9999991,0.9999977,1.000005
min,-1.303952,-2.196115,-1.447532,-2.38622,-1.662599,-1.774254,-1.256095,-1.274544,-1.207258
25%,-0.5742155,-0.8453679,-0.79677,-1.113474,-0.75614,-0.688098,-0.5637969,-0.5766162,-0.544557
50%,-0.2368104,0.02864487,-0.642273,0.538605,-0.235322,-0.176786,-0.2291275,-0.2442698,-0.233204
75%,0.275836,0.6642905,0.972928,0.778179,0.501494,0.4593,0.2644878,0.2589975,0.2348
max,14.60117,1.856126,2.957989,2.624914,2.540366,5.858155,30.24963,14.02288,16.815256


# Data Preprocessing

## Normalization

#### As we can see her the variance and standard deviation is more so we have to normalize the data before applying any model to fit from above covariance matrix and five point summary table

In [31]:
from pyspark.sql.functions import *
df.show()

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedrooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|        452600.0|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|        358500.0|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|        352100.0|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|        341300.0|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|        342200.0|      3.8462|     565.0|        280.0|    1627.0|
|     193.0|            52.0|   37.85|  -122.25|        269700.0|      4.0368|  

In [32]:
df.agg(max("medianHouseValue")).show()

+---------------------+
|max(medianHouseValue)|
+---------------------+
|             500001.0|
+---------------------+



In [33]:
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

In [34]:
df.show()

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedrooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|           3.521|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|           3.413|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|           3.422|      3.8462|     565.0|        280.0|    1627.0|
|     193.0|            52.0|   37.85|  -122.25|           2.697|      4.0368|  

## Following steps will create a dependency in the data by adding new data column as follows

In [35]:
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

In [36]:
populationPerHousehold = df.select(col("population")/col("households"))

In [37]:
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

In [38]:
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

In [39]:
df.show()

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+------------------+----------------------+-------------------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedrooms|totalRooms| roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+------------------+----------------------+-------------------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0| 6.984126984126984|    2.5555555555555554|0.14659090909090908|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0| 6.238137082601054|     2.109841827768014|0.15579659106916466|
|     177.0|            52.0|   37.85|  -122.24|           3.521|      7.2574|     496.0|        190.0|    1467.0| 

In [40]:
truncated_df = df.select("medianHouseValue", 
                          "totalBedRooms", 
                          "population", 
                          "households", 
                          "medianIncome", 
                          "roomsPerHousehold", 
                          "populationPerHousehold", 
                          "bedroomsPerRoom")


In [41]:
truncated_df.show()

+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|medianHouseValue|totalBedRooms|population|households|medianIncome| roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|           4.526|        129.0|     322.0|     126.0|      8.3252| 6.984126984126984|    2.5555555555555554|0.14659090909090908|
|           3.585|       1106.0|    2401.0|    1138.0|      8.3014| 6.238137082601054|     2.109841827768014|0.15579659106916466|
|           3.521|        190.0|     496.0|     177.0|      7.2574| 8.288135593220339|    2.8022598870056497|0.12951601908657123|
|           3.413|        235.0|     558.0|     219.0|      5.6431|5.8173515981735155|     2.547945205479452|0.18445839874411302|
|           3.422|        280.0|     565.0|     259.0|      3.8462| 6.281853281853282|    

In [42]:
truncated_df.first()

Row(medianHouseValue=4.526, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

# Spark dataframe Guide
https://spark.apache.org/docs/1.3.1/sql-programming-guide.html

https://dzone.com/articles/using-apache-spark-dataframes-for-processing-of-ta

# Standardization

In [43]:
# you have re-ordered the data, you’re ready to normalize the data
# next step is to separate the target value for the model training
# separating the features from the target variable
from pyspark.ml.linalg import DenseVector
input_data = truncated_df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

### DenseVector

1. This is just to create a feature vector for your feature to be trained
2. this can be done by numpy array as well

In the dataframe we have 8 columns. from 8 columns first column is you target data and the rest are the input training data. to achive this separate

In [44]:
input_data

PythonRDD[66] at RDD at PythonRDD.scala:48

In [45]:
input_data_df = spark.createDataFrame(input_data, ["label", "features"])

In [46]:
input_data_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....|
|3.413|[235.0,558.0,219....|
|3.422|[280.0,565.0,259....|
|2.697|[213.0,413.0,193....|
|2.992|[489.0,1094.0,514...|
|2.414|[687.0,1157.0,647...|
|2.267|[665.0,1206.0,595...|
|2.611|[707.0,1551.0,714...|
|2.815|[434.0,910.0,402....|
|2.418|[752.0,1504.0,734...|
|2.135|[474.0,1098.0,468...|
|1.913|[191.0,345.0,174....|
|1.592|[626.0,1212.0,620...|
|  1.4|[283.0,697.0,264....|
|1.525|[347.0,793.0,331....|
|1.555|[293.0,648.0,303....|
|1.587|[455.0,990.0,419....|
|1.629|[298.0,690.0,275....|
+-----+--------------------+
only showing top 20 rows



### Data is read for training

```Our Input Data frame is input_data_df```

For ML Libraries in follow below link for more details. Some of them I Will explain in this tutorial 

MLlib standardizes APIs for machine learning algorithms

https://spark.apache.org/docs/latest/ml-pipeline.html

## ML Pipelines
1. This combine multiple algorithms into a single pipeline, or workflow.
2. Example, If you have stages in your approach like LoadData, PreProcess, Train, Test, FreezeModel
3. In the above stages create Pipeline like spark.Pipeline([LoadData, PreProcess, Train, Test, FreezeModel])
4. Pipeline will start with LoadData and ends with FreezeModel like we sequentially run our code or methods

I will show one example of pipeline but I will try to do it in this jyputer notebook i will rather follow the sequential approach

## ===============================
# Training
## ===============================

## Normalize >>>>>
1. Before we start training we have to normalize the data for the best fit for training.
2. As we have seen earlier in this tutorial how to standardize or normalize the dataset in the pandas dataframe
Ex:
In "pandas_df", we have normalized the data and stored in the "pandas_df_normalized". there we have normalized the data set by substracting mean and divided with standard deviation. 

Same thing will be done by using "StandardScaler" function in spark ML Features or your can use "Normalizer" as well
Follow below link for more details of API
https://spark.apache.org/docs/2.2.0/ml-features.html#standardscaler



In [54]:
# Using StandardScaler
from pyspark.ml.feature import StandardScaler

# StandardScaler rescale your dataset by normalizing each feature to have unit standard deviation and/or zero mean
# Parameters:
# [withStd: True by default. Scales the data to unit standard deviation.]
# [withMean: False by default. Centers the data with mean before scaling.]

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)
scalerModel = scaler.fit(input_data_df)
scaledData = scalerModel.transform(input_data_df)

In [55]:
scaledData.show()

+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,126....|[-0.9706826023275...|
|3.585|[1106.0,2401.0,11...|[1.34861676086480...|
|3.521|[190.0,496.0,177....|[-0.8258747608180...|
|3.413|[235.0,558.0,219....|[-0.7190493039668...|
|3.422|[280.0,565.0,259....|[-0.6122238471155...|
|2.697|[213.0,413.0,193....|[-0.7712750828718...|
|2.992|[489.0,1094.0,514...|[-0.1160789475176...|
|2.414|[687.0,1157.0,647...|[0.35395306262777...|
|2.267|[665.0,1206.0,595...|[0.30172728372272...|
|2.611|[707.0,1551.0,714...|[0.40143104345054...|
|2.815|[434.0,910.0,402....|[-0.2466433947802...|
|2.418|[752.0,1504.0,734...|[0.50825650030177...|
|2.135|[474.0,1098.0,468...|[-0.1516874331347...|
|1.913|[191.0,345.0,174....|[-0.8235008617769...|
|1.592|[626.0,1212.0,620...|[0.20914522111832...|
|  1.4|[283.0,697.0,264....|[-0.6051021499921...|
|1.525|[347.0,793.0,331....|[-0.4531726113593...|


In [65]:
# Using Normalizer
from pyspark.ml.feature import Normalizer

# Normalizer rescale your dataset by normalizing each feature to have L1 norm
normalizer = Normalizer(inputCol="features", outputCol="scaledFeatures", p=1.0)
l1NormData = normalizer.transform(input_data_df)

In [66]:
l1NormData.show()

+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,126....|[0.21680254203295...|
|3.585|[1106.0,2401.0,11...|[0.23724715176181...|
|3.521|[190.0,496.0,177....|[0.21554723815760...|
|3.413|[235.0,558.0,219....|[0.22900178933999...|
|3.422|[280.0,565.0,259....|[0.25078782839186...|
|2.697|[213.0,413.0,193....|[0.25657391517877...|
|2.992|[489.0,1094.0,514...|[0.23198308464096...|
|2.414|[687.0,1157.0,647...|[0.27469813009599...|
|2.267|[665.0,1206.0,595...|[0.26872360158912...|
|2.611|[707.0,1551.0,714...|[0.23700707851144...|
|2.815|[434.0,910.0,402....|[0.24699206250527...|
|2.418|[752.0,1504.0,734...|[0.25064104288364...|
|2.135|[474.0,1098.0,468...|[0.23111420433232...|
|1.913|[191.0,345.0,174....|[0.26567230710693...|
|1.592|[626.0,1212.0,620...|[0.25381417693358...|
|  1.4|[283.0,697.0,264....|[0.22581103886639...|
|1.525|[347.0,793.0,331....|[0.23409773921947...|


In [60]:
# Data split into Training and Testing
# Here I will use both the normalized dataset one by one to test our model
# scaledData : "StandardScaler" function normalized data  [train_data_s, test_data_s] notation
# l1NormData : "Normalizer" function normalized data     [train_data_n, test_data_n] notation
train_data_s, test_data_s = scaledData.randomSplit([.8,.2],seed=1234)
train_data_n, test_data_n = l1NormData.randomSplit([.8,.2],seed=1234)

## Machine Learning Model With Spark ML

For more Machine Learning Algorithm try following link
https://www.datacamp.com/community/tutorials/machine-https://www.datacamp.com/community/tutorials/machine-learning-pythonlearning-python

I will explain some of them

For API reference go to foloowing link
https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression

For R Programming follow below link
http://spark.rstudio.com/reference/ml_linear_regression/

# Train Linear Regression

In [122]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel
# weightCol: Weight column
# tol: Tolerance level
# standardization: standardization as we have done earlier
# solver
# regParam: Regularization 
# predictionCol: Prediction Column
# maxIter: Iteration
# featuresCol: Feature Column
# elasticNetParam: Elasticity Parameter
# loss: Loss Function [squaredError, huber]
# epsilon: Epsilon 

In [81]:
lr = LinearRegression(featuresCol = 'scaledFeatures', 
                      labelCol='label', 
                      maxIter=50, 
                      regParam=0.3, 
                      elasticNetParam=0.8, 
                      solver="normal",
                      loss="squaredError",
                      fitIntercept=True)

In [82]:
lr_model = lr.fit(train_data_s)

In [98]:
# The coefficients
lr_model.coefficients

DenseVector([0.0, 0.0, 0.0, 0.5316, 0.0, 0.0, 0.0])

In [97]:
# The intercept
lr_model.intercept

2.066458979451869

In [104]:
tr_summary_lr = lr_model.summary

In [113]:
#smaller an RMSE value, the closer predicted and observed values are
tr_summary_lr.rootMeanSquaredError

0.8764168648364763

In [106]:
tr_summary_lr.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -2.006720235739934|
|  -1.47061238023092|
| -1.506882001314473|
|-0.9798902298376403|
|-1.0638652835756917|
|-1.5177391573247523|
|-0.9731906945221331|
|-1.3613119489039995|
|-1.3287065695123998|
| -1.062498622952322|
|-1.4217803943232756|
|-1.0228358682735217|
|-1.7834686476957586|
|-1.1178242243818775|
|-1.1700952737642003|
| -1.176559204107277|
|-0.9753395279228012|
| -1.144235169084425|
|-0.9398417132373327|
|-1.2397993972102546|
+-------------------+
only showing top 20 rows



In [107]:
tr_summary_lr.objectiveHistory

[0.5000000000000002,
 0.47013465437019386,
 0.3939823955746674,
 0.3920419381794541,
 0.3895587976914163,
 0.3894238036526684,
 0.38938858697924855,
 0.38937939980255354,
 0.3893770030907852,
 0.3893763778467639,
 0.38937621473575046,
 0.3893761721840394,
 0.38937616115207646,
 0.3893761579056647,
 0.38937615723571983,
 0.3893761571720026,
 0.38937615716594276,
 0.38937615716536633]

In [108]:
tr_summary_lr.totalIterations

18

In [114]:
# coefficient of determination is a measure that shows how close the data are to the fitted regression line
tr_summary_lr.r2

0.4229759607534169

## Validate Model and Save Model - Linear Regression

In [110]:
predicted = lr_model.transform(test_data_s)

In [111]:
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
predictionAndLabel = predictions.zip(labels).collect()

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/Users/anilnayak/anaconda/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 105, in __del__
    SparkContext._active_spark_context._gateway.detach(self._java_obj)
AttributeError: 'LinearRegression' object has no attribute '_java_obj'


In [112]:
predictionAndLabel[:5] # Prediction and Label

[(1.1333393374396872, 0.14999),
 (1.4480569267207897, 0.14999),
 (1.5709834178225561, 0.14999),
 (1.7494269503374564, 0.283),
 (1.2432540402026728, 0.366)]

In [124]:
model_path = "/Users/anilnayak/BigData/spark_project_regression/lr_model"
lr_model.save(model_path)
model2 = LinearRegressionModel.load(model_path)
# Check the model that is saved is correct
model2.coefficients==lr_model.coefficients

True

# Next Tutorial will be on 
1. Decision Tree Regressor
2. Classification https://spark.apache.org/docs/latest/ml-classification-regression.html#classification
3. Clustering [K-means]  https://spark.apache.org/docs/latest/ml-clustering.html
4. Random Forest https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
5. Principal component analysis (PCA)
6. Singular value decomposition (SVD)
7. Frequent Pattern Mining

Try some of them before next tutorial

In [125]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.fpm import FPGrowth

In [126]:
spark.stop()