# Example of Data Processing Using PySpark

[Basic Knowledge for Spark](https://github.com/YiranJing/Software-Engineering-for-Data-Science/wiki/Spark)


### Content:
1. Begin SparkSession
1. Data Cleaning
2. Data Exploration
3. Feature Engineering
4. Build Spark ML model
5. Model Evaluation
6. Stop SparkSession

[local file system run spark](https://www.quora.com/In-Apache-Spark-when-using-local-file-system-is-it-necessary-to-have-the-data-file-on-all-the-slaves-or-is-it-fine-if-we-can-have-it-in-master-node)

In [85]:
import findspark
from pyspark import SparkContext, SparkConf

In [86]:
# If you’re still in doubt where SPARK_HOME is located at, 
# you can call findspark.find() to automatically detect the location of where Spark is installed.
findspark.init()

### Begin SparkSession

In [87]:
# Import SparkSession
from pyspark.sql import SparkSession

In [88]:
# Build the SparkSession
# getOrCreate(): get the current Spark session or to create one if there is none running
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

#### Load data
`textFile()` method to read in the data from the folder that you downloaded it to `RDDs`. This method takes an URI for the file, which is in this case the local path of your machine, and reads it as a collection of lines.

In [89]:
# Load in the data (EDD currently)
rdd = sc.textFile('cal_housing.data')
# Load in the header
header = sc.textFile('cal_housing.domain')

- Spark’s execution is `lazy execution`, nothing has been executed yet. Your data hasn’t been actually read in.
The `rdd` and `header` variables are actually just concepts in your mind.

### Data Cleaning

In [90]:
# push Spark to work (since spark is lazy evaluation)
# use the collect() method to look at the header
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous.',
 'totalRooms: continuous.',
 'totalBedrooms: continuous. ',
 'population: continuous.',
 'households: continuous.',
 'medianIncome: continuous.',
 'medianHouseValue: continuous.']

be careful when using `collect()`! Running this line of code can possibly cause the driver to `run out of memory`. 

That’s why the following approach with the `take()` method is a safer approach if you want to just print a few elements of the RDD. In general, it’s a good principle to limit your result set whenever possible

In [91]:
# load data as RDD.
rdd.take(2) # take the first 2 elements

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [92]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(",")) # pass a lambda function to split the line at the comma
# Inspect the first 2 lines 
rdd.take(1) 

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000']]

In [93]:
# Inspect the first line 
# rdd.first()

# Take top elements
# rdd.top()

#### Convert to DataFrame with header
RDD has no header.

To make your life easier, you will move on from the RDD and convert it to a DataFrame. Dataframes are preferred over RDDs

[Difference between RDD and DataFrame API](https://github.com/YiranJing/Software-Engineering-for-Data-Science/wiki/Spark)

DataFrame Object,
- Each entry is linked to a row and a certain column 
- columns have data types.

In [94]:
"""
The first step is to make a SchemaRDD 
or an RDD of Row objects with a schema.
"""

# Import the necessary modules 
from pyspark.sql import Row
# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [95]:
df.head()

Row(households='126.000000', housingMedianAge='41.000000', latitude='37.880000', longitude='-122.230000', medianHouseValue='452600.000000', medianIncome='8.325200', population='322.000000', totalBedRooms='129.000000', totalRooms='880.000000')

In [96]:
# Show the top 20 rows by default
df.show(3)

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
only showing top 3 rows



In [97]:
# Print the data types of all `df` columns
# df.dtypes

# return the columns of your DataFrame
# df.columns

In [98]:
# Print the schema of `df`
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



#### Modify DataTypes

In [99]:
"""
declare that each column of the DataFrame df 
should be cast to a FloatType()
"""
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 
           'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [100]:
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



### Data exploration

In [101]:
df.select('population','totalBedRooms').show(5)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
+----------+-------------+
only showing top 5 rows



In [102]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",
                                            ascending=False).show(5)

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
+----------------+-----+
only showing top 5 rows



In [103]:
df.describe().show(3)

+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|         latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|            20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean|499.5396802325581|28.639486434108527|35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.3297528316098| 12.58555761211163|2.135952380602968|  2.003531742932898|115395.61587441359|1.

### Feature Engineering

In [104]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [105]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")


##### Standardization

`DenseVector()` used to store arrays of values for use in PySpark.

In [106]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])
df.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
+-----+--------------------+
only showing top 2 rows



In [107]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.show(2)

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,126....|[0.30623297630686...|
|3.585|[1106.0,2401.0,11...|[2.62553233949916...|
+-----+--------------------+--------------------+
only showing top 2 rows



### Building Spark ML model

In [53]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [54]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.1340115638008952, 0.14999),
 (1.4485018834650096, 0.14999),
 (1.5713396046425587, 0.14999),
 (1.7496542762527307, 0.283),
 (1.2438468929500472, 0.366)]

### Model Evaluation

In [55]:
# Coefficients for the model
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2796, 0.0, 0.0, 0.0])

In [56]:
# Intercept for the model
linearModel.intercept

0.9841344205626824

In [57]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

0.8765335684459216

In [58]:
# Get the R2
linearModel.summary.r2

0.42282227755911483

### Stop the SparkSession

In [59]:
spark.stop()