<a href="https://colab.research.google.com/github/augustine-uba1/databricks_machineLearning/blob/main/PySPark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Introduction

In [2]:
## Install pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=ed3d3a12b72723e70e1ecad6d7143e93fd77391e9c296f747dfa43883afbd9a8
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


**Import pyspark**

In [3]:
import pyspark

In [4]:
## import the SparkSession class from the pyspark.sql module. The SparkSession class is the entry point for working with Spark SQL in PySpark
##create a SparkSession object named spark, In the below example, the appName parameter is set to 'Sample'
##getOrCreate method checks if a SparkSession already exists. If it does, it returns the existing SparkSession; otherwise, it creates a new one

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Sample').getOrCreate()

In [10]:
spark

In [35]:
## read a csv file into a pyspark dataframe. You can specify the read format, csv, parquet, table.. see documentation for more read formats
pyspark_df = spark.read.option('header', 'true').csv('users1.csv')

In [8]:
pyspark_df.show()

+--------+----+----------+------+
|Username| Age|experience|income|
+--------+----+----------+------+
|     Sam|  54|        15| 50000|
|  Justin|  31|        12| 35000|
|    Phil|  20|         5| 25000|
|  Roland|  25|         7| 40000|
|   Lucas|null|      null| 50000|
|    null|  56|         5|  null|
|    null|  32|      null|  null|
+--------+----+----------+------+



In [9]:
type(pyspark_df)

pyspark.sql.dataframe.DataFrame

In [10]:
pyspark_df.dtypes

[('Username', 'string'),
 ('Age', 'string'),
 ('experience', 'string'),
 ('income', 'string')]

In [11]:
pyspark_df.printSchema()

root
 |-- Username: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- experience: string (nullable = true)
 |-- income: string (nullable = true)



In [12]:
pyspark_df.head(5)

[Row(Username='Sam', Age='54', experience='15', income='50000'),
 Row(Username='Justin', Age='31', experience='12', income='35000'),
 Row(Username='Phil', Age='20', experience='5', income='25000'),
 Row(Username='Roland', Age='25', experience='7', income='40000'),
 Row(Username='Lucas', Age=None, experience=None, income='50000')]

# Data Wrangling - Dataframe operations
*   viewing datatype schema
*   returning specific columns and column indexing
*   adding, dropping and renaming columns



In [22]:
## check datatype schema
pyspark_df.printSchema()

root
 |-- Username: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- income: integer (nullable = true)



In [54]:
## here we specify the data types of the columns
pyspark_df = spark.read.csv('users1.csv', header=True, inferSchema = True)
pyspark_df.show()

+--------+----+----------+------+
|Username| Age|experience|income|
+--------+----+----------+------+
|     Sam|  54|        15| 50000|
|  Justin|  31|        12| 35000|
|    Phil|  20|         5| 25000|
|  Roland|  25|         7| 40000|
|   Lucas|null|      null| 50000|
|    null|  56|         5|  null|
|    null|  32|      null|  null|
+--------+----+----------+------+



In [55]:
## check datatype schema
pyspark_df.printSchema()

root
 |-- Username: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- income: integer (nullable = true)



In [25]:
## selecting multiple columns
pyspark_df.select(['Username', 'experience']).show()

+--------+----------+
|Username|experience|
+--------+----------+
|     Sam|        15|
|  Justin|        12|
|    Phil|         5|
|  Roland|         7|
|   Lucas|      null|
|    null|         5|
|    null|      null|
+--------+----------+



In [26]:
##indexing a column
indexed_col = pyspark_df.select(pyspark_df[ 'experience',][0].alias("indexed_col"))
indexed_col.show()

+-----------+
|indexed_col|
+-----------+
|         15|
|         12|
|          5|
|          7|
|       null|
|          5|
|       null|
+-----------+



In [32]:
## describing (summarysing) values in a dataframe
pyspark_df.describe().show()

+-------+--------+------------------+-----------------+------------------+
|summary|Username|               Age|       experience|            income|
+-------+--------+------------------+-----------------+------------------+
|  count|       5|                 6|                5|                 5|
|   mean|    null|36.333333333333336|              8.8|           40000.0|
| stddev|    null|15.108496505829647|4.494441010848846|10606.601717798214|
|    min|  Justin|                20|                5|             25000|
|    max|     Sam|                56|               15|             50000|
+-------+--------+------------------+-----------------+------------------+



In [31]:
## adding columns in dataframes
pyspark_df_add = pyspark_df.withColumn('experience after 10 years', pyspark_df['experience'] + 10)
pyspark_df.show()

+--------+----+----------+------+
|Username| Age|experience|income|
+--------+----+----------+------+
|     Sam|  54|        15| 50000|
|  Justin|  31|        12| 35000|
|    Phil|  20|         5| 25000|
|  Roland|  25|         7| 40000|
|   Lucas|null|      null| 50000|
|    null|  56|         5|  null|
|    null|  32|      null|  null|
+--------+----+----------+------+



In [33]:
## droping a column
pyspark_df_drop = pyspark_df.drop('experience after 10 years')
pyspark_df.show()

+--------+----+----------+------+
|Username| Age|experience|income|
+--------+----+----------+------+
|     Sam|  54|        15| 50000|
|  Justin|  31|        12| 35000|
|    Phil|  20|         5| 25000|
|  Roland|  25|         7| 40000|
|   Lucas|null|      null| 50000|
|    null|  56|         5|  null|
|    null|  32|      null|  null|
+--------+----+----------+------+



In [34]:
## Rename a column
pyspark_df.withColumnRenamed('experience', 'working years').show()

+--------+----+-------------+------+
|Username| Age|working years|income|
+--------+----+-------------+------+
|     Sam|  54|           15| 50000|
|  Justin|  31|           12| 35000|
|    Phil|  20|            5| 25000|
|  Roland|  25|            7| 40000|
|   Lucas|null|         null| 50000|
|    null|  56|            5|  null|
|    null|  32|         null|  null|
+--------+----+-------------+------+



Data Wrangling2 - Dataframe Opertaions


*   Droping columns, Droping rows, parameters in dropping functionalities
*   Handling missing values using mean, median, mode



# Data Wrangling 2 - Dataframes operations


*   Drop columns, drop rows, parameters in dropping functionalities
*   Handling missing values by mean, median and mode




In [39]:
pyspark_df.describe().show()

+-------+--------+------------------+-----------------+------------------+
|summary|Username|               Age|       experience|            income|
+-------+--------+------------------+-----------------+------------------+
|  count|       5|                 6|                5|                 5|
|   mean|    null|36.333333333333336|              8.8|           40000.0|
| stddev|    null|15.108496505829647|4.494441010848846|10606.601717798214|
|    min|  Justin|                20|               12|             25000|
|    max|     Sam|                56|                7|             50000|
+-------+--------+------------------+-----------------+------------------+



In [41]:
## droping rows with null values (by default 'any' is set as all if no values are specified)

pyspark_df.na.drop(how="any").show()

+--------+---+----------+------+
|Username|Age|experience|income|
+--------+---+----------+------+
|     Sam| 54|        15| 50000|
|  Justin| 31|        12| 35000|
|    Phil| 20|         5| 25000|
|  Roland| 25|         7| 40000|
+--------+---+----------+------+



In [42]:
## droping rows with null values using all (drops only rows where all values are null)
pyspark_df.na.drop(how="all").show()

+--------+----+----------+------+
|Username| Age|experience|income|
+--------+----+----------+------+
|     Sam|  54|        15| 50000|
|  Justin|  31|        12| 35000|
|    Phil|  20|         5| 25000|
|  Roland|  25|         7| 40000|
|   Lucas|null|      null| 50000|
|    null|  56|         5|  null|
|    null|  32|      null|  null|
+--------+----+----------+------+



In [43]:
## droping rows with null values (thresh=2 ommits rows where there are at least two values in the entire column)
pyspark_df.na.drop(how="any", thresh=2).show()

+--------+----+----------+------+
|Username| Age|experience|income|
+--------+----+----------+------+
|     Sam|  54|        15| 50000|
|  Justin|  31|        12| 35000|
|    Phil|  20|         5| 25000|
|  Roland|  25|         7| 40000|
|   Lucas|null|      null| 50000|
|    null|  56|         5|  null|
+--------+----+----------+------+



In [44]:
## using subset - removes only null values in specified rows
pyspark_df.na.drop(subset=['experience']).show()

+--------+---+----------+------+
|Username|Age|experience|income|
+--------+---+----------+------+
|     Sam| 54|        15| 50000|
|  Justin| 31|        12| 35000|
|    Phil| 20|         5| 25000|
|  Roland| 25|         7| 40000|
|    null| 56|         5|  null|
+--------+---+----------+------+



In [46]:
## replacing all missing values, if columns are not specified, missing values in all columns are replaced
pyspark_df.na.fill('missing value', ['age', 'experience']).show()

+--------+-------------+-------------+------+
|Username|          Age|   experience|income|
+--------+-------------+-------------+------+
|     Sam|           54|           15| 50000|
|  Justin|           31|           12| 35000|
|    Phil|           20|            5| 25000|
|  Roland|           25|            7| 40000|
|   Lucas|missing value|missing value| 50000|
|    null|           56|            5|  null|
|    null|           32|missing value|  null|
+--------+-------------+-------------+------+



In [56]:
## replacing all missing values with row operations, here we make use of the inputer function imported from pyspark, and the format() Python method
from pyspark.ml.feature import Imputer

imputer = Imputer(
                  inputCols=['Age', 'experience', 'income'],
                  outputCols=["{}_input_value".format(c) for c in ['Age', 'experience', 'income']]
                ).setStrategy("mean")

In [60]:
## add the new columns to the dataframe and displays the new dataframe
new_imput =imputer.fit(pyspark_df).transform(pyspark_df)
new_imput.show()

+--------+----+----------+------+---------------+----------------------+------------------+
|Username| Age|experience|income|Age_input_value|experience_input_value|income_input_value|
+--------+----+----------+------+---------------+----------------------+------------------+
|     Sam|  54|        15| 50000|             54|                    15|             50000|
|  Justin|  31|        12| 35000|             31|                    12|             35000|
|    Phil|  20|         5| 25000|             20|                     5|             25000|
|  Roland|  25|         7| 40000|             25|                     7|             40000|
|   Lucas|null|      null| 50000|             36|                     8|             50000|
|    null|  56|         5|  null|             56|                     5|             40000|
|    null|  32|      null|  null|             32|                     8|             40000|
+--------+----+----------+------+---------------+----------------------+--------

In [63]:
new_pyspark_df = new_imput.drop('Age', 'experience', 'income')
new_pyspark_df.show()

+--------+---------------+----------------------+------------------+
|Username|Age_input_value|experience_input_value|income_input_value|
+--------+---------------+----------------------+------------------+
|     Sam|             54|                    15|             50000|
|  Justin|             31|                    12|             35000|
|    Phil|             20|                     5|             25000|
|  Roland|             25|                     7|             40000|
|   Lucas|             36|                     8|             50000|
|    null|             56|                     5|             40000|
|    null|             32|                     8|             40000|
+--------+---------------+----------------------+------------------+



In [69]:
## filter operations - without uing the select method all columns in the dataframe are returned
new_pyspark_df.filter("income_input_value >= 35000" ).select(['Username', 'income_input_Value']).show()

+--------+------------------+
|Username|income_input_Value|
+--------+------------------+
|     Sam|             50000|
|  Justin|             35000|
|  Roland|             40000|
|   Lucas|             50000|
|    null|             40000|
|    null|             40000|
+--------+------------------+



In [76]:
## using &, |
new_pyspark_df.filter((new_pyspark_df['income_input_Value'] > 35000)  |
                       (new_pyspark_df['Age_input_value'] > 30) ).select(['Username', 'income_input_Value']).show()

+--------+------------------+
|Username|income_input_Value|
+--------+------------------+
|     Sam|             50000|
|  Justin|             35000|
|  Roland|             40000|
|   Lucas|             50000|
|    null|             40000|
|    null|             40000|
+--------+------------------+



In [79]:
## using  ~
new_pyspark_df.filter(~(new_pyspark_df['income_input_Value'] > 35000)).select(['Username', 'income_input_Value']).show()

+--------+------------------+
|Username|income_input_Value|
+--------+------------------+
|  Justin|             35000|
|    Phil|             25000|
+--------+------------------+



# Grouping and aggregation of dataframes

In [93]:
department_df = spark.read.csv('departments.csv', header=True, inferSchema = True)
department_df.printSchema()

root
 |-- Username: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [89]:
## group by name and the sum aggregate
department_df.groupBy('Username').sum().show()

+--------+-----------+
|Username|sum(Salary)|
+--------+-----------+
|    Kate|      75000|
|    Jack|      41000|
|   Lewis|      96000|
|    John|      71000|
|     Sam|      36000|
|  Olivia|      60000|
+--------+-----------+



In [86]:
## group by column (department) and the mean aggregate
department_df.groupBy('department').mean().show()

+----------------+------------------+
|      department|       avg(Salary)|
+----------------+------------------+
|             IOT|32333.333333333332|
|        big data|           42000.0|
|machine learning|           37500.0|
|data engineering|           40500.0|
+----------------+------------------+



In [87]:
## group by column (department) and the count aggregate
department_df.groupBy('department').count().show()

+----------------+-----+
|      department|count|
+----------------+-----+
|             IOT|    3|
|        big data|    3|
|machine learning|    2|
|data engineering|    2|
+----------------+-----+



In [91]:
## agg function - Aggregate on the entire DataFrame without group
department_df.agg({"Salary": "max"}).show()

+-----------+
|max(Salary)|
+-----------+
|      56000|
+-----------+

