# Download Spark

In [None]:
! wget http://apache.mirror.anlx.net/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

# Prepare example dataset 

In [None]:
! wget https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip -O bank.zip
! unzip -o bank.zip -d bank

In [2]:
file_path = "dataset/bank/bank-full.csv"

# Loading as different Dataframe

## Spark Dataframe

In [3]:
df = (spark.read
           .option("inferSchema", "true")
           .option("header", "true")
           .option("delimiter", ";")
           .option("quote", '"')
           .csv(file_path))

df.show(5)

+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown| single|  unknown|     no|      1|     no|  no|unknown|  5|  may|     19

## Pandas Dataframe

In [51]:
import pandas as pd

pdf = pd.read_csv(file_path, header=0, sep=";", quotechar='"')
display(pdf.head())

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0,58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
1,44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
2,33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
3,47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
4,33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no


## Koalas Dataframe

In [52]:
import databricks.koalas as ks

kdf = ks.read_csv(file_path, header=0, sep=";", quotechar='"')
display(kdf.head())

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0,58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
1,44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
2,33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
3,47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
4,33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no


In [53]:
# Converting to Koalas Dataframe from Spark DataFrame

# Creating a Koalas DataFrame from PySpark DataFrame
# kdf = ks.DataFrame(df)

# # Alternative way of creating a Koalas DataFrame from PySpark DataFrame
# kdf = df.to_koalas()

# Koalas Dataframe -> PySpark DataFrame
# kdf.to_spark()

# Column Manipulation

In [54]:
# Creating a column with PySpark
from pyspark.sql.functions import col

df = df.withColumn("duration_new", col("duration") + 100)
df.show(5)

+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+------------+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|duration_new|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+------------+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|         361|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|         251|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|         176|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|         192|

In [None]:
# Creating a column with Pandas
pdf["duration_new"] = pdf["duration"] + 100
display(pdf.head())

In [None]:
# Creating a column with Koalas
kdf["duration_new"] = kdf["duration"] + 100
display(kdf.head())

# Filtering

In [None]:
# Filtering with PySpark
df_filtered  = df.filter(col("duration_new") >= 300)
print(df_filtered.count())

In [None]:
# Filtering with Pandas
pdf_filtered = pdf[pdf.duration_new >= 300]
print(pdf_filtered.shape[0])

In [None]:
# Filtering with Koalas
kdf_filtered = kdf[kdf.duration_new >= 300]
print(kdf_filtered.shape[0])

# Value Counts

In [None]:
# To get value counts of the different job types with PySpark
df.groupby("job") \
    .count() \
    .orderBy("count", ascending=False) \
    .show()

In [None]:
# Value counts in Koalas
pdf["job"].value_counts()

In [None]:
# Value counts in Koalas
kdf["job"].value_counts()

# GroupBy

In [None]:
# Get average age per education group using PySpark
df_grouped_1 = (df.groupby("education")
                .agg({"age": "mean"})
                .select("education", col("avg(age)").alias("avg_age")))

df_grouped_1.show()

In [None]:
# Get the average age per education group in Pandas
pdf_grouped_1 = pdf.groupby("education", as_index=False).agg({"age": "mean"})

# Rename our columns
pdf_grouped_1.columns = ["education", "avg_age"]
display(pdf_grouped_1)

In [None]:
# Get the average age per education group in Koalas
kdf_grouped_1 = kdf.groupby("education", as_index=False).agg({"age": "mean"})

# Rename our columns
kdf_grouped_1.columns = ["education", "avg_age"]
display(kdf_grouped_1)

# Writing Data

In [None]:
# Saving the Spark DataFrame as a Parquet file.
df_grouped_1.write.mode("overwrite").parquet("dataset/bank_grouped_pyspark.parquet")

In [None]:
# Saving the Pandas DataFrame as a Parquet file.
pdf_grouped_1.to_parquet("dataset/bank_grouped_pandas.parquet")

In [None]:
# Saving the Koalas DataFrame as a Parquet file.
kdf_grouped_1.to_parquet("dataset/bank_grouped_koalas.parquet", mode="overwrite")