In [None]:
pip install pyspark

tar xvf spark-2.2.0-bin-hadoop2.7.tgz

mv spark-2.1.0-bin-hadoop2.7 /usr/local/spark

cd /usr/local/spark

nano README.md

cat README.md

build/mvn -DskipTests clean package run

./bin/pyspark

export PYSPARK_DRIVER_PYTHON="/usr/local/ipython/bin/ipython"

>>> rdd1 = spark.sparkContext.parallelize([('a',7),('a',2),('b',2)])
>>> rdd2 = spark.sparkContext.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])
>>> rdd3 = spark.sparkContext.parallelize(range(100))

>>> rdd1.reduce(lambda a,b: a+b)# Set a fixed value for the hash seed secret
export PYTHONHASHSEED=0

# Set an alternate Python executable
export PYSPARK_PYTHON=/usr/local/ipython/bin/ipython

# Augment the default search path for shared libraries
export LD_LIBRARY_PATH=/usr/local/ipython/bin/ipython

# Augment the default search path for private libraries 
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-*-src.zip:$PYTHONPATH:$SPARK_HOME/python/
>>> rdd2.flatMapValues(lambda x: x).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

import findspark

findspark.init("/usr/local/spark")

from pyspark.sql import SparkSession

spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

export SPARK_HOME="/usr/local/spark/"

# Set a fixed value for the hash seed secret
export PYTHONHASHSEED=0

# Set an alternate Python executable
export PYSPARK_PYTHON=/usr/local/ipython/bin/ipython

# Augment the default search path for shared libraries
export LD_LIBRARY_PATH=/usr/local/ipython/bin/ipython

# Augment the default search path for private libraries 
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-*-src.zip:$PYTHONPATH:$SPARK_HOME/python/

# Load in the data
rdd = sc.textFile('/Users/yourName/Downloads/CaliforniaHousing/cal_housing.data')

# Load in the header
header = sc.textFile('/Users/yourName/Downloads/CaliforniaHousing/cal_housing.domain')

header.collect()
#[u'longitude: continuous.', u'latitude: continuous.', u'housingMedianAge: continuous. ', u'totalRooms: continuous. ', u'totalBedrooms: continuous. ', u'population: continuous. ', u'households: continuous. ', u'medianIncome: continuous. ', u'medianHouseValue: continuous. ']

rdd.take(2)
#[u'-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000', u'-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

# Inspect the first 2 lines 
rdd.take(2)

#[[u'-122.230000', u'37.880000', u'41.000000', u'880.000000', u'129.000000', u'322.000000', u'126.000000', u'8.325200', u'452600.000000'], [u'-122.220000', u'37.860000', u'21.000000', u'7099.000000', u'1106.000000', u'2401.000000', u'1138.000000', u'8.301400', u'358500.000000']]

# Inspect the first line 
rdd.first()

# Take top elements
rdd.top(2)

#[u'-122.230000', u'37.880000', u'41.000000', u'880.000000', u'129.000000', u'322.000000', u'126.000000', u'8.325200', u'452600.000000']

# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

df.show()

# Print the data types of all `df` columns
# df.dtypes

# Print the schema of `df`
df.printSchema()

from pyspark.sql.types import *

df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
   .withColumn("latitude", df["latitude"].cast(FloatType())) \
   .withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())) \
   .withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \ 
   .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \ 
   .withColumn("population", df["population"].cast(FloatType())) \ 
   .withColumn("households", df["households"].cast(FloatType())) \ 
   .withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \ 
   .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

df.select('population','totalBedRooms').show(10)

df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

df.describe().show()

# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)
#[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0), Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()
#Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

#[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])), Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

#[(1.4491508524918457, 0.14999), (1.5705029404692372, 0.14999), (2.148727956912464, 0.14999), (1.5831547768979277, 0.344), (1.5182107797955968, 0.398)]

# Coefficients for the model
linearModel.coefficients

# Intercept for the model
linearModel.intercept

# The coefficients
#[0.0,0.0,0.0,0.276239709215,0.0,0.0,0.0]

# The intercept
#0.990399577462

# Get the RMSE
linearModel.summary.rootMeanSquaredError

# Get the R2
linearModel.summary.r2

# RMSE
#0.8692118678997669

# R2
#0.4240895287218379

spark.stop()