<a href="https://colab.research.google.com/github/chinhegde/Analysis-on-Enron-Email-Dataset/blob/main/PySpark_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Exploration
As followed in the freeCodeCamp tutorial: https://youtu.be/_C8kWso4ne4?si=sK4kKuY1Pmg0GMmR

## Installation and import

In [17]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=06393bd1489011f33cdad7ef6b6c144f043109a4c3c8e5a533f01cb397777859
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [18]:
import pyspark
import csv
import pandas as pd
from pyspark.sql import SparkSession

In [19]:
# Creating a sample CSV file

data = [
    ["Name", "Age", "Experience", "Salary"],
    ["Chinmayi", 31, 10, 30000],
    ["Sarah", 30, 8, 25000],
    ["Sunny", 29, 4, 20000],
    ["Paul", 24, 3, 20000],
    ["John", 21, 1, 15000],
    ["Jennie", 23, 2, 18000]
]

csv_file_name = "test1.csv"

with open(csv_file_name, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(data)

In [20]:
type(pd.read_csv('test1.csv'))

pandas.core.frame.DataFrame

## Create Spark session

In [21]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [22]:
spark

## CSV/Dataframe Manipulation

### Read CSV

In [23]:
df_pyspark = spark.read.csv('test1.csv')

In [24]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [25]:
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

In [26]:
df_pyspark.show()

+--------+---+----------+------+
|     _c0|_c1|       _c2|   _c3|
+--------+---+----------+------+
|    Name|Age|Experience|Salary|
|Chinmayi| 31|        10| 30000|
|   Sarah| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|    John| 21|         1| 15000|
|  Jennie| 23|         2| 18000|
+--------+---+----------+------+



In [27]:
df_pyspark2 = spark.read.option('header', 'true').csv('test1.csv')
df_pyspark2.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Chinmayi| 31|        10| 30000|
|   Sarah| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|    John| 21|         1| 15000|
|  Jennie| 23|         2| 18000|
+--------+---+----------+------+



In [28]:
type(df_pyspark2)

pyspark.sql.dataframe.DataFrame

In [29]:
df_pyspark2.head()

Row(Name='Chinmayi', Age='31', Experience='10', Salary='30000')

In [30]:
df_pyspark2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [31]:
df_pyspark3 = spark.read.option('header', 'true').csv('test1.csv', inferSchema = True)
df_pyspark3.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [32]:
df_pyspark3.head(3)

[Row(Name='Chinmayi', Age=31, Experience=10, Salary=30000),
 Row(Name='Sarah', Age=30, Experience=8, Salary=25000),
 Row(Name='Sunny', Age=29, Experience=4, Salary=20000)]

In [33]:
df_pyspark3.select('Name')

DataFrame[Name: string]

In [34]:
df_pyspark3.select('Name').show()

+--------+
|    Name|
+--------+
|Chinmayi|
|   Sarah|
|   Sunny|
|    Paul|
|    John|
|  Jennie|
+--------+



In [35]:
df_pyspark3.select(['Name', 'age']).show()

+--------+---+
|    Name|age|
+--------+---+
|Chinmayi| 31|
|   Sarah| 30|
|   Sunny| 29|
|    Paul| 24|
|    John| 21|
|  Jennie| 23|
+--------+---+



In [36]:
df_pyspark3['Name']

Column<'Name'>

In [37]:
df_pyspark3.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [38]:
df_pyspark3.describe().show()

+-------+--------+------------------+-----------------+------------------+
|summary|    Name|               Age|       Experience|            Salary|
+-------+--------+------------------+-----------------+------------------+
|  count|       6|                 6|                6|                 6|
|   mean|    NULL|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|    NULL| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Chinmayi|                21|                1|             15000|
|    max|   Sunny|                31|               10|             30000|
+-------+--------+------------------+-----------------+------------------+



### Add/drop columns

In [39]:
# Adding columns

df_pyspark3.withColumn('Experience after 2 years', df_pyspark3['Experience']+2).show()

+--------+---+----------+------+------------------------+
|    Name|Age|Experience|Salary|Experience after 2 years|
+--------+---+----------+------+------------------------+
|Chinmayi| 31|        10| 30000|                      12|
|   Sarah| 30|         8| 25000|                      10|
|   Sunny| 29|         4| 20000|                       6|
|    Paul| 24|         3| 20000|                       5|
|    John| 21|         1| 15000|                       3|
|  Jennie| 23|         2| 18000|                       4|
+--------+---+----------+------+------------------------+



In [40]:
df_pyspark3.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Chinmayi| 31|        10| 30000|
|   Sarah| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|    John| 21|         1| 15000|
|  Jennie| 23|         2| 18000|
+--------+---+----------+------+



In [41]:
df_pyspark3 = df_pyspark3.withColumn('Experience after 2 years', df_pyspark3['Experience']+2)
df_pyspark3.show()

+--------+---+----------+------+------------------------+
|    Name|Age|Experience|Salary|Experience after 2 years|
+--------+---+----------+------+------------------------+
|Chinmayi| 31|        10| 30000|                      12|
|   Sarah| 30|         8| 25000|                      10|
|   Sunny| 29|         4| 20000|                       6|
|    Paul| 24|         3| 20000|                       5|
|    John| 21|         1| 15000|                       3|
|  Jennie| 23|         2| 18000|                       4|
+--------+---+----------+------+------------------------+



In [42]:
df_pyspark3 = df_pyspark3.drop('Experience after 2 years')
df_pyspark3.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Chinmayi| 31|        10| 30000|
|   Sarah| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|    John| 21|         1| 15000|
|  Jennie| 23|         2| 18000|
+--------+---+----------+------+



In [43]:
# Rename column

df_pyspark3.withColumnRenamed('Name', 'New Name').show()

+--------+---+----------+------+
|New Name|Age|Experience|Salary|
+--------+---+----------+------+
|Chinmayi| 31|        10| 30000|
|   Sarah| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|    John| 21|         1| 15000|
|  Jennie| 23|         2| 18000|
+--------+---+----------+------+



### Handling missing values

In [44]:
# Creating sample dataset no. 2

data = [
    ["Name", "age", "Experience", "Salary"],
    ["Krish", 31, 10, 30000],
    ["Sudhanshu", 30, 8, 25000],
    ["Sunny", 29, 4, 20000],
    ["Paul", 24, 3, 20000],
    ["Harsha", 21, 1, 15000],
    ["Shubham", 23, 2, 18000],
    ["Mahesh", None, None, 40000],
    [None, 34, 10, 38000],
    [None, 36, None, None]
]

csv_file_name = "test2.csv"

with open(csv_file_name, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(data)

In [45]:
df_pyspark = spark.read.option('header', 'true').csv('test2.csv', inferSchema = True)

In [46]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [47]:
df_pyspark.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [48]:
df_pyspark.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [49]:
df_pyspark.na.drop(how = 'all').show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [50]:
# dropping with threshold

df_pyspark.na.drop(how = 'any', thresh = 1).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [51]:
df_pyspark.na.drop(how = 'any', thresh = 2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
+---------+----+----------+------+



In [52]:
df_pyspark.na.drop(how = 'any', thresh = 3).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 34|        10| 38000|
+---------+---+----------+------+



In [53]:
# drop with subset
df_pyspark.na.drop(how = 'any', subset = ['Experience']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 34|        10| 38000|
+---------+---+----------+------+



### Fill missing values

In [54]:
df_pyspark.na.fill('MISSING VALUES').show()

+--------------+----+----------+------+
|          Name| age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|     Sudhanshu|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|       Shubham|  23|         2| 18000|
|        Mahesh|NULL|      NULL| 40000|
|MISSING VALUES|  34|        10| 38000|
|MISSING VALUES|  36|      NULL|  NULL|
+--------------+----+----------+------+



In [55]:
df_pyspark.na.fill('MISSING VALUES', ['Experience','age']).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [56]:
df_pyspark.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [57]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")

In [58]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|NULL|      NULL| 40000|         29|                 4|         40000|
|     NULL|  34|        10| 38000|         34|                10|         38000|
|     NULL|  36|      NULL|  NULL|         36|                 4|         20000|
+---------+----+----------+-

### Filter Operation