In [1]:
import os

In [2]:
import pandas as pd
pd.read_csv('./test.csv')

Unnamed: 0,Name,Age,Experience
0,Gagan,25,5
1,Ajay,28,7
2,Sunny,30,10


In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('Testing').getOrCreate()

In [5]:
spark

In [6]:
df_pyspark = spark.read.csv('test.csv')

In [7]:
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string]

In [8]:
df_pyspark.show()

+-----+---+----------+
|  _c0|_c1|       _c2|
+-----+---+----------+
| Name|Age|Experience|
|Gagan| 25|         5|
| Ajay| 28|         7|
|Sunny| 30|        10|
+-----+---+----------+



In [47]:
# specify the column names
spark.read.option('header', 'true').csv('test.csv').show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Gagan|  25|         5|
| Ajay|  28|         7|
|Sunny|  30|        10|
| null|  15|         1|
|Radha|null|        10|
|Rishi|null|      null|
+-----+----+----------+



In [48]:
df_pyspark = spark.read.option('header', 'true').csv('test.csv')

In [11]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [12]:
df_pyspark.head(2)

[Row(Name='Gagan', Age='25', Experience='5'),
 Row(Name='Ajay', Age='28', Experience='7')]

In [13]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experience: string (nullable = true)



In [14]:
# spark identify the column dtype
df_pyspark = spark.read.option('header', 'true').csv('test.csv', inferSchema=True)

In [15]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [63]:
# different way to read csv
df_pyspark = spark.read.csv('test.csv', header=True, inferSchema=True)
df_pyspark.show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Gagan|  25|         5|
| Ajay|  28|         7|
|Sunny|  30|        10|
| null|  15|         1|
|Radha|null|        10|
|Rishi|null|      null|
+-----+----+----------+



In [17]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [18]:
df_pyspark.columns

['Name', 'Age', 'Experience']

In [21]:
name = df_pyspark.select('Name')

In [22]:
type(name)

pyspark.sql.dataframe.DataFrame

In [23]:
name.head()

Row(Name='Gagan')

In [24]:
name_exp = df_pyspark.select(['Name', 'Experience'])

In [25]:
name_exp.show()

+-----+----------+
| Name|Experience|
+-----+----------+
|Gagan|         5|
| Ajay|         7|
|Sunny|        10|
+-----+----------+



In [26]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

In [28]:
df_pyspark.describe().show()

+-------+-----+------------------+------------------+
|summary| Name|               Age|        Experience|
+-------+-----+------------------+------------------+
|  count|    3|                 3|                 3|
|   mean| null|27.666666666666668| 7.333333333333333|
| stddev| null| 2.516611478423583|2.5166114784235836|
|    min| Ajay|                25|                 5|
|    max|Sunny|                30|                10|
+-------+-----+------------------+------------------+



In [32]:
# adding columns in df
df_pyspark_new = df_pyspark.withColumn('Exp+2', df_pyspark['Experience']+2)

In [33]:
df_pyspark_new.show()

+-----+---+----------+-----+
| Name|Age|Experience|Exp+2|
+-----+---+----------+-----+
|Gagan| 25|         5|    7|
| Ajay| 28|         7|    9|
|Sunny| 30|        10|   12|
+-----+---+----------+-----+



In [34]:
# dropping the columns
df_pyspark_new = df_pyspark_new.drop('Exp+2')

In [35]:
df_pyspark_new.show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
|Gagan| 25|         5|
| Ajay| 28|         7|
|Sunny| 30|        10|
+-----+---+----------+



In [36]:
# Rename the columns
df_pyspark.withColumnRenamed('Name', 'First Name').show()

+----------+---+----------+
|First Name|Age|Experience|
+----------+---+----------+
|     Gagan| 25|         5|
|      Ajay| 28|         7|
|     Sunny| 30|        10|
+----------+---+----------+



In [49]:
df_pyspark.show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Gagan|  25|         5|
| Ajay|  28|         7|
|Sunny|  30|        10|
| null|  15|         1|
|Radha|null|        10|
|Rishi|null|      null|
+-----+----+----------+



In [50]:
df_pyspark.na.drop().show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
|Gagan| 25|         5|
| Ajay| 28|         7|
|Sunny| 30|        10|
+-----+---+----------+



In [51]:
# how?
# how=any then all the nulls will be drop
# how=all then all column in a row will be null then only drop
df_pyspark.na.drop(how='any').show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
|Gagan| 25|         5|
| Ajay| 28|         7|
|Sunny| 30|        10|
+-----+---+----------+



In [52]:
df_pyspark.na.drop(how='all').show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Gagan|  25|         5|
| Ajay|  28|         7|
|Sunny|  30|        10|
| null|  15|         1|
|Radha|null|        10|
|Rishi|null|      null|
+-----+----+----------+



In [54]:
# threshold which tells atleast threshold number of non-null values should be present in a row
df_pyspark.na.drop(how='any', thresh=2).show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Gagan|  25|         5|
| Ajay|  28|         7|
|Sunny|  30|        10|
| null|  15|         1|
|Radha|null|        10|
+-----+----+----------+



In [55]:
df_pyspark.na.drop(how='any', thresh=3).show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
|Gagan| 25|         5|
| Ajay| 28|         7|
|Sunny| 30|        10|
+-----+---+----------+



In [56]:
# subset - we can define the set of columns from where we want to drop null values
df_pyspark.na.drop(how='any', subset=['Experience']).show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Gagan|  25|         5|
| Ajay|  28|         7|
|Sunny|  30|        10|
| null|  15|         1|
|Radha|null|        10|
+-----+----+----------+



In [57]:
# Filling missing values
df_pyspark.na.fill('Missing Value').show()

+-------------+-------------+-------------+
|         Name|          Age|   Experience|
+-------------+-------------+-------------+
|        Gagan|           25|            5|
|         Ajay|           28|            7|
|        Sunny|           30|           10|
|Missing Value|           15|            1|
|        Radha|Missing Value|           10|
|        Rishi|Missing Value|Missing Value|
+-------------+-------------+-------------+



In [58]:
df_pyspark.na.fill('Missing Value', 'Experience').show()

+-----+----+-------------+
| Name| Age|   Experience|
+-----+----+-------------+
|Gagan|  25|            5|
| Ajay|  28|            7|
|Sunny|  30|           10|
| null|  15|            1|
|Radha|null|           10|
|Rishi|null|Missing Value|
+-----+----+-------------+



In [59]:
df_pyspark.na.fill('Missing Value', ['Experience', 'Age']).show()

+-----+-------------+-------------+
| Name|          Age|   Experience|
+-----+-------------+-------------+
|Gagan|           25|            5|
| Ajay|           28|            7|
|Sunny|           30|           10|
| null|           15|            1|
|Radha|Missing Value|           10|
|Rishi|Missing Value|Missing Value|
+-----+-------------+-------------+



In [80]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Experience: double (nullable = true)



In [75]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
df_pyspark = df_pyspark.withColumn("Age", col("Age").cast(DoubleType()))
df_pyspark = df_pyspark.withColumn("Experience", col("Experience").cast(DoubleType()))

In [79]:
# filling mean 
# imputer_function

from pyspark.ml.feature import Imputer

imputer_func = Imputer(
    inputCols=['Age', 'Experience'],
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience']]
).setStrategy('mean')

imputer_func.fit(df_pyspark).transform(df_pyspark).show()

+-----+----+----------+-----------+------------------+
| Name| Age|Experience|Age_imputed|Experience_imputed|
+-----+----+----------+-----------+------------------+
|Gagan|25.0|       5.0|       25.0|               5.0|
| Ajay|28.0|       7.0|       28.0|               7.0|
|Sunny|30.0|      10.0|       30.0|              10.0|
| null|15.0|       1.0|       15.0|               1.0|
|Radha|null|      10.0|       24.5|              10.0|
|Rishi|null|      null|       24.5|               6.6|
+-----+----+----------+-----------+------------------+



In [81]:
# Filter Operation
# Age < 29
df_pyspark.filter('Age<29').show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Gagan|25.0|       5.0|
| Ajay|28.0|       7.0|
| null|15.0|       1.0|
+-----+----+----------+



In [82]:
df_pyspark.filter('Age<29').select(['Name', 'Experience']).show()

+-----+----------+
| Name|Experience|
+-----+----------+
|Gagan|       5.0|
| Ajay|       7.0|
| null|       1.0|
+-----+----------+



In [84]:
df_pyspark.filter((df_pyspark['Age']<29 ) & (df_pyspark['Age']>20)).show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Gagan|25.0|       5.0|
| Ajay|28.0|       7.0|
+-----+----+----------+



In [85]:
df_pyspark.filter((df_pyspark['Age']<29 ) | (df_pyspark['Age']>20)).show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Gagan|25.0|       5.0|
| Ajay|28.0|       7.0|
|Sunny|30.0|      10.0|
| null|15.0|       1.0|
+-----+----+----------+



In [88]:
df_pyspark.filter(~(df_pyspark['Age']<29 ) | (df_pyspark['Age']<20)).show()

+-----+----+----------+
| Name| Age|Experience|
+-----+----+----------+
|Sunny|30.0|      10.0|
| null|15.0|       1.0|
+-----+----+----------+

