#### Practice with PySpark

In [1]:
import pandas as pd
import numpy as np
import pyspark
import pyspark.sql.functions as f
from pyspark.sql.functions import when
from pyspark.sql.functions import lit
from pydataset import data

In [3]:
#The SparkSession is an entry point to underlying PySpark functionality 
#to programmatically create PySpark RDD, DataFrame, and Dataset
#SparkSession can be created using SparkSession.builder builder patterns.
#getOrCreate() the method returns an existing SparkSession if it exists otherwise it creates a new SparkSession.
spark = pyspark.sql.SparkSession.builder.getOrCreate()
#spark.read.csv(csv_file), spark.read.json(json_file), and spark.read.parquet(parquet_file) 
#can be used to read in data
#.createDataFrame can also be used with practice datasets from pydataset
tips = spark.createDataFrame(data("tips"))
#.show() is like .head() in pandas
tips.show(5)

22/06/29 11:55:34 WARN Utils: Your hostname, Nathans-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.20 instead (on interface en0)
22/06/29 11:55:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/29 11:55:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                          (0 + 1) / 1]

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



                                                                                

In [4]:
#the schema of the data can be seen using the PrintSchema method
tips.printSchema()
#spark infers the schema from data however some times the inferred datatype may not be correct 
#or we may need to define our own column names and data types, this can be done using StructType / StructField

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: long (nullable = true)



In [5]:
#There are various methods used to inspect the data. 
#They are schema, dtypes, show, head, first, take, describe, columns, count, distinct, printSchema
#shows the data types
tips.dtypes

[('total_bill', 'double'),
 ('tip', 'double'),
 ('sex', 'string'),
 ('smoker', 'string'),
 ('day', 'string'),
 ('time', 'string'),
 ('size', 'bigint')]

In [6]:
#shows the first 20 rows of the full df
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [7]:
#shows the column names and values for the complete first row of the df
tips.head()

Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2)

In [14]:
tips.first()

Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2)

In [13]:
tips.take(2)

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3)]

In [11]:
#describe(): It computes the summary statistics of the columns with the numeric data type. .show()is needed to view
tips.describe().show()

[Stage 4:>                                                        (0 + 16) / 16]

+-------+-----------------+------------------+------+------+----+------+------------------+
|summary|       total_bill|               tip|   sex|smoker| day|  time|              size|
+-------+-----------------+------------------+------+------+----+------+------------------+
|  count|              244|               244|   244|   244| 244|   244|               244|
|   mean|19.78594262295082|2.9982786885245902|  null|  null|null|  null| 2.569672131147541|
| stddev|8.902411954856856| 1.383638189001182|  null|  null|null|  null|0.9510998047322345|
|    min|             3.07|               1.0|Female|    No| Fri|Dinner|                 1|
|    max|            50.81|              10.0|  Male|   Yes|Thur| Lunch|                 6|
+-------+-----------------+------------------+------+------+----+------+------------------+



                                                                                

In [16]:
#shows the columns as a list -  no '()'
tips.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [17]:
#count(): It returns the count of the number of rows in the data.
tips.count()

244

---

#### Column Manipulation

In [25]:
#Use withColumn the method takes two parameters column name and data to add a new column to the existing data
tips = tips.withColumn('tip_percentage', tips.tip / tips.total_bill)
tips.show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|     tip_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

In [27]:
#Use withColumnRenamed which takes to parameters existing column name and new column name to 
#rename the existing column
tips = tips.withColumnRenamed('tip_percentage', 'tips_percentage')
tips.show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|    tips_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

In [28]:
#Use drop the method which takes the column name and returns the data to drop a column
tips = tips.drop('tips_percentage')
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

---

#### Dealing with missing values

In [29]:
#Remove Rows with Missing Values

#  data.na.drop()

#Replacing Missing Values with Mean

#  data.na.fill(data.select(f.mean(data['open'])).collect()[0][0])

#Replacing Missing Values with new values

#  data.na.replace(old_value, new_vallue)

---

#### Querying data

In [30]:
#common methods - Select, Filter, Between, When, Like, GroupBy, Aggregations
#select
tips.select('total_bill').show()

+----------+
|total_bill|
+----------+
|     16.99|
|     10.34|
|     21.01|
|     23.68|
|     24.59|
|     25.29|
|      8.77|
|     26.88|
|     15.04|
|     14.78|
|     10.27|
|     35.26|
|     15.42|
|     18.43|
|     14.83|
|     21.58|
|     10.33|
|     16.29|
|     16.97|
|     20.65|
+----------+
only showing top 20 rows



In [32]:
#filter
tips.filter(f.col('sex') == 'Male').show()

+----------+----+----+------+---+------+----+
|total_bill| tip| sex|smoker|day|  time|size|
+----------+----+----+------+---+------+----+
|     10.34|1.66|Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|Male|    No|Sun|Dinner|   3|
|     23.68|3.31|Male|    No|Sun|Dinner|   2|
|     25.29|4.71|Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|Male|    No|Sun|Dinner|   2|
|     26.88|3.12|Male|    No|Sun|Dinner|   4|
|     15.04|1.96|Male|    No|Sun|Dinner|   2|
|     14.78|3.23|Male|    No|Sun|Dinner|   2|
|     10.27|1.71|Male|    No|Sun|Dinner|   2|
|     15.42|1.57|Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|Male|    No|Sun|Dinner|   4|
|     21.58|3.92|Male|    No|Sun|Dinner|   2|
|     16.29|3.71|Male|    No|Sun|Dinner|   3|
|     20.65|3.35|Male|    No|Sat|Dinner|   3|
|     17.92|4.08|Male|    No|Sat|Dinner|   2|
|     39.42|7.58|Male|    No|Sat|Dinner|   4|
|     19.82|3.18|Male|    No|Sat|Dinner|   2|
|     17.81|2.34|Male|    No|Sat|Dinner|   4|
|     13.37| 2.0|Male|    No|Sat|D

In [34]:
#between
tips.filter(tips.total_bill.between(10, 20)).show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinner|   3|
|     16.97| 3.5|Female|    No|Sun|Dinner|   3|
|     17.92|4.08|  Male|    No|Sat|Dinner|   2|
|     15.77|2.23|Female|    No|Sat|Dinner|   2|
|     19.82|3.18|  Male|    No|Sat|Dinner|   2|
|     17.81|2.34|  Male|    No|Sat|Dinner|   4|
|     13.37| 2.0|  Male|    No|Sat|Dinner|   2|
|     12.69| 2.0|  Male|    No|Sat|Dinner|   2|
|     19.65| 3.0|Female|    No|Sat|Dinne

In [35]:
#when
tips.select('total_bill', 'tip', f.when(tips.smoker == 'Yes', 1).otherwise(0)).show()

+----------+----+------------------------------------------+
|total_bill| tip|CASE WHEN (smoker = Yes) THEN 1 ELSE 0 END|
+----------+----+------------------------------------------+
|     16.99|1.01|                                         0|
|     10.34|1.66|                                         0|
|     21.01| 3.5|                                         0|
|     23.68|3.31|                                         0|
|     24.59|3.61|                                         0|
|     25.29|4.71|                                         0|
|      8.77| 2.0|                                         0|
|     26.88|3.12|                                         0|
|     15.04|1.96|                                         0|
|     14.78|3.23|                                         0|
|     10.27|1.71|                                         0|
|     35.26| 5.0|                                         0|
|     15.42|1.57|                                         0|
|     18.43| 3.0|       

In [37]:
#like
tips.select('total_bill', 'tip', 'day', tips.day.rlike('^[S]')).show()

+----------+----+---+----------------+
|total_bill| tip|day|RLIKE(day, ^[S])|
+----------+----+---+----------------+
|     16.99|1.01|Sun|            true|
|     10.34|1.66|Sun|            true|
|     21.01| 3.5|Sun|            true|
|     23.68|3.31|Sun|            true|
|     24.59|3.61|Sun|            true|
|     25.29|4.71|Sun|            true|
|      8.77| 2.0|Sun|            true|
|     26.88|3.12|Sun|            true|
|     15.04|1.96|Sun|            true|
|     14.78|3.23|Sun|            true|
|     10.27|1.71|Sun|            true|
|     35.26| 5.0|Sun|            true|
|     15.42|1.57|Sun|            true|
|     18.43| 3.0|Sun|            true|
|     14.83|3.02|Sun|            true|
|     21.58|3.92|Sun|            true|
|     10.33|1.67|Sun|            true|
|     16.29|3.71|Sun|            true|
|     16.97| 3.5|Sun|            true|
|     20.65|3.35|Sat|            true|
+----------+----+---+----------------+
only showing top 20 rows



In [39]:
#groupby
tips.select('day', 'total_bill').groupby('day').mean().show()

+----+------------------+
| day|   avg(total_bill)|
+----+------------------+
| Sun|21.409999999999997|
| Sat|20.441379310344825|
|Thur|17.682741935483865|
| Fri| 17.15157894736842|
+----+------------------+



#### Aggregations methods

- Standard aggregation methods such as min( ), max( ), and avg( ) are avaialble in pyspark

- exe-> .agg(min("data").alias("From"), max("data").alias("To")

---

In [41]:
## Writing entire data to different file formats

# CSV
#  data.write.csv('dataset.csv')

# JSON
#  data.write.save('dataset.json', format='json')

# Parquet
#  data.write.save('dataset.parquet', format='parquet')

## Writing selected data to different file formats

# CSV
#  data.select(['data', 'open', 'close', 'adjusted'])\
            #.write.csv('dataset.csv')

# JSON
#  data.select(['data', 'open', 'close', 'adjusted'])\
    #.write.save('dataset.json', format='json')

# Parquet
#  data.select(['data', 'open', 'close', 'adjusted'])\
    #.write.save('dataset.parquet', format='parquet')