# PySpark Dataframe
In this notebook, we are learning pyspark functions to transform the tips data.  
- Initiate the Spark Session
- Read csv file
- Check Datatypes
- Column names
- Conversion back to pandas DataFrame
- Data summary
- Add/Drop/Rename Columns
- Missing values - drop/fill/replace(Mean,Median, and Mode)
- Filter data
- Aggregate data


1. Initiate the Spark Session

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    .master('local[*]')\
                    .appName('spark_application')\
                    .getOrCreate()

In [6]:
spark

In [7]:
from pyspark.sql import SparkSession
SparkSession.getActiveSession()

2. Read csv File

In [8]:
df_pyspark = spark.read.csv('Data/tips.csv',header=True, inferSchema=True)
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [None]:
3. Find Schema and Data Types

In [128]:
# Check Schema
df_pyspark.printSchema()
# Check Data types
df_pyspark.dtypes

root
 |-- total_bill: string (nullable = true)
 |-- tip: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: string (nullable = true)



[('total_bill', 'string'),
 ('tip', 'string'),
 ('sex', 'string'),
 ('smoker', 'string'),
 ('day', 'string'),
 ('time', 'string'),
 ('size', 'string')]

4. Column Names

In [70]:
# Check columns names
df_pyspark.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [71]:
# Show data; selected columns
df_pyspark.show(5)
df_pyspark.select(['sex','day']).show(2)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows

+------+---+
|   sex|day|
+------+---+
|Female|Sun|
|  Male|Sun|
+------+---+
only showing top 2 rows



5. Conversion back to pandas Dataframe

In [None]:
df_pyspark.toPandas()

6. Summarize Data

In [72]:
# Data summary
df_pyspark.describe().show()

+-------+------------------+-----------------+------+------+----+------+------------------+
|summary|        total_bill|              tip|   sex|smoker| day|  time|              size|
+-------+------------------+-----------------+------+------+----+------+------------------+
|  count|               248|              248|   248|   248| 248|   248|               248|
|   mean|19.632459677419362|2.970645161290322|  null|  null|null|  null|  2.57085020242915|
| stddev| 8.911357126068228|1.389667039569976|  null|  null|null|  null|0.9510587199808357|
|    min|             10.07|                1|Female|  Male| Fri|Dinner|                 1|
|    max|              9.94|              9.0|    NA|   Yes|Thur|   Sun|            Dinner|
+-------+------------------+-----------------+------+------+----+------+------------------+



7. Add/Drop/Rename Columns

In [77]:
# Add Column 
df_pyspark = df_pyspark.withColumn('Double Size',df_pyspark['Size']*2)
df_pyspark.show(3)

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|Double Size|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        4.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        6.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        6.0|
+----------+----+------+------+---+------+----+-----------+
only showing top 3 rows



In [78]:
# Drop column 
df_pyspark = df_pyspark.drop('Double Size')
df_pyspark.show(3)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 3 rows



In [129]:
# Rename column
df_pyspark= df_pyspark.withColumnRenamed('tip','tips')
df_pyspark.show(3)

+----------+----+------+------+---+------+----+
|total_bill|tips|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 3 rows



8. Missing Values - Drop/Fill/Replace

In [116]:
# Missing value - drop 
df_pyspark.na.drop(how="any").show(3)
df_pyspark.na.drop(how="any",subset=['sex']).show(3)

# Missing value - fill
df_pyspark.na.fill('No Value',['sex','day']).show(3)

+----------+----+------+------+---+------+----+
|total_bill|tips|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 3 rows

+----------+----+------+------+---+------+----+
|total_bill|tips|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 3 rows

+----------+----+------+------+---+------+----+
|total_bill|tips|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Din

In [134]:
# Missing value - add imputed column with mean,median,mode 
from pyspark.ml.feature import Imputer
from pyspark.sql.types import IntegerType, StringType, DoubleType, BooleanType
df_pyspark = df_pyspark.withColumn("tips", df_pyspark["tips"].cast(DoubleType()))
imputer = Imputer(
    inputCols = ['tips'],
    outputCols = ["{}_imputed".format(c) for c in ['tips']]
    ).setStrategy("median")

In [136]:
df_pyspark = imputer.fit(df_pyspark).transform(df_pyspark)
df_pyspark.show(3)

+----------+----+------+------+---+------+----+------------+
|total_bill|tips|   sex|smoker|day|  time|size|tips_imputed|
+----------+----+------+------+---+------+----+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.01|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        1.66|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|         3.5|
+----------+----+------+------+---+------+----+------------+
only showing top 3 rows



In [138]:
df_pyspark.show()

+----------+----+------+------+----+------+----+------------+
|total_bill|tips|   sex|smoker| day|  time|size|tips_imputed|
+----------+----+------+------+----+------+----+------------+
|     16.99|1.01|Female|    No| Sun|Dinner|   2|        1.01|
|     10.34|1.66|  Male|    No| Sun|Dinner|   3|        1.66|
|     21.01| 3.5|  Male|    No| Sun|Dinner|   3|         3.5|
|     23.68|3.31|  Male|    No| Sun|Dinner|   2|        3.31|
|     24.59|3.61|Female|    No| Sun|Dinner|   4|        3.61|
|     25.29|4.71|  Male|    No| Sun|Dinner|   4|        4.71|
|      8.77| 2.0|  Male|    No| Sun|Dinner|   2|         2.0|
|     26.88|3.12|  Male|    No| Sun|Dinner|   4|        3.12|
|     15.04|1.96|  Male|    No| Sun|Dinner|   2|        1.96|
|     14.78|3.23|  Male|    No| Sun|Dinner|   2|        3.23|
|     10.27|1.71|  null|    No| Sun|Dinner|   2|        1.71|
|     10.27|1.42|  Male|    No|null|Dinner|   2|        1.42|
|     10.27|1.11|  Male|    No|null|Dinner|   4|        1.11|
|     10

9. Filter data

In [150]:
df_pyspark.filter("smoker = 'No'").show(2)
df_pyspark.filter("size > 3").show(2) # larger than 3
df_pyspark.filter(df_pyspark['size'] > 3).show(2) # larger than 3
df_pyspark.filter((df_pyspark['size'] > 2) & (df_pyspark['size'] <4)).show(2) # between 2 and 4
df_pyspark.filter(~(df_pyspark['sex']=='Female')).show(2)  # not female

+----------+----+------+------+---+------+----+------------+
|total_bill|tips|   sex|smoker|day|  time|size|tips_imputed|
+----------+----+------+------+---+------+----+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.01|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        1.66|
+----------+----+------+------+---+------+----+------------+
only showing top 2 rows

+----------+----+------+------+---+------+----+------------+
|total_bill|tips|   sex|smoker|day|  time|size|tips_imputed|
+----------+----+------+------+---+------+----+------------+
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        3.61|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        4.71|
+----------+----+------+------+---+------+----+------------+
only showing top 2 rows

+----------+----+------+------+---+------+----+------------+
|total_bill|tips|   sex|smoker|day|  time|size|tips_imputed|
+----------+----+------+------+---+------+----+------------+
|     24.59|3.61|Female|    No|Sun|

10. Aggregate
- GroupBy: sum(), avg(), mean(), count()
- agg

In [154]:
df_pyspark.select(['smoker','tips_imputed']).groupBy('smoker').sum().show()
df_pyspark.select(['smoker','tips_imputed']).groupBy('smoker').avg().show()
df_pyspark.groupBy(['smoker']).count().show()

+------+------------------+
|smoker| sum(tips_imputed)|
+------+------------------+
|    No|458.74000000000007|
|   Yes|            279.81|
+------+------------------+

+------+-----------------+
|smoker|avg(tips_imputed)|
+------+-----------------+
|    No|2.959612903225807|
|   Yes|3.008709677419355|
+------+-----------------+

+------+-----+
|smoker|count|
+------+-----+
|    No|  155|
|   Yes|   93|
+------+-----+



In [156]:
df_pyspark.agg({'tips_imputed':'avg'}).show()

+------------------+
| avg(tips_imputed)|
+------------------+
|2.9780241935483867|
+------------------+



In [None]:
session.stop()