In [2]:
import findspark

In [6]:
findspark.init(r'C:\Users\akshi\Anaconda3\Lib\site-packages\pyspark')

In [7]:
from pyspark.sql import SparkSession

In [8]:
# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [121]:
# Load in the data
rdd = sc.textFile(r'C:\Work\pyspark\cadata.txt')


In [122]:
# Load in the header
header =  sc.textFile('C:\Work\pyspark\cadata.txt')

In [123]:
rdd.take(2)

['4.5260000000000000e+005 8.3252000000000006e+000 4.1000000000000000e+001 8.8000000000000000e+002 1.2900000000000000e+002 3.2200000000000000e+002 1.2600000000000000e+002 3.7880000000000003e+001 -1.2223000000000000e+002',
 '3.5850000000000000e+005 8.3013999999999992e+000 2.1000000000000000e+001 7.0990000000000000e+003 1.1060000000000000e+003 2.4010000000000000e+003 1.1380000000000000e+003 3.7859999999999999e+001 -1.2222000000000000e+002']

In [124]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(" "))

# Inspect the first 2 lines 
rdd.take(2)

[['4.5260000000000000e+005',
  '8.3252000000000006e+000',
  '4.1000000000000000e+001',
  '8.8000000000000000e+002',
  '1.2900000000000000e+002',
  '3.2200000000000000e+002',
  '1.2600000000000000e+002',
  '3.7880000000000003e+001',
  '-1.2223000000000000e+002'],
 ['3.5850000000000000e+005',
  '8.3013999999999992e+000',
  '2.1000000000000000e+001',
  '7.0990000000000000e+003',
  '1.1060000000000000e+003',
  '2.4010000000000000e+003',
  '1.1380000000000000e+003',
  '3.7859999999999999e+001',
  '-1.2222000000000000e+002']]

RDDs when you want to perform low-level transformations and actions on your unstructured data. This means that you don’t care about imposing a schema while processing or accessing the attributes by name or column. Tying in to what was said before about performance, by using RDDs, you don’t necessarily want the performance benefits that DataFrames can offer for (semi-) structured data. Use RDDs when you want to manipulate the data with functional programming constructs rather than domain specific expressions.
To recapitulate, you’ll switch to DataFrames now to use high-level expressions, to perform SQL queries to explore your data further and to gain columnar access.

In [125]:
#With this SchemaRDD in place, you can easily convert the RDD to a DataFrame with the toDF() method.

In [128]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()


In [129]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          households|    housingMedianAge|            latitude|           longitude|    medianHouseValue|        medianIncome|          population|       totalBedRooms|          totalRooms|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|1.260000000000000...|4.100000000000000...|8.325200000000000...|4.526000000000000...|-1.22230000000000...|3.788000000000000...|3.220000000000000...|1.290000000000000...|8.800000000000000...|
|1.138000000000000...|2.100000000000000...|8.301399999999999...|3.585000000000000...|-1.22220000000000...|3.785999999999999...|2.401000000000000...|1.106000000000000...|7.099000000000000...|
|1.138000000000000...|2.100000000000000...|8.

In [130]:
df.describe()

DataFrame[summary: string, households: string, housingMedianAge: string, latitude: string, longitude: string, medianHouseValue: string, medianIncome: string, population: string, totalBedRooms: string, totalRooms: string]

In [131]:
df.columns

['households',
 'housingMedianAge',
 'latitude',
 'longitude',
 'medianHouseValue',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms']

In [132]:
df.take(0)

[]

In [133]:
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
        for name in names: 
             df = df.withColumn(name, df[name].cast(newType))
        return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [134]:
df.describe()

DataFrame[summary: string, households: string, housingMedianAge: string, latitude: string, longitude: string, medianHouseValue: string, medianIncome: string, population: string, totalBedRooms: string, totalRooms: string]

In [135]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
+----------+-------------+
only showing top 10 rows



In [136]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [137]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+
|summary|        households|  housingMedianAge|          latitude|         longitude|   medianHouseValue|      medianIncome|        population|    totalBedRooms|       totalRooms|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+
|  count|             20641|             20641|             20641|             20641|              20641|             20641|             20641|            20641|            20641|
|   mean|499.57061188895887| 28.63911632188363|3.8708856597461185|206863.16365486168|-119.56983284834519| 35.63196937834926|1425.5240056198827|537.9255365534616| 2635.97931301778|
| stddev|382.34631731329733|12.585365057783626|1.9000259937810955|115397.64768878758| 2.003568131750

# Data Preprocessing


In [138]:
#express the house values in units of 100,000

In [139]:
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

[Row(households=126.0, housingMedianAge=41.0, latitude=8.325200080871582, longitude=452600.0, medianHouseValue=-0.0012223000335693358, medianIncome=37.880001068115234, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=8.301400184631348, longitude=358500.0, medianHouseValue=-0.0012222000122070313, medianIncome=37.86000061035156, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

Feature Engineering


Rooms per household which refers to the number of rooms in households per block group;
Population per household, which basically gives you an indication of how many people live in households per block group; And
Bedrooms per room which will give you an idea about how many rooms are bedrooms per block group;

In [140]:
# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=8.325200080871582, longitude=452600.0, medianHouseValue=-0.0012223000335693358, medianIncome=37.880001068115234, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [141]:
#let’s leave out variables such as longitude, latitude, housingMedianAge and totalRooms

In [142]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

Standardization

In [143]:
# DenseVector() function. A dense vector is a local vector that is backed by a double array that represents its entry values.
#In other words, it's used to store arrays of values for use in PySpark.

In [144]:
#making a DataFrame out of the input_data and you re-label the columns by passing a list as a second argument. This list consists of the column names "label" and "features":

In [145]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [146]:
#inally scale the data. we  can use Spark ML to do this: this library will make machine learning on big data scalable and easy

In [147]:
#The input columns are the features, and the output column with the rescaled that will be included in the scaled_df will be named "features_scaled":

In [148]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

In [149]:
# Inspect the result
scaled_df.take(2)

[Row(label=-0.0012223000335693358, features=DenseVector([129.0, 322.0, 126.0, 37.88, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3295, 17.7344, 2.8229, 0.2461, 2.5264])),
 Row(label=-0.0012222000122070313, features=DenseVector([1106.0, 2401.0, 1138.0, 37.86, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9764, 17.7251, 2.5214, 0.2031, 2.6851]))]

#  Building A Machine Learning Model With Spark ML

In [150]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

Note that the argument elasticNetParam corresponds to α or the vertical intercept and that the regParam or the regularization paramater corresponds to λ.

In [151]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

the transform() method to predict the labels for your test_data. Then, you can use RDD operations to extract the predictions as well as the true labels from the DataFrame and zip these two values together in a list called predictionAndLabel

In [152]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(-0.0011957699738438035, -0.001243499984741211),
 (-0.0011957699738438035, -0.0012430000305175782),
 (-0.0011957699738438035, -0.0012430000305175782),
 (-0.0011957699738438035, -0.0012419000244140626),
 (-0.0011957699738438035, -0.0012416999816894532)]

In [155]:
# Coefficients for the model
linearModel.coefficients



DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

In [154]:
# Intercept for the model
linearModel.intercept

-0.0011957699738438035

In [157]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError


2.0057493967173708e-05

In [158]:
# Get the R2
linearModel.summary.r2

9.237055564881302e-14