In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as func

### SparkSession is the entry point to programming Spark with the Dataset and DataFrame API. It can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files. 

In [3]:
#Creating SparkSession
spark = SparkSession.builder.appName("FirstApp").getOrCreate()

In [8]:
#Defining Schema for your Dataframe
myschema = StructType([
    StructField("UserID",IntegerType(),True),
    StructField("Name",StringType(),True),
    StructField("Age",IntegerType(),True),
    StructField("Friends",IntegerType(),True)
])

### Method 1
If your data already has column names and their data types properly defined within the file, you may not necessarily need 
to create a schema explicitly in your code. In this case, Spark can perform automatic schema inference, where it examines the
first row of the CSV file to determine the column names and attempts to infer the data types of each column.

In [None]:
# df = spark.read.csv("name.csv", header=True, inferSchema=True)

### Method 2
There are some situations where explicitly defining a schema is still beneficial:
1) Data Type Control <br>
2) Data Validation <br>
3) Performance

In [11]:
#Creating Dataframe from a csv file and load into Spark Dataframe
people = spark.read.format("csv")\
    .schema(myschema)\
    .option("path","fakefriends.csv")\
    .load()

In [23]:
people.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Friends: integer (nullable = true)



In [43]:
print("All Columns   :",people.columns)
print("Column Dtypes :",people.dtypes)

All Columns   : ['UserID', 'Name', 'Age', 'Friends']
Column Dtypes : [('UserID', 'int'), ('Name', 'string'), ('Age', 'int'), ('Friends', 'int')]


In [44]:
people.select(['Age']).describe().show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|               500|
|   mean|            43.708|
| stddev|14.864340996711995|
|    min|                18|
|    max|                69|
+-------+------------------+



In [27]:
people.select(['Name','Age']).show(2)

+--------+---+
|    Name|Age|
+--------+---+
|    Will| 33|
|Jean-Luc| 26|
+--------+---+
only showing top 2 rows



In [13]:
#Performing all thetransformations
output = people.select(people.UserID,people.Name,people.Age,people.Friends)\
         .where(people.Age < 30).withColumn('insert_ts', func.current_timestamp())\
         .orderBy(people.UserID)

In [None]:
# Drop a column
# output.drop('column_name')
# Rename a column
# output.withColumnRenamed('Name','New Name')

In [14]:
#Count of output DataFrame
output.count()

112

### Temp View is used when you wanted to store the table for a specific spark session. Once created you can use it to run SQL queries. These views are session-scoped i.e valid only that running spark session

In [19]:
# Creating a Temp View
output.createOrReplaceTempView("peoples")

In [21]:
#Running a simple Spark SQL query
spark.sql("select name,age,friends,insert_ts from peoples").show(5)

+--------+---+-------+--------------------+
|    name|age|friends|           insert_ts|
+--------+---+-------+--------------------+
|Jean-Luc| 26|      2|2023-09-24 13:27:...|
|    Hugh| 27|    181|2023-09-24 13:27:...|
|  Weyoun| 22|    323|2023-09-24 13:27:...|
|   Miles| 19|    268|2023-09-24 13:27:...|
|  Julian| 25|      1|2023-09-24 13:27:...|
+--------+---+-------+--------------------+
only showing top 5 rows



### Handle Missing Values

In [4]:
df = spark.read.csv("MissingValues.csv", header=True, inferSchema=True) 

In [6]:
df.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Abdul|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|      Ali|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Sheraz|  22|         1| 15000|
|    Ahmed|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [14]:
# thresh = 3      means minimum 3 Non-Null values
# how = any/all   means drop a row if it contains any/all nulls.
df.na.drop(how='all',thresh=3).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Abdul| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|      Ali| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Sheraz| 22|         1| 15000|
|    Ahmed| 23|         2| 18000|
|     NULL| 34|        10| 38000|
+---------+---+----------+------+



In [16]:
# subset = list,tuple,columns
df.na.drop(subset=['Experience']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Abdul| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|      Ali| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Sheraz| 22|         1| 15000|
|    Ahmed| 23|         2| 18000|
|     NULL| 34|        10| 38000|
+---------+---+----------+------+



In [25]:
# Fillinf Missing Values
df.na.fill({"Name":"Missing","age":10}).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Abdul| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|      Ali| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Sheraz| 22|         1| 15000|
|    Ahmed| 23|         2| 18000|
|   Mahesh| 10|      NULL| 40000|
|  Missing| 34|        10| 38000|
|  Missing| 36|      NULL|  NULL|
+---------+---+----------+------+



In [26]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("mean")
# Same for median and mode

In [28]:
# Add imputation cols to df
imputer.fit(df).transform(df).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Abdul|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|      Ali|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Sheraz|  22|         1| 15000|         22|                 1|         15000|
|    Ahmed|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|NULL|      NULL| 40000|         28|                 5|         40000|
|     NULL|  34|        10| 38000|         34|                10|         38000|
|     NULL|  36|      NULL|  NULL|         36|                 5|         25750|
+---------+----+----------+-