# Getting Started with PySpark 

In [2]:
!pip install pyspark



In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

### Creating Spark Session

In [5]:
spark = SparkSession.builder.appName("Practice").getOrCreate()

### Multiple Ways To Create Data Frame

We can create the Data Frame using tuple & dictionary exploring the same

### 01

In [6]:
data = [(1, 'Akshay'), (2,'Amarnath')]
schema = ['id', 'name']

demo_df_01 = spark.createDataFrame(data, schema)

demo_df_01.show()

demo_df_01.printSchema()

+---+--------+
| id|    name|
+---+--------+
|  1|  Akshay|
|  2|Amarnath|
+---+--------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



### 02

In [7]:
data = [{'id' : 1, 'name':'Akshay'}, {'id' : 2, 'name' :'Amarnath'}]

demo_df_02 = spark.createDataFrame(data)

demo_df_02.show()

demo_df_02.printSchema()

+---+--------+
| id|    name|
+---+--------+
|  1|  Akshay|
|  2|Amarnath|
+---+--------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



### 03

In [8]:
from pyspark.sql.types import *

data = [{'id' : 1, 'name':'Akshay'}, {'id' : 2, 'name' :'Amarnath'}]


schema = StructType([
    StructField(name = 'id', dataType = IntegerType()),
     StructField(name = 'name', dataType = StringType())
])

demo_df_03 = spark.createDataFrame(data, schema)

demo_df_03.show()

demo_df_03.printSchema()

+---+--------+
| id|    name|
+---+--------+
|  1|  Akshay|
|  2|Amarnath|
+---+--------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



### Reading CSV File

In [9]:
df = spark.read.csv(r"Sample Data\Sample_CSV_Data_01.csv", header = 'true')

In [10]:
df.show()

+------+---+------+----------+
|  Name|Age|Salary|      Dept|
+------+---+------+----------+
|Akshay| 23| 50000|        IT|
|  Amar| 18| 10000|        BE|
|Adarsh| 17|  5000|   Diploma|
|Ganesh| 16|  1000|Highschool|
|  Alok| 11|   500|    School|
+------+---+------+----------+



### Exploring CSV options 

In [11]:
# Without inferSchema option I can see all the column data type as a String

df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Dept: string (nullable = true)



In [12]:
df = spark.read.csv(r"Sample Data\Sample_CSV_Data_01.csv", header = 'true', inferSchema = 'true')

In [13]:
# With inferSchema option I can see all the column data type as expected

df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Dept: string (nullable = true)



### Select statement

There are multiple ways to access the values from Data Frame exploring the same 

#### Method 01

In [14]:
df_single_col = df.select('Name')

In [15]:
df_single_col.show()

+------+
|  Name|
+------+
|Akshay|
|  Amar|
|Adarsh|
|Ganesh|
|  Alok|
+------+



#### Method 02

In [16]:
df_single_col = df.select(df['Name']).show()

+------+
|  Name|
+------+
|Akshay|
|  Amar|
|Adarsh|
|Ganesh|
|  Alok|
+------+



#### Method 03

In [17]:
df_single_col = df.select(df.Name).show()

+------+
|  Name|
+------+
|Akshay|
|  Amar|
|Adarsh|
|Ganesh|
|  Alok|
+------+



### Column Renaming 

In [18]:
df_rename_col = df.withColumnRenamed('Name', 'Brothers')

In [19]:
df_rename_col.show()

+--------+---+------+----------+
|Brothers|Age|Salary|      Dept|
+--------+---+------+----------+
|  Akshay| 23| 50000|        IT|
|    Amar| 18| 10000|        BE|
|  Adarsh| 17|  5000|   Diploma|
|  Ganesh| 16|  1000|Highschool|
|    Alok| 11|   500|    School|
+--------+---+------+----------+



### Adding New column

In [20]:
df_new_col = df.withColumn('SalaryIncrement',df['Salary']+1000)

In [21]:
df_new_col.show()

+------+---+------+----------+---------------+
|  Name|Age|Salary|      Dept|SalaryIncrement|
+------+---+------+----------+---------------+
|Akshay| 23| 50000|        IT|          51000|
|  Amar| 18| 10000|        BE|          11000|
|Adarsh| 17|  5000|   Diploma|           6000|
|Ganesh| 16|  1000|Highschool|           2000|
|  Alok| 11|   500|    School|           1500|
+------+---+------+----------+---------------+



In [22]:
df.show()

+------+---+------+----------+
|  Name|Age|Salary|      Dept|
+------+---+------+----------+
|Akshay| 23| 50000|        IT|
|  Amar| 18| 10000|        BE|
|Adarsh| 17|  5000|   Diploma|
|Ganesh| 16|  1000|Highschool|
|  Alok| 11|   500|    School|
+------+---+------+----------+



In [23]:
from pyspark.sql.functions import col

cop_df = df_new_col.withColumn('Copied_column', col('Salary'))

cop_df.show()

+------+---+------+----------+---------------+-------------+
|  Name|Age|Salary|      Dept|SalaryIncrement|Copied_column|
+------+---+------+----------+---------------+-------------+
|Akshay| 23| 50000|        IT|          51000|        50000|
|  Amar| 18| 10000|        BE|          11000|        10000|
|Adarsh| 17|  5000|   Diploma|           6000|         5000|
|Ganesh| 16|  1000|Highschool|           2000|         1000|
|  Alok| 11|   500|    School|           1500|          500|
+------+---+------+----------+---------------+-------------+



### Dropping the column

In [24]:
df_new_col.show()

+------+---+------+----------+---------------+
|  Name|Age|Salary|      Dept|SalaryIncrement|
+------+---+------+----------+---------------+
|Akshay| 23| 50000|        IT|          51000|
|  Amar| 18| 10000|        BE|          11000|
|Adarsh| 17|  5000|   Diploma|           6000|
|Ganesh| 16|  1000|Highschool|           2000|
|  Alok| 11|   500|    School|           1500|
+------+---+------+----------+---------------+



In [25]:
df_new_col = df_new_col.drop('SalaryIncrement')

In [26]:
df_new_col.show()

+------+---+------+----------+
|  Name|Age|Salary|      Dept|
+------+---+------+----------+
|Akshay| 23| 50000|        IT|
|  Amar| 18| 10000|        BE|
|Adarsh| 17|  5000|   Diploma|
|Ganesh| 16|  1000|Highschool|
|  Alok| 11|   500|    School|
+------+---+------+----------+



Note: 
1. Adding new column or dropping column or Renaming column is not a in-place operation that means these operation won't get effect to the org DF 
2. If you want to make changes in org DF you should assign the changes to the org DF variable for Example see above operation i.e. df_new_col

## Handling Null Data

In [27]:
df_null_prac = \
spark.read \
.csv(r"Sample Data\Sample_CSV_Data_02.csv", header = 'true', inferSchema = 'true')

In [28]:
df_null_prac.show()

+--------+----+-------------+------+
|    Name| Age|         City|Salary|
+--------+----+-------------+------+
|    John|  30|     New York| 70000|
|    Anna|  35|San Francisco| 80000|
|  Robert|  40|  Los Angeles| 90000|
|   Julia|  25|      Chicago| 60000|
| Michael|  45|      Houston|100000|
|   Emily|  28|       Boston| 65000|
|   David|  33|      Seattle| 75000|
|  Akshay|NULL|       Sidnal|  NULL|
|    NULL|NULL|         NULL|100000|
|Amarnath|  18|        Akkol|  NULL|
+--------+----+-------------+------+



### Remove All The Null Values From The DF 

In [29]:
df_null_prac.na.drop().show()

+-------+---+-------------+------+
|   Name|Age|         City|Salary|
+-------+---+-------------+------+
|   John| 30|     New York| 70000|
|   Anna| 35|San Francisco| 80000|
| Robert| 40|  Los Angeles| 90000|
|  Julia| 25|      Chicago| 60000|
|Michael| 45|      Houston|100000|
|  Emily| 28|       Boston| 65000|
|  David| 33|      Seattle| 75000|
+-------+---+-------------+------+



### Exploring Drop Function Parameters
- how
- thresh
- subset

### 1. How

It has two options 
- any : this will drop the perticular row if one of the column consist of Null value
- all : this will drop the row if all the raw data is null

In [30]:
df_null_prac.na.drop(how = "any").show()

+-------+---+-------------+------+
|   Name|Age|         City|Salary|
+-------+---+-------------+------+
|   John| 30|     New York| 70000|
|   Anna| 35|San Francisco| 80000|
| Robert| 40|  Los Angeles| 90000|
|  Julia| 25|      Chicago| 60000|
|Michael| 45|      Houston|100000|
|  Emily| 28|       Boston| 65000|
|  David| 33|      Seattle| 75000|
+-------+---+-------------+------+



In [31]:
df_null_prac.na.drop(how = "all").show()

+--------+----+-------------+------+
|    Name| Age|         City|Salary|
+--------+----+-------------+------+
|    John|  30|     New York| 70000|
|    Anna|  35|San Francisco| 80000|
|  Robert|  40|  Los Angeles| 90000|
|   Julia|  25|      Chicago| 60000|
| Michael|  45|      Houston|100000|
|   Emily|  28|       Boston| 65000|
|   David|  33|      Seattle| 75000|
|  Akshay|NULL|       Sidnal|  NULL|
|    NULL|NULL|         NULL|100000|
|Amarnath|  18|        Akkol|  NULL|
+--------+----+-------------+------+



### 2. Threshold

- This will delete the row based on minimum threshold value mentioned that is Non null value count 

In [32]:
df_null_prac.na.drop(how = "any", thresh = 2).show()

+--------+----+-------------+------+
|    Name| Age|         City|Salary|
+--------+----+-------------+------+
|    John|  30|     New York| 70000|
|    Anna|  35|San Francisco| 80000|
|  Robert|  40|  Los Angeles| 90000|
|   Julia|  25|      Chicago| 60000|
| Michael|  45|      Houston|100000|
|   Emily|  28|       Boston| 65000|
|   David|  33|      Seattle| 75000|
|  Akshay|NULL|       Sidnal|  NULL|
|Amarnath|  18|        Akkol|  NULL|
+--------+----+-------------+------+



Explanation : As you can see, rows where a non-null value appeared in only one column, with all other columns being null, were deleted based on a threshold value of 2.

In [33]:
df_null_prac.na.drop(how = "any", thresh = 3).show()

+--------+---+-------------+------+
|    Name|Age|         City|Salary|
+--------+---+-------------+------+
|    John| 30|     New York| 70000|
|    Anna| 35|San Francisco| 80000|
|  Robert| 40|  Los Angeles| 90000|
|   Julia| 25|      Chicago| 60000|
| Michael| 45|      Houston|100000|
|   Emily| 28|       Boston| 65000|
|   David| 33|      Seattle| 75000|
|Amarnath| 18|        Akkol|  NULL|
+--------+---+-------------+------+



Explanation : As you can see, rows where a non-null value appeared in one and two column, with all other columns being null, were deleted based on a threshold value of 3.

### 3. Subset

- This will remove rows from a specific column where a null value is present.

In [34]:
df_null_prac.na.drop(how = "any", subset=['City', 'Salary']).show()

+-------+---+-------------+------+
|   Name|Age|         City|Salary|
+-------+---+-------------+------+
|   John| 30|     New York| 70000|
|   Anna| 35|San Francisco| 80000|
| Robert| 40|  Los Angeles| 90000|
|  Julia| 25|      Chicago| 60000|
|Michael| 45|      Houston|100000|
|  Emily| 28|       Boston| 65000|
|  David| 33|      Seattle| 75000|
+-------+---+-------------+------+



## Filter 

- ==
- &
- |
- ~


In [35]:
df_filter_prac = df_null_prac.na.drop()

In [36]:
df_filter_prac.show()

+-------+---+-------------+------+
|   Name|Age|         City|Salary|
+-------+---+-------------+------+
|   John| 30|     New York| 70000|
|   Anna| 35|San Francisco| 80000|
| Robert| 40|  Los Angeles| 90000|
|  Julia| 25|      Chicago| 60000|
|Michael| 45|      Houston|100000|
|  Emily| 28|       Boston| 65000|
|  David| 33|      Seattle| 75000|
+-------+---+-------------+------+



In [37]:
df_filter_prac.filter(df_filter_prac['Age']<=30).show()

+-----+---+--------+------+
| Name|Age|    City|Salary|
+-----+---+--------+------+
| John| 30|New York| 70000|
|Julia| 25| Chicago| 60000|
|Emily| 28|  Boston| 65000|
+-----+---+--------+------+



In [38]:
df_filter_prac.filter((df_filter_prac['Age']<=30) & (df_filter_prac['Salary']<=60000)).show()

+-----+---+-------+------+
| Name|Age|   City|Salary|
+-----+---+-------+------+
|Julia| 25|Chicago| 60000|
+-----+---+-------+------+



In [39]:
df_filter_prac.filter((df_filter_prac['Age']<=30) | (df_filter_prac['Salary']<=60000)).show()

+-----+---+--------+------+
| Name|Age|    City|Salary|
+-----+---+--------+------+
| John| 30|New York| 70000|
|Julia| 25| Chicago| 60000|
|Emily| 28|  Boston| 65000|
+-----+---+--------+------+



In [40]:
df_filter_prac.filter(~(df_filter_prac['Age']<=30)).show()

+-------+---+-------------+------+
|   Name|Age|         City|Salary|
+-------+---+-------------+------+
|   Anna| 35|San Francisco| 80000|
| Robert| 40|  Los Angeles| 90000|
|Michael| 45|      Houston|100000|
|  David| 33|      Seattle| 75000|
+-------+---+-------------+------+



## Group By & Aggregating Function

1. Group by function will group the result based on column you pass and on top of it we can use the Aggregate function

In [41]:
df_agg_prac = spark.read.csv(r"Sample Data\Sample_CSV_Data_03.csv", header=True, inferSchema = True)

In [42]:
df_agg_prac.show()

+--------+----+-------------+------+
|    Name| Age|         City|Salary|
+--------+----+-------------+------+
|    John|  30|     New York| 70000|
|    Anna|  35|San Francisco| 80000|
|  Robert|  40|     New York| 90000|
|   Julia|  25|      Chicago| 60000|
| Michael|  45|      Houston|100000|
|   Emily|  28|       Boston| 65000|
|   David|  33|      Seattle| 75000|
|  Akshay|NULL|      Chicago|  NULL|
|    Anna|  30|      Chicago|100000|
|Amarnath|  18|      Houston|  2000|
|  Robert|  25|     New York| 80000|
+--------+----+-------------+------+



In [43]:
df_agg_prac.groupBy('City').sum('Salary').show()

+-------------+-----------+
|         City|sum(Salary)|
+-------------+-----------+
|San Francisco|      80000|
|      Chicago|     160000|
|      Seattle|      75000|
|      Houston|     102000|
|     New York|     240000|
|       Boston|      65000|
+-------------+-----------+



In [44]:
df_agg_prac.groupBy('City').max('Salary').show()

+-------------+-----------+
|         City|max(Salary)|
+-------------+-----------+
|San Francisco|      80000|
|      Chicago|     100000|
|      Seattle|      75000|
|      Houston|     100000|
|     New York|      90000|
|       Boston|      65000|
+-------------+-----------+



In [45]:
df_agg_prac.groupBy('City').min('Salary').show()

+-------------+-----------+
|         City|min(Salary)|
+-------------+-----------+
|San Francisco|      80000|
|      Chicago|      60000|
|      Seattle|      75000|
|      Houston|       2000|
|     New York|      70000|
|       Boston|      65000|
+-------------+-----------+



In [46]:
df_agg_prac.groupBy('City').mean('Salary').show()

+-------------+-----------+
|         City|avg(Salary)|
+-------------+-----------+
|San Francisco|    80000.0|
|      Chicago|    80000.0|
|      Seattle|    75000.0|
|      Houston|    51000.0|
|     New York|    80000.0|
|       Boston|    65000.0|
+-------------+-----------+



In [47]:
df_agg_prac.groupBy('City').avg('Salary').show()

+-------------+-----------+
|         City|avg(Salary)|
+-------------+-----------+
|San Francisco|    80000.0|
|      Chicago|    80000.0|
|      Seattle|    75000.0|
|      Houston|    51000.0|
|     New York|    80000.0|
|       Boston|    65000.0|
+-------------+-----------+



In [48]:
df_agg_prac.groupBy('City').count().show()

+-------------+-----+
|         City|count|
+-------------+-----+
|San Francisco|    1|
|      Chicago|    3|
|      Seattle|    1|
|      Houston|    2|
|     New York|    3|
|       Boston|    1|
+-------------+-----+



# Where condition

In [49]:
df_where_prac = df_agg_prac

In [50]:
df_where_prac.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [51]:
from pyspark.sql.functions import max
df_where_prac.select('Name', 'city').where("Age<30").show()

+--------+--------+
|    Name|    city|
+--------+--------+
|   Julia| Chicago|
|   Emily|  Boston|
|Amarnath| Houston|
|  Robert|New York|
+--------+--------+



In [52]:
df_where_prac.where(df_where_prac.City=='Chicago').show()

+------+----+-------+------+
|  Name| Age|   City|Salary|
+------+----+-------+------+
| Julia|  25|Chicago| 60000|
|Akshay|NULL|Chicago|  NULL|
|  Anna|  30|Chicago|100000|
+------+----+-------+------+



### Array Practice

Let's explore the Array data type and some functions of it

1. Split - This function convert the delimeterd string into Array type
2. Array contains - This functions check the value which is present or not

In [53]:
from pyspark.sql.types import ArrayType, IntegerType

data_array = [('A', [1,2]), ('B', [3,4])]

schema_array = StructType([
    StructField(name = 'id', dataType = StringType()),
     StructField(name = 'SubId', dataType = ArrayType(IntegerType()))
])

df_array = spark.createDataFrame(data_array, schema_array)

df_array.show()

+---+------+
| id| SubId|
+---+------+
|  A|[1, 2]|
|  B|[3, 4]|
+---+------+



In [54]:
from pyspark.sql.types import ArrayType, IntegerType

data_array = [{'id':'A', 'SubId':[1,2]}, {'id':'B','SubId': [3,4]}]

schema_array = StructType([
    StructField(name = 'id', dataType = StringType()),
     StructField(name = 'SubId', dataType = ArrayType(IntegerType()))
])

df_array = spark.createDataFrame(data_array, schema_array)

df_array.show()

+---+------+
| id| SubId|
+---+------+
|  A|[1, 2]|
|  B|[3, 4]|
+---+------+



#### Split function

In [55]:
from pyspark.sql.functions import split, col, array_contains

data_array_01 = [(1, 'Java, Python'), (2, 'Spark,SF')]
schema_array_01 = ['Id', 'Skills']

df_array_01 = spark.createDataFrame(data_array_01, schema_array_01)

df_array_01 = df_array_01.withColumn('ArraySkils', split(col('Skills'), ','))

df_array_01.show()

+---+------------+---------------+
| Id|      Skills|     ArraySkils|
+---+------------+---------------+
|  1|Java, Python|[Java,  Python]|
|  2|    Spark,SF|    [Spark, SF]|
+---+------------+---------------+



#### array_contains function

In [None]:
df_array_01.withColumn('ArrayContains', array_contains(col('ArraySkils'), 'Java')).show()

### Map Type

Let's explore the Map data type and some functions of it

In [None]:
from pyspark.sql.types import StructType, StructField, MapType

df_map_data = [('Akshay', {'Height': '6', 'Hair': 'Brown'}), ('Rohan', {'Height': '6.5', 'Hair': 'Black'})]

df_map_schema = StructType([
    StructField(name='Name', dataType=StringType()),
    StructField(name='Properties', dataType=MapType(StringType(),StringType()))
])

df_Map = spark.createDataFrame(df_map_data, df_map_schema)

df_Map.show(truncate=False)


1. Accesing the values from Map and adding the values into new column

In [None]:
df_Map = df_Map.withColumn('Hair', df_Map.Properties['Hair'])

df_Map.show(truncate=False)

df_Map = df_Map.withColumn('Height', df_Map.Properties['Height'])

df_Map.show(truncate=False)

2. Accesing the Key and values from Map 

In [None]:
from pyspark.sql.functions import map_keys, map_values

df_Map.drop(col('Keys'))

df_Map_Keys = df_Map.withColumn('Keys', map_keys(df_Map.Properties))

df_Map_Keys.show(truncate=False)

df_Map_Values = df_Map.withColumn('Values', map_values(df_Map.Properties))

df_Map_Values.show(truncate=False)

## Exploring Condition and Functions of PySpark

### 01 When Condition

In [None]:
df_when_prac_data = [ (2,'Akshay', 'M', 2000), (3, 'Akshata','F',3000), (4, 'Rohan', 'M', 6000)]

df_when_prac_schema = ['Id', 'Name', 'Gender', 'Salary']

df_when_prac = spark.createDataFrame(df_when_prac_data, df_when_prac_schema)

df_when_prac.show()

In [None]:
from pyspark.sql.functions import when

df_when_prac.select(
    df_when_prac.Name, 
    when(condition=df_when_prac.Gender=='M', value='Male')\
    .when(condition=df_when_prac.Gender=='F', value='Female')\
    .otherwise('Male').alias('Gender')
).show()

### 02 Alias

In [None]:
df_when_prac.select(df_when_prac.Id.alias('Emp_ID'), df_when_prac.Name.alias('Emp_Name')).show()

### 03 Asc & Desc

In [None]:
df_when_prac.sort(df_when_prac.Id.asc()).show()

In [None]:
df_when_prac.sort(df_when_prac.Id.desc()).show()

### 04 Like 

In [None]:
df_when_prac.filter(df_when_prac.Name.like('A%')).show()

### 05 Distinct

In [None]:
df_dist_prac_data = [(1,'Akshay'), (1,'Akshay'), (2,'Amarnath'), (3,'Adarsh'), (4,'Ganesh')]

df_dist_prac_schema = ['Id', 'Names']

df_dist_prac = spark.createDataFrame(df_dist_prac_data, df_dist_prac_schema)

df_dist_prac.show()

In [None]:
df_dist_prac.distinct().show()

## Group By and Aggregating Function

    Exploring more about Group by function and Aggregating Function

In [None]:
df_grp_prac_data = [
    ("Alice", "Engineering", 5000),
    ("Bob", "Engineering", 6000),
    ("Charlie", "Sales", 4000),
    ("David", "Marketing", 7000),
    ("Eva", "Sales", 4500),
    ("Frank", "Marketing", 5500),
]

df_grp_prac_schema = ["Name", "Department", "Salary"]

df_grp_prac = spark.createDataFrame(df_grp_prac_data, df_grp_prac_schema)

df_grp_prac.show()

    1. Counting the Number of Employess working in each department

In [None]:
df_grp_prac.groupBy(df_grp_prac.Department).count().show()

    2. Finding the Max salary of each Department

In [None]:
df_grp_prac.groupBy(df_grp_prac.Department).max('Salary').show()

    2. Finding the Min salary of each Department

In [None]:
df_grp_prac.groupBy(df_grp_prac.Department).min('Salary').show()

    3. Finding the Total of salary of each Department

In [None]:
df_grp_prac.groupBy(df_grp_prac.Department).sum('Salary').show()

If we want to utilize multiple aggregate functions within a single DataFrame, we can employ the `agg` function to compute these aggregate operations.

In [None]:
from pyspark.sql.functions import count, max, min, sum

df_grp_prac.groupBy(df_grp_prac.Department).agg(
            count('*').alias('Number_of_emp'),
            sum('Salary').alias('Total_salary'),   
            min('Salary').alias('Min_Salary'), 
            max('Salary').alias('Max_Salary')).show()