<a href="https://colab.research.google.com/github/JoshuaD1/PySpark/blob/main/PySpark_Tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# [PySpark Tutorial](https://sparkbyexamples.com/pyspark-tutorial/)

*   Spark is useful for applications that require a highly distributed persistent, and pipelined processing.
*   Start a project in Pandas with a limited sample (**less than 1 millions rows and 1000 columns**) to explore and migrate to Spark.
*   Spark is useful for **Natural Language Processing and Computer Vision** applications, which typically require alot of calculations.

## Libraries


In [None]:
!pip install pyspark

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.ml.feature import *
from pyspark.ml.regression import *

import pandas as pd
import numpy as np
from sklearn import datasets

## Functions

In [None]:
# adjust data types
def convertColumn(df, columns, newType):
  for col in columns: 
     df = df.withColumn(col, df[col].cast(newType))
  return df

## Part 1: Creating Dataframe
- Retrieving Data
- Visualizing Dataframe
- Manipulating Dataframe

### Retrieving Data 

In [None]:
# start a spark session
spark = SparkSession.builder.appName("Practice").getOrCreate()

# checking spark session
spark

In [None]:
# store housing dataset
housing_df = datasets.fetch_california_housing()

# show housing dataset info
[print(k, v, '\n') for k,v in housing_df.items()]

In [None]:
# store dataset in pandas dataframe
df = pd.DataFrame(data=housing_df.data, columns=housing_df.feature_names)
df['y'] = housing_df.target

# convert between spark_df & pandas_df
spark_df = spark.createDataFrame(df.copy()) 
pandas_df = spark_df.toPandas()

# remove 1st row of dummy headers for CSV files
#spark_df  = spark.read.option('header','true').csv('file_path')

Dataframe Info & [Data Types](https://sparkbyexamples.com/spark/spark-sql-dataframe-data-types/)

In [None]:
# stats between spark_df & pandas_df
spark_df.describe().show()
pandas_df.describe()

In [None]:
# make copy of spark_df 
df = spark_df.alias('df')

# check if copy & orignal are same object
id(df) == id(spark_df)

In [None]:
# identify data types
df.printSchema()
df.dtypes

In [None]:
# change dtype of certain columns
convertColumn(df, df.columns, FloatType()).printSchema()

### Visualizing Dataframe

In [None]:
# look at first few rows
df.head(2)

In [None]:
# look at first few rows another way
df.show(n=2,truncate=10, vertical=True)

In [None]:
df.distinct().show()

In [None]:
# show columns
print(df.columns[:], '\n')

# ref column by name
print(df['MedInc'], '\n')

In [None]:
# selecting data & target columns
df.select(df.columns[:-1]).show()
df.select(df.columns[-1:]).show()

### Manipulating Dataframe

In [None]:
# renaming columns
df.withColumnRenamed('HouseAge','HouseLife').show()

In [None]:
# adding new columns based on existing columns
df.withColumn('TripleSpace', df['AveBedrms']*3).show()

In [None]:
# add new column with constants
df.withColumn("Rewards", F.lit(None)).show()
df.withColumn("Constant", F.lit(1.0)).show()
df.withColumn("Constant", F.lit('fill')).show()

In [None]:
# removing columns
df.drop('HouseAge','Population').show()

## Part 2: Filtering & Sorting
* [Filtering Dataframe](https://sparkbyexamples.com/pyspark/pyspark-where-filter/)
* Filtering by conditions & arrays

### Filtering Dataframe

In [None]:
df.describe().show()

In [None]:
# filter column with condition
df.filter(df['AveRooms'] > 3).show()

# filter column with oposite condition
df.filter(~(df['AveRooms'] > 3)).show()

# filter column with multiple conditions
df.filter( (df['HouseAge'] > 51.0) & (df['Latitude'] == 37.88) ).show()

# filter based on values
values = [1.0, 52.0]
df.filter(df['HouseAge'].isin(values)).show()

# filter column using SQL col() function
df.filter(F.col('HouseAge') == 1.0).show()

### Odering & Sorting
More details can be found [here](https://sparkbyexamples.com/pyspark/pyspark-orderby-and-sort-explained/)

In [None]:
# sorting using the sort() function
df.sort(df['y'], df['Population']).show()

# sort by descending & ascending 
df.sort(df['y'].asc(), df['Population'].desc()).show()

In [None]:
# sorting using groupBy and aggregate
df.groupBy('HouseAge').sum().show()

df.groupBy('HouseAge').avg().show()

df.groupBy('HouseAge').mean().show()

df.groupBy('HouseAge').count().show()

In [None]:
df.columns
df.groupBy('HouseAge').max().show()

## Part 3: [Updating Dataframe](https://sparkbyexamples.com/pyspark/pyspark-update-a-column-with-value/)
- Dropping / Replacing Data
- Imputing Missing Values

### [Missing Points](https://www.youtube.com/watch?v=wXx58-mDOKI)

In [None]:
# add missing values
df_miss = df.replace(41.0, None, 'HouseAge')

# add index column
df_index = df_miss.withColumn("id", F.monotonically_increasing_id())

check duplicate rows

In [None]:
print(f"number of rows: {df.count()}")
print(f"number of distinct rows: {df.distinct().count()}")

In [None]:
# drop duplicate rows
df.dropDuplicates().show()

In [None]:
print(f"number of rows: {df.count()}")

# check distinct other than specific column
print(f"number of distinct rows: {df.select([col for col in df.columns if col != 'id']).distinct().count()}")

In [None]:
df.dropDuplicates(subset=[col for col in df.columns if col != 'id']).show()

# check distinct other than specific column
print(f"number of rows: {df.dropDuplicates(subset=[col for col in df.columns if col != 'id']).count()}")

check number of missing points in per row/column

In [None]:
# number of missing values in per row
df_index.rdd.map(lambda row: (row['id'], sum([r == None for r in row]))).collect()

In [163]:
# % of missing values in per column
df_miss.agg(*[(1-F.count(c)/F.count('*')).alias(c+'_miss') for c in df_miss.columns]).show()

+-----------+--------------------+-------------+--------------+---------------+-------------+-------------+--------------+------+
|MedInc_miss|       HouseAge_miss|AveRooms_miss|AveBedrms_miss|Population_miss|AveOccup_miss|Latitude_miss|Longitude_miss|y_miss|
+-----------+--------------------+-------------+--------------+---------------+-------------+-------------+--------------+------+
|        0.0|0.014341085271317833|          0.0|           0.0|            0.0|          0.0|          0.0|           0.0|   0.0|
+-----------+--------------------+-------------+--------------+---------------+-------------+-------------+--------------+------+



In [168]:
# drop uninformative columns
print(f"number of rows: {df.count()}")
df_miss.dropna(how='any', thresh=1, subset=['HouseAge']).count()

number of rows: 20640


20344

### Encoding

encoding values automatically with [imputer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html)

In [None]:
# create classifier
imputer = Imputer(strategy='median', inputCols=['HouseAge'], outputCols=['out_HouseAge'])

# apply classifier
imputer.fit(df_miss).transform(df_miss).show()

In [None]:
print(f"Strategy: {imputer.getStrategy()}")
print(f"Error: {imputer.getRelativeError()}")

manually encoding values

In [176]:
#Replace 0 for null for all integer columns
df_miss.na.fill(value=0).show()

#Replace 0 for null on only population column 
df_miss.na.fill(value=0, subset=["HouseAge"]).show()

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|    y|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+
|8.3252|     0.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|4.526|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|3.585|
|7.2574|    52.0| 8.288135593220339| 1.073446327683616|     496.0|2.8022598870056497|   37.85|  -122.24|3.521|
|5.6431|    52.0|5.8173515981735155|1.0730593607305936|     558.0| 2.547945205479452|   37.85|  -122.25|3.413|
|3.8462|    52.0| 6.281853281853282|1.0810810810810811|     565.0|2.1814671814671813|   37.85|  -122.25|3.422|
|4.0368|    52.0| 4.761658031088083|1.1036269430051813|     413.0| 2.139896373056995|   37.85|  -122.25|2.697|
|

In [174]:
# creat new column with missing values replaced
df_miss.withColumn("HouseMissing", F.when(F.col("HouseAge").isNull(), 0).otherwise(F.col("HouseAge"))).show()

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+------------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|    y|HouseMissing|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+------------+
|8.3252|    null| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|4.526|         0.0|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|3.585|        21.0|
|7.2574|    52.0| 8.288135593220339| 1.073446327683616|     496.0|2.8022598870056497|   37.85|  -122.24|3.521|        52.0|
|5.6431|    52.0|5.8173515981735155|1.0730593607305936|     558.0| 2.547945205479452|   37.85|  -122.25|3.413|        52.0|
|3.8462|    52.0| 6.281853281853282|1.0810810810810811|     565.0|2.1814671814671813|   37.85|  -122.25|3.422|        52.0|
|4.0368|

In [None]:
# replace values
df.replace(41.0, None).show()

# replace values in certain column
df.replace(41.0, None, 'HouseAge').show()

In [None]:
# encoding points
df.replace(to_replace=[37.88, -122.23], value=[1.0, 1.0]).show()

# encoding points in certain column
df.replace(to_replace=[21.0, 50.0], value=[1.0, 1.0], subset='HouseAge').show()

### Outliers
must normalize data

In [193]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+
|summary|            MedInc|          HouseAge|          AveRooms|         AveBedrms|        Population|          AveOccup|          Latitude|          Longitude|                 y|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+
|  count|             20640|             20640|             20640|             20640|             20640|             20640|             20640|              20640|             20640|
|   mean| 3.870671002906976|28.639486434108527| 5.428999742190385| 1.096675149606209|1425.4767441860465|3.0706551594363716|  35.6318614341086|-119.56970445736394|2.0685581690891293|
| stddev|1.8998217179452666|12.585557612111632|2.4741731394243245|0.4739108567954673|1132.

In [195]:
# create a dictionary for outlier bounds 
bounds = {}

# populate dictionary for each column
for c in df.columns:
  quantiles = df.approxQuantile(col=c, probabilities=[.25, .75], relativeError=0.05)
  print(f'interquartile range for {c}: {quantiles}')
  IQR = quantiles[1] - quantiles[0]
  bounds[c] = [quantiles[0]-1.5*IQR, quantiles[1]+1.5*IQR]

interquartile range for MedInc: [2.4922, 4.5556]
interquartile range for HouseAge: [18.0, 36.0]
interquartile range for AveRooms: [4.424242424242424, 5.9106529209622]
interquartile range for AveBedrms: [1.0077399380804954, 1.089613034623218]
interquartile range for Population: [796.0, 1596.0]
interquartile range for AveOccup: [2.42816091954023, 3.199124726477024]
interquartile range for Latitude: [33.92, 37.57]
interquartile range for Longitude: [-121.83, -118.09]
interquartile range for y: [1.233, 2.445]


In [191]:
bounds

{'MedInc': [-0.6029000000000004, 7.6507000000000005],
 'HouseAge': [-9.0, 63.0],
 'AveRooms': [2.19462667916276, 8.140268666041862],
 'AveBedrms': [0.8849302932664116, 1.2124226794373019],
 'Population': [-404.0, 2796.0],
 'AveOccup': [1.2717152091350383, 4.355570436882216],
 'Latitude': [28.445000000000004, 43.045],
 'Longitude': [-127.44, -112.48000000000002],
 'y': [-0.5849999999999995, 4.263]}

## Part 4: Machine Learning

### Linear Regression

In [None]:
# check dtype
df.printSchema()
df.columns[:-1]

In [None]:
# crunch predictors into 1D array
predictors = VectorAssembler(inputCols=df.columns[:-1], 
                             outputCol="Independent Features")
output = predictors.transform(df)

# show new df & new column
predictors.transform(df).show()
print(predictors.transform(df).columns[-1:])

# finialized dataframe for machine learning
predictors.transform(df).select("Independent Features","y").show()
finalized_data = predictors.transform(df).select("Independent Features","y")

In [None]:
# train test split
train_data, test_data = finalized_data.randomSplit([0.75,0.25])

# select model 
regressor = LinearRegression(featuresCol='Independent Features', 
                             labelCol='y', 
                             predictionCol='y_pred')
# apply model
regressor = regressor.fit(train_data)

In [None]:
# prediction
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

In [None]:
# regression results
print(f"regressionCoeffs: {regressor.coefficients}")
print(f"meanAbsoluteError: {round(pred_results.meanAbsoluteError, 3)}")
print(f"meanSquaredError: {round(pred_results.meanSquaredError, 3)}")