<a href="https://www.kaggle.com/code/masatomurakawamm/data-wrangling-and-ml-with-pyspark?scriptVersionId=150610046" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

---
### This notebook is a memorandum of my studies on PySpark.
### The aim of this notebook is to introduce how to handle data and conduct machine learning with PySpark.

---
## What is PySpark?
**(from [official document][1])**

PySpark is the Python API for Apache Spark. It enables you to perform real-time, large-scale data processing in a distributed environment using Python. It also provides a PySpark shell for interactively analyzing your data.

PySpark combines Python’s learnability and ease of use with the power of Apache Spark to enable processing and analysis of data at any size for everyone familiar with Python.

PySpark supports all of Spark’s features such as Spark SQL, DataFrames, Structured Streaming, Machine Learning (MLlib) and Spark Core.

---
The main purpose of using PySpark is to conduct distributed computing processing for large scale data. For such aim, scalable managed services available on the cloud, Amazon EMR of AWS or Dataproc of Google Cloud for example, would be the first choice. In this notebook, we will run PySpark codes locally. But please note that these codes can also be easily applied to distributed processing environments.

---
Here is a tip for the two data structure of PySpark, RDD and DataFrame. RDD, or Resilient Distributed Dataset, is an immutable distributed collection of objects, partitioned across multiple nodes in a cluster and can be processed in parallel. DataFrame is a higher-level abstraction built on top of RDDs in PySpark. The operations on PySpark's DataFrame are internally translated into operations on RDDs, and PySpark's Catalyst optimizer optimizes the execution plan of DataFrame operations. Thus, in this notebook, we will not introduce how to handle RDD directly, but DataFrame which provides a more user-friendly, SQL-like interface compared to RDD.

---
**If you find this notebook useful, or when you copy&edit this notebook, please give me an upvote. It helps me keep up my motivation.**

---

[1]: https://spark.apache.org/docs/latest/api/python/index.html

<span id='toc'/>

<h1 style="background:#05445E; border:0; border-radius: 12px; color:#D3D3D3"><center>0. TABLE OF CONTENTS</center></h1>

<ul class="list-group" style="list-style-type:none;">
    <li><a href="#1" class="list-group-item list-group-item-action">1. Settings</a></li>
    <li><a href="#2" class="list-group-item list-group-item-action">2. Data Loading</a></li>
        <ul class="list-group" style="list-style-type:none;">
            <li><a href="#2.1" class="list-group-item list-group-item-action">2.1 Data Loading with PySpark</a></li>
            <li><a href="#2.2" class="list-group-item list-group-item-action">2.2 Parquet</a></li>
            <li><a href="#2.3" class="list-group-item list-group-item-action">2.3 Database and tables</a></li>
        </ul>
    <li><a href="#3" class="list-group-item list-group-item-action">3. Data Wrangling with PySpark</a>
        <ul class="list-group" style="list-style-type:none;">
            <li><a href="#3.1" class="list-group-item list-group-item-action">3.1 Select specific columns</a></li>
            <li><a href="#3.2" class="list-group-item list-group-item-action">3.2 Show as alias</a></li>
            <li><a href="#3.3" class="list-group-item list-group-item-action">3.3 Count data</a></li>
            <li><a href="#3.4" class="list-group-item list-group-item-action">3.4 Count unique number</a></li>
            <li><a href="#3.5" class="list-group-item list-group-item-action">3.5 Create and Delete columns</a></li>
            <li><a href="#3.6" class="list-group-item list-group-item-action">3.6 Extract records that meet the criteria</a></li>
            <li><a href="#3.7" class="list-group-item list-group-item-action">3.7 Conditional branching</a></li>
            <li><a href="#3.8" class="list-group-item list-group-item-action">3.8 Data type conversion</a></li>
            <li><a href="#3.9" class="list-group-item list-group-item-action">3.9 Statistical Analysis</a></li>
            <li><a href="#3.10" class="list-group-item list-group-item-action">3.10 GroupBy</a></li>
            <li><a href="#3.11" class="list-group-item list-group-item-action">3.11 Window functions</a></li>
            <li><a href="#3.12" class="list-group-item list-group-item-action">3.12 Handling of null values</a></li>
            <li><a href="#3.13" class="list-group-item list-group-item-action">3.13 Join DataFrames</a></li>
        </ul>
    </li>
    <li><a href="#4" class="list-group-item list-group-item-action">4. Machine Learning with PySpark MLlib</a>
        <ul class="list-group" style="list-style-type:none;">
            <li><a href="#4.1" class="list-group-item list-group-item-action">4.1 Logistic Regression</a></li>
            <li><a href="#4.2" class="list-group-item list-group-item-action">4.2 Random Forest with PySpark Pipeline</a></li>
            <li><a href="#4.3" class="list-group-item list-group-item-action">4.3 Clustering with k-means</a></li>
        </ul>
    </li>
</ul>


<a id ="1"></a><h1 style="background:#05445E; border:0; border-radius: 12px; color:#D3D3D3"><center>1. Settings</center></h1>
[Back to the TOC](#toc)

In [1]:
## Parameters
data_config = {
    'train_csv_path': '../input/spaceship-titanic/train.csv',
    'test_csv_path': '../input/spaceship-titanic/test.csv',
    'sample_submission_path': '../input/spaceship-titanic/sample_submission.csv',
}

seed = 123

print('Parameters setted!')

Parameters setted!


In [2]:
# Installing dependencies 
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=4aa2ecbefa93cc9d44a7978c9441e4931f71e0808cd28c2db183352d487f32d8
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
## Import dependencies 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
%matplotlib inline

import sklearn
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OrdinalEncoder

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField, StructType, 
    StringType, IntegerType, 
    FloatType, BooleanType,
    DateType,
)
import pyspark.sql.functions as sf
from pyspark.sql.functions import (
    col, when, asc, desc, lit,
    mean, sum, avg, stddev,
    count, countDistinct,
    format_number, isnan,
    asc, desc, mean, 
    rank, lag, lead,
)
from pyspark.sql.window import Window

from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, 
    VectorAssembler, StandardScaler,
)

# In case you have to connect Notebook with Spark
#import findspark
#spark_path = '~~'
#findspark.init(spark_path)

import warnings
warnings.filterwarnings('ignore')

print('import done!')

import done!


In [4]:
# Building a SparkSession
spark = SparkSession.builder.appName('SpaceshipTitanic').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/14 03:00:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Or you can build a SparkSession with more options like this
spark = SparkSession.builder \
    .appName('SpaceshipTitanic') \
    .config('hive.exec.dynamic.partition', 'true') \
    .config('hive.exec.dynamic.parition.mode', 'nonstrict') \
    .config('spark.sql.session.timeZone', 'JST') \
    .config('spark.ui.enabled', 'true') \
    .config('spark.eventLog.enabled', 'true') \
    .enableHiveSupport() \
    .getOrCreate()

23/11/14 03:00:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


<a id ="2"></a><h1 style="background:#05445E; border:0; border-radius: 12px; color:#D3D3D3"><center>2. Data Loading</center></h1>
[Back to the TOC](#toc)

---
### [File and Data Field Descriptions](https://www.kaggle.com/competitions/spaceship-titanic/data)

- **train.csv** - Personal records for about two-thirds (~8700) of the passengers, to be used as training data.
 - `PassengerId` - A unique Id for each passenger. Each Id takes the form `gggg_pp` where `gggg` indicates a group the passenger is travelling with and `pp` is their number within the group. People in a group are often family members, but not always.
 - `HomePlanet` - The planet the passenger departed from, typically their planet of permanent residence.
 - `CryoSleep` - Indicates whether the passenger elected to be put into suspended animation for the duration of the voyage. Passengers in cryosleep are confined to their cabins.
 - `Cabin` - The cabin number where the passenger is staying. Takes the form `deck/num/side`, where `side` can be either `P` for *Port* or `S` for *Starboard*.
 - `Destination` - The planet the passenger will be debarking to.
 - `Age` - The age of the passenger.
 - `VIP` - Whether the passenger has paid for special VIP service during the voyage.
 - `RoomService`, `FoodCourt`, `ShoppingMall`, `Spa`, `VRDeck` - Amount the passenger has billed at each of the *Spaceship Titanic*'s many luxury amenities.
 - `Name` - The first and last names of the passenger.
 - `Transported` - Whether the passenger was transported to another dimension. This is the target, the column you are trying to predict.


- **test.csv** - Personal records for the remaining one-third (~4300) of the passengers, to be used as test data. Your task is to predict the value of `Transported` for the passengers in this set.


- **sample_submission.csv** - A submission file in the correct format.
 - `PassengerId` - Id for each passenger in the test set.
 - `Transported` - The target. For each passenger, predict either *True* or *False*.

---
### [Submission & Evaluation](https://www.kaggle.com/competitions/spaceship-titanic/overview/evaluation)

- Submissions are evaluated based on their classification accuracy, the percentage of predicted labels that are correct.

---

In [6]:
# Data Loading with Pandas
train_pandas_df = pd.read_csv(data_config['train_csv_path'])
test_pandas_df = pd.read_csv(data_config['test_csv_path'])
submission_pandas_df = pd.read_csv(data_config['sample_submission_path'])

print(train_pandas_df.dtypes)
train_pandas_df.head(5)

PassengerId      object
HomePlanet       object
CryoSleep        object
Cabin            object
Destination      object
Age             float64
VIP              object
RoomService     float64
FoodCourt       float64
ShoppingMall    float64
Spa             float64
VRDeck          float64
Name             object
Transported        bool
dtype: object


Unnamed: 0,PassengerId,HomePlanet,CryoSleep,Cabin,Destination,Age,VIP,RoomService,FoodCourt,ShoppingMall,Spa,VRDeck,Name,Transported
0,0001_01,Europa,False,B/0/P,TRAPPIST-1e,39.0,False,0.0,0.0,0.0,0.0,0.0,Maham Ofracculy,False
1,0002_01,Earth,False,F/0/S,TRAPPIST-1e,24.0,False,109.0,9.0,25.0,549.0,44.0,Juanna Vines,True
2,0003_01,Europa,False,A/0/S,TRAPPIST-1e,58.0,True,43.0,3576.0,0.0,6715.0,49.0,Altark Susent,False
3,0003_02,Europa,False,A/0/S,TRAPPIST-1e,33.0,False,0.0,1283.0,371.0,3329.0,193.0,Solam Susent,False
4,0004_01,Earth,False,F/1/S,TRAPPIST-1e,16.0,False,303.0,70.0,151.0,565.0,2.0,Willy Santantines,True


<a id ="2.1"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>2.1 Data Loading with PySpark</center></h2>
[Back to the TOC](#toc)

In [7]:
# Data Loading with PySpark-1
train_df = spark.read.csv(data_config['train_csv_path'], header=True, inferSchema=True)

print(train_df.printSchema())
train_df.show(n=5, truncate=False)

                                                                                

root
 |-- PassengerId: string (nullable = true)
 |-- HomePlanet: string (nullable = true)
 |-- CryoSleep: boolean (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- VIP: boolean (nullable = true)
 |-- RoomService: double (nullable = true)
 |-- FoodCourt: double (nullable = true)
 |-- ShoppingMall: double (nullable = true)
 |-- Spa: double (nullable = true)
 |-- VRDeck: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- Transported: boolean (nullable = true)

None
+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+------+------+-----------------+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|Destination|Age |VIP  |RoomService|FoodCourt|ShoppingMall|Spa   |VRDeck|Name             |Transported|
+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+------+------+-----------------+-----------+
|

In [8]:
# Data Loading with PySpark-2
# Datatypes of features are automatically predicted when "inferSchema" parameter equals True.
# Or you can specify data schema yourself.
train_data_schema = StructType(fields=[
    StructField('PassengerId', StringType(), True),
    StructField('HomePlanet',  StringType(), True),
    StructField('CryoSleep', BooleanType(), True),
    StructField('Cabin', StringType(), True),
    StructField('Destination', StringType(), True),
    StructField('Age', FloatType(), True),
    StructField('VIP', BooleanType(), True),
    StructField('RoomService', FloatType(), True),
    StructField('FoodCourt', FloatType(), True),
    StructField('ShoppingMall', FloatType(), True),
    StructField('Spa', FloatType(), True),
    StructField('VRDeck', FloatType(), True),
    StructField('Name', StringType(), True),
    StructField('Transported', BooleanType(), True),
    ]
)

train_df = spark.read.csv(data_config['train_csv_path'], header=True, schema=train_data_schema)
print(train_df.printSchema())
train_df.show(n=5, truncate=False)

root
 |-- PassengerId: string (nullable = true)
 |-- HomePlanet: string (nullable = true)
 |-- CryoSleep: boolean (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- VIP: boolean (nullable = true)
 |-- RoomService: float (nullable = true)
 |-- FoodCourt: float (nullable = true)
 |-- ShoppingMall: float (nullable = true)
 |-- Spa: float (nullable = true)
 |-- VRDeck: float (nullable = true)
 |-- Name: string (nullable = true)
 |-- Transported: boolean (nullable = true)

None
+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+------+------+-----------------+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|Destination|Age |VIP  |RoomService|FoodCourt|ShoppingMall|Spa   |VRDeck|Name             |Transported|
+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+------+------+-----------------+-----------+
|0001_0

In [9]:
# Test Data Loading with PySpark
test_data_schema = StructType(fields=[
    StructField('PassengerId', StringType(), True),
    StructField('HomePlanet',  StringType(), True),
    StructField('CryoSleep', BooleanType(), True),
    StructField('Cabin', StringType(), True),
    StructField('Destination', StringType(), True),
    StructField('Age', FloatType(), True),
    StructField('VIP', BooleanType(), True),
    StructField('RoomService', FloatType(), True),
    StructField('FoodCourt', FloatType(), True),
    StructField('ShoppingMall', FloatType(), True),
    StructField('Spa', FloatType(), True),
    StructField('VRDeck', FloatType(), True),
    StructField('Name', StringType(), True),
    #StructField('Transported', BooleanType(), True),
    ]
)

# Another way of loading data
test_df = spark.read.format('csv') \
            .option("multiLine", "true") \
            .option('encoding', 'utf-8') \
            .option('header', 'True') \
            .option('sep', ',') \
            .load(data_config['test_csv_path'], schema=test_data_schema)
# multiLine option is for the case when some columns contain newlines.

submission_df = spark.read.csv(data_config['sample_submission_path'], header=True, schema=test_data_schema)

test_df.show(n=5)

+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+------+------+----------------+
|PassengerId|HomePlanet|CryoSleep|Cabin|Destination| Age|  VIP|RoomService|FoodCourt|ShoppingMall|   Spa|VRDeck|            Name|
+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+------+------+----------------+
|    0013_01|     Earth|     true|G/3/S|TRAPPIST-1e|27.0|false|        0.0|      0.0|         0.0|   0.0|   0.0| Nelly Carsoning|
|    0018_01|     Earth|    false|F/4/S|TRAPPIST-1e|19.0|false|        0.0|      9.0|         0.0|2823.0|   0.0|  Lerome Peckers|
|    0019_01|    Europa|     true|C/0/S|55 Cancri e|31.0|false|        0.0|      0.0|         0.0|   0.0|   0.0| Sabih Unhearfus|
|    0021_01|    Europa|    false|C/1/S|TRAPPIST-1e|38.0|false|        0.0|   6652.0|         0.0| 181.0| 585.0|Meratz Caltilter|
|    0023_01|     Earth|    false|F/5/S|TRAPPIST-1e|20.0|false|       10.0|      0.0|     

In [10]:
# Convert to a pandas DataFrame
train_df_pd = train_df.toPandas() # PySpark => Pandas
train_df_ps = spark.createDataFrame(train_df_pd) # Pandas => PySpark

train_df_pd.head()

Unnamed: 0,PassengerId,HomePlanet,CryoSleep,Cabin,Destination,Age,VIP,RoomService,FoodCourt,ShoppingMall,Spa,VRDeck,Name,Transported
0,0001_01,Europa,False,B/0/P,TRAPPIST-1e,39.0,False,0.0,0.0,0.0,0.0,0.0,Maham Ofracculy,False
1,0002_01,Earth,False,F/0/S,TRAPPIST-1e,24.0,False,109.0,9.0,25.0,549.0,44.0,Juanna Vines,True
2,0003_01,Europa,False,A/0/S,TRAPPIST-1e,58.0,True,43.0,3576.0,0.0,6715.0,49.0,Altark Susent,False
3,0003_02,Europa,False,A/0/S,TRAPPIST-1e,33.0,False,0.0,1283.0,371.0,3329.0,193.0,Solam Susent,False
4,0004_01,Earth,False,F/1/S,TRAPPIST-1e,16.0,False,303.0,70.0,151.0,565.0,2.0,Willy Santantines,True


<a id ="2.2"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>2.2 Parquet</center></h2>
[Back to the TOC](#toc)

In [11]:
# Save as parquet files separated by partitions
save_path = '/kaggle/working/spark-warehouse/spaceship.db/parque'

# The desirable size for each file is about 3GB.
train_df.repartition(2) \
        .write \
        .partitionBy('HomePlanet') \
        .mode('overwrite') \
        .format('parquet') \
        .save(save_path)

# Load parquet data
train_df = spark.read.parquet(save_path)

train_df.show(n=5, truncate=False)

                                                                                

+-----------+---------+--------+-------------+----+-----+-----------+---------+------------+---+------+----------------+-----------+----------+
|PassengerId|CryoSleep|Cabin   |Destination  |Age |VIP  |RoomService|FoodCourt|ShoppingMall|Spa|VRDeck|Name            |Transported|HomePlanet|
+-----------+---------+--------+-------------+----+-----+-----------+---------+------------+---+------+----------------+-----------+----------+
|5863_01    |false    |G/950/P |55 Cancri e  |0.0 |false|0.0        |0.0      |0.0         |0.0|0.0   |Lawren Loverthyo|true       |Earth     |
|7456_01    |true     |G/1210/S|TRAPPIST-1e  |21.0|false|0.0        |0.0      |0.0         |0.0|0.0   |Yvetta Whitez   |true       |Earth     |
|5915_01    |false    |E/382/S |TRAPPIST-1e  |32.0|false|0.0        |0.0      |39.0        |0.0|749.0 |Garion Mcneiley |false      |Earth     |
|6020_01    |true     |G/974/P |55 Cancri e  |39.0|false|0.0        |0.0      |0.0         |0.0|0.0   |Stany Hanner    |false      |Eart

<a id ="2.3"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>2.3 Database and tables</center></h2>
[Back to the TOC](#toc)

In [12]:
# For SQL execution, you have to register the DataFrame as a temporary view.
train_df.createOrReplaceTempView('train_df')

In [13]:
# Display the list of tables
spark.sql('SHOW TABLES IN default').show(truncate=False)

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |train_df |true       |
+---------+---------+-----------+



In [14]:
# Display the list of databases
spark.sql('SHOW DATABASES').show(truncate=False)

+---------+
|namespace|
+---------+
|default  |
+---------+



In [15]:
# Create database
spark.sql('CREATE DATABASE IF NOT EXISTS spaceship')
spark.sql('SHOW DATABASES').show(truncate=False)

+---------+
|namespace|
+---------+
|default  |
|spaceship|
+---------+



In [16]:
# Create table
spark_hive = SparkSession.builder \
    .appName("Spaceship_table_create") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.legacy.createHiveTableByDefault", "false") \
    .enableHiveSupport() \
    .getOrCreate()
spark_hive.sql('CREATE DATABASE IF NOT EXISTS spaceship_hive')

query = '''
CREATE EXTERNAL TABLE IF NOT EXISTS spaceship_hive.spaceship_table(
    PassengerId String,
    CryoSleep Boolean,
    Cabin String,
    Destination String,
    Age Float,
    VIP Boolean,
    RoomService Float,
    FoodCourt Float,
    ShoppingMall Float,
    Spa Float,
    VRDeck Float,
    Name String
)
PARTITIONED BY (HomePlanet String)
TBLPROPERTIES ('parquet.compression'='SNAPPY')
LOCATION '/kaggle/working/spark-warehouse/spaceship.db/parque';
'''
# Please note that column list does not contain the column which is used for partition, HomePlanet.

try:
    spark_hive.sql(query)
    spark_hive.sql('SHOW TABLES IN spaceship_hive').show(truncate=False)
except Exception as e:
    print(e)

23/11/14 03:01:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+--------------+---------------+-----------+
|namespace     |tableName      |isTemporary|
+--------------+---------------+-----------+
|spaceship_hive|spaceship_table|false      |
|              |train_df       |true       |
+--------------+---------------+-----------+



In [17]:
# add a partition
spark_hive.sql("ALTER TABLE spaceship_hive.spaceship_table ADD IF NOT EXISTS PARTITION (HomePlanet='Earth')")
spark_hive.sql('SHOW PARTITIONS spaceship_hive.spaceship_table').show()

+----------------+
|       partition|
+----------------+
|HomePlanet=Earth|
+----------------+



In [18]:
# add all partitions
spark_hive.sql('MSCK REPAIR TABLE spaceship_hive.spaceship_table')
spark_hive.sql('SHOW PARTITIONS spaceship_hive.spaceship_table').show(truncate=False)

+-------------------------------------+
|partition                            |
+-------------------------------------+
|HomePlanet=Earth                     |
|HomePlanet=Europa                    |
|HomePlanet=Mars                      |
|HomePlanet=__HIVE_DEFAULT_PARTITION__|
+-------------------------------------+



In [19]:
# Checking data
spark_hive.sql('select * from spaceship_hive.spaceship_table').show(n=5, truncate=False)

+-----------+---------+--------+-------------+----+-----+-----------+---------+------------+---+------+----------------+----------+
|PassengerId|CryoSleep|Cabin   |Destination  |Age |VIP  |RoomService|FoodCourt|ShoppingMall|Spa|VRDeck|Name            |HomePlanet|
+-----------+---------+--------+-------------+----+-----+-----------+---------+------------+---+------+----------------+----------+
|5863_01    |false    |G/950/P |55 Cancri e  |0.0 |false|0.0        |0.0      |0.0         |0.0|0.0   |Lawren Loverthyo|Earth     |
|7456_01    |true     |G/1210/S|TRAPPIST-1e  |21.0|false|0.0        |0.0      |0.0         |0.0|0.0   |Yvetta Whitez   |Earth     |
|5915_01    |false    |E/382/S |TRAPPIST-1e  |32.0|false|0.0        |0.0      |39.0        |0.0|749.0 |Garion Mcneiley |Earth     |
|6020_01    |true     |G/974/P |55 Cancri e  |39.0|false|0.0        |0.0      |0.0         |0.0|0.0   |Stany Hanner    |Earth     |
|1565_01    |false    |F/304/S |PSO J318.5-22|16.0|false|0.0        |1519.0 

In [20]:
# CTAS SQL
query = '''
CREATE EXTERNAL TABLE IF NOT EXISTS spaceship_hive.ctas_sql
    STORED AS PARQUET LOCATION '/kaggle/working/spark-warehouse/spaceship_hive.db/ctas_sql'
AS SELECT *
FROM spaceship_hive.spaceship_table
'''

# Following code resulted in an error, somehow!
try :
    spark_hive.sql(query)
except Exception as e:
    print('error: ', e)

error:  [NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT] CREATE Hive TABLE (AS SELECT) is not supported, if you want to enable it, please set "spark.sql.catalogImplementation" to "hive".;
'CreateTable `spark_catalog`.`spaceship_hive`.`ctas_sql`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Ignore
+- Project [PassengerId#691, CryoSleep#692, Cabin#693, Destination#694, Age#695, VIP#696, RoomService#697, FoodCourt#698, ShoppingMall#699, Spa#700, VRDeck#701, Name#702, HomePlanet#703]
   +- SubqueryAlias spark_catalog.spaceship_hive.spaceship_table
      +- Relation spark_catalog.spaceship_hive.spaceship_table[PassengerId#691,CryoSleep#692,Cabin#693,Destination#694,Age#695,VIP#696,RoomService#697,FoodCourt#698,ShoppingMall#699,Spa#700,VRDeck#701,Name#702,HomePlanet#703] parquet



In [21]:
# CTAS dataframe
train_df.write.format('parquet').mode('overwrite').saveAsTable('spaceship.ctas_df', path='/kaggle/working/spark-warehouse/spaceship.db/ctas_df')

spark.sql('SELECT * FROM spaceship.ctas_df').printSchema()
spark.sql('SELECT * FROM spaceship.ctas_df').show(n=5, truncate=False)

root
 |-- PassengerId: string (nullable = true)
 |-- CryoSleep: boolean (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- VIP: boolean (nullable = true)
 |-- RoomService: float (nullable = true)
 |-- FoodCourt: float (nullable = true)
 |-- ShoppingMall: float (nullable = true)
 |-- Spa: float (nullable = true)
 |-- VRDeck: float (nullable = true)
 |-- Name: string (nullable = true)
 |-- Transported: boolean (nullable = true)
 |-- HomePlanet: string (nullable = true)

+-----------+---------+--------+-------------+----+-----+-----------+---------+------------+---+------+----------------+-----------+----------+
|PassengerId|CryoSleep|Cabin   |Destination  |Age |VIP  |RoomService|FoodCourt|ShoppingMall|Spa|VRDeck|Name            |Transported|HomePlanet|
+-----------+---------+--------+-------------+----+-----+-----------+---------+------------+---+------+----------------+-----------+----------+
|5863_01 

In [22]:
# Revert the settings for subsequent processing
# Or you can build a SparkSession with more options like this
#spark = SparkSession.builder \
#    .appName('SpaceshipTitanic') \
#    .config('hive.exec.dynamic.partition', 'true') \
#    .config('hive.exec.dynamic.parition.mode', 'nonstrict') \
#    .config('spark.sql.session.timeZone', 'JST') \
#    .config('spark.ui.enabled', 'true') \
#    .config('spark.eventLog.enabled', 'true') \
#    .enableHiveSupport() \
#    .getOrCreate()

#train_df.createOrReplaceTempView('train_df')
#spark.sql('SHOW TABLES IN default').show(truncate=False)

<a id ="3"></a><h1 style="background:#05445E; border:0; border-radius: 12px; color:#D3D3D3"><center>3. Data Wrangling with PySpark</center></h1>
[Back to the TOC](#toc)

<a id ="3.1"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.1 Select specific columns</center></h2>
[Back to the TOC](#toc)

In [23]:
train_df.select('PassengerId', 'Transported').show(n=5, truncate=False)

# same result
columns = ['PassengerId', 'Transported']
train_df.select(columns).show(n=5, truncate=False)

+-----------+-----------+
|PassengerId|Transported|
+-----------+-----------+
|5863_01    |true       |
|7456_01    |true       |
|5915_01    |false      |
|6020_01    |false      |
|1565_01    |false      |
+-----------+-----------+
only showing top 5 rows

+-----------+-----------+
|PassengerId|Transported|
+-----------+-----------+
|5863_01    |true       |
|7456_01    |true       |
|5915_01    |false      |
|6020_01    |false      |
|1565_01    |false      |
+-----------+-----------+
only showing top 5 rows



In [24]:
# SQL
query = """
SELECT
    PassengerId
    , Transported
FROM
    train_df
"""

spark.sql(query).show(n=5, truncate=False)

+-----------+-----------+
|PassengerId|Transported|
+-----------+-----------+
|5863_01    |true       |
|7456_01    |true       |
|5915_01    |false      |
|6020_01    |false      |
|1565_01    |false      |
+-----------+-----------+
only showing top 5 rows



<a id ="3.2"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.2 Show as alias</center></h2>
[Back to the TOC](#toc)

In [25]:
train_df.select(train_df['PassengerId'].alias('ID'), train_df.Transported.alias('Label')).show(n=5, truncate=False)

+-------+-----+
|ID     |Label|
+-------+-----+
|5863_01|true |
|7456_01|true |
|5915_01|false|
|6020_01|false|
|1565_01|false|
+-------+-----+
only showing top 5 rows



In [26]:
# SQL
query = """
SELECT
    PassengerId as ID
    , Transported as Label
FROM
    train_df
"""

spark.sql(query).show(n=5, truncate=False)

+-------+-----+
|ID     |Label|
+-------+-----+
|5863_01|true |
|7456_01|true |
|5915_01|false|
|6020_01|false|
|1565_01|false|
+-------+-----+
only showing top 5 rows



<a id ="3.3"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.3 Count data</center></h2>
[Back to the TOC](#toc)

In [27]:
print('total #: ', train_df.count())
print('total # of PassengerId: ', train_df.select('PassengerId').count())

total #:  8693
total # of PassengerId:  8693


In [28]:
# SQL
query = """
SELECT
    count(*) AS total_number
    , count(PassengerId) AS total_PassengerId_number
FROM
    train_df
"""

spark.sql(query).show(n=5, truncate=False)

+------------+------------------------+
|total_number|total_PassengerId_number|
+------------+------------------------+
|8693        |8693                    |
+------------+------------------------+



<a id ="3.4"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.4 Count unique number</center></h2>
[Back to the TOC](#toc)

In [29]:
print('unique # of PassengerId: ', train_df.select('PassengerId').distinct().count())
print('unique # of PassengerId: ', train_df.select('Transported').distinct().count())
train_df.select(countDistinct('PassengerID').alias('unique # ID'),
                countDistinct(train_df['Transported']).alias('unique # Label')) \
                .show()

unique # of PassengerId:  8693
unique # of PassengerId:  2
+-----------+--------------+
|unique # ID|unique # Label|
+-----------+--------------+
|       8693|             2|
+-----------+--------------+



In [30]:
# SQL
query = """
SELECT
    count(DISTINCT PassengerId) AS unique_ID_number
    , count(DISTINCT Transported) AS unique_Label_number
FROM
    train_df
"""

spark.sql(query).show(n=5, truncate=False)

+----------------+-------------------+
|unique_ID_number|unique_Label_number|
+----------------+-------------------+
|8693            |2                  |
+----------------+-------------------+



<a id ="3.5"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.5 Create and Delete columns</center></h2>
[Back to the TOC](#toc)

In [31]:
# Checking columns
print(train_df.columns)
print(train_df.printSchema())

['PassengerId', 'CryoSleep', 'Cabin', 'Destination', 'Age', 'VIP', 'RoomService', 'FoodCourt', 'ShoppingMall', 'Spa', 'VRDeck', 'Name', 'Transported', 'HomePlanet']
root
 |-- PassengerId: string (nullable = true)
 |-- CryoSleep: boolean (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- VIP: boolean (nullable = true)
 |-- RoomService: float (nullable = true)
 |-- FoodCourt: float (nullable = true)
 |-- ShoppingMall: float (nullable = true)
 |-- Spa: float (nullable = true)
 |-- VRDeck: float (nullable = true)
 |-- Name: string (nullable = true)
 |-- Transported: boolean (nullable = true)
 |-- HomePlanet: string (nullable = true)

None


In [32]:
# Create a 'TotalBill' column as a sum of all 5 bill values
train_df = train_df.withColumn(
    'TotalBill',
    train_df['RoomService'] \
    + train_df['FoodCourt'] \
    + train_df['ShoppingMall'] \
    + train_df['Spa'] \
    + train_df['VRDeck']
    )

target_cols = ['TotalBill','RoomService','FoodCourt', 'ShoppingMall', 'Spa', 'VRDeck']
train_df.select([col for col in train_df.columns if col in target_cols]).show(n=5, truncate=False)
print(train_df.printSchema())

+-----------+---------+------------+---+------+---------+
|RoomService|FoodCourt|ShoppingMall|Spa|VRDeck|TotalBill|
+-----------+---------+------------+---+------+---------+
|0.0        |0.0      |0.0         |0.0|0.0   |0.0      |
|0.0        |0.0      |0.0         |0.0|0.0   |0.0      |
|0.0        |0.0      |39.0        |0.0|749.0 |788.0    |
|0.0        |0.0      |0.0         |0.0|0.0   |0.0      |
|0.0        |1519.0   |0.0         |0.0|766.0 |2285.0   |
+-----------+---------+------------+---+------+---------+
only showing top 5 rows

root
 |-- PassengerId: string (nullable = true)
 |-- CryoSleep: boolean (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- VIP: boolean (nullable = true)
 |-- RoomService: float (nullable = true)
 |-- FoodCourt: float (nullable = true)
 |-- ShoppingMall: float (nullable = true)
 |-- Spa: float (nullable = true)
 |-- VRDeck: float (nullable = true)
 |-- Name: string

In [33]:
# Create & delete columns
train_df = train_df.withColumn('SpaceTax', lit(0.5)) # constant value
train_df = train_df.withColumn('TotalBillwithTax', (train_df['TotalBill'] * (1.0 + train_df['SpaceTax'])))

target_cols.extend(['SpaceTax', 'TotalBillwithTax'])

print('---------Create columns---------')
train_df.select([col for col in train_df.columns if col in target_cols]).show(n=5, truncate=False)
print(train_df.printSchema())

train_df = train_df.drop('SpaceTax', 'TotalBillwithTax')

print('---------Delete columns---------')
train_df.select([col for col in train_df.columns if col in target_cols]).show(n=5, truncate=False)
print(train_df.printSchema())

---------Create columns---------
+-----------+---------+------------+---+------+---------+--------+----------------+
|RoomService|FoodCourt|ShoppingMall|Spa|VRDeck|TotalBill|SpaceTax|TotalBillwithTax|
+-----------+---------+------------+---+------+---------+--------+----------------+
|0.0        |0.0      |0.0         |0.0|0.0   |0.0      |0.5     |0.0             |
|0.0        |0.0      |0.0         |0.0|0.0   |0.0      |0.5     |0.0             |
|0.0        |0.0      |39.0        |0.0|749.0 |788.0    |0.5     |1182.0          |
|0.0        |0.0      |0.0         |0.0|0.0   |0.0      |0.5     |0.0             |
|0.0        |1519.0   |0.0         |0.0|766.0 |2285.0   |0.5     |3427.5          |
+-----------+---------+------------+---+------+---------+--------+----------------+
only showing top 5 rows

root
 |-- PassengerId: string (nullable = true)
 |-- CryoSleep: boolean (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: float 

In [34]:
# Change column names
train_df.withColumnRenamed('PassengerId', 'ID').columns

['ID',
 'CryoSleep',
 'Cabin',
 'Destination',
 'Age',
 'VIP',
 'RoomService',
 'FoodCourt',
 'ShoppingMall',
 'Spa',
 'VRDeck',
 'Name',
 'Transported',
 'HomePlanet',
 'TotalBill']

In [35]:
# SQL
query_1 = """
SELECT 
    RoomService
    , FoodCourt
    , ShoppingMall
    , Spa
    , VRDeck
    , RoomService + FoodCourt + ShoppingMall + Spa + VRDeck as TotalBill
FROM
    train_df
"""
spark.sql(query_1).show(n=5, truncate=False)

# TotalBill column, which is the virtual column introduced by the withColumn method, is not actually existed on train_df.
# Therefore, the following code will result in an error.
query_2 = """
SELECT
    RoomService
    , FoodCourt
    , ShoppingMall
    , Spa
    , VRDeck
    , TotalBill
FROM
    train_df
"""
try:
    spark.sql(query_2).show(n=5, truncate=False)
except Exception as e:
    print(e)

# To avoid the error, you have to renew the temporal view before conducting the SQL query.
train_df.createOrReplaceTempView("train_df")
spark.sql(query_2).show(n=5, truncate=False)

+-----------+---------+------------+---+------+---------+
|RoomService|FoodCourt|ShoppingMall|Spa|VRDeck|TotalBill|
+-----------+---------+------------+---+------+---------+
|0.0        |0.0      |0.0         |0.0|0.0   |0.0      |
|0.0        |0.0      |0.0         |0.0|0.0   |0.0      |
|0.0        |0.0      |39.0        |0.0|749.0 |788.0    |
|0.0        |0.0      |0.0         |0.0|0.0   |0.0      |
|0.0        |1519.0   |0.0         |0.0|766.0 |2285.0   |
+-----------+---------+------------+---+------+---------+
only showing top 5 rows

[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `TotalBill` cannot be resolved. Did you mean one of the following? [`Cabin`, `Name`, `Spa`, `Age`, `VIP`].; line 8 pos 6;
'Project [RoomService#413, FoodCourt#414, ShoppingMall#415, Spa#416, VRDeck#417, 'TotalBill]
+- SubqueryAlias train_df
   +- View (`train_df`, [PassengerId#407,CryoSleep#408,Cabin#409,Destination#410,Age#411,VIP#412,RoomService#413,FoodCourt#414,Shopping

<a id ="3.6"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.6 Extract records that meet the criteria</center></h2>
[Back to the TOC](#toc)

In [36]:
# Choose records where the age > 30
tmp_df_1 = train_df.filter(train_df['Age']>30).select(['PassengerId', 'Age', 'TotalBill'])
tmp_df_1.show(n=5, truncate=False)

# Verify that the results are the same
tmp_df_2 = train_df.filter(train_df.Age>30).select(['PassengerId', 'Age', 'TotalBill'])
tmp_df_1.subtract(tmp_df_2).show() # The differences of these two DataFrames are shown.

# Verify that the results are the same
tmp_df_3 = train_df.where(train_df['Age']>30).select(['PassengerId', 'Age', 'TotalBill'])
tmp_df_1.subtract(tmp_df_3).show() # The differences of these two DataFrames are shown.

# Verify that the results are the same
tmp_df_4 = train_df.where(col('Age')>30).select(['PassengerId', 'Age', 'TotalBill'])
tmp_df_1.subtract(tmp_df_4).show() # The differences of these two DataFrames are shown.

# Verify that the results are the same
tmp_df_5 = train_df.filter('Age>30').select(['PassengerId', 'Age', 'TotalBill'])
tmp_df_1.subtract(tmp_df_5).show() # The differences of these two DataFrames are shown.

# Choose records with multiple conditions
tmp_df_6 = train_df.filter((train_df['Age']>30) & (train_df['TotalBill']>1000)).select(['PassengerId', 'Age', 'TotalBill'])
tmp_df_6.show(n=5, truncate=False)

+-----------+----+---------+
|PassengerId|Age |TotalBill|
+-----------+----+---------+
|5915_01    |32.0|788.0    |
|6020_01    |39.0|0.0      |
|2590_03    |35.0|0.0      |
|1552_01    |37.0|0.0      |
|2811_01    |44.0|871.0    |
+-----------+----+---------+
only showing top 5 rows

+-----------+---+---------+
|PassengerId|Age|TotalBill|
+-----------+---+---------+
+-----------+---+---------+

+-----------+---+---------+
|PassengerId|Age|TotalBill|
+-----------+---+---------+
+-----------+---+---------+

+-----------+---+---------+
|PassengerId|Age|TotalBill|
+-----------+---+---------+
+-----------+---+---------+

+-----------+---+---------+
|PassengerId|Age|TotalBill|
+-----------+---+---------+
+-----------+---+---------+

+-----------+----+---------+
|PassengerId|Age |TotalBill|
+-----------+----+---------+
|6850_03    |52.0|1621.0   |
|8988_07    |53.0|1554.0   |
|2476_01    |35.0|1265.0   |
|1144_02    |35.0|1396.0   |
|2396_01    |46.0|1625.0   |
+-----------+----+---------+
o

In [37]:
# SQL 
query = """
SELECT 
    PassengerId
    , Age
    , TotalBill
FROM
    train_df
WHERE
    (Age > 30) 
    and (TotalBill > 1000)
"""

spark.sql(query).show(n=5, truncate=False)

+-----------+----+---------+
|PassengerId|Age |TotalBill|
+-----------+----+---------+
|6850_03    |52.0|1621.0   |
|8988_07    |53.0|1554.0   |
|2476_01    |35.0|1265.0   |
|1144_02    |35.0|1396.0   |
|2396_01    |46.0|1625.0   |
+-----------+----+---------+
only showing top 5 rows



<a id ="3.7"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.7 Conditional branching</center></h2>
[Back to the TOC](#toc)

In [38]:
train_df = train_df.withColumn(
    'AgeGroup',
    when(train_df['Age']<20, 'junior')
    .when((train_df['Age']>=20) & (train_df['Age']<30), 'young')
    .when((train_df['Age']>=30) & (train_df['Age']<55), 'middle')
    .when((train_df['Age']>=55) & (train_df['Age']<70), 'senior')
    .otherwise('elderly')
)

train_df = train_df.withColumn(
    'VIPflg',
    when(train_df.VIP==True, lit(1))
    .when(train_df.VIP!=True, lit(0))
)

planets = [
    'Mercury',
    'Venus',
    'Earth',
    'Mars',
    'Jupiter',
    'Saturn',
    'Uranus',
    'Neptune',
]
train_df = train_df.withColumn(
    'Planetflg',
    when(train_df['HomePlanet'].isin(planets), train_df['HomePlanet'])
    .when(train_df['HomePlanet']=='Pluto', 'DwarfPlanet')
    .otherwise(lit('Satellite'))
)

train_df.select('Age', 'AgeGroup', 'VIP', 'VIPflg', 'HomePlanet', 'Planetflg').show(n=10)

+----+--------+-----+------+----------+---------+
| Age|AgeGroup|  VIP|VIPflg|HomePlanet|Planetflg|
+----+--------+-----+------+----------+---------+
| 0.0|  junior|false|     0|     Earth|    Earth|
|21.0|   young|false|     0|     Earth|    Earth|
|32.0|  middle|false|     0|     Earth|    Earth|
|39.0|  middle|false|     0|     Earth|    Earth|
|16.0|  junior|false|     0|     Earth|    Earth|
|35.0|  middle|false|     0|     Earth|    Earth|
| 1.0|  junior|false|     0|     Earth|    Earth|
|37.0|  middle|false|     0|     Earth|    Earth|
|NULL| elderly|false|     0|     Earth|    Earth|
| 7.0|  junior|false|     0|     Earth|    Earth|
+----+--------+-----+------+----------+---------+
only showing top 10 rows



In [39]:
# SQL 
query = """
SELECT
    Age
    , CASE
        WHEN Age < 20 THEN 'junior'
        WHEN (Age >=20) AND (Age<30) THEN 'young'
        WHEN Age BETWEEN 30 AND 55 THEN 'middle' -- Age==30 and Age==55 are classified as 'middle'
        WHEN Age>=55 AND Age<70 THEN 'senior'
        ELSE 'elderly'
    END AS AgeGroup
FROM
    train_df
"""

spark.sql(query).show(n=5, truncate=False)

+----+--------+
|Age |AgeGroup|
+----+--------+
|0.0 |junior  |
|21.0|young   |
|32.0|middle  |
|39.0|middle  |
|16.0|junior  |
+----+--------+
only showing top 5 rows



<a id ="3.8"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.8 Data type conversion</center></h2>
[Back to the TOC](#toc)

In [40]:
print(train_df.dtypes)
train_df.select(train_df['Transported']).show(n=5, truncate=False)

train_df_new = train_df.withColumn(
    'Transported',
    train_df['Transported'].cast('int')
)
print(train_df.dtypes)
train_df_new.select(train_df_new['Transported']).show(n=5, truncate=False)

[('PassengerId', 'string'), ('CryoSleep', 'boolean'), ('Cabin', 'string'), ('Destination', 'string'), ('Age', 'float'), ('VIP', 'boolean'), ('RoomService', 'float'), ('FoodCourt', 'float'), ('ShoppingMall', 'float'), ('Spa', 'float'), ('VRDeck', 'float'), ('Name', 'string'), ('Transported', 'boolean'), ('HomePlanet', 'string'), ('TotalBill', 'float'), ('AgeGroup', 'string'), ('VIPflg', 'int'), ('Planetflg', 'string')]
+-----------+
|Transported|
+-----------+
|true       |
|true       |
|false      |
|false      |
|false      |
+-----------+
only showing top 5 rows

[('PassengerId', 'string'), ('CryoSleep', 'boolean'), ('Cabin', 'string'), ('Destination', 'string'), ('Age', 'float'), ('VIP', 'boolean'), ('RoomService', 'float'), ('FoodCourt', 'float'), ('ShoppingMall', 'float'), ('Spa', 'float'), ('VRDeck', 'float'), ('Name', 'string'), ('Transported', 'boolean'), ('HomePlanet', 'string'), ('TotalBill', 'float'), ('AgeGroup', 'string'), ('VIPflg', 'int'), ('Planetflg', 'string')]
+----

<a id ="3.9"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.9 Statistical Analysis</center></h2>
[Back to the TOC](#toc)

In [41]:
# describe()
train_df.describe().show()

23/11/14 03:01:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-----------+-----+-----------+------------------+-----------------+------------------+------------------+-----------------+------------------+----------------+----------+------------------+--------+--------------------+---------+
|summary|PassengerId|Cabin|Destination|               Age|      RoomService|         FoodCourt|      ShoppingMall|              Spa|            VRDeck|            Name|HomePlanet|         TotalBill|AgeGroup|              VIPflg|Planetflg|
+-------+-----------+-----+-----------+------------------+-----------------+------------------+------------------+-----------------+------------------+----------------+----------+------------------+--------+--------------------+---------+
|  count|       8693| 8494|       8511|              8514|             8512|              8510|              8485|             8510|              8505|            8493|      8492|              7785|    8693|                8490|     8693|
|   mean|       NULL| NULL|       NULL| 28.8

                                                                                

In [42]:
# summary()
train_df.summary().show()



+-------+-----------+-----+-----------+------------------+-----------------+------------------+------------------+-----------------+------------------+----------------+----------+------------------+--------+--------------------+---------+
|summary|PassengerId|Cabin|Destination|               Age|      RoomService|         FoodCourt|      ShoppingMall|              Spa|            VRDeck|            Name|HomePlanet|         TotalBill|AgeGroup|              VIPflg|Planetflg|
+-------+-----------+-----+-----------+------------------+-----------------+------------------+------------------+-----------------+------------------+----------------+----------+------------------+--------+--------------------+---------+
|  count|       8693| 8494|       8511|              8514|             8512|              8510|              8485|             8510|              8505|            8493|      8492|              7785|    8693|                8490|     8693|
|   mean|       NULL| NULL|       NULL| 28.8

                                                                                

In [43]:
# Choose columns
print('describe')
train_df.describe(['Age', 'TotalBill']).show()
print('summary')
train_df.select('Age', 'TotalBill').summary().show()

describe
+-------+------------------+------------------+
|summary|               Age|         TotalBill|
+-------+------------------+------------------+
|  count|              8514|              7785|
|   mean| 28.82793046746535|1484.6015414258188|
| stddev|14.489021423908795|2845.2882407440616|
|    min|               0.0|               0.0|
|    max|              79.0|           35987.0|
+-------+------------------+------------------+

summary
+-------+------------------+------------------+
|summary|               Age|         TotalBill|
+-------+------------------+------------------+
|  count|              8514|              7785|
|   mean| 28.82793046746535|1484.6015414258188|
| stddev|14.489021423908795|2845.2882407440616|
|    min|               0.0|               0.0|
|    25%|              19.0|               0.0|
|    50%|              27.0|             736.0|
|    75%|              38.0|            1486.0|
|    max|              79.0|           35987.0|
+-------+-------------

In [44]:
# Other functions
train_df.agg({'Age':'mean', 'TotalBill':'sum'}).show()
train_df.agg(avg('Age').alias('avg_age'), sum('Age').alias('sum_age')).show()

+--------------+-----------------+
|sum(TotalBill)|         avg(Age)|
+--------------+-----------------+
|   1.1557623E7|28.82793046746535|
+--------------+-----------------+

+-----------------+--------+
|          avg_age| sum_age|
+-----------------+--------+
|28.82793046746535|245441.0|
+-----------------+--------+



In [45]:
# formatting
sales_std = train_df.select(stddev('Age').alias('StdAge'))
sales_std.show()
sales_std.select(format_number('StdAge', 2).alias('StdAge')).show()

+------------------+
|            StdAge|
+------------------+
|14.489021423908795|
+------------------+

+------+
|StdAge|
+------+
| 14.49|
+------+



<a id ="3.10"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.10 GroupBy</center></h2>
[Back to the TOC](#toc)

In [46]:
train_df.groupBy('HomePlanet').count().sort(desc('count')).show()

+----------+-----+
|HomePlanet|count|
+----------+-----+
|     Earth| 4602|
|    Europa| 2131|
|      Mars| 1759|
|      NULL|  201|
+----------+-----+



In [47]:
# Group by multiple conditions
train_df.groupBy('HomePlanet', 'VIP').count().sort(asc('count')).show()

+----------+-----+-----+
|HomePlanet|  VIP|count|
+----------+-----+-----+
|      NULL| NULL|    3|
|      NULL| true|    5|
|    Europa| NULL|   42|
|      Mars| NULL|   43|
|      Mars| true|   63|
|     Earth| NULL|  115|
|    Europa| true|  131|
|      NULL|false|  193|
|      Mars|false| 1653|
|    Europa|false| 1958|
|     Earth|false| 4487|
+----------+-----+-----+



In [48]:
# Get average values
train_df.groupby('HomePlanet').mean('TotalBill').show()

# Combination with filter() function
train_df.groupBy('HomePlanet').mean('TotalBill') \
        .filter(col('avg(TotalBill)') >= 1000).show()

# mean() function calls agg(avg()) function. After agg() function, you can use alias() function for renaming.
train_df.groupBy('HomePlanet').agg(avg('TotalBill') \
        .alias('mean_total_bill')).filter(col('mean_total_bill') >= 1000) \
        .sort(desc('mean_total_bill')).show()

+----------+------------------+
|HomePlanet|    avg(TotalBill)|
+----------+------------------+
|     Earth| 688.4433065106072|
|    Europa|3552.7497398543182|
|      NULL|1146.9562841530055|
|      Mars|1074.1222292590246|
+----------+------------------+

+----------+------------------+
|HomePlanet|    avg(TotalBill)|
+----------+------------------+
|    Europa|3552.7497398543182|
|      NULL|1146.9562841530055|
|      Mars|1074.1222292590246|
+----------+------------------+

+----------+------------------+
|HomePlanet|   mean_total_bill|
+----------+------------------+
|    Europa|3552.7497398543182|
|      NULL|1146.9562841530055|
|      Mars|1074.1222292590246|
+----------+------------------+



In [49]:
# Group by multiple conditions and aggregate multiple functions
train_df.groupBy('HomePlanet').agg(avg('TotalBill').alias('avg_total_bill'), \
                                   stddev('TotalBill').alias('stddev_total_bill')) \
                                   .show()

+----------+------------------+------------------+
|HomePlanet|    avg_total_bill| stddev_total_bill|
+----------+------------------+------------------+
|     Earth| 688.4433065106072| 774.1283734325697|
|    Europa|3552.7497398543182| 4885.469155426432|
|      NULL|1146.9562841530055|1980.9960360338232|
|      Mars|1074.1222292590246| 1352.855516696299|
+----------+------------------+------------------+



In [50]:
# Pivot Table 
# You can get aggregation group members as a header by pivottable.
train_df.groupBy('HomePlanet').agg(count('VIP').alias('VIPCount')).show()
train_df.groupby('HomePlanet').pivot('VIP').count().show()
train_df.groupby('HomePlanet').pivot('VIP').max('TotalBill').show()

+----------+--------+
|HomePlanet|VIPCount|
+----------+--------+
|     Earth|    4487|
|    Europa|    2089|
|      NULL|     198|
|      Mars|    1716|
+----------+--------+

+----------+----+-----+----+
|HomePlanet|null|false|true|
+----------+----+-----+----+
|      NULL|   3|  193|   5|
|      Mars|  43| 1653|  63|
|     Earth| 115| 4487|NULL|
|    Europa|  42| 1958| 131|
+----------+----+-----+----+

+----------+-------+-------+-------+
|HomePlanet|   null|  false|   true|
+----------+-------+-------+-------+
|      NULL| 2416.0|13558.0| 3605.0|
|      Mars| 9597.0|10801.0| 5014.0|
|     Earth| 6279.0| 6335.0|   NULL|
|    Europa|19775.0|35987.0|31076.0|
+----------+-------+-------+-------+



In [51]:
# SQL
query = """
SELECT
    HomePlanet
    , COUNT(*) AS Count
    , COUNT(DISTINCT PassengerId) as TotalIDs
    , AVG(TotalBill) AS mean_total_bill
    , MAX(TotalBill) AS max_total_bill
    , SUM(TotalBill) AS sum_of_total_bill
FROM
    train_df
GROUP BY
    HomePlanet
HAVING
    mean_total_bill > 1000
ORDER BY
    mean_total_bill DESC
"""

spark.sql(query).show(n=5, truncate=False)

+----------+-----+--------+------------------+--------------+-----------------+
|HomePlanet|Count|TotalIDs|mean_total_bill   |max_total_bill|sum_of_total_bill|
+----------+-----+--------+------------------+--------------+-----------------+
|Europa    |2131 |2131    |3552.7497398543182|35987.0       |6828385.0        |
|NULL      |201  |201     |1146.9562841530055|13558.0       |209893.0         |
|Mars      |1759 |1759    |1074.1222292590246|10801.0       |1696039.0        |
+----------+-----+--------+------------------+--------------+-----------------+



<a id ="3.11"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.11 Window functions</center></h2>
[Back to the TOC](#toc)

In [52]:
# rank function
window_schema = Window.partitionBy(train_df['HomePlanet']) \
                .orderBy(train_df['TotalBill'].desc())

tmp_df = train_df.withColumn(
    'TotalBillRank', rank().over(window_schema)).select('HomePlanet', 'TotalBill', 'TotalBillRank')

tmp_df.show(n=10)

+----------+---------+-------------+
|HomePlanet|TotalBill|TotalBillRank|
+----------+---------+-------------+
|      NULL|  13558.0|            1|
|      NULL|  12473.0|            2|
|      NULL|   9612.0|            3|
|      NULL|   9307.0|            4|
|      NULL|   7177.0|            5|
|      NULL|   6628.0|            6|
|      NULL|   5240.0|            7|
|      NULL|   4931.0|            8|
|      NULL|   4766.0|            9|
|      NULL|   4537.0|           10|
+----------+---------+-------------+
only showing top 10 rows



In [53]:
# lag function
window_schema = Window.partitionBy(tmp_df['HomePlanet']) \
                .orderBy(tmp_df['TotalBillRank'].asc())

tmp_df.withColumn('TotalBillLag', lag('TotalBill', 1).over(window_schema)).select('HomePlanet', 'TotalBill', 'TotalBillRank', 'TotalBillLag').show(n=10)

+----------+---------+-------------+------------+
|HomePlanet|TotalBill|TotalBillRank|TotalBillLag|
+----------+---------+-------------+------------+
|      NULL|  13558.0|            1|        NULL|
|      NULL|  12473.0|            2|     13558.0|
|      NULL|   9612.0|            3|     12473.0|
|      NULL|   9307.0|            4|      9612.0|
|      NULL|   7177.0|            5|      9307.0|
|      NULL|   6628.0|            6|      7177.0|
|      NULL|   5240.0|            7|      6628.0|
|      NULL|   4931.0|            8|      5240.0|
|      NULL|   4766.0|            9|      4931.0|
|      NULL|   4537.0|           10|      4766.0|
+----------+---------+-------------+------------+
only showing top 10 rows



In [54]:
# lead function
window_schema = Window.partitionBy(tmp_df['HomePlanet']) \
                .orderBy(tmp_df['TotalBillRank'].asc())

tmp_df.withColumn('TotalBillLead', lead('TotalBill', 2).over(window_schema)).select('HomePlanet', 'TotalBill', 'TotalBillRank', 'TotalBillLead').show(n=10)

+----------+---------+-------------+-------------+
|HomePlanet|TotalBill|TotalBillRank|TotalBillLead|
+----------+---------+-------------+-------------+
|      NULL|  13558.0|            1|       9612.0|
|      NULL|  12473.0|            2|       9307.0|
|      NULL|   9612.0|            3|       7177.0|
|      NULL|   9307.0|            4|       6628.0|
|      NULL|   7177.0|            5|       5240.0|
|      NULL|   6628.0|            6|       4931.0|
|      NULL|   5240.0|            7|       4766.0|
|      NULL|   4931.0|            8|       4537.0|
|      NULL|   4766.0|            9|       4393.0|
|      NULL|   4537.0|           10|       4042.0|
+----------+---------+-------------+-------------+
only showing top 10 rows



In [55]:
# SQL
query = """
SELECT
    HomePlanet
    , TotalBill
    , RANK() OVER (
        PARTITION BY HomePlanet
        ORDER BY TotalBill DESC
        ) AS TotalBillRank
FROM
    train_df
"""

spark.sql(query).show(n=10, truncate=False)

+----------+---------+-------------+
|HomePlanet|TotalBill|TotalBillRank|
+----------+---------+-------------+
|NULL      |13558.0  |1            |
|NULL      |12473.0  |2            |
|NULL      |9612.0   |3            |
|NULL      |9307.0   |4            |
|NULL      |7177.0   |5            |
|NULL      |6628.0   |6            |
|NULL      |5240.0   |7            |
|NULL      |4931.0   |8            |
|NULL      |4766.0   |9            |
|NULL      |4537.0   |10           |
+----------+---------+-------------+
only showing top 10 rows



In [56]:
# SQL
query = """
WITH master AS (
    SELECT
        HomePlanet
        , TotalBill
        , RANK() OVER (
            PARTITION BY HomePlanet
            ORDER BY TotalBill DESC
        ) AS TotalBillRank
    FROM
        train_df
)

SELECT
    HomePlanet
    , TotalBill
    , TotalBillRank
    , LAG(TotalBill) OVER (
        PARTITION BY HomePlanet
        ORDER BY TotalBillRank ASC
    ) AS TotalBillLag
FROM
    master
"""

spark.sql(query).show(n=10, truncate=False)

+----------+---------+-------------+------------+
|HomePlanet|TotalBill|TotalBillRank|TotalBillLag|
+----------+---------+-------------+------------+
|NULL      |13558.0  |1            |NULL        |
|NULL      |12473.0  |2            |13558.0     |
|NULL      |9612.0   |3            |12473.0     |
|NULL      |9307.0   |4            |9612.0      |
|NULL      |7177.0   |5            |9307.0      |
|NULL      |6628.0   |6            |7177.0      |
|NULL      |5240.0   |7            |6628.0      |
|NULL      |4931.0   |8            |5240.0      |
|NULL      |4766.0   |9            |4931.0      |
|NULL      |4537.0   |10           |4766.0      |
+----------+---------+-------------+------------+
only showing top 10 rows



In [57]:
# SQL
query = """
WITH master AS (
    SELECT
        HomePlanet
        , TotalBill
        , RANK() OVER (
            PARTITION BY HomePlanet
            ORDER BY TotalBill DESC
        ) AS TotalBillRank
    FROM
        train_df
)

SELECT
    HomePlanet
    , TotalBill
    , TotalBillRank
    , LEAD(TotalBill, 2) OVER (
        PARTITION BY HomePlanet
        ORDER BY TotalBillRank ASC
    ) AS TotalBillLead
FROM
    master
"""

spark.sql(query).show(n=10, truncate=False)

+----------+---------+-------------+-------------+
|HomePlanet|TotalBill|TotalBillRank|TotalBillLead|
+----------+---------+-------------+-------------+
|NULL      |13558.0  |1            |9612.0       |
|NULL      |12473.0  |2            |9307.0       |
|NULL      |9612.0   |3            |7177.0       |
|NULL      |9307.0   |4            |6628.0       |
|NULL      |7177.0   |5            |5240.0       |
|NULL      |6628.0   |6            |4931.0       |
|NULL      |5240.0   |7            |4766.0       |
|NULL      |4931.0   |8            |4537.0       |
|NULL      |4766.0   |9            |4393.0       |
|NULL      |4537.0   |10           |4042.0       |
+----------+---------+-------------+-------------+
only showing top 10 rows



In [58]:
# SQL; moving average
query = """
WITH master AS (
    SELECT
        HomePlanet
        , VIP
        , TotalBill
        , RANK() OVER (
            PARTITION BY HomePlanet
            ORDER BY TotalBill DESC
        ) AS TotalBillRank
    FROM
        train_df
)

SELECT
    HomePlanet
    , VIP
    , TotalBill
    , TotalBillRank
    , AVG(TotalBill) OVER (
        PARTITION BY HomePlanet, VIP
        ORDER BY TotalBillRank ASC
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) AS TotalBillAvg
FROM
    master
"""

spark.sql(query).show(n=10, truncate=False)

+----------+-----+---------+-------------+-----------------+
|HomePlanet|VIP  |TotalBill|TotalBillRank|TotalBillAvg     |
+----------+-----+---------+-------------+-----------------+
|NULL      |NULL |2416.0   |22           |2416.0           |
|NULL      |NULL |922.0    |66           |1669.0           |
|NULL      |NULL |598.0    |105          |1312.0           |
|NULL      |false|13558.0  |1            |13558.0          |
|NULL      |false|12473.0  |2            |13015.5          |
|NULL      |false|9612.0   |3            |11881.0          |
|NULL      |false|9307.0   |4            |10464.0          |
|NULL      |false|7177.0   |5            |8698.666666666666|
|NULL      |false|6628.0   |6            |7704.0           |
|NULL      |false|5240.0   |7            |6348.333333333333|
+----------+-----+---------+-------------+-----------------+
only showing top 10 rows



<a id ="3.12"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.12 Handling of null values</center></h2>
[Back to the TOC](#toc)

In [59]:
# Checking Pandas DataFrame
train_pandas_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8693 entries, 0 to 8692
Data columns (total 14 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   PassengerId   8693 non-null   object 
 1   HomePlanet    8492 non-null   object 
 2   CryoSleep     8476 non-null   object 
 3   Cabin         8494 non-null   object 
 4   Destination   8511 non-null   object 
 5   Age           8514 non-null   float64
 6   VIP           8490 non-null   object 
 7   RoomService   8512 non-null   float64
 8   FoodCourt     8510 non-null   float64
 9   ShoppingMall  8485 non-null   float64
 10  Spa           8510 non-null   float64
 11  VRDeck        8505 non-null   float64
 12  Name          8493 non-null   object 
 13  Transported   8693 non-null   bool   
dtypes: bool(1), float64(6), object(7)
memory usage: 891.5+ KB


In [60]:
# Delete the record which contains nll values
print(train_df.count())
print(train_df.dropna('any').count())

8693
6606


In [61]:
# Checking null and NaN values
print('---------Cryosleep---------')
print('total number: ', train_df.select('CryoSleep').count())
try:
    print('number of null values: ', train_df[train_df['CryoSleep'].isNull()].count())
except Exception as e:
    print('number of null values: error') 
try:
    print('number of NaN values: ', train_df[isnan(train_df['CryoSleep'])].count())
except Exception as e:
    print('number of NaN values: error') 
    
print('---------Age---------')
print('total number: ', train_df.select('Age').count())
try:
    print('number of null values: ', train_df[train_df['Age'].isNull()].count())
except Exception as e:
    print('number of null values: error') 
try:
    print('number of NaN values: ', train_df[isnan(train_df['Age'])].count())
except Exception as e:
    print('number of NaN values: error') 

---------Cryosleep---------
total number:  8693
number of null values:  217
number of NaN values: error
---------Age---------
total number:  8693
number of null values:  179
number of NaN values:  0


In [62]:
# filling null values

# This result shows that you have to match the data type when you fill null values.
train_df_new = train_df.fillna('unknown', subset=['CryoSleep'])
print('---------Data type not match---------')
print('total number: ', train_df_new.select('CryoSleep').count())
try:
    print('number of null values: ', train_df_new[train_df_new['CryoSleep'].isNull()].count())
except Exception as e:
    print('number of null values: error') 
try:
    print('number of NaN values: ', train_df_new[isnan(train_df_new['CryoSleep'])].count())
except Exception as e:
    print('number of NaN values: error') 
    
train_df_new = train_df.fillna(True, subset=['CryoSleep'])
print('---------Data type match---------')
print('total number: ', train_df_new.select('CryoSleep').count())
try:
    print('number of null values: ', train_df_new[train_df_new['CryoSleep'].isNull()].count())
except Exception as e:
    print('number of null values: error') 
try:
    print('number of NaN values: ', train_df_new[isnan(train_df_new['CryoSleep'])].count())
except Exception as e:
    print('number of NaN values: error') 

---------Data type not match---------
total number:  8693
number of null values:  217
number of NaN values: error
---------Data type match---------
total number:  8693
number of null values:  0
number of NaN values: error


In [63]:
# fillna with mode or mean
target_cols = {
    'CryoSleep': 'mode',
    'Age': 'mean'
}
replacement_values = {}
for key in target_cols:
    if target_cols[key] == 'mode':
        tmp_df = train_df.groupBy(key).count().orderBy(desc('count')).limit(1)
        mode_value = tmp_df.select(key).collect()[0][0] #collect() returns result as a list
        replacement_values[key] = mode_value
    elif target_cols[key] == 'mean':
        mean_value = train_df.select(mean(key)).collect()[0][0]
        replacement_values[key] = mean_value

print('---------before---------')
train_df.groupBy('CryoSleep').count().show()
print('number of null values of Age: ', train_df[train_df['Age'].isNull()].count())
print('number of mean values of Age: ', train_df.select(mean(train_df['Age'])).collect()[0][0])

train_df_new = train_df.fillna(replacement_values)

print('---------after---------')
train_df_new.groupBy('CryoSleep').count().show()
print('number of null values of Age: ', train_df_new[train_df_new['Age'].isNull()].count())
print('number of mean values of Age: ', train_df_new.select(mean(train_df_new['Age'])).collect()[0][0])

---------before---------
+---------+-----+
|CryoSleep|count|
+---------+-----+
|     NULL|  217|
|     true| 3037|
|    false| 5439|
+---------+-----+

number of null values of Age:  179
number of mean values of Age:  28.82793046746535
---------after---------
+---------+-----+
|CryoSleep|count|
+---------+-----+
|     true| 3037|
|    false| 5656|
+---------+-----+

number of null values of Age:  0
number of mean values of Age:  28.827930467114765


In [64]:
# SQL 
# replacement of null with fixed values
query = """
SELECT
    Age
    , IFNULL(Age, 0) AS FixedAge
FROM
    train_df
WHERE 
    Age IS NULL
"""

spark.sql(query).show(n=5, truncate=False)

+----+--------+
|Age |FixedAge|
+----+--------+
|NULL|0.0     |
|NULL|0.0     |
|NULL|0.0     |
|NULL|0.0     |
|NULL|0.0     |
+----+--------+
only showing top 5 rows



In [65]:
# SQL 
# replacement of null with values of othre column values
query = """
SELECT
    Age
    , COALESCE(Age, TotalBill) AS FixedAge
FROM
    train_df
WHERE 
    Age IS NULL
"""

spark.sql(query).show(n=5, truncate=False)

+----+--------+
|Age |FixedAge|
+----+--------+
|NULL|756.0   |
|NULL|0.0     |
|NULL|0.0     |
|NULL|908.0   |
|NULL|783.0   |
+----+--------+
only showing top 5 rows



<a id ="3.13"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>3.13 Join DataFrames</center></h2>
[Back to the TOC](#toc)

In [66]:
df_1 = train_df.select('PassengerId', 'Cryosleep')
df_2 = train_df.select('PassengerId', 'Transported')

df_1.join(df_2, on=[df_1['PassengerId']==df_2['PassengerId']], how='inner').show(n=5, truncate=False)

+-----------+---------+-----------+-----------+
|PassengerId|Cryosleep|PassengerId|Transported|
+-----------+---------+-----------+-----------+
|5863_01    |false    |5863_01    |true       |
|7456_01    |true     |7456_01    |true       |
|5915_01    |false    |5915_01    |false      |
|6020_01    |true     |6020_01    |false      |
|1565_01    |false    |1565_01    |false      |
+-----------+---------+-----------+-----------+
only showing top 5 rows



In [67]:
# SQL 
query = """
WITH df_1 AS (
    SELECT PassengerId, CryoSleep
    FROM train_df
),
    df_2 AS (
    SELECT PassengerId, Transported
    FROM train_df
)


SELECT *
FROM df_1
INNER JOIN df_2
USING (PassengerId)
"""

spark.sql(query).show(n=5, truncate=False)

+-----------+---------+-----------+
|PassengerId|CryoSleep|Transported|
+-----------+---------+-----------+
|5863_01    |false    |true       |
|7456_01    |true     |true       |
|5915_01    |false    |false      |
|6020_01    |true     |false      |
|1565_01    |false    |false      |
+-----------+---------+-----------+
only showing top 5 rows



<a id ="4"></a><h1 style="background:#05445E; border:0; border-radius: 12px; color:#D3D3D3"><center>4. Machine Learning with PySpark MLlib</center></h1>

<a id ="4.1"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>4.1 Logistic Regression</center></h2>
[Back to the TOC](#toc)

In [68]:
# Checking schema
train_df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- CryoSleep: boolean (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- VIP: boolean (nullable = true)
 |-- RoomService: float (nullable = true)
 |-- FoodCourt: float (nullable = true)
 |-- ShoppingMall: float (nullable = true)
 |-- Spa: float (nullable = true)
 |-- VRDeck: float (nullable = true)
 |-- Name: string (nullable = true)
 |-- Transported: boolean (nullable = true)
 |-- HomePlanet: string (nullable = true)
 |-- TotalBill: float (nullable = true)
 |-- AgeGroup: string (nullable = false)
 |-- VIPflg: integer (nullable = true)
 |-- Planetflg: string (nullable = true)



In [69]:
# Simply drop the null values and duplicates
print('before: ', train_df.count())

train_df = train_df.dropna('any')
train_df = train_df.dropDuplicates()

print('after: ', train_df.count())

before:  8693
after:  6606


In [70]:
# Response valuable
# boolean -> numerical
train_data = train_df.withColumn('y',
                   when(col('Transported')==True, lit(1.0))
                   .when(train_df['Transported']==False, lit(0.0)))

In [71]:
# Checking explanatory varibales
cat_features = [
    'HomePlanet',
    'CryoSleep',
    #'Cabin',
    'Destination',
    'VIP',
    'AgeGroup'
]

for feature in cat_features:
    print(f'---------{feature}---------')
    train_data.select(feature).groupby(feature).count().show()

---------HomePlanet---------
+----------+-----+
|HomePlanet|count|
+----------+-----+
|      Mars| 1367|
|     Earth| 3566|
|    Europa| 1673|
+----------+-----+

---------CryoSleep---------
+---------+-----+
|CryoSleep|count|
+---------+-----+
|     true| 2332|
|    false| 4274|
+---------+-----+

---------Destination---------
+-------------+-----+
|  Destination|count|
+-------------+-----+
|PSO J318.5-22|  623|
|  TRAPPIST-1e| 4576|
|  55 Cancri e| 1407|
+-------------+-----+

---------VIP---------
+-----+-----+
|  VIP|count|
+-----+-----+
| true|  162|
|false| 6444|
+-----+-----+

---------AgeGroup---------
+--------+-----+
|AgeGroup|count|
+--------+-----+
| elderly|   36|
|  junior| 1670|
|  senior|  363|
|   young| 2131|
|  middle| 2406|
+--------+-----+



In [72]:
# Explanatory varibales

# boolean features
# boolean -> numerical
boolean_features = [
    'CryoSleep',
    'VIP',
]
for feature in boolean_features:
    train_data = train_data.withColumn(feature,
                   when(col(feature)==True, lit(1.0))
                   .when(col(feature)==False, lit(0.0)))
# numerical -> onehot
bool_onehot_encoder_list = []
bool_onehot_encoder_fitted_list = []
for feature in boolean_features:
    bool_onehot_encoder_list.append(OneHotEncoder(inputCol=feature, outputCol=f'{feature}Encoded'))
for onehot_encoder in bool_onehot_encoder_list:
    bool_onehot_encoder_fitted_list.append(onehot_encoder.fit(train_data))
for onehot_encoder in bool_onehot_encoder_fitted_list:
    train_data = onehot_encoder.transform(train_data)

# categorical features
# categorical -> numerical
cat_features = [
    'HomePlanet',
    #'Cabin',
    'Destination',
    'AgeGroup'
]
cat_string_indexer_list = []
cat_string_indexer_fitted_list = []
for feature in cat_features:
    cat_string_indexer_list.append(StringIndexer(inputCol=feature, outputCol=f'{feature}Index'))
for string_indexer in cat_string_indexer_list:
    cat_string_indexer_fitted_list.append(string_indexer.fit(train_data))
for string_indexer in cat_string_indexer_fitted_list:
    train_data = string_indexer.transform(train_data)
# numerical -> onehot
cat_onehot_encoder_list = []
cat_onehot_encoder_fitted_list = []
for feature in cat_features:
    cat_onehot_encoder_list.append(OneHotEncoder(inputCol=f'{feature}Index', outputCol=f'{feature}Encoded'))
for onehot_encoder in cat_onehot_encoder_list:
    cat_onehot_encoder_fitted_list.append(onehot_encoder.fit(train_data))
for onehot_encoder in cat_onehot_encoder_fitted_list:
    train_data = onehot_encoder.transform(train_data)

# all features (numerical features)
num_features = [
    'RoomService',
    'FoodCourt',
    'ShoppingMall',
    'Spa',
    'VRDeck',
]

all_features = [f'{feature}Encoded' for feature in boolean_features] \
                + [f'{feature}Encoded' for feature in cat_features] \
                + num_features
assembler = VectorAssembler(inputCols=all_features, outputCol='features')
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

train_data = assembler.transform(train_data)
standard_scaler_fitted = scaler.fit(train_data)
train_data = standard_scaler_fitted.transform(train_data)
# Please note that numerical features are standardized, while one-hot encoded categorical features are not standardized.

print('---------before scaling---------')
train_df.select(num_features).show(n=5)
print('---------after scaling---------')
train_data.select('scaled_features').show(n=5)

---------before scaling---------
+-----------+---------+------------+-----+------+
|RoomService|FoodCourt|ShoppingMall|  Spa|VRDeck|
+-----------+---------+------------+-----+------+
|      124.0|      0.0|         0.0|679.0|   0.0|
|       90.0|     29.0|       102.0|244.0|  56.0|
|        0.0|    594.0|         0.0|  0.0| 119.0|
|        0.0|      0.0|         0.0|  0.0|   0.0|
|        0.0|      0.0|      1872.0|  1.0|   0.0|
+-----------+---------+------------+-----+------+
only showing top 5 rows

---------after scaling---------
+--------------------+
|     scaled_features|
+--------------------+
|(15,[0,1,2,4,6,10...|
|[2.09230100517416...|
|(15,[0,1,2,4,6,11...|
|(15,[0,1,2,5,8],[...|
|(15,[0,1,2,4,7,12...|
+--------------------+
only showing top 5 rows



In [73]:
# Validation data
train_data, valid_data = train_data.randomSplit([0.8, 0.2], seed=seed)

In [74]:
# Test data preparation

# fillna of test data with mod or mean of train data
target_cols = {
    'HomePlanet': 'mode',
    'CryoSleep': 'mode',
    'Destination': 'mode',
    'Age': 'mean',
    'VIP': 'mode',
    'RoomService': 'mean',
    'FoodCourt': 'mean',
    'ShoppingMall': 'mean',
    'Spa': 'mean',
    'VRDeck': 'mean',
}
replacement_values = {}
for key in target_cols:
    if target_cols[key] == 'mode':
        tmp_df = train_df.groupBy(key).count().orderBy(desc('count')).limit(1)
        mode_value = tmp_df.select(key).collect()[0][0] #collect() returns result as a list
        replacement_values[key] = mode_value
    elif target_cols[key] == 'mean':
        mean_value = train_df.select(mean(key)).collect()[0][0]
        replacement_values[key] = mean_value

test_df = test_df.fillna(replacement_values)

# Handling Age feature (numerical -> categorical)
test_data = test_df.withColumn(
    'AgeGroup',
    when(test_df['Age']<20, 'junior')
    .when((test_df['Age']>=20) & (test_df['Age']<30), 'young')
    .when((test_df['Age']>=30) & (test_df['Age']<55), 'middle')
    .when((test_df['Age']>=55) & (test_df['Age']<70), 'senior')
    .otherwise('elderly')
)

# boolean features
for feature in boolean_features:
    test_data = test_data.withColumn(feature,
                   when(col(feature)==True, lit(1.0))
                   .when(col(feature)==False, lit(0.0)))
for onehot_encoder in bool_onehot_encoder_fitted_list:
    test_data = onehot_encoder.transform(test_data)

# categorical features
for string_indexer in cat_string_indexer_fitted_list:
    test_data = string_indexer.transform(test_data)
for onehot_encoder in cat_onehot_encoder_fitted_list:
    test_data = onehot_encoder.transform(test_data)

test_data = assembler.transform(test_data)
test_data = standard_scaler_fitted.transform(test_data)

In [75]:
# Model Building and Training
from pyspark.ml.classification import LogisticRegression
clf = LogisticRegression(featuresCol='scaled_features', labelCol='y')
clf_model = clf.fit(train_data)

print(f'coefficients: {clf_model.coefficients}')
print('------------')
print(f'intercept: {clf_model.intercept}')

23/11/14 03:02:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


coefficients: [-0.6270979386912746,0.06111128873041614,-0.3146588699934995,0.650194576239723,-0.050338558770314554,0.20505047782832558,-0.16180866459937715,-0.11625589151619516,0.02890358401791087,-0.022022451761017095,-0.8568879373118231,0.8433832201353652,0.35786368222916354,-2.131429629544571,-1.9982102138968958]
------------
intercept: 1.3781214666046069


In [76]:
# Prediction
pred_train = clf_model.transform(train_data)
pred_valid = clf_model.transform(valid_data)

pred_train.select('rawPrediction', 'probability', 'prediction').show(n=5, truncate=False)

+----------------------------------------+----------------------------------------+----------+
|rawPrediction                           |probability                             |prediction|
+----------------------------------------+----------------------------------------+----------+
|[-1.5108183429234487,1.5108183429234487]|[0.18081754643930706,0.819182453560693] |1.0       |
|[1.7535338175076076,-1.7535338175076076]|[0.8523979655028872,0.14760203449711284]|0.0       |
|[4.158640046686162,-4.158640046686162]  |[0.9846117026794735,0.01538829732052649]|0.0       |
|[1.5425607593361017,-1.5425607593361017]|[0.8238366759733831,0.17616332402661694]|0.0       |
|[0.805862663341119,-0.805862663341119]  |[0.6912271628933722,0.30877283710662784]|0.0       |
+----------------------------------------+----------------------------------------+----------+
only showing top 5 rows



In [77]:
# Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='y')
train_AUC = evaluator.evaluate(pred_train)
valid_AUC = evaluator.evaluate(pred_valid)
print(f'train_AUC: {train_AUC}, valid_AUC: {valid_AUC}')

train_AUC: 0.8745891729320721, valid_AUC: 0.8831891140602602


In [78]:
# Test Prediction
pred_test = clf_model.transform(test_data)
pred_test.select('rawPrediction', 'probability', 'prediction').show(n=5, truncate=False)

+------------------------------------------+-----------------------------------------+----------+
|rawPrediction                             |probability                              |prediction|
+------------------------------------------+-----------------------------------------+----------+
|[-0.784160387576586,0.784160387576586]    |[0.3134239207124052,0.6865760792875948]  |1.0       |
|[5.467784794650542,-5.467784794650542]    |[0.9957971671134399,0.004202832886560093]|0.0       |
|[-3.43278812222004,3.43278812222004]      |[0.03128632075245673,0.9687136792475433] |1.0       |
|[-3.478697116919088,3.478697116919088]    |[0.029924478046349755,0.9700755219536502]|1.0       |
|[0.14690754251727123,-0.14690754251727123]|[0.5366609751012064,0.46333902489879364] |0.0       |
+------------------------------------------+-----------------------------------------+----------+
only showing top 5 rows



In [79]:
# Format to submission file
submission_data = pred_test.select('PassengerId', 'prediction').toPandas()
submission_df = submission_pandas_df.merge(submission_data, on='PassengerId')
submission_df.loc[submission_df['prediction']==1.0, 'Transported'] = True
submission_df.loc[submission_df['prediction']==0.0, 'Transported'] = False

submission_df = submission_df.drop('prediction', axis=1)
# submission_df.to_csv('submission.csv', index=False)
submission_df.head()

Unnamed: 0,PassengerId,Transported
0,0013_01,True
1,0018_01,False
2,0019_01,True
3,0021_01,True
4,0023_01,False


<a id ="4.2"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>4.2 Random Forest with PySpark Pipeline</center></h2>
[Back to the TOC](#toc)

In [80]:
# Response valuable
train_data = train_df.withColumn('y',
                   when(col('Transported')==True, lit(1.0))
                   .when(train_df['Transported']==False, lit(0.0)))
# Boolean -> numerical
boolean_features = [
    'CryoSleep',
    'VIP',
]
for feature in boolean_features:
    train_data = train_data.withColumn(f'{feature}_flg',
                   when(col(feature)==True, lit(1.0))
                   .when(col(feature)==False, lit(0.0)))

# Train-valid split
train_data, valid_data = train_data.randomSplit([0.8, 0.2], seed=seed)

# Boolean features encoder
boolean_flg_features = [
    'CryoSleep_flg',
    'VIP_flg',
]
bool_onehot_encoder_list = []
for feature in boolean_flg_features:
    bool_onehot_encoder_list.append(OneHotEncoder(inputCol=feature, outputCol=f'{feature}Encoded'))
    
# Categorical features indexer and encoder
cat_features = [
    'HomePlanet',
    #'Cabin',
    'Destination',
    'AgeGroup'
]
cat_string_indexer_list = []
for feature in cat_features:
    cat_string_indexer_list.append(StringIndexer(inputCol=feature, outputCol=f'{feature}Index'))
cat_onehot_encoder_list = []
for feature in cat_features:
    cat_onehot_encoder_list.append(OneHotEncoder(inputCol=f'{feature}Index', outputCol=f'{feature}Encoded'))

# All features assembler
num_features = [
    'RoomService',
    'FoodCourt',
    'ShoppingMall',
    'Spa',
    'VRDeck',
]
all_features = [f'{feature}Encoded' for feature in boolean_flg_features] \
                + [f'{feature}Encoded' for feature in cat_features] \
                + num_features
assembler = VectorAssembler(inputCols=all_features, outputCol='features')

# Numerical features scaler
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

# Model
from pyspark.ml.classification import RandomForestClassifier
clf = RandomForestClassifier(featuresCol='scaled_features', labelCol='y', impurity='gini')

# Pipeline
pipeline_stages = []
pipeline_stages.extend(bool_onehot_encoder_list)
pipeline_stages.extend(cat_string_indexer_list)
pipeline_stages.extend(cat_onehot_encoder_list)
pipeline_stages.extend([assembler, scaler, clf])

from pyspark.ml.pipeline import Pipeline
pipeline = Pipeline(stages=pipeline_stages)

In [81]:
# Model training
fitted_model = pipeline.fit(train_data)

# Checking of the trained model
feature_importances = list(zip(all_features, fitted_model.stages[-1].featureImportances))
pd.DataFrame(feature_importances, columns=['feature_name', 'feature_importance'])

Unnamed: 0,feature_name,feature_importance
0,CryoSleep_flgEncoded,0.376905
1,VIP_flgEncoded,0.000535
2,HomePlanetEncoded,0.009925
3,DestinationEncoded,0.015618
4,AgeGroupEncoded,0.010388
5,RoomService,0.007208
6,FoodCourt,0.002419
7,ShoppingMall,0.001607
8,Spa,0.008574
9,VRDeck,0.000176


In [82]:
# Prediction of train data
pred_train = fitted_model.transform(train_data)
pred_train.select('rawPrediction', 'probability', 'prediction').show(n=5, truncate=False)

# Evaluation of prediction of train data
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='y')
train_AUC = evaluator.evaluate(pred_train)
print('train AUC: ', train_AUC)

+--------------------------------------+----------------------------------------+----------+
|rawPrediction                         |probability                             |prediction|
+--------------------------------------+----------------------------------------+----------+
|[9.599557695251493,10.400442304748507]|[0.47997788476257464,0.5200221152374254]|1.0       |
|[14.931254117614953,5.068745882385046]|[0.7465627058807477,0.2534372941192523] |0.0       |
|[13.476251196953083,6.523748803046918]|[0.6738125598476541,0.3261874401523459] |0.0       |
|[15.234759234794982,4.765240765205017]|[0.7617379617397491,0.23826203826025086]|0.0       |
|[13.307780512617883,6.692219487382115]|[0.6653890256308942,0.33461097436910575]|0.0       |
+--------------------------------------+----------------------------------------+----------+
only showing top 5 rows

train AUC:  0.8621280070984696


In [83]:
# Prediction of validation data
pred_valid = fitted_model.transform(valid_data)
pred_valid.select('rawPrediction', 'probability', 'prediction').show(n=5, truncate=False)

# Evaluation of prediction of valid data
valid_AUC = evaluator.evaluate(pred_valid)
print('validation AUC: ', valid_AUC)

+---------------------------------------+----------------------------------------+----------+
|rawPrediction                          |probability                             |prediction|
+---------------------------------------+----------------------------------------+----------+
|[12.754011460306407,7.245988539693595] |[0.6377005730153203,0.36229942698467976]|0.0       |
|[10.905177488375125,9.094822511624873] |[0.5452588744187563,0.45474112558124363]|0.0       |
|[13.064341619482258,6.935658380517742] |[0.6532170809741129,0.3467829190258871] |0.0       |
|[14.607160745817167,5.3928392541828325]|[0.7303580372908584,0.26964196270914165]|0.0       |
|[3.8310896562128587,16.16891034378714] |[0.19155448281064294,0.808445517189357] |1.0       |
+---------------------------------------+----------------------------------------+----------+
only showing top 5 rows

validation AUC:  0.8506523493543774


In [84]:
# Test data preparation

# fillna of test data with mod or mean of train data
target_cols = {
    'HomePlanet': 'mode',
    'CryoSleep': 'mode',
    'Destination': 'mode',
    'Age': 'mean',
    'VIP': 'mode',
    'RoomService': 'mean',
    'FoodCourt': 'mean',
    'ShoppingMall': 'mean',
    'Spa': 'mean',
    'VRDeck': 'mean',
}
replacement_values = {}
for key in target_cols:
    if target_cols[key] == 'mode':
        tmp_df = train_df.groupBy(key).count().orderBy(desc('count')).limit(1)
        mode_value = tmp_df.select(key).collect()[0][0] #collect() returns result as a list
        replacement_values[key] = mode_value
    elif target_cols[key] == 'mean':
        mean_value = train_df.select(mean(key)).collect()[0][0]
        replacement_values[key] = mean_value

test_df = test_df.fillna(replacement_values)

# Handling Age feature (numerical -> categorical)
test_data = test_df.withColumn(
    'AgeGroup',
    when(test_df['Age']<20, 'junior')
    .when((test_df['Age']>=20) & (test_df['Age']<30), 'young')
    .when((test_df['Age']>=30) & (test_df['Age']<55), 'middle')
    .when((test_df['Age']>=55) & (test_df['Age']<70), 'senior')
    .otherwise('elderly')
)

# Boolean -> numerical
boolean_features = [
    'CryoSleep',
    'VIP',
]
for feature in boolean_features:
    test_data = test_data.withColumn(f'{feature}_flg',
                   when(col(feature)==True, lit(1.0))
                   .when(col(feature)==False, lit(0.0)))

# Test Prediction
pred_test = fitted_model.transform(test_data)
pred_test.select('rawPrediction', 'probability', 'prediction').show(n=5, truncate=False)

+---------------------------------------+----------------------------------------+----------+
|rawPrediction                          |probability                             |prediction|
+---------------------------------------+----------------------------------------+----------+
|[4.278432166736302,15.7215678332637]   |[0.2139216083368151,0.786078391663185]  |1.0       |
|[15.219475974506324,4.7805240254936745]|[0.7609737987253162,0.23902620127468371]|0.0       |
|[2.7502981019836663,17.249701898016333]|[0.13751490509918332,0.8624850949008167]|1.0       |
|[6.943152428402965,13.056847571597034] |[0.3471576214201483,0.6528423785798517] |1.0       |
|[12.655761825899049,7.344238174100953] |[0.6327880912949524,0.36721190870504766]|0.0       |
+---------------------------------------+----------------------------------------+----------+
only showing top 5 rows



In [85]:
# Format to submission file
submission_data = pred_test.select('PassengerId', 'prediction').toPandas()
submission_df = submission_pandas_df.merge(submission_data, on='PassengerId')
submission_df.loc[submission_df['prediction']==1.0, 'Transported'] = True
submission_df.loc[submission_df['prediction']==0.0, 'Transported'] = False

submission_df = submission_df.drop('prediction', axis=1)
submission_df.to_csv('submission.csv', index=False)
submission_df.head()

Unnamed: 0,PassengerId,Transported
0,0013_01,True
1,0018_01,False
2,0019_01,True
3,0021_01,True
4,0023_01,False


<a id ="4.3"></a><h2 style="background:#75E6DA; border:0; border-radius: 12px; color:black"><center>4.3 Clustering with k-means</center></h2>
[Back to the TOC](#toc)

In [86]:
num_features = [
    'RoomService',
    'FoodCourt',
    'ShoppingMall',
    'Spa',
    'VRDeck',
]
assembler = VectorAssembler(inputCols=num_features, outputCol='features')
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

# Model (k-means; n=4)
from pyspark.ml.clustering import KMeans
n = 5
kmeans = KMeans().setK(n).setSeed(seed)

# Pipeline
pipeline_stages = [assembler, scaler, kmeans]
pipeline = Pipeline(stages=pipeline_stages)

# Model training
fitted_model = pipeline.fit(train_data)

# Clustering train data
train_clustering = fitted_model.transform(train_data)

# Evaluation
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator() 
sil = evaluator.evaluate(train_clustering) # silhouette coefficient
print('silhouette coefficient: ', sil)

train_clustering.groupBy('prediction').count().show()

silhouette coefficient:  0.8144955661851929
+----------+-----+
|prediction|count|
+----------+-----+
|         1|    4|
|         3|   88|
|         4|  388|
|         2|   78|
|         0| 4711|
+----------+-----+



In [87]:
# Clustering test data

# Fill null values of numerical features of test data with its mean values of train data
for feature in num_features:
    mean_value = train_df.agg(avg(feature)).collect()[0][0]
    test_df = test_df.fillna(mean_value, subset=[feature])
    
# Test Prediction
test_clustering = fitted_model.transform(test_data)
print('silhouette coefficient: ', evaluator.evaluate(test_clustering))
test_clustering.groupBy('prediction').count().show()

silhouette coefficient:  0.8338002324470034
+----------+-----+
|prediction|count|
+----------+-----+
|         1|    4|
|         3|   67|
|         4|  276|
|         2|   61|
|         0| 3869|
+----------+-----+



In [88]:
# Searching for better 'k'
for k in range(2, 10):
    kmeans = KMeans().setK(k).setSeed(seed)
    pipeline_stages = [assembler, scaler, kmeans]
    pipeline = Pipeline(stages=pipeline_stages)
    fitted_model = pipeline.fit(train_data)
    train_clustering = fitted_model.transform(train_data)
    evaluator = ClusteringEvaluator()
    print('k: ', k, ', silhouette coefficient: ', evaluator.evaluate(train_clustering))

k:  2 , silhouette coefficient:  0.9054089418435607
k:  3 , silhouette coefficient:  0.8232085310618042
k:  4 , silhouette coefficient:  0.8269344531219524
k:  5 , silhouette coefficient:  0.8144955661851929
k:  6 , silhouette coefficient:  0.7645640018034273
k:  7 , silhouette coefficient:  0.7728433462584169
k:  8 , silhouette coefficient:  0.7753287435958771
k:  9 , silhouette coefficient:  0.7632038362580217
