# The basics

In [2]:
import pandas as pd

In [3]:
! which python3

/opt/conda/bin/python3


## The video I watched in the README uses a later version of PySpark.

In [4]:
import pyspark

In [61]:
print(pyspark.__version__)

2.4.5


In [9]:
df_1 = pd.read_csv("data/test1.csv", index_col = None)

In [10]:
df_1

Unnamed: 0,Name,age,Experience,Salary
0,Krish,31,10,30000
1,Sudhanshu,30,8,25000
2,Sunny,29,4,20000
3,Paul,24,3,20000
4,Harsha,21,1,15000
5,Shubham,23,2,18000


In [12]:
from pyspark.sql import SparkSession

# write pyspark session
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [13]:
# when running locally, we use 1 master node
spark

In [14]:
# see below for using options
df_1_pyspark = spark.read.csv("data/test1.csv")

In [15]:
df_1_pyspark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

In [16]:
df_1_pyspark.show()

+---------+---+----------+------+
|      _c0|_c1|       _c2|   _c3|
+---------+---+----------+------+
|     Name|age|Experience|Salary|
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



# Reading data

In [33]:
df_1_pyspark = spark.read.option("header", "true").csv("data/test1.csv", inferSchema = True)

In [19]:
type(df_1_pyspark)

pyspark.sql.dataframe.DataFrame

# Filtering and showing

In [25]:
df_1_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [34]:
df_1_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [27]:
df_1_pyspark.head(3)

[Row(Name='Krish', age=31, Experience=10, Salary=30000),
 Row(Name='Sudhanshu', age=30, Experience=8, Salary=25000),
 Row(Name='Sunny', age=29, Experience=4, Salary=20000)]

In [29]:
df_1_pyspark.select(["Name", "Experience"]).show()

+---------+----------+
|     Name|Experience|
+---------+----------+
|    Krish|        10|
|Sudhanshu|         8|
|    Sunny|         4|
|     Paul|         3|
|   Harsha|         1|
|  Shubham|         2|
+---------+----------+



In [30]:
df_1_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [31]:
df_1_pyspark.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



# Adding a column

In [35]:
df_1_pyspark = df_1_pyspark.withColumn("Exp_af_2_yrs", df_1_pyspark["Experience"]+2)

In [36]:
df_1_pyspark.show()

+---------+---+----------+------+------------+
|     Name|age|Experience|Salary|Exp_af_2_yrs|
+---------+---+----------+------+------------+
|    Krish| 31|        10| 30000|          12|
|Sudhanshu| 30|         8| 25000|          10|
|    Sunny| 29|         4| 20000|           6|
|     Paul| 24|         3| 20000|           5|
|   Harsha| 21|         1| 15000|           3|
|  Shubham| 23|         2| 18000|           4|
+---------+---+----------+------+------------+



# Drop col

In [37]:
df_1_pyspark = df_1_pyspark.drop("Exp_af_2_yrs")

In [38]:
df_1_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



# Rename col

In [39]:
df_1_pyspark.withColumnRenamed("Name", "New Name").show()

+---------+---+----------+------+
| New Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



# Read new df

In [41]:
df_2 = spark.read.csv("data/test2.csv", header = True, inferSchema = True)

In [49]:
df_2.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [58]:
df_2.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

# Drop NANS

In [43]:
df_2.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [None]:
# how any vs all
# controls whether to drop rows is any col is null or all cols are null

In [46]:
# specifcy dropping rows with more than thresh missing cols
df_2.na.drop(how="any", thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [47]:
df_2.na.drop(how="any", subset=["Experience"]).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



# Fill missing values/ Imputation

## had issues here, maybe it's bc of version issues

In [50]:
df_2.na.fill("Missing Values", ["Experience", "age"]).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [55]:
df_2.na.fill("Missing Values", "Salary").show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [59]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["age", "Experience", "Salary"],
    outputCols=["{}_imputed".format(c) for c in ["age", "Experience", "Salary"]]
).setStrategy("median")

In [60]:
imputer.fit(df_2).transform(df_2).show()

IllegalArgumentException: 'requirement failed: Column age must be of type equal to one of the following types: [double, float] but was actually of type int.'

# Filter operations

In [62]:
df_3 = spark.read.csv("data/test1.csv", header = True, inferSchema = True)

In [63]:
df_3.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [64]:
df_3.filter("Salary<=20000").show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [65]:
df_3.filter("Salary<=20000").select(["Name", "age"]).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [69]:
df_3.filter((df_3['Salary']<=20000) & (df_3['Salary']>=1500)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

