# Pyspark Tutorial in Colab

In [None]:
# Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download Spark 3.5.0 with Hadoop 3
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# Extract the Spark tar file
!tar -xzf spark-3.5.0-bin-hadoop3.tgz

# Move to /opt for standard setup
!mv spark-3.5.0-bin-hadoop3 /opt/spark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark"

# Install findspark
!pip install -q findspark

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Test Spark Session") \
    .getOrCreate()

spark.range(5).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [None]:
import pyspark
import pandas as pd

df=pd.read_csv("/content/gas_prices.csv")
df

Unnamed: 0,Year,Australia,Canada,France,Germany,Italy,Japan,Mexico,South Korea,UK,USA
0,1990,,1.87,3.63,2.65,4.59,3.16,1.0,2.05,2.82,1.16
1,1991,1.96,1.92,3.45,2.9,4.5,3.46,1.3,2.49,3.01,1.14
2,1992,1.89,1.73,3.56,3.27,4.53,3.58,1.5,2.65,3.06,1.13
3,1993,1.73,1.57,3.41,3.07,3.68,4.16,1.56,2.88,2.84,1.11
4,1994,1.84,1.45,3.59,3.52,3.7,4.36,1.48,2.87,2.99,1.11
5,1995,1.95,1.53,4.26,3.96,4.0,4.43,1.11,2.94,3.21,1.15
6,1996,2.12,1.61,4.41,3.94,4.39,3.64,1.25,3.18,3.34,1.23
7,1997,2.05,1.62,4.0,3.53,4.07,3.26,1.47,3.34,3.83,1.23
8,1998,1.63,1.38,3.87,3.34,3.84,2.82,1.49,3.04,4.06,1.06
9,1999,1.72,1.52,3.85,3.42,3.87,3.27,1.79,3.8,4.29,1.17


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [None]:
spark

In [None]:
df_pyspark=spark.read.csv('gas_prices.csv')

In [None]:
df_pyspark=spark.read.option('header', 'true').csv('gas_prices.csv')

In [None]:
df_pyspark.show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|     1|       2.05|2.82|1.16|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|1.11|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|1.11|
|1995|     1.95|  1.53|  4.26|   3.96|    4| 4.43|  1.11|       2.94|3.21|1.15|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|
|1997|     2.05|  1.62|     4|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|1.06|
|1999|     1.72|  1.52|  3.85|   3.42| 3

In [None]:
type(df_pyspark)

In [None]:
df_pyspark.head(3)

[Row(Year='1990', Australia=None, Canada='1.87', France='3.63', Germany='2.65', Italy='4.59', Japan='3.16', Mexico='1', South Korea='2.05', UK='2.82', USA='1.16'),
 Row(Year='1991', Australia='1.96', Canada='1.92', France='3.45', Germany='2.9', Italy='4.5', Japan='3.46', Mexico='1.3', South Korea='2.49', UK='3.01', USA='1.14'),
 Row(Year='1992', Australia='1.89', Canada='1.73', France='3.56', Germany='3.27', Italy='4.53', Japan='3.58', Mexico='1.5', South Korea='2.65', UK='3.06', USA='1.13')]

In [None]:
df_pyspark.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Australia: string (nullable = true)
 |-- Canada: string (nullable = true)
 |-- France: string (nullable = true)
 |-- Germany: string (nullable = true)
 |-- Italy: string (nullable = true)
 |-- Japan: string (nullable = true)
 |-- Mexico: string (nullable = true)
 |-- South Korea: string (nullable = true)
 |-- UK: string (nullable = true)
 |-- USA: string (nullable = true)




* PySpark Dataframe
* Reading The Dataset
* Checking the Datatypes of the Column(Schema)
* Selecting Columns And Indexing
* Check Describe option similar to Pandas
* Adding Columns
* Dropping columns
* Renaming Columns



In [None]:
from pyspark.sql import SparkSession

In [None]:
spark=SparkSession.builder.appName('Dataframe').getOrCreate()

In [None]:
spark

In [None]:
## Read the dataset
df_pyspark=spark.read.option('header','true').csv('gas_prices.csv', inferSchema=True)

In [None]:
df_pyspark=spark.read.csv('gas_prices.csv', header=True, inferSchema=True)
df_pyspark.show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|   1.0|       2.05|2.82|1.16|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|1.11|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|1.11|
|1995|     1.95|  1.53|  4.26|   3.96|  4.0| 4.43|  1.11|       2.94|3.21|1.15|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|
|1997|     2.05|  1.62|   4.0|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|1.06|
|1999|     1.72|  1.52|  3.85|   3.42| 3

In [None]:
## Check the schema
df_pyspark.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Australia: double (nullable = true)
 |-- Canada: double (nullable = true)
 |-- France: double (nullable = true)
 |-- Germany: double (nullable = true)
 |-- Italy: double (nullable = true)
 |-- Japan: double (nullable = true)
 |-- Mexico: double (nullable = true)
 |-- South Korea: double (nullable = true)
 |-- UK: double (nullable = true)
 |-- USA: double (nullable = true)



In [None]:
type(df_pyspark)

In [None]:
df_pyspark.columns
df_pyspark.head(5)

[Row(Year=1990, Australia=None, Canada=1.87, France=3.63, Germany=2.65, Italy=4.59, Japan=3.16, Mexico=1.0, South Korea=2.05, UK=2.82, USA=1.16),
 Row(Year=1991, Australia=1.96, Canada=1.92, France=3.45, Germany=2.9, Italy=4.5, Japan=3.46, Mexico=1.3, South Korea=2.49, UK=3.01, USA=1.14),
 Row(Year=1992, Australia=1.89, Canada=1.73, France=3.56, Germany=3.27, Italy=4.53, Japan=3.58, Mexico=1.5, South Korea=2.65, UK=3.06, USA=1.13),
 Row(Year=1993, Australia=1.73, Canada=1.57, France=3.41, Germany=3.07, Italy=3.68, Japan=4.16, Mexico=1.56, South Korea=2.88, UK=2.84, USA=1.11),
 Row(Year=1994, Australia=1.84, Canada=1.45, France=3.59, Germany=3.52, Italy=3.7, Japan=4.36, Mexico=1.48, South Korea=2.87, UK=2.99, USA=1.11)]

In [None]:
df_pyspark.show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|   1.0|       2.05|2.82|1.16|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|1.11|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|1.11|
|1995|     1.95|  1.53|  4.26|   3.96|  4.0| 4.43|  1.11|       2.94|3.21|1.15|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|
|1997|     2.05|  1.62|   4.0|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|1.06|
|1999|     1.72|  1.52|  3.85|   3.42| 3

In [None]:
df_pyspark.select(['Year','USA', 'UK']).show()

+----+----+----+
|Year| USA|  UK|
+----+----+----+
|1990|1.16|2.82|
|1991|1.14|3.01|
|1992|1.13|3.06|
|1993|1.11|2.84|
|1994|1.11|2.99|
|1995|1.15|3.21|
|1996|1.23|3.34|
|1997|1.23|3.83|
|1998|1.06|4.06|
|1999|1.17|4.29|
|2000|1.51|4.58|
|2001|1.46|4.13|
|2002|1.36|4.16|
|2003|1.59| 4.7|
|2004|1.88|5.56|
|2005| 2.3|5.97|
|2006|2.59|6.36|
|2007| 2.8|7.13|
|2008|3.27|7.42|
+----+----+----+



In [None]:
df_pyspark.dtypes

[('Year', 'int'),
 ('Australia', 'double'),
 ('Canada', 'double'),
 ('France', 'double'),
 ('Germany', 'double'),
 ('Italy', 'double'),
 ('Japan', 'double'),
 ('Mexico', 'double'),
 ('South Korea', 'double'),
 ('UK', 'double'),
 ('USA', 'double')]

In [None]:
df_pyspark.describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+
|summary|             Year|         Australia|            Canada|            France|           Germany|             Italy|             Japan|             Mexico|       South Korea|                UK|               USA|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+
|  count|               19|                18|                19|                19|                19|                19|                19|                 19|                19|                19|                19|
|   mean|           1999.0|2.3488888888888892|2.0868421052631576| 4.407894736842105| 4.224736842105264| 4.645789473684211|3.

In [None]:
## Adding columns in Datafram
df_pyspark_temp=df_pyspark.withColumn('USA New Column', df_pyspark['USA']+5.2)

In [None]:
df_pyspark_temp.show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+------------------+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|    USA New Column|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+------------------+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|   1.0|       2.05|2.82|1.16|              6.36|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|              6.34|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|              6.33|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|1.11|6.3100000000000005|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|1.11|6.3100000000000005|
|1995|     1.95|  1.53|  4.26|   3.96|  4.0| 4.43|  1.11|       2.94|3.21|1.15|              6.35|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|              6.43|
|1997|    

In [None]:
df_pyspark_temp=df_pyspark_temp.drop('USA New Column')

In [None]:
df_pyspark_temp.show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|   1.0|       2.05|2.82|1.16|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|1.11|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|1.11|
|1995|     1.95|  1.53|  4.26|   3.96|  4.0| 4.43|  1.11|       2.94|3.21|1.15|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|
|1997|     2.05|  1.62|   4.0|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|1.06|
|1999|     1.72|  1.52|  3.85|   3.42| 3

In [None]:
### Rename the columns
df_pyspark_temp.withColumnRenamed('USA', 'USA_Renamed').show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+-----------+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK|USA_Renamed|
+----+---------+------+------+-------+-----+-----+------+-----------+----+-----------+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|   1.0|       2.05|2.82|       1.16|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|       1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|       1.13|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|       1.11|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|       1.11|
|1995|     1.95|  1.53|  4.26|   3.96|  4.0| 4.43|  1.11|       2.94|3.21|       1.15|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|       1.23|
|1997|     2.05|  1.62|   4.0|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|       1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84


# Pyspark Handling Missing Values
* Dropping Columns
* Dropping Rows
* Various Parameter In Dropping functionalities
* Handling Missing values by Mean, Median And Mode

In [None]:
df_pyspark=spark.read.csv('/content/gas_prices_missing_values.csv', header=True,inferSchema=True)

In [None]:
df_pyspark.show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|  NULL|       2.05|2.82|1.16|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|NULL|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|NULL|
|1995|     1.95|  1.53|  4.26|   3.96| NULL| 4.43|  1.11|       2.94|3.21|1.15|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|
|1997|     2.05|  1.62|  NULL|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|1.06|
|1999|     1.72|  1.52|  3.85|   3.42| 3

In [None]:
## drop the columns
df_pyspark.drop('USA').show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK|
+----+---------+------+------+-------+-----+-----+------+-----------+----+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|  NULL|       2.05|2.82|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|
|1995|     1.95|  1.53|  4.26|   3.96| NULL| 4.43|  1.11|       2.94|3.21|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|
|1997|     2.05|  1.62|  NULL|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|
|1999|     1.72|  1.52|  3.85|   3.42| 3.87| NULL|  1.79|        3.8|4.29|
|2000|     1.94|  1.86|  

In [None]:
## drop the NA rows
df_pyspark.na.drop().show()
## drop the NA how=(any, all)?
##threshold === ATLEAST 3 NON NULL values should be present
df_pyspark.na.drop(how="any",thresh=3).show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|1.06|
|2000|     1.94|  1.86|   3.8|   3.45| 3.77| 3.65|  2.01|       4.18|4.58|1.51|
|2001|     1.71|  1.72|  3.51|    3.4| 3.57| 3.27|   2.2|       3.76|4.13|1.46|
|2003|     2.19|  1.99|  4.35|   4.59| 4.53| 3.47|  2.04|       4.11| 4.7|1.59|
|2004|     2.72|  2.37|  4.99|   5.24| 5.29| 3.93|  2.03|       4.51|5.56|1.88|
|2005|     3.23|  2.89|  5.46|   5.66| 5.74| 4.28|  2.22|       5.28|5.97| 2.3|
|2006|     3.54|  3.26|  5.88|   6.03|  

In [None]:
## Subset
df_pyspark.na.drop(how="any",subset=['USA']).show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|  NULL|       2.05|2.82|1.16|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|
|1995|     1.95|  1.53|  4.26|   3.96| NULL| 4.43|  1.11|       2.94|3.21|1.15|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|
|1997|     2.05|  1.62|  NULL|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|1.06|
|1999|     1.72|  1.52|  3.85|   3.42| 3.87| NULL|  1.79|        3.8|4.29|1.17|
|2000|     1.94|  1.86|   3.8|   3.45| 3.77| 3.65|  2.01|       4.18|4.58|1.51|
|2001|     1.71|  1.72|  3.51|    3.4| 3

In [None]:
## Filling the missing value
df_pyspark.na.fill(69.0).show()
#However, this will only apply to columns with same types.

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|1990|     69.0|  1.87|  3.63|   2.65| 4.59| 3.16|  69.0|       2.05|2.82|1.16|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|69.0|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|69.0|
|1995|     1.95|  1.53|  4.26|   3.96| 69.0| 4.43|  1.11|       2.94|3.21|1.15|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|
|1997|     2.05|  1.62|  69.0|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|1.06|
|1999|     1.72|  1.52|  3.85|   3.42| 3

In [None]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['USA', 'UK'],
    outputCols=["{}_imputed".format(c) for c in ['USA', 'UK']]
).setStrategy("mean")

In [None]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+-----------+----------+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|USA_imputed|UK_imputed|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+-----------+----------+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|  NULL|       2.05|2.82|1.16|       1.16|      2.82|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|       1.14|      3.01|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|       1.13|      3.06|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|NULL|   1.666875|      2.84|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|NULL|   1.666875|      2.99|
|1995|     1.95|  1.53|  4.26|   3.96| NULL| 4.43|  1.11|       2.94|3.21|1.15|       1.15|      3.21|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34

# Pyspark Dataframes
* Filter Operation
* &,|,==
* ~

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Filter").getOrCreate()

In [None]:
df_pyspark=spark.read.csv('/content/gas_prices.csv', header=True, inferSchema=True)
df_pyspark.show()

+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|Year|Australia|Canada|France|Germany|Italy|Japan|Mexico|South Korea|  UK| USA|
+----+---------+------+------+-------+-----+-----+------+-----------+----+----+
|1990|     NULL|  1.87|  3.63|   2.65| 4.59| 3.16|   1.0|       2.05|2.82|1.16|
|1991|     1.96|  1.92|  3.45|    2.9|  4.5| 3.46|   1.3|       2.49|3.01|1.14|
|1992|     1.89|  1.73|  3.56|   3.27| 4.53| 3.58|   1.5|       2.65|3.06|1.13|
|1993|     1.73|  1.57|  3.41|   3.07| 3.68| 4.16|  1.56|       2.88|2.84|1.11|
|1994|     1.84|  1.45|  3.59|   3.52|  3.7| 4.36|  1.48|       2.87|2.99|1.11|
|1995|     1.95|  1.53|  4.26|   3.96|  4.0| 4.43|  1.11|       2.94|3.21|1.15|
|1996|     2.12|  1.61|  4.41|   3.94| 4.39| 3.64|  1.25|       3.18|3.34|1.23|
|1997|     2.05|  1.62|   4.0|   3.53| 4.07| 3.26|  1.47|       3.34|3.83|1.23|
|1998|     1.63|  1.38|  3.87|   3.34| 3.84| 2.82|  1.49|       3.04|4.06|1.06|
|1999|     1.72|  1.52|  3.85|   3.42| 3

In [None]:
### Filter Operations

df_pyspark.filter(df_pyspark["Japan"]>=3).select(['Year','Japan']).show()
##Alternate
df_pyspark.select(['Year','Japan']).filter("Japan>=3").show()

+----+-----+
|Year|Japan|
+----+-----+
|1990| 3.16|
|1991| 3.46|
|1992| 3.58|
|1993| 4.16|
|1994| 4.36|
|1995| 4.43|
|1996| 3.64|
|1997| 3.26|
|1999| 3.27|
|2000| 3.65|
|2001| 3.27|
|2002| 3.15|
|2003| 3.47|
|2004| 3.93|
|2005| 4.28|
|2006| 4.47|
|2007| 4.49|
|2008| 5.74|
+----+-----+

+----+-----+
|Year|Japan|
+----+-----+
|1990| 3.16|
|1991| 3.46|
|1992| 3.58|
|1993| 4.16|
|1994| 4.36|
|1995| 4.43|
|1996| 3.64|
|1997| 3.26|
|1999| 3.27|
|2000| 3.65|
|2001| 3.27|
|2002| 3.15|
|2003| 3.47|
|2004| 3.93|
|2005| 4.28|
|2006| 4.47|
|2007| 4.49|
|2008| 5.74|
+----+-----+



In [None]:
df_pyspark.filter((df_pyspark["Japan"]>=3) & (df_pyspark["Japan"]<4) ).select(['Year','Japan']).show()

+----+-----+
|Year|Japan|
+----+-----+
|1990| 3.16|
|1991| 3.46|
|1992| 3.58|
|1996| 3.64|
|1997| 3.26|
|1999| 3.27|
|2000| 3.65|
|2001| 3.27|
|2002| 3.15|
|2003| 3.47|
|2004| 3.93|
+----+-----+



## Pyspark GroupBy And Aggregate Functions

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Filter").getOrCreate()

In [None]:
df_pyspark=spark.read.csv('/content/ComputerSales.csv', header=True, inferSchema=True)
df_pyspark.show()

+-------+---------------+---+---+-----+----------+------------+----------+------+-------+--------+----+
|Sale ID|        Contact|Sex|Age|State|Product ID|Product Type|Sale Price|Profit|   Lead|   Month|Year|
+-------+---------------+---+---+-----+----------+------------+----------+------+-------+--------+----+
|      1|    Paul Thomas|  M| 43|   OH| M01-F0024|     Desktop|    479.99|143.39|Website| January|2018|
|      2|    Margo Simms|  F| 37|   WV| GT13-0024|     Desktop|   1249.99|230.89|Flyer 4| January|2018|
|      3|      Sam Stine|  M| 26|   PA|     I3670|     Desktop|    649.99|118.64|Website|February|2018|
|      4|     Moe Eggert|  M| 35|   PA|     I3593|      Laptop|    399.99| 72.09|Website|   March|2018|
|      5|    Jessica Elk|  F| 55|   PA|    15M-ED|      Laptop|    699.99| 98.09|Flyer 4|   March|2018|
|      6|Sally Struthers|  F| 45|   PA| GT13-0024|     Desktop|   1249.99|230.89|Flyer 2|   April|2018|
|      7| Michelle Samms|  F| 46|   OH|   GA401IV|      Laptop| 

In [None]:
## Groupby
df_pyspark['Product Type','Sale Price','Profit'].groupBy('Product Type').sum().show()

+------------+------------------+------------------+
|Product Type|   sum(Sale Price)|       sum(Profit)|
+------------+------------------+------------------+
|      Laptop|56259.449999999975| 8644.450000000004|
|      Tablet|11649.869999999999|1836.9700000000005|
|     Desktop| 28859.66000000002| 5537.760000000003|
+------------+------------------+------------------+



In [None]:
df_pyspark['Product Type','Sale Price','Profit'].groupBy('Product Type').mean().show()

+------------+------------------+------------------+
|Product Type|   avg(Sale Price)|       avg(Profit)|
+------------+------------------+------------------+
|      Laptop|1022.8990909090904|157.17181818181825|
|      Tablet| 896.1438461538461|141.30538461538464|
|     Desktop| 848.8135294117653|162.87529411764714|
+------------+------------------+------------------+



In [None]:
df_pyspark.groupBy('Product Type').count().show()

+------------+-----+
|Product Type|count|
+------------+-----+
|      Laptop|   55|
|      Tablet|   13|
|     Desktop|   34|
+------------+-----+



In [None]:
df_pyspark['Product Type','Sale Price','Profit'].agg({'Sale Price':'sum','Profit':'mean'}).show()

+-----------------+------------------+
|  sum(Sale Price)|       avg(Profit)|
+-----------------+------------------+
|96768.98000000013|157.05078431372553|
+-----------------+------------------+

