In [1]:
import pyspark 


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Practice').getOrCreate()



In [22]:
df_pyspark = spark.read.csv('test1.csv')

In [23]:
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string]

In [24]:
df_pyspark.show()

+----+---+----------+
| _c0|_c1|       _c2|
+----+---+----------+
|name|age|experience|
|   A|  2|         5|
|   B|  4|        10|
|   C|  6|        20|
+----+---+----------+



In [25]:
df_pyspark = spark.read.option('header',True).csv('test1.csv') # To tell that 1st row is header row

In [26]:
df_pyspark.show()

+----+---+----------+
|name|age|experience|
+----+---+----------+
|   A|  2|         5|
|   B|  4|        10|
|   C|  6|        20|
+----+---+----------+



In [27]:
df_pyspark.head(2)

[Row(name='A', age='2', experience='5'),
 Row(name='B', age='4', experience='10')]

In [28]:
df_pyspark[0], df_pyspark[1]

(Column<'name'>, Column<'age'>)

In [29]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- experience: string (nullable = true)



# Preprocessing

1. Check datatypes of columns
2. Select columns

In [94]:
df_pyspark = spark.read.option('header',True).csv('test1.csv')
# print shape
df_pyspark.count(), len(df_pyspark.columns)

(3, 3)

In [32]:
df_pyspark.printSchema()
# Age and experience are int but it read as string

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- experience: string (nullable = true)



In [34]:
# Default behavior is read all cols as string
# We can inferschema using additional value

df_pyspark = spark.read.option('header',True).csv('test1.csv', inferSchema=True)
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [56]:
# Another way which is preferable
df_pyspark = spark.read.csv('test1.csv', inferSchema=True, header=True)
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [37]:
df_pyspark.columns

['name', 'age', 'experience']

In [40]:
# Selecting column
df_pyspark.select(['name','age']).show()

+----+---+
|name|age|
+----+---+
|   A|  2|
|   B|  4|
|   C|  6|
+----+---+



In [41]:
df_pyspark.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int')]

In [46]:
df_pyspark.describe().show()

+-------+----+---+------------------+
|summary|name|age|        experience|
+-------+----+---+------------------+
|  count|   3|  3|                 3|
|   mean|null|4.0|11.666666666666666|
| stddev|null|2.0| 7.637626158259733|
|    min|   A|  2|                 5|
|    max|   C|  6|                20|
+-------+----+---+------------------+



In [51]:
# Adding Columns

df_pyspark.withColumn(colName='exp+2', col=df_pyspark['experience']+2).show()
# This is not inplace

+----+---+----------+-----+
|name|age|experience|exp+2|
+----+---+----------+-----+
|   A|  2|         5|    7|
|   B|  4|        10|   12|
|   C|  6|        20|   22|
+----+---+----------+-----+



In [52]:
df_pyspark.show()

+----+---+----------+
|name|age|experience|
+----+---+----------+
|   A|  2|         5|
|   B|  4|        10|
|   C|  6|        20|
+----+---+----------+



In [57]:
df_pyspark = df_pyspark.withColumn(colName='exp+2', col=df_pyspark['experience']+2)

In [59]:
df_pyspark.show()

+----+---+----------+-----+
|name|age|experience|exp+2|
+----+---+----------+-----+
|   A|  2|         5|    7|
|   B|  4|        10|   12|
|   C|  6|        20|   22|
+----+---+----------+-----+



In [62]:
df_pyspark.drop('exp+2').show()

+----+---+----------+
|name|age|experience|
+----+---+----------+
|   A|  2|         5|
|   B|  4|        10|
|   C|  6|        20|
+----+---+----------+



In [61]:
df_pyspark.show()

+----+---+----------+-----+
|name|age|experience|exp+2|
+----+---+----------+-----+
|   A|  2|         5|    7|
|   B|  4|        10|   12|
|   C|  6|        20|   22|
+----+---+----------+-----+



In [63]:
df_pyspark = df_pyspark.withColumn(colName='exp+2', col=df_pyspark['experience']+2)
df_pyspark.show()

+----+---+----------+-----+
|name|age|experience|exp+2|
+----+---+----------+-----+
|   A|  2|         5|    7|
|   B|  4|        10|   12|
|   C|  6|        20|   22|
+----+---+----------+-----+



In [113]:
from pyspark.sql.functions import  transform
df_pyspark.select(transform('age', lambda x: x*2)).show()

AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "transform(age, lambdafunction((x_0 * 2), x_0))" due to data type mismatch: Parameter 1 requires the "ARRAY" type, however "age" has the type "INT".;
'Project [unresolvedalias(transform(age#1602, lambdafunction((lambda 'x_0 * 2), lambda 'x_0, false)), Some(org.apache.spark.sql.Column$$Lambda$3609/23224320@992e32))]
+- Relation [name#1601,age#1602,experience#1603,salary#1604] csv


In [None]:
df_pyspark = df_pyspark.withColumn(colName='has7Exp', col=if df_pyspark['experience']>7 then )
df_pyspark.show()

In [64]:
df_pyspark.withColumnRenamed('experience','exp').show() # To rename 1 column

+----+---+---+-----+
|name|age|exp|exp+2|
+----+---+---+-----+
|   A|  2|  5|    7|
|   B|  4| 10|   12|
|   C|  6| 20|   22|
+----+---+---+-----+



In [67]:
newNames = ['Name','Age','exper','erper+2'] # Length of this should match column numbers
df_pyspark.toDF(*newNames).show()

+----+---+-----+-------+
|Name|Age|exper|erper+2|
+----+---+-----+-------+
|   A|  2|    5|      7|
|   B|  4|   10|     12|
|   C|  6|   20|     22|
+----+---+-----+-------+



# Handling Missing Values

1. Dropping row
2. Impute

In [95]:
df_pyspark = spark.read.csv('test2.csv', header=True, inferSchema=True)
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: double (nullable = true)



In [96]:
df_pyspark.show()

+----+----+----------+------+
|name| age|experience|salary|
+----+----+----------+------+
|   A|   2|         5|1000.0|
|   B|   4|        10| 150.0|
|   C|   6|        20|2000.0|
|   D|null|        24| 123.0|
|   E|   8|      null| 300.2|
|null|   2|      null| 150.0|
+----+----+----------+------+



In [72]:
df_pyspark.na.drop().show()

+----+---+----------+------+
|name|age|experience|salary|
+----+---+----------+------+
|   A|  2|         5|1000.0|
|   B|  4|        10| 150.0|
|   C|  6|        20|2000.0|
+----+---+----------+------+



In [74]:
# how=all : If all values are null, drop row
df_pyspark.na.drop(how='all').show()

+----+----+----------+------+
|name| age|experience|salary|
+----+----+----------+------+
|   A|   2|         5|1000.0|
|   B|   4|        10| 150.0|
|   C|   6|        20|2000.0|
|   D|null|        24| 123.0|
|   E|   8|      null| 300.2|
+----+----+----------+------+



In [76]:
# how=any (default) : If any value in row is null, drop row
df_pyspark.na.drop(how='any').show()

+----+---+----------+------+
|name|age|experience|salary|
+----+---+----------+------+
|   A|  2|         5|1000.0|
|   B|  4|        10| 150.0|
|   C|  6|        20|2000.0|
+----+---+----------+------+



In [97]:
df_pyspark.show()

+----+----+----------+------+
|name| age|experience|salary|
+----+----+----------+------+
|   A|   2|         5|1000.0|
|   B|   4|        10| 150.0|
|   C|   6|        20|2000.0|
|   D|null|        24| 123.0|
|   E|   8|      null| 300.2|
|null|   2|      null| 150.0|
+----+----+----------+------+



In [90]:
# thresh=3 : Atleast 2 non-null values should be present to keep 
df_pyspark.na.drop(how='any', thresh=3).show()

+----+----+----------+------+
|name| age|experience|salary|
+----+----+----------+------+
|   A|   2|         5|1000.0|
|   B|   4|        10| 150.0|
|   C|   6|        20|2000.0|
|   D|null|        24| 123.0|
|   E|   8|      null| 300.2|
+----+----+----------+------+



In [98]:
# subset: Drop nan values only in a particular column
df_pyspark.na.drop(how='any', subset=['experience']).show()

+----+----+----------+------+
|name| age|experience|salary|
+----+----+----------+------+
|   A|   2|         5|1000.0|
|   B|   4|        10| 150.0|
|   C|   6|        20|2000.0|
|   D|null|        24| 123.0|
+----+----+----------+------+



In [101]:
# Put same value in all missing places
df_pyspark.na.fill(value='Missing Val').show() # Nothing done in age/experience col because we can't put string in int column

+-----------+----+----------+------+
|       name| age|experience|salary|
+-----------+----+----------+------+
|          A|   2|         5|1000.0|
|          B|   4|        10| 150.0|
|          C|   6|        20|2000.0|
|          D|null|        24| 123.0|
|          E|   8|      null| 300.2|
|Missing Val|   2|      null| 150.0|
+-----------+----+----------+------+



In [104]:
df_pyspark.na.fill(value=-1, subset=['age','experience']).show() # Nothing done because we can't put string in int column

+----+---+----------+------+
|name|age|experience|salary|
+----+---+----------+------+
|   A|  2|         5|1000.0|
|   B|  4|        10| 150.0|
|   C|  6|        20|2000.0|
|   D| -1|        24| 123.0|
|   E|  8|        -1| 300.2|
|null|  2|        -1| 150.0|
+----+---+----------+------+



In [105]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['experience','age'],
    outputCols=["{}_imputed".format(c) for c in ['experience','age']]
).setStrategy('mean')

In [108]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+----+----+----------+------+------------------+-----------+
|name| age|experience|salary|experience_imputed|age_imputed|
+----+----+----------+------+------------------+-----------+
|   A|   2|         5|1000.0|                 5|          2|
|   B|   4|        10| 150.0|                10|          4|
|   C|   6|        20|2000.0|                20|          6|
|   D|null|        24| 123.0|                24|          4|
|   E|   8|      null| 300.2|                14|          8|
|null|   2|      null| 150.0|                14|          2|
+----+----+----------+------+------------------+-----------+



In [109]:
df_pyspark.describe().show()

+-------+----+------------------+-----------------+-----------------+
|summary|name|               age|       experience|           salary|
+-------+----+------------------+-----------------+-----------------+
|  count|   5|                 5|                4|                6|
|   mean|null|               4.4|            14.75|620.5333333333333|
| stddev|null|2.6076809620810595|8.770214744615247|753.6417362823444|
|    min|   A|                 2|                5|            123.0|
|    max|   E|                 8|               24|           2000.0|
+-------+----+------------------+-----------------+-----------------+

