## Spark Tutorial Tutorials
https://www.youtube.com/watch?v=WyZmM6K7ubc&list=PLZoTAELRMXVNjiiawhzZ0afHcPvC8jpcg&index=1&ab_channel=KrishNaik

https://github.com/kevinschaich/pyspark-cheatsheet

# Day 1

In [1]:
import pyspark
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


#### Data Loding 

In [2]:
# Starting Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [3]:
spark

#### Read Data using Pandas

In [4]:
pdf = pd.read_csv("shuttle_service_frq.csv")
pdf.head()

Unnamed: 0,Line No.,SHUTTLE NO.,Jan,Feb,Mar
0,SDL3,3,1,0,0
1,SDL2,26,0,0,0
2,SDL2,27,0,0,1
3,SDL3,28,0,0,1
4,SDL1,29,1,0,0


#### Read Data using Spark

In [5]:
sdf = spark.read.csv("shuttle_service_frq.csv")

In [6]:
sdf

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string]

In [7]:
sdf.head()

Row(_c0='Line No.', _c1='SHUTTLE NO.', _c2='Jan', _c3='Feb', _c4='Mar')

In [8]:
sdf.show()

+--------+-----------+---+---+---+
|     _c0|        _c1|_c2|_c3|_c4|
+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
|    SDL3|         28|  0|  0|  1|
|    SDL1|         29|  1|  0|  0|
|    SDL3|         30|  1|  0|  0|
|    SDL2|         31|  0|  0|  0|
|    SDL2|         32|  3|  0|  1|
|    SDL3|         33|  0|  1|  0|
|    SDL3|         34|  0|  0|  0|
|    SDL4|         35|  1|  0|  1|
|    SDL3|         36|  0|  1|  0|
|    SDL1|         37|  0|  0|  0|
|    SDL1|         38|  0|  0|  0|
|    SDL3|         39|  0|  0|  0|
|    SDL1|         40|  0|  0|  0|
|    SDL3|         41|  0|  0|  0|
|    SDL3|         42|  0|  0|  1|
|    SDL4|         43|  0|  0|  1|
+--------+-----------+---+---+---+
only showing top 20 rows



In [9]:
# sdf = spark.read.csv("shuttle_service_frq.csv")
sdf = spark.read.option('header', 'true').csv("shuttle_service_frq.csv")
sdf

DataFrame[Line No.: string, SHUTTLE NO.: string, Jan: string, Feb: string, Mar: string]

In [10]:
sdf.show()

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
|    SDL3|         28|  0|  0|  1|
|    SDL1|         29|  1|  0|  0|
|    SDL3|         30|  1|  0|  0|
|    SDL2|         31|  0|  0|  0|
|    SDL2|         32|  3|  0|  1|
|    SDL3|         33|  0|  1|  0|
|    SDL3|         34|  0|  0|  0|
|    SDL4|         35|  1|  0|  1|
|    SDL3|         36|  0|  1|  0|
|    SDL1|         37|  0|  0|  0|
|    SDL1|         38|  0|  0|  0|
|    SDL3|         39|  0|  0|  0|
|    SDL1|         40|  0|  0|  0|
|    SDL3|         41|  0|  0|  0|
|    SDL3|         42|  0|  0|  1|
|    SDL4|         43|  0|  0|  1|
|    SDL3|         44|  0|  0|  0|
+--------+-----------+---+---+---+
only showing top 20 rows



In [11]:
sdf.head(3)

[Row(Line No.='SDL3', SHUTTLE NO.='3', Jan='1', Feb='0', Mar='0'),
 Row(Line No.='SDL2', SHUTTLE NO.='26', Jan='0', Feb='0', Mar='0'),
 Row(Line No.='SDL2', SHUTTLE NO.='27', Jan='0', Feb='0', Mar='1')]

In [12]:
# Type of sdf
type(sdf)

pyspark.sql.dataframe.DataFrame

In [13]:
# Print Schema 
sdf.printSchema()

root
 |-- Line No.: string (nullable = true)
 |-- SHUTTLE NO.: string (nullable = true)
 |-- Jan: string (nullable = true)
 |-- Feb: string (nullable = true)
 |-- Mar: string (nullable = true)



# Day 2

In [14]:
# Start spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Dataframe").getOrCreate()
spark

In [15]:
# Read Dataset
sdf = spark.read.option('header', 'true').csv('shuttle_service_frq.csv') # take all columns as string
sdf

DataFrame[Line No.: string, SHUTTLE NO.: string, Jan: string, Feb: string, Mar: string]

In [16]:
sdf.show(5)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
|    SDL3|         28|  0|  0|  1|
|    SDL1|         29|  1|  0|  0|
+--------+-----------+---+---+---+
only showing top 5 rows



In [17]:
sdf.printSchema()

root
 |-- Line No.: string (nullable = true)
 |-- SHUTTLE NO.: string (nullable = true)
 |-- Jan: string (nullable = true)
 |-- Feb: string (nullable = true)
 |-- Mar: string (nullable = true)



In [18]:
# Read Dataset
sdf = spark.read.option('header', 'true').csv('shuttle_service_frq.csv', inferSchema=True) 
sdf

DataFrame[Line No.: string, SHUTTLE NO.: int, Jan: int, Feb: int, Mar: int]

In [19]:
sdf.printSchema()

root
 |-- Line No.: string (nullable = true)
 |-- SHUTTLE NO.: integer (nullable = true)
 |-- Jan: integer (nullable = true)
 |-- Feb: integer (nullable = true)
 |-- Mar: integer (nullable = true)



### OR

In [20]:
sdf = spark.read.csv('shuttle_service_frq.csv', header=True, inferSchema=True)
sdf

DataFrame[Line No.: string, SHUTTLE NO.: int, Jan: int, Feb: int, Mar: int]

In [21]:
sdf.show(3)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
+--------+-----------+---+---+---+
only showing top 3 rows



In [22]:
sdf.printSchema()

root
 |-- Line No.: string (nullable = true)
 |-- SHUTTLE NO.: integer (nullable = true)
 |-- Jan: integer (nullable = true)
 |-- Feb: integer (nullable = true)
 |-- Mar: integer (nullable = true)



In [23]:
# Print Line No column
# column names containing spaces and special characters need to be properly escaped
# or accessed using backticks
print("sdf : ")
print(sdf.select('`Line No.`'))
print()
print("Type : ")
print(type(sdf.select('`Line No.`')))
print()
sdf.select('`Line No.`').show(3)

sdf : 
DataFrame[Line No.: string]

Type : 
<class 'pyspark.sql.dataframe.DataFrame'>

+--------+
|Line No.|
+--------+
|    SDL3|
|    SDL2|
|    SDL2|
+--------+
only showing top 3 rows



In [24]:
# Print "Jan" column
# or accessed using backticks
print("sdf : ")
print(sdf.select('Jan'))
print()
print("Type : ")
print(type(sdf.select('Jan')))
print()
sdf.select('Jan').show(3)

sdf : 
DataFrame[Jan: int]

Type : 
<class 'pyspark.sql.dataframe.DataFrame'>

+---+
|Jan|
+---+
|  1|
|  0|
|  0|
+---+
only showing top 3 rows



In [25]:
# Select multiple columns 
# or accessed using backticks
print("sdf : ")
print(sdf.select(['`Line No.`', 'Jan']))
print()
print("Type : ")
print(type(sdf.select(['`Line No.`', 'Jan'])))
print()
sdf.select(['`Line No.`', 'Jan']).show(3)

sdf : 
DataFrame[Line No.: string, Jan: int]

Type : 
<class 'pyspark.sql.dataframe.DataFrame'>

+--------+---+
|Line No.|Jan|
+--------+---+
|    SDL3|  1|
|    SDL2|  0|
|    SDL2|  0|
+--------+---+
only showing top 3 rows



##### Column Rename

In [26]:
sdf.columns 

['Line No.', 'SHUTTLE NO.', 'Jan', 'Feb', 'Mar']

In [27]:
# Rename columns to remove spaces and special characters
for column in sdf.columns:
    new_column_name = column.replace(' ', '_').replace('.', '')
    print(new_column_name)
    sdf = sdf.withColumnRenamed(column, new_column_name)
    
sdf

Line_No
SHUTTLE_NO
Jan
Feb
Mar


DataFrame[Line_No: string, SHUTTLE_NO: int, Jan: int, Feb: int, Mar: int]

In [28]:
# Describe
print(sdf.describe())
print()
sdf.describe().show(3)

DataFrame[summary: string, Line_No: string, SHUTTLE_NO: string, Jan: string, Feb: string, Mar: string]

+-------+-------+------------------+------------------+-------------------+-------------------+
|summary|Line_No|        SHUTTLE_NO|               Jan|                Feb|                Mar|
+-------+-------+------------------+------------------+-------------------+-------------------+
|  count|    132|               134|               134|                134|                134|
|   mean|   NULL|121.73134328358209|0.4253731343283582|0.14925373134328357|0.41044776119402987|
| stddev|   NULL|136.46003376345536|0.8076408201115163| 0.3781128980976144| 0.6740371753782891|
+-------+-------+------------------+------------------+-------------------+-------------------+
only showing top 3 rows



#### Adding Column

In [29]:
add = sdf.withColumn('April', sdf['Mar']+2)
add.show(5)

+-------+----------+---+---+---+-----+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|April|
+-------+----------+---+---+---+-----+
|   SDL3|         3|  1|  0|  0|    2|
|   SDL2|        26|  0|  0|  0|    2|
|   SDL2|        27|  0|  0|  1|    3|
|   SDL3|        28|  0|  0|  1|    3|
|   SDL1|        29|  1|  0|  0|    2|
+-------+----------+---+---+---+-----+
only showing top 5 rows



In [30]:
drop = add.drop('April')
drop.show(5)

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



# Day 3

In [31]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Practice').getOrCreate()
spark

In [32]:
sdf = spark.read.csv('shuttle_service_frq.csv', header=True, inferSchema=True)
sdf

DataFrame[Line No.: string, SHUTTLE NO.: int, Jan: int, Feb: int, Mar: int]

In [33]:
sdf.show(3)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
+--------+-----------+---+---+---+
only showing top 3 rows



In [34]:
# Rename columns to remove spaces and special characters
for column in sdf.columns:
    new_column_name = column.replace(' ', '_').replace('.', '')
    print(new_column_name)
    sdf = sdf.withColumnRenamed(column, new_column_name)


Line_No
SHUTTLE_NO
Jan
Feb
Mar


In [35]:
for col in sdf.columns:
    renamed_col = col.replace(' ', '_').replace('.', '')
    print(renamed_col)
    sdf = sdf.withColumnRenamed(col, renamed_col)

print()
sdf

Line_No
SHUTTLE_NO
Jan
Feb
Mar



DataFrame[Line_No: string, SHUTTLE_NO: int, Jan: int, Feb: int, Mar: int]

#### Null Values Droping

In [36]:
# Drop Null rows
sdf.na.drop(how='any').show()   # Drop if any NUll is there
sdf.na.drop(how='all').show()   # Drop if all are NUll there
sdf.na.drop(how='any', thresh=2).show()   # Drop if alteaset 2 null.
sdf.na.drop(how='any',subset='Jan').show()   # Drop if null in 'Jan' Column 

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
|   SDL3|        30|  1|  0|  0|
|   SDL2|        31|  0|  0|  0|
|   SDL2|        32|  3|  0|  1|
|   SDL3|        33|  0|  1|  0|
|   SDL3|        34|  0|  0|  0|
|   SDL4|        35|  1|  0|  1|
|   SDL3|        36|  0|  1|  0|
|   SDL1|        37|  0|  0|  0|
|   SDL1|        38|  0|  0|  0|
|   SDL3|        39|  0|  0|  0|
|   SDL1|        40|  0|  0|  0|
|   SDL3|        41|  0|  0|  0|
|   SDL3|        42|  0|  0|  1|
|   SDL4|        43|  0|  0|  1|
|   SDL3|        44|  0|  0|  0|
+-------+----------+---+---+---+
only showing top 20 rows

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        

#### Null Values filling

In [37]:
sdf.na.fill("Missing Values").show(5)   # Fillna using "Missing Valuues in all df
sdf.na.fill("Missing Values", ['Jan', 'Feb']).show(5)   # Fillna using "Missing Valuues in ['Jan', 'Feb'] columns
sdf.na.fill("Missing Values", ['Jan', 'Feb']).show(5)   # Fillna using mean9

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



In [38]:
from pyspark.ml.feature import Imputer

# Initialize the Imputer
imputer = Imputer(
    inputCols  = ['Jan', 'Feb', 'Mar'],
    outputCols = ["{}_imputed".format(c) for c in ['Jan', 'Feb', 'Mar']]
).setStrategy('mean')

# Fit and transform the DataFrame
sdf_imputed = imputer.fit(sdf).transform(sdf)

In [39]:
# Show the result
sdf_imputed.show()

+-------+----------+---+---+---+-----------+-----------+-----------+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|Jan_imputed|Feb_imputed|Mar_imputed|
+-------+----------+---+---+---+-----------+-----------+-----------+
|   SDL3|         3|  1|  0|  0|          1|          0|          0|
|   SDL2|        26|  0|  0|  0|          0|          0|          0|
|   SDL2|        27|  0|  0|  1|          0|          0|          1|
|   SDL3|        28|  0|  0|  1|          0|          0|          1|
|   SDL1|        29|  1|  0|  0|          1|          0|          0|
|   SDL3|        30|  1|  0|  0|          1|          0|          0|
|   SDL2|        31|  0|  0|  0|          0|          0|          0|
|   SDL2|        32|  3|  0|  1|          3|          0|          1|
|   SDL3|        33|  0|  1|  0|          0|          1|          0|
|   SDL3|        34|  0|  0|  0|          0|          0|          0|
|   SDL4|        35|  1|  0|  1|          1|          0|          1|
|   SDL3|        36|  0|  1|  0|  

# Day 4

In [40]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practice").getOrCreate()
spark

In [41]:
sdf = spark.read.csv('shuttle_service_frq.csv', header=True, inferSchema=True)
sdf

DataFrame[Line No.: string, SHUTTLE NO.: int, Jan: int, Feb: int, Mar: int]

In [42]:
sdf.show(3)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
+--------+-----------+---+---+---+
only showing top 3 rows



#### Filter Operation

In [43]:
# Jan >= 2
# sdf.filter('Jan>=2').show()
sdf.filter('Jan>=2').show()
# sdf.filter('Jan>=2').select(['jan', 'Feb']).show(5)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL2|         32|  3|  0|  1|
|    SDL1|         62|  3|  0|  2|
|    SDL2|         84|  3|  0|  0|
|    SDL2|         95|  2|  0|  0|
|    SDL2|        108|  2|  0|  0|
|    SDL1|        110|  2|  0|  2|
|    SDL1|        115|  2|  1|  2|
|    SDL4|        123|  3|  0|  1|
|    SDL2|        124|  2|  0|  0|
|    SDL4|        125|  4|  0|  0|
|    SDL1|        144|  3|  0|  3|
|    SDL2|        151|  2|  0|  1|
+--------+-----------+---+---+---+



###### OR

In [44]:
sdf.filter(sdf['Jan']>=2).show()

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL2|         32|  3|  0|  1|
|    SDL1|         62|  3|  0|  2|
|    SDL2|         84|  3|  0|  0|
|    SDL2|         95|  2|  0|  0|
|    SDL2|        108|  2|  0|  0|
|    SDL1|        110|  2|  0|  2|
|    SDL1|        115|  2|  1|  2|
|    SDL4|        123|  3|  0|  1|
|    SDL2|        124|  2|  0|  0|
|    SDL4|        125|  4|  0|  0|
|    SDL1|        144|  3|  0|  3|
|    SDL2|        151|  2|  0|  1|
+--------+-----------+---+---+---+



In [45]:
sdf.filter((sdf['Jan']>=2) & (sdf['Feb']>=1)).show()

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL1|        115|  2|  1|  2|
+--------+-----------+---+---+---+



In [46]:
# Use of NOT
sdf.filter(~(sdf['Jan']>=1) & ~(sdf['Feb']>=1)).show(5)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
|    SDL3|         28|  0|  0|  1|
|    SDL2|         31|  0|  0|  0|
|    SDL3|         34|  0|  0|  0|
+--------+-----------+---+---+---+
only showing top 5 rows



# Day 5

In [47]:
from pyspark.sql import SparkSession

spark  = SparkSession.builder.appName("Practice").getOrCreate()
spark

In [48]:
sdf = spark.read.csv('shuttle_service_frq.csv', header=True, inferSchema=True)
sdf

DataFrame[Line No.: string, SHUTTLE NO.: int, Jan: int, Feb: int, Mar: int]

In [49]:
sdf.show(5)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
|    SDL3|         28|  0|  0|  1|
|    SDL1|         29|  1|  0|  0|
+--------+-----------+---+---+---+
only showing top 5 rows



#### Groupby/Aggregation Function

In [50]:
for col in sdf.columns:
    new_col = col.replace(".", "").replace(" ", "_")
    sdf = sdf.withColumnRenamed(col, new_col)

In [51]:
sdf.show(2)

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
+-------+----------+---+---+---+
only showing top 2 rows



In [52]:
sdf.groupby('Jan').sum().show()

+---+---------------+--------+--------+--------+
|Jan|sum(SHUTTLE_NO)|sum(Jan)|sum(Feb)|sum(Mar)|
+---+---------------+--------+--------+--------+
|  1|           2714|      26|       6|       9|
|  3|            445|      15|       0|       7|
|  4|            125|       4|       0|       0|
|  2|            703|      12|       1|       5|
|  0|          12325|       0|      13|      34|
+---+---------------+--------+--------+--------+



In [53]:
sdf.agg({'Jan':'sum'}).show()

+--------+
|sum(Jan)|
+--------+
|      57|
+--------+



In [54]:
sdf.groupby('Jan').min().show()
sdf.groupby('Jan').max().show()
sdf.groupby('Jan').avg().show() 

+---+---------------+--------+--------+--------+
|Jan|min(SHUTTLE_NO)|min(Jan)|min(Feb)|min(Mar)|
+---+---------------+--------+--------+--------+
|  1|              3|       1|       0|       0|
|  3|             32|       3|       0|       0|
|  4|            125|       4|       0|       0|
|  2|             95|       2|       0|       0|
|  0|             26|       0|       0|       0|
+---+---------------+--------+--------+--------+

+---+---------------+--------+--------+--------+
|Jan|max(SHUTTLE_NO)|max(Jan)|max(Feb)|max(Mar)|
+---+---------------+--------+--------+--------+
|  1|            451|       1|       1|       2|
|  3|            144|       3|       0|       3|
|  4|            125|       4|       0|       0|
|  2|            151|       2|       1|       2|
|  0|            787|       0|       2|       3|
+---+---------------+--------+--------+--------+

+---+------------------+--------+-------------------+-------------------+
|Jan|   avg(SHUTTLE_NO)|avg(Jan)|         

# Day 6

In [55]:
from pyspark.sql import SparkSession

sp = SparkSession.builder.appName("mLib").getOrCreate()
sp

In [56]:
sdf = sp.read.csv('shuttle_service_frq.csv', header=True, inferSchema=True)
sdf

DataFrame[Line No.: string, SHUTTLE NO.: int, Jan: int, Feb: int, Mar: int]

In [57]:
sdf.show(3)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
+--------+-----------+---+---+---+
only showing top 3 rows



##### MLib

In [58]:
for col in sdf.columns:
    new_Col = col.replace(' ', '_').replace(".", '')
    sdf = sdf.withColumnRenamed(col, new_Col)

In [59]:
sdf.show(5)

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



In [60]:
sdf.printSchema()

root
 |-- Line_No: string (nullable = true)
 |-- SHUTTLE_NO: integer (nullable = true)
 |-- Jan: integer (nullable = true)
 |-- Feb: integer (nullable = true)
 |-- Mar: integer (nullable = true)



##### X & y Splitting

In [61]:
from pyspark.ml.feature import VectorAssembler

# Dendent and Independent Variable
featureAss = VectorAssembler(inputCols=['SHUTTLE_NO', 'Jan', 'Feb'], outputCol="Independent_Feature")

output = featureAss.transform(sdf)
output.show()

+-------+----------+---+---+---+-------------------+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|Independent_Feature|
+-------+----------+---+---+---+-------------------+
|   SDL3|         3|  1|  0|  0|      [3.0,1.0,0.0]|
|   SDL2|        26|  0|  0|  0|     [26.0,0.0,0.0]|
|   SDL2|        27|  0|  0|  1|     [27.0,0.0,0.0]|
|   SDL3|        28|  0|  0|  1|     [28.0,0.0,0.0]|
|   SDL1|        29|  1|  0|  0|     [29.0,1.0,0.0]|
|   SDL3|        30|  1|  0|  0|     [30.0,1.0,0.0]|
|   SDL2|        31|  0|  0|  0|     [31.0,0.0,0.0]|
|   SDL2|        32|  3|  0|  1|     [32.0,3.0,0.0]|
|   SDL3|        33|  0|  1|  0|     [33.0,0.0,1.0]|
|   SDL3|        34|  0|  0|  0|     [34.0,0.0,0.0]|
|   SDL4|        35|  1|  0|  1|     [35.0,1.0,0.0]|
|   SDL3|        36|  0|  1|  0|     [36.0,0.0,1.0]|
|   SDL1|        37|  0|  0|  0|     [37.0,0.0,0.0]|
|   SDL1|        38|  0|  0|  0|     [38.0,0.0,0.0]|
|   SDL3|        39|  0|  0|  0|     [39.0,0.0,0.0]|
|   SDL1|        40|  0|  0|  0|     [40.0,0.0

In [62]:
output.columns 

['Line_No', 'SHUTTLE_NO', 'Jan', 'Feb', 'Mar', 'Independent_Feature']

In [63]:
fdf = output.select(['Independent_Feature', 'Mar'])
fdf.show()

+-------------------+---+
|Independent_Feature|Mar|
+-------------------+---+
|      [3.0,1.0,0.0]|  0|
|     [26.0,0.0,0.0]|  0|
|     [27.0,0.0,0.0]|  1|
|     [28.0,0.0,0.0]|  1|
|     [29.0,1.0,0.0]|  0|
|     [30.0,1.0,0.0]|  0|
|     [31.0,0.0,0.0]|  0|
|     [32.0,3.0,0.0]|  1|
|     [33.0,0.0,1.0]|  0|
|     [34.0,0.0,0.0]|  0|
|     [35.0,1.0,0.0]|  1|
|     [36.0,0.0,1.0]|  0|
|     [37.0,0.0,0.0]|  0|
|     [38.0,0.0,0.0]|  0|
|     [39.0,0.0,0.0]|  0|
|     [40.0,0.0,0.0]|  0|
|     [41.0,0.0,0.0]|  0|
|     [42.0,0.0,0.0]|  1|
|     [43.0,0.0,0.0]|  1|
|     [44.0,0.0,0.0]|  0|
+-------------------+---+
only showing top 20 rows



##### Model

In [64]:
from pyspark.ml.regression import LinearRegression

# Train Test Split
train_df, test_df = fdf.randomSplit([0.75, 0.25], seed=42)

# Model Initiation
reg = LinearRegression(featuresCol='Independent_Feature', labelCol='Mar')
# Model Fitting 
reg = reg.fit(train_df)

# Regression Elements (X's, and c)
reg.coefficients, reg.intercept

(DenseVector([-0.0003, 0.0738, 0.1791]), 0.38253961435629263)

##### Prediction

In [65]:
pred = reg.evaluate(test_df)
pred.predictions.show()

+-------------------+---+-------------------+
|Independent_Feature|Mar|         prediction|
+-------------------+---+-------------------+
|     [27.0,0.0,0.0]|  1|0.37367050016305536|
|     [31.0,0.0,0.0]|  0|0.37235655731961276|
|     [33.0,0.0,1.0]|  0| 0.5508075476808526|
|     [34.0,0.0,0.0]|  0|0.37137110018703084|
|     [38.0,0.0,0.0]|  0| 0.3700571573435883|
|     [44.0,0.0,0.0]|  0|0.36808624307842447|
|     [48.0,0.0,0.0]|  0| 0.3667723002349819|
|     [54.0,0.0,0.0]|  0|0.36480138596981804|
|     [55.0,0.0,0.0]|  0| 0.3644729002589574|
|     [57.0,0.0,0.0]|  1| 0.3638159288372361|
|     [59.0,0.0,0.0]|  0|0.36315895741551485|
|     [60.0,0.0,0.0]|  0| 0.3628304717046542|
|     [67.0,0.0,0.0]|  0|0.36053107172862975|
|     [70.0,0.0,0.0]|  1| 0.3595456145960478|
|     [71.0,0.0,0.0]|  0|0.35921712888518714|
|     [72.0,0.0,0.0]|  0| 0.3588886431743265|
|     [74.0,0.0,0.0]|  0|0.35823167175260523|
|     [76.0,0.0,0.0]|  0|0.35757470033088395|
|     [80.0,0.0,1.0]|  1| 0.535368

In [66]:
pred.meanAbsoluteError, pred.meanSquaredError

(0.5788255503317775, 0.5688966674789168)

# 

# 

# 

# Basic

In [67]:
# Starting Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practice1").getOrCreate()
spark

In [68]:
df = spark.read.csv("shuttle_service_frq.csv", header=True, inferSchema=True)
df.show(3)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
+--------+-----------+---+---+---+
only showing top 3 rows



In [69]:
df.show(3)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL2|         26|  0|  0|  0|
|    SDL2|         27|  0|  0|  1|
+--------+-----------+---+---+---+
only showing top 3 rows



In [70]:
import json

# Show preview as JSON 

# df.collect()
# [row.asDict(recursive=True) for row in df.collect()]
print(json.dumps([row.asDict(recursive=True) for row in df.collect()[:10]]))

[{"Line No.": "SDL3", "SHUTTLE NO.": 3, "Jan": 1, "Feb": 0, "Mar": 0}, {"Line No.": "SDL2", "SHUTTLE NO.": 26, "Jan": 0, "Feb": 0, "Mar": 0}, {"Line No.": "SDL2", "SHUTTLE NO.": 27, "Jan": 0, "Feb": 0, "Mar": 1}, {"Line No.": "SDL3", "SHUTTLE NO.": 28, "Jan": 0, "Feb": 0, "Mar": 1}, {"Line No.": "SDL1", "SHUTTLE NO.": 29, "Jan": 1, "Feb": 0, "Mar": 0}, {"Line No.": "SDL3", "SHUTTLE NO.": 30, "Jan": 1, "Feb": 0, "Mar": 0}, {"Line No.": "SDL2", "SHUTTLE NO.": 31, "Jan": 0, "Feb": 0, "Mar": 0}, {"Line No.": "SDL2", "SHUTTLE NO.": 32, "Jan": 3, "Feb": 0, "Mar": 1}, {"Line No.": "SDL3", "SHUTTLE NO.": 33, "Jan": 0, "Feb": 1, "Mar": 0}, {"Line No.": "SDL3", "SHUTTLE NO.": 34, "Jan": 0, "Feb": 0, "Mar": 0}]


In [71]:
# Total Records
print(f"Total Number of Records: {df.count()}")

Total Number of Records: 134


In [72]:
print(f"Total number of Columns: {len(df.columns)}")
df.columns

Total number of Columns: 5


['Line No.', 'SHUTTLE NO.', 'Jan', 'Feb', 'Mar']

In [73]:
# Shape of the df
print(f"Shape of the df: {df.count()}, {len(df.columns)}")

Shape of the df: 134, 5


In [74]:
# Schema of the df
df.schema

StructType([StructField('Line No.', StringType(), True), StructField('SHUTTLE NO.', IntegerType(), True), StructField('Jan', IntegerType(), True), StructField('Feb', IntegerType(), True), StructField('Mar', IntegerType(), True)])

In [75]:
# List of Records
df.collect()[:5]

[Row(Line No.='SDL3', SHUTTLE NO.=3, Jan=1, Feb=0, Mar=0),
 Row(Line No.='SDL2', SHUTTLE NO.=26, Jan=0, Feb=0, Mar=0),
 Row(Line No.='SDL2', SHUTTLE NO.=27, Jan=0, Feb=0, Mar=1),
 Row(Line No.='SDL3', SHUTTLE NO.=28, Jan=0, Feb=0, Mar=1),
 Row(Line No.='SDL1', SHUTTLE NO.=29, Jan=1, Feb=0, Mar=0)]

In [76]:
# Get as List of Dictionary of the Recoreds
[rows.asDict() for rows in df.collect()[:5]]

[{'Line No.': 'SDL3', 'SHUTTLE NO.': 3, 'Jan': 1, 'Feb': 0, 'Mar': 0},
 {'Line No.': 'SDL2', 'SHUTTLE NO.': 26, 'Jan': 0, 'Feb': 0, 'Mar': 0},
 {'Line No.': 'SDL2', 'SHUTTLE NO.': 27, 'Jan': 0, 'Feb': 0, 'Mar': 1},
 {'Line No.': 'SDL3', 'SHUTTLE NO.': 28, 'Jan': 0, 'Feb': 0, 'Mar': 1},
 {'Line No.': 'SDL1', 'SHUTTLE NO.': 29, 'Jan': 1, 'Feb': 0, 'Mar': 0}]

In [77]:
# Covert to Pandas df format
df.toPandas().head()

Unnamed: 0,Line No.,SHUTTLE NO.,Jan,Feb,Mar
0,SDL3,3,1,0,0
1,SDL2,26,0,0,0
2,SDL2,27,0,0,1
3,SDL3,28,0,0,1
4,SDL1,29,1,0,0


# Filter Data 

In [78]:
from pyspark.sql import functions as F, types as T

In [79]:
df.filter(df.Jan == 1).show(5)

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|          3|  1|  0|  0|
|    SDL1|         29|  1|  0|  0|
|    SDL3|         30|  1|  0|  0|
|    SDL4|         35|  1|  0|  1|
|    SDL4|         50|  1|  0|  0|
+--------+-----------+---+---+---+
only showing top 5 rows



In [80]:
df.filter(df['Feb'] >1).show()

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|        453|  0|  2|  0|
+--------+-----------+---+---+---+



In [81]:
df.filter(df['`SHUTTLE NO.`'] >500).show()

+--------+-----------+---+---+---+
|Line No.|SHUTTLE NO.|Jan|Feb|Mar|
+--------+-----------+---+---+---+
|    SDL3|        607|  0|  0|  0|
|    SDL4|        608|  0|  0|  0|
|    SDL3|        609|  0|  0|  0|
|    SDL1|        610|  0|  0|  0|
|    SDL1|        786|  0|  0|  1|
|    SDL4|        787|  0|  0|  0|
+--------+-----------+---+---+---+



In [82]:
# Renaming the column name 

for col in df.columns:
    new_cols = col.replace(" ", "_").replace(".", "")
    print(new_cols)
    df = df.withColumnRenamed(col, new_cols)

df.show(5)

Line_No
SHUTTLE_NO
Jan
Feb
Mar
+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



In [83]:
df.filter(df.SHUTTLE_NO>600).show()

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|       607|  0|  0|  0|
|   SDL4|       608|  0|  0|  0|
|   SDL3|       609|  0|  0|  0|
|   SDL1|       610|  0|  0|  0|
|   SDL1|       786|  0|  0|  1|
|   SDL4|       787|  0|  0|  0|
+-------+----------+---+---+---+



In [84]:
df.filter((df.Jan>1) & (df.Feb<1)).show(5)

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL2|        32|  3|  0|  1|
|   SDL1|        62|  3|  0|  2|
|   SDL2|        84|  3|  0|  0|
|   SDL2|        95|  2|  0|  0|
|   SDL2|       108|  2|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



# Order By

In [85]:
df.orderBy(df.Jan.desc()).show(5)

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL4|       125|  4|  0|  0|
|   SDL2|        84|  3|  0|  0|
|   SDL4|       123|  3|  0|  1|
|   SDL2|        32|  3|  0|  1|
|   SDL1|        62|  3|  0|  2|
+-------+----------+---+---+---+
only showing top 5 rows



# Joins

# Column Operations

In [86]:
df.show(5)

+-------+----------+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



In [87]:
# Add a new Static Columns
df.withColumn("April", F.lit("Pass")).show(5)

+-------+----------+---+---+---+-----+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|April|
+-------+----------+---+---+---+-----+
|   SDL3|         3|  1|  0|  0| Pass|
|   SDL2|        26|  0|  0|  0| Pass|
|   SDL2|        27|  0|  0|  1| Pass|
|   SDL3|        28|  0|  0|  1| Pass|
|   SDL1|        29|  1|  0|  0| Pass|
+-------+----------+---+---+---+-----+
only showing top 5 rows



In [88]:
# Add a new column 'Sum' which is the sum of 'Jan' and 'Feb'
df.withColumn("Apr", F.col("Jan") + F.col("Feb")).show(4)

+-------+----------+---+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|Apr|
+-------+----------+---+---+---+---+
|   SDL3|         3|  1|  0|  0|  1|
|   SDL2|        26|  0|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|  0|
|   SDL3|        28|  0|  0|  1|  0|
+-------+----------+---+---+---+---+
only showing top 4 rows



In [89]:
# Create the new column 'Month' based on the condition
df.withColumn(
    "April", 
    F.when(
        (F.col("Jan").isNotNull()) & (F.col("Feb").isNotNull()),
        F.concat((F.col("Jan") + F.col("Feb")).cast(StringType()))
    ).otherwise(F.lit("Null"))
).show(5)

+-------+----------+---+---+---+-----+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|April|
+-------+----------+---+---+---+-----+
|   SDL3|         3|  1|  0|  0|    1|
|   SDL2|        26|  0|  0|  0|    0|
|   SDL2|        27|  0|  0|  1|    0|
|   SDL3|        28|  0|  0|  1|    0|
|   SDL1|        29|  1|  0|  0|    1|
+-------+----------+---+---+---+-----+
only showing top 5 rows



In [90]:
df.withColumn(
    "Apr",
    F.when(
        (F.col('Jan').isNotNull()) & (F.col('Feb').isNotNull()),
         F.col("jan")+2
    ).otherwise(F.lit("Null"))
).show(3)

+-------+----------+---+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|Apr|
+-------+----------+---+---+---+---+
|   SDL3|         3|  1|  0|  0|  3|
|   SDL2|        26|  0|  0|  0|  2|
|   SDL2|        27|  0|  0|  1|  2|
+-------+----------+---+---+---+---+
only showing top 3 rows



In [91]:
# adding Column simply
df.withColumn("Apr", F.col("Jan")+20).show(5)

+-------+----------+---+---+---+---+
|Line_No|SHUTTLE_NO|Jan|Feb|Mar|Apr|
+-------+----------+---+---+---+---+
|   SDL3|         3|  1|  0|  0| 21|
|   SDL2|        26|  0|  0|  0| 20|
|   SDL2|        27|  0|  0|  1| 20|
|   SDL3|        28|  0|  0|  1| 20|
|   SDL1|        29|  1|  0|  0| 21|
+-------+----------+---+---+---+---+
only showing top 5 rows



In [92]:
# Print Single Column 
df.withColumn("Apr", F.col("Jan")+10).select("Apr").show(5)

+---+
|Apr|
+---+
| 11|
| 10|
| 10|
| 10|
| 11|
+---+
only showing top 5 rows



In [93]:
# Drop Column 
df.drop("Feb", "Mar").show(5)

+-------+----------+---+
|Line_No|SHUTTLE_NO|Jan|
+-------+----------+---+
|   SDL3|         3|  1|
|   SDL2|        26|  0|
|   SDL2|        27|  0|
|   SDL3|        28|  0|
|   SDL1|        29|  1|
+-------+----------+---+
only showing top 5 rows



In [94]:
# Pick column with alias.
df.select("Jan", 
          "Feb", 
          F.col('SHUTTLE_NO').alias("No.")
).show(5)

+---+---+---+
|Jan|Feb|No.|
+---+---+---+
|  1|  0|  3|
|  0|  0| 26|
|  0|  0| 27|
|  0|  0| 28|
|  1|  0| 29|
+---+---+---+
only showing top 5 rows



In [95]:
# Renaming Column
df.withColumnRenamed('SHUTTLE_NO', 's_No').show(5)

+-------+----+---+---+---+
|Line_No|s_No|Jan|Feb|Mar|
+-------+----+---+---+---+
|   SDL3|   3|  1|  0|  0|
|   SDL2|  26|  0|  0|  0|
|   SDL2|  27|  0|  0|  1|
|   SDL3|  28|  0|  0|  1|
|   SDL1|  29|  1|  0|  0|
+-------+----+---+---+---+
only showing top 5 rows



In [96]:
# Batch Rename/ Cleaning column 
for col in df.columns:
    df = df.withColumnRenamed(col, col.lower().replace(" ", "_").replace("-", "_"))
df.show(5)

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



# Null Values and Duplicates

In [97]:
df.show()

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
|   SDL3|        30|  1|  0|  0|
|   SDL2|        31|  0|  0|  0|
|   SDL2|        32|  3|  0|  1|
|   SDL3|        33|  0|  1|  0|
|   SDL3|        34|  0|  0|  0|
|   SDL4|        35|  1|  0|  1|
|   SDL3|        36|  0|  1|  0|
|   SDL1|        37|  0|  0|  0|
|   SDL1|        38|  0|  0|  0|
|   SDL3|        39|  0|  0|  0|
|   SDL1|        40|  0|  0|  0|
|   SDL3|        41|  0|  0|  0|
|   SDL3|        42|  0|  0|  1|
|   SDL4|        43|  0|  0|  1|
|   SDL3|        44|  0|  0|  0|
+-------+----------+---+---+---+
only showing top 20 rows



In [98]:
# Cast a column to different dtype
df.withColumn("Jan", df.jan.cast(T.FloatType())).show(5)

+-------+----------+---+---+---+
|line_no|shuttle_no|Jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|1.0|  0|  0|
|   SDL2|        26|0.0|  0|  0|
|   SDL2|        27|0.0|  0|  1|
|   SDL3|        28|0.0|  0|  1|
|   SDL1|        29|1.0|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



In [99]:
# Replace all Nulls with a specific values
df.fillna({
    'jan': 11,
    'Feb': 12
}).show(5)

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



In [100]:
# Take the first value that is not null
df.withColumn("jan", F.coalesce(df.jan, df.feb, F.lit("N/A"))).show(5)

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



In [101]:
# Drop Duplicate rows, considering specific column
df.drop_duplicates(['jan', 'feb', 'mar']).show(100)

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL4|        66|  0|  0|  2|
|   SDL4|        90|  0|  0|  3|
|   SDL3|        33|  0|  1|  0|
|   SDL3|        80|  0|  1|  1|
|   SDL3|       453|  0|  2|  0|
|   SDL3|         3|  1|  0|  0|
|   SDL4|        35|  1|  0|  1|
|   SDL4|       139|  1|  0|  2|
|   SDL3|       137|  1|  1|  0|
|   SDL1|        98|  1|  1|  1|
|   SDL3|        77|  1|  1|  2|
|   SDL2|        95|  2|  0|  0|
|   SDL2|       151|  2|  0|  1|
|   SDL1|       110|  2|  0|  2|
|   SDL1|       115|  2|  1|  2|
|   SDL2|        84|  3|  0|  0|
|   SDL2|        32|  3|  0|  1|
|   SDL1|        62|  3|  0|  2|
|   SDL1|       144|  3|  0|  3|
|   SDL4|       125|  4|  0|  0|
+-------+----------+---+---+---+



In [102]:
# Remove duplicate rows from the entire DataFrame, based on all columns
df.drop_duplicates().show(5)
df.distinct().show(5)

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL2|        99|  0|  0|  0|
|   SDL3|        44|  0|  0|  0|
|   SDL3|       127|  0|  0|  0|
|   SDL3|        33|  0|  1|  0|
|   SDL3|        41|  0|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL2|        99|  0|  0|  0|
|   SDL3|        44|  0|  0|  0|
|   SDL3|       127|  0|  0|  0|
|   SDL3|        33|  0|  1|  0|
|   SDL3|        41|  0|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



In [103]:
# Replace empty strint with Null/None
df.replace({"":None}, subset=['jan']).show(5)

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



In [104]:
df.replace(float('nan'), None).show(5)

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



# String Operations

In [105]:
# String columns
df.filter(df.line_no.contains('o')).show()

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
+-------+----------+---+---+---+



In [106]:
# Startwith 4
df.filter(df.shuttle_no.startswith('4')).show()

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL1|        40|  0|  0|  0|
|   SDL3|        41|  0|  0|  0|
|   SDL3|        42|  0|  0|  1|
|   SDL4|        43|  0|  0|  1|
|   SDL3|        44|  0|  0|  0|
|   SDL2|        45|  0|  0|  0|
|   NULL|        46|  0|  0|  0|
|   SDL1|        47|  0|  0|  0|
|   SDL3|        48|  0|  0|  0|
|   SDL2|        49|  0|  0|  0|
|   SDL4|       450|  0|  1|  0|
|   SDL2|       451|  1|  1|  0|
|   SDL3|       453|  0|  2|  0|
+-------+----------+---+---+---+



In [107]:
df.filter(df.line_no.isNull()).show()

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   NULL|        46|  0|  0|  0|
|   NULL|        88|  0|  0|  0|
+-------+----------+---+---+---+



In [108]:
# Have 45 at in between
df.filter(df.shuttle_no.like('%45%')).show()

# have 45 at end
df.filter(df.shuttle_no.like('%45')).show()

# Contain 45 at starting
df.filter(df.shuttle_no.like('45%')).show()

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL2|        45|  0|  0|  0|
|   SDL3|       145|  0|  0|  0|
|   SDL4|       450|  0|  1|  0|
|   SDL2|       451|  1|  1|  0|
|   SDL3|       453|  0|  2|  0|
+-------+----------+---+---+---+

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL2|        45|  0|  0|  0|
|   SDL3|       145|  0|  0|  0|
+-------+----------+---+---+---+

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL2|        45|  0|  0|  0|
|   SDL4|       450|  0|  1|  0|
|   SDL2|       451|  1|  1|  0|
|   SDL3|       453|  0|  2|  0|
+-------+----------+---+---+---+



In [117]:
# column ends with "ice" and may optionally have uppercase letters before "ice" 
# (e.g., "NICE", "AICE", "ice")
df.filter(df.line_no.rlike('[A-Z]*ice$')).show()

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
+-------+----------+---+---+---+



In [109]:
# onlu SDL4 records
df.filter(df.line_no.isin('SDL4')).show(5)

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL4|        35|  1|  0|  1|
|   SDL4|        43|  0|  0|  1|
|   SDL4|        50|  1|  0|  0|
|   SDL4|        53|  1|  0|  0|
|   SDL4|        54|  0|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows



# String Fundtions

In [124]:
# extracts a substring starting from the first character (index 0) to 1 index.
df.withColumn('shuttle_no_substr', df.shuttle_no.substr(0, 1)).show(5)

+-------+----------+---+---+---+-----------------+
|line_no|shuttle_no|jan|feb|mar|shuttle_no_substr|
+-------+----------+---+---+---+-----------------+
|   SDL3|         3|  1|  0|  0|                3|
|   SDL2|        26|  0|  0|  0|                2|
|   SDL2|        27|  0|  0|  1|                2|
|   SDL3|        28|  0|  0|  1|                2|
|   SDL1|        29|  1|  0|  0|                2|
+-------+----------+---+---+---+-----------------+
only showing top 5 rows



In [130]:
# removes leading and trailing spaces
df.withColumn('line_no_trim', F.trim(df.line_no)).show(5)

+-------+----------+---+---+---+------------+
|line_no|shuttle_no|jan|feb|mar|line_no_trim|
+-------+----------+---+---+---+------------+
|   SDL3|         3|  1|  0|  0|        SDL3|
|   SDL2|        26|  0|  0|  0|        SDL2|
|   SDL2|        27|  0|  0|  1|        SDL2|
|   SDL3|        28|  0|  0|  1|        SDL3|
|   SDL1|        29|  1|  0|  0|        SDL1|
+-------+----------+---+---+---+------------+
only showing top 5 rows



In [134]:
# pads the left side of a string with a specified character (0) 
# until the string reaches a certain length (3).
df.withColumn('shuttle_no_left_pad_0', F.lpad('shuttle_no', 3, '0')).show(5)

+-------+----------+---+---+---+---------------------+
|line_no|shuttle_no|jan|feb|mar|shuttle_no_left_pad_0|
+-------+----------+---+---+---+---------------------+
|   SDL3|         3|  1|  0|  0|                  003|
|   SDL2|        26|  0|  0|  0|                  026|
|   SDL2|        27|  0|  0|  1|                  027|
|   SDL3|        28|  0|  0|  1|                  028|
|   SDL1|        29|  1|  0|  0|                  029|
+-------+----------+---+---+---+---------------------+
only showing top 5 rows



In [135]:
df.withColumn('shuttle_no_right_pad_0', F.rpad('shuttle_no', 3, '0')).show(5)

+-------+----------+---+---+---+----------------------+
|line_no|shuttle_no|jan|feb|mar|shuttle_no_right_pad_0|
+-------+----------+---+---+---+----------------------+
|   SDL3|         3|  1|  0|  0|                   300|
|   SDL2|        26|  0|  0|  0|                   260|
|   SDL2|        27|  0|  0|  1|                   270|
|   SDL3|        28|  0|  0|  1|                   280|
|   SDL1|        29|  1|  0|  0|                   290|
+-------+----------+---+---+---+----------------------+
only showing top 5 rows



In [141]:
# trims (removes) leading spaces
df.withColumn('line_no_lead_trim', F.ltrim('line_no')).show(5)

# trims (removes) trailing spaces
df.withColumn('line_no_trail_trim', F.rtrim('line_no')).show(5)

+-------+----------+---+---+---+-----------------+
|line_no|shuttle_no|jan|feb|mar|line_no_lead_trim|
+-------+----------+---+---+---+-----------------+
|   SDL3|         3|  1|  0|  0|             SDL3|
|   SDL2|        26|  0|  0|  0|             SDL2|
|   SDL2|        27|  0|  0|  1|             SDL2|
|   SDL3|        28|  0|  0|  1|             SDL3|
|   SDL1|        29|  1|  0|  0|             SDL1|
+-------+----------+---+---+---+-----------------+
only showing top 5 rows

+-------+----------+---+---+---+------------------+
|line_no|shuttle_no|jan|feb|mar|line_no_trail_trim|
+-------+----------+---+---+---+------------------+
|   SDL3|         3|  1|  0|  0|              SDL3|
|   SDL2|        26|  0|  0|  0|              SDL2|
|   SDL2|        27|  0|  0|  1|              SDL2|
|   SDL3|        28|  0|  0|  1|              SDL3|
|   SDL1|        29|  1|  0|  0|              SDL1|
+-------+----------+---+---+---+------------------+
only showing top 5 rows



In [160]:
# If line_no = "123" and shuttle_no = None, the result would be None (because concat doesn't handle null values by skipping them).
# it conseder "-", as string value.
df.withColumn('line-suttle_no', F.concat('line_no', F.lit("-"), 'shuttle_no')).show(5)

+-------+----------+---+---+---+--------------+
|line_no|shuttle_no|jan|feb|mar|line-suttle_no|
+-------+----------+---+---+---+--------------+
|   SDL3|         3|  1|  0|  0|        SDL3-3|
|   SDL2|        26|  0|  0|  0|       SDL2-26|
|   SDL2|        27|  0|  0|  1|       SDL2-27|
|   SDL3|        28|  0|  0|  1|       SDL3-28|
|   SDL1|        29|  1|  0|  0|       SDL1-29|
+-------+----------+---+---+---+--------------+
only showing top 5 rows



##### OR


In [161]:
# If line_no = "123" and shuttle_no = None, the result would be "123" (no trailing hyphen).
# and it consider "-", as delimeter.
df.withColumn('line-suttle_no', F.concat_ws("-", 'line_no', 'shuttle_no')).show(5)

+-------+----------+---+---+---+--------------+
|line_no|shuttle_no|jan|feb|mar|line-suttle_no|
+-------+----------+---+---+---+--------------+
|   SDL3|         3|  1|  0|  0|        SDL3-3|
|   SDL2|        26|  0|  0|  0|       SDL2-26|
|   SDL2|        27|  0|  0|  1|       SDL2-27|
|   SDL3|        28|  0|  0|  1|       SDL3-28|
|   SDL1|        29|  1|  0|  0|       SDL1-29|
+-------+----------+---+---+---+--------------+
only showing top 5 rows



In [175]:
# RegEx Replacement
# Suppose the value of line_no is "SDL12345".
# The regex SDL(.*) matches "SDL" and captures "12345" in the capture group $1.
# The replacement string Line_No-$1 would produce "Line_No-12345".
# If the value of line_no is "SDL" (with no characters following "SDL"):
# The regex SDL(.*) matches "SDL" and captures an empty string in the capture group $1.
# The replacement string would produce "Line_No-".
# If line_no does not match the pattern 'SDL(.*)', it will remain unchanged in the new column.

df.withColumn("line_no_new", F.regexp_replace("line_no", 'SDL(.*)', 'Line_No-$1')).show(5)

+-------+----------+---+---+---+-----------+
|line_no|shuttle_no|jan|feb|mar|line_no_new|
+-------+----------+---+---+---+-----------+
|   SDL3|         3|  1|  0|  0|  Line_No-3|
|   SDL2|        26|  0|  0|  0|  Line_No-2|
|   SDL2|        27|  0|  0|  1|  Line_No-2|
|   SDL3|        28|  0|  0|  1|  Line_No-3|
|   SDL1|        29|  1|  0|  0|  Line_No-1|
+-------+----------+---+---+---+-----------+
only showing top 5 rows



In [119]:
# Suppose the value of shuttle_no is "ABC123XYZ".

# The regex [0-9]+ matches the first sequence of digits it encounters, which is "123".
# The resulting value in 'shuttle_no_new' would be "123".
# If the value of shuttle_no is "ABCXYZ" (with no digits):

# The regex [0-9]+ matches zero digits.
# The resulting value in 'shuttle_no_new' would be an empty string "".

df.withColumn("line_no_ID", F.regexp_extract("line_no", '[0-9]+', 0)).show(5)

+-------+----------+---+---+---+----------+
|line_no|shuttle_no|jan|feb|mar|line_no_ID|
+-------+----------+---+---+---+----------+
|   SDL3|         3|  1|  0|  0|         3|
|   SDL2|        26|  0|  0|  0|         2|
|   SDL2|        27|  0|  0|  1|         2|
|   SDL3|        28|  0|  0|  1|         3|
|   SDL1|        29|  1|  0|  0|         1|
+-------+----------+---+---+---+----------+
only showing top 5 rows



# Number Operation 

In [120]:
# Round to 0
df.withColumn('mar', F.round('mar', 0)).show(5)

# Floor
df.withColumn('feb', F.floor('feb')).show(5)

# Ceil 
df.withColumn('jan', F.ceil('jan')).show(5)
## OR (can be used interchangeably)

# Ceiling 
df.withColumn('jan', F.ceiling('jan')).show(5)

# absolute
df.withColumn('jan', F.abs('jan')).show(5)

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows

+-------+----------+---+---+---+
|line_no|shuttle_no|jan|feb|mar|
+-------+----------+---+---+---+
|   SDL3|         3|  1|  0|  0|
|   SDL2|        26|  0|  0|  0|
|   SDL2|        27|  0|  0|  1|
|   SDL3|        28|  0|  0|  1|
|   SDL1|        29|  1|  0|  0|
+-------+----------+---+---+---+
only showing top 5 rows

+-------+----------+---+---+---+
|