In [1]:
# install pyspark
!pip install pyspark



In [None]:
# Create a pyspark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [None]:
# Loading in data

df = spark.read.csv('/content/sample_data/california_housing_train.csv', header=True)
df.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|      73400.000000|
|-114.570000|33.570000|         20.000000|1454.000000|    326.000000| 624.000000| 262.000000|     1.925000|    

## SQL for Data Analysis

In [None]:
# create a temporary view that we can Query
df.createOrReplaceTempView("data")

In [None]:
# initial sql Query
spark.sql("SELECT distinct latitude from data LIMIT 10").show()

+---------+
| latitude|
+---------+
|37.070000|
|37.940000|
|37.690000|
|39.750000|
|40.250000|
|34.930000|
|38.770000|
|41.500000|
|38.270000|
|34.490000|
+---------+



In [None]:
query = """
SELECT
AVG(population) AS Average_population,
AVG(households) AS Average_households,
MAX(housing_median_age) AS Max_housing_median_age,
MIN(housing_median_age) AS Min_housing_median_age
FROM data
"""

spark.sql(query).show()

+------------------+------------------+----------------------+----------------------+
|Average_population|Average_households|Max_housing_median_age|Min_housing_median_age|
+------------------+------------------+----------------------+----------------------+
|1429.5739411764705| 501.2219411764706|              9.000000|              1.000000|
+------------------+------------------+----------------------+----------------------+



## Machine Learning

In [None]:
from pyspark.sql.types import IntegerType

df_int = df.select(
    df["total_rooms"].cast(IntegerType()),
    df["total_bedrooms"].cast(IntegerType()),
    df["population"].cast(IntegerType()),
    df["median_house_value"].cast(IntegerType()).alias("label")
)

df_int.show()

+-----------+--------------+----------+-----+
|total_rooms|total_bedrooms|population|label|
+-----------+--------------+----------+-----+
|       5612|          1283|      1015|66900|
|       7650|          1901|      1129|80100|
|        720|           174|       333|85700|
|       1501|           337|       515|73400|
|       1454|           326|       624|65500|
|       1387|           236|       671|74000|
|       2907|           680|      1841|82400|
|        812|           168|       375|48500|
|       4789|          1175|      3134|58400|
|       1497|           309|       787|48100|
|       3741|           801|      2434|86500|
|       1988|           483|      1182|62000|
|       1291|           248|       580|48600|
|       2478|           464|      1346|70400|
|       1448|           378|       949|45000|
|       2556|           587|      1005|69100|
|       1678|           322|       666|94900|
|         44|            33|        64|25000|
|       1388|           386|      

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler



# create a features vector from df
assembler = VectorAssembler(
    inputCols=["total_rooms","total_bedrooms", "population"],
    outputCol="features")

output = assembler.transform(df_int)
output.show()

+-----------+--------------+----------+-----+--------------------+
|total_rooms|total_bedrooms|population|label|            features|
+-----------+--------------+----------+-----+--------------------+
|       5612|          1283|      1015|66900|[5612.0,1283.0,10...|
|       7650|          1901|      1129|80100|[7650.0,1901.0,11...|
|        720|           174|       333|85700| [720.0,174.0,333.0]|
|       1501|           337|       515|73400|[1501.0,337.0,515.0]|
|       1454|           326|       624|65500|[1454.0,326.0,624.0]|
|       1387|           236|       671|74000|[1387.0,236.0,671.0]|
|       2907|           680|      1841|82400|[2907.0,680.0,184...|
|        812|           168|       375|48500| [812.0,168.0,375.0]|
|       4789|          1175|      3134|58400|[4789.0,1175.0,31...|
|       1497|           309|       787|48100|[1497.0,309.0,787.0]|
|       3741|           801|      2434|86500|[3741.0,801.0,243...|
|       1988|           483|      1182|62000|[1988.0,483.0,118

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator


# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = output.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="features")

# Train model.  This also runs the indexer.
model = dt.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

+-----------------+------+---------------+
|       prediction| label|       features|
+-----------------+------+---------------+
|194023.1889081456| 67500|[12.0,4.0,18.0]|
|194023.1889081456|275000| [18.0,3.0,8.0]|
|194023.1889081456|350000| [18.0,6.0,3.0]|
|194023.1889081456| 80000|[20.0,4.0,74.0]|
|194023.1889081456| 67500|[22.0,7.0,55.0]|
+-----------------+------+---------------+
only showing top 5 rows



In [None]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.173666
