# Introduction to pyspark

Source: DataCamp

Spark is a platform for big data and cluster computing. Spark lets distribute computations over clusters with multiple nodes (computing instances). Each node works different subsets of the data and carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. 

Big data, parallelization also introduces greater complexity. In order to decide if `pyspark` is for you, you can answer the following questions:

- Is my data too big to work with on a single machine?
- Can my calculations be easily parallelized?

In [1]:
spark # print SparkSession to list version

# pyspark: Using Spark from Python

## Step 1: Connect to a Cluster

The first step in using Spark is connecting to a cluster.

In practice, the cluster will be hosted on a remote machine that's connected to all other nodes. There will be one computer, called **the master** that manages splitting up the data and the computations. The master is connected to the rest of the computers in the cluster, which are called **slaves**. The master sends the slaves data and calculations to run, and they send their results back to the master.

We are going to run our cluster locally, therefore we will not connect to another computer, instead we will run them locally on your own computer.

### SparkContext sc

Creating the connection is as simple as creating an instance of the SparkContext class. The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to.

An object holding all these attributes can be created with the SparkConf() constructor. Take a look at the documentation for all the details!

When you start pyspark you will have a SparkContext called sc already available in your workspace.

In [None]:
sc

In [None]:
print(sc)

In [None]:
# print version of Spark running
print(sc.version)

# Using DataFrames
Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. However, RDDs are hard to work with directly, so in this course you'll be using the Spark DataFrame abstraction built on top of RDDs.

The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDDs.

When you start modifying and combining columns and rows of data, there are many ways to arrive at the same result, but some often take much longer than others. When using RDDs, it's up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in!

To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. You can think of the **SparkContext as your connection to the cluster** and the **SparkSession as your interface with that connection**.

Remember, for the rest of this course you'll have a SparkSession called spark available in your workspace!

# Creating a SparkSession
We've already created a SparkSession for you called spark, but what if you're not sure there already is one? Creating multiple SparkSessions and SparkContexts can cause issues, so it's best practice to use the SparkSession.builder.getOrCreate() method. This returns an existing SparkSession if there's already one in the environment, or creates a new one if necessary!

*Spark session automatically created as* `spark`

In [None]:
spark

In [None]:
# can also look up by running
spark2 = SparkSession.builder.getOrCreate()

In [None]:
# It's the same Session
spark == spark2

- Import SparkSession from pyspark.sql.
- Make a new SparkSession called my_spark using SparkSession.builder.getOrCreate().
- Print my_spark to the console to verify it's a SparkSession

In [None]:
from pyspark.sql import SparkSession

my_spark = SparkSession.builder.getOrCreate()

print(my_spark)

# Viewing tables
Once you've created a SparkSession, you can start poking around to see what data is in your cluster!

Your SparkSession has an **attribute called catalog** which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.

One of the most useful is the .listTables() method, which returns the names of all the tables in your cluster as a list.

- See what tables are in your cluster by calling spark.catalog.listTables() and printing the result!

In [None]:
my_spark.catalog.listTables()

In [None]:
# Simple tests with RDD
res = sc.parallelize(range(1000000))
res = res.map(lambda x: x + 273.15)

In [None]:
res.take(2)

# Regularized Linear Model in pyspark

Below you can find an end-to-end example of how you can find a regularized Linear Model with pyspark.

Source: https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning

In [None]:
spark # to see Spark version

In [None]:
!dir

In [None]:
# Read in California Housing data
rdd = sc.textFile('cadata.txt')
rdd.collect()[:40]

In [None]:
rdd = sc.parallelize(rdd.collect()[27:])

In [None]:
rdd2 = rdd.map(lambda line: line.split(','))

In [None]:
rdd3 = rdd2.map(lambda x: x[0].strip().split(' '))

In [None]:
rdd4 = rdd3.map(lambda x:[i for i in x if i is not ''])

In [None]:
rdd5= rdd4.map(lambda lines: [float(x) for x in lines])

In [None]:
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd5.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [None]:
df.show()

In [None]:
df.columns

In [None]:
df.printSchema()

In [None]:
df.select('population','totalBedRooms').show(10)

In [None]:
df.groupBy("housingMedianAge").count().sort("count",ascending=False).show()

In [None]:
# convert to pandas (note should only be a small dataframe, it is OK for describe)
df.describe().toPandas()

In [None]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

In [None]:
# Create new features / feature engineering

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

In [None]:
# Re-order and select columns, put label column at index 0
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [None]:
df.rdd.take(2)

In [None]:
# DenseVector so that we can do ML
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [None]:
df.show()

In [None]:
# Scale the features / Normalize

# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.show(2)

In [None]:
# Split into 20% test data 80% train data
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1337)

In [None]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=20)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [None]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

In [None]:
# Coefficients for the model
print(linearModel.coefficients)

# Intercept for the model
linearModel.intercept

In [None]:
# Get the RMSE
print(linearModel.summary.rootMeanSquaredError)

# Get the R2
linearModel.summary.r2

In [None]:
# stop spark session
spark.stop()