In [1]:
spark_df = sqlContext.read.format("csv").load("/FileStore/tables/cal_housing.data", header=True, inferSchema=True)
spark_df.show(2)
housing_df =  sqlContext.read.format("csv").load("FileStore/tables/cal_housing.domain", header=True, inferSchema=True)
housing_df.show(2)

In [2]:
housing_df.collect()

In [3]:
# Split lines on commas
spark_df.first()
# Inspect the first 2 lines 
spark_df.take(2)

In [4]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = spark_df.rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [5]:
df.show(20)

In [6]:
from pyspark.sql.types import *

df = df.withColumn("longitude", df["longitude"].cast(FloatType()))  .withColumn("latitude", df["latitude"].cast(FloatType())).withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())).withColumn("totalRooms", df["totalRooms"].cast(FloatType())).withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())).withColumn("households", df["households"].cast(FloatType())).withColumn("medianIncome", df["medianIncome"].cast(FloatType())).withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))
     

In [7]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())


In [8]:
df.select('population','totalBedRooms').show(10)


In [9]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()


In [10]:
df.describe().show()


In [11]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

In [12]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))
# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

In [13]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [14]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [15]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

In [16]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [17]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [18]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

In [19]:
# Coefficients for the model
linearModel.coefficients

# Intercept for the model
linearModel.intercept


In [20]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

# Get the R2
linearModel.summary.r2