Hi there,
One of our brother suggested it is good to have an interactive lesson so here is an attampt for this.

In this notebook, we will explain **Data frames** in detail.

So each exercises will have few bullet points, with the topics and a sample code.
There will be some questions at the end of the notebook which you are supposed to answer them and submit as Lab Work

In [None]:
!pip install pyspark

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName('Om_Sairam').getOrCreate()



# DataFrame
- It is basically "A distributed collection of data grouped into named columns"
- Unlike datasets, dataframes are loosely typed.
- One can also create PySpark DataFrame from different data sources like TXT, CSV, JSON, ORV, Avro, Parquet, XML formats by reading from HDFS, and clod platforms
- There are multiple ways to create dataframe, the most generic one is using `spark.read`
- In our previous assignment, we used `iris_dataset = spark.read.option("inferSchema","true").option("header","true").csv("irisdata.csv")`
    - We are telling spark to take the file, infer its schema and also the provided csv has header.
- Now we will try to create schema manually.

### Schema
- A schema defines the column names and types of a DataFrame
- A schema is a `StructType` made up of a number of fields, `StructFields`, that have a name, type, a Boolean flag specifying whether that column can contain missing or null values
- One can even insert random metadata in the schema as well.

In [None]:
from pyspark.sql.types import StructType,StructField,FloatType,StringType

#Iris DataFrame Headers: p_w;p_l;s_w;s_l;type

myManualSchema = StructType([
    StructField("p_w", FloatType(), True),
    StructField("p_l", FloatType(), True),
    StructField("s_w", FloatType(), True),
    StructField("s_l", FloatType(), True),
    StructField("type", StringType(), True, metadata={"hello":"world"})
])

#Now we defined schema, now lets create the data frame and use the above schema
iris_df = spark.read.csv("/content/iris.csv", schema=myManualSchema, sep=",")

iris_df.show()
#One can even create dataframe from rdd using createDataFrame method.


## Columns and Expressions
- Columns in Spark are similar to columns in a spreadsheet
- It cannot be used outside the context of the DataFrame
    - To have a real value in column, we should have `row` which will be inside of `DataFrame`

In [None]:
from pyspark.sql.functions import col, column
iris_df = spark.read.csv("/content/iris.csv", schema=myManualSchema, sep=",")
col("p_w")
column("p_w")
#Different ways of creating columns
#If you want to use specific column in a dataframe, df
iris_df.col("p_w")#Just eg.
iris_df.columns#Displays all the columns

### Expressions
- An expression is a set of transformations on one or more values in a record in a DataFrame
- The `expr()` function is used to express transformations or computations involving DataFrame columns.
- If it is bit confusing just remember the following:
- **Columns are just expressions.**
- **Columns and transformations of those columns compile to the same logical plan as parsed expressions.**

## Record and Rows
- In Spark, each row in a DataFrame is a single record. Spark represents this record as an object of type `Row`
- Spark manipulates Row objects using column expressions in order to produce usable values
- Row objects internally represent arrays of bytes.
- There is abstraction present here, making us to use the column expression to manipulate them.
- It’s important to note that only DataFrames have schemas. Rows themselves do not have schemas.
- When creating Row manually, one must specify the values in the same order as the schema of the DataFrame to which they might be appended.

In [None]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [None]:
# So let us stich it altogether
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
    StructField("Reg.No.", LongType(), False),
    StructField("Name", StringType(), False),
    StructField("DOB", StringType(), False)
])
myRow1 = Row(23352, 'Aryan Sai', '02-05-2000')
myRow2 = Row(23358, 'Sai Gopal', '09-06-2001')
myRow3 = Row(23354, 'Sai Ram', '03-05-2000')
myRow4 = Row(23355, 'Sai Syam', '09-04-2001')
myDf = spark.createDataFrame([myRow1,myRow2,myRow3,myRow4], myManualSchema)

myDf.show()

+-------+---------+----------+
|Reg.No.|     Name|       DOB|
+-------+---------+----------+
|  23352|Aryan Sai|02-05-2000|
|  23358|Sai Gopal|09-06-2001|
|  23354|  Sai Ram|03-05-2000|
|  23355| Sai Syam|09-04-2001|
+-------+---------+----------+



## Note Worthy Points
- By default Spark is case insensitive; one can make Spark case sensitive by setting the configuration:
    - `set spark.sql.caseSensitive true`
- Sometimes we need to cast the spark columns to different datatypes. It can be done:
    - `df.withColumn("count2", col("count").cast("long"))`
    - `withColumn` is used to create new columns.
- To rename a column, we will use `df.withColumnRenamed("OLD_NAME", "new_name")`
- Take a guess on how do we drop the columns.
- *Remeber to reduce the partition size from 200 to 5*

In [None]:
myDf.select('Name').show()

+---------+
|     Name|
+---------+
|Aryan Sai|
|Sai Gopal|
|  Sai Ram|
| Sai Syam|
+---------+



## Filtering
- To filter rows, we create an expression that evaluates to true or false.
- The rows, to which the expression is evaluated as false, are *filtered out*
- There are two methods to perform this operation: `where` or `filter`

In [None]:
myDf.filter(col("Name") == 'Aryan Sai' ).show()

+-------+---------+----------+
|Reg.No.|     Name|       DOB|
+-------+---------+----------+
|  23352|Aryan Sai|02-05-2000|
+-------+---------+----------+



In [None]:
myDf.where("Name = 'Aryan Sai'").show()

+-------+---------+----------+
|Reg.No.|     Name|       DOB|
+-------+---------+----------+
|  23352|Aryan Sai|02-05-2000|
+-------+---------+----------+



## Unique
- A very common use case is to extract the unique or distinct values in a DataFrame
- We use `distinct` function for the following.
- It is a transformation, so it will return a new data frame with only unique values.

In [None]:
myDf.select("Name", "DOB").distinct().count()

4

## Random
- Sometimes, you  just want to sample some random records from your DataFrame.
- It can be perofmred using `sample` method on a DataFrame
- It is done as follows:

In [None]:
seed = 5#Seed should be provided for better random behaviour.
withReplacement = False
fraction = 0.5
myDf.sample(withReplacement, fraction, seed).count()

2

In [None]:
## Sorting
# - To sort a df based on the column, one can use `sort` and `orderBy`
# - To more explicitly specify sort direction,use the `asc` and `desc` functions if operating bon a column
myDf.sort(myDf['Name'].asc()).show()
myDf.sort(myDf['Name'].desc()).show()

+-------+---------+----------+
|Reg.No.|     Name|       DOB|
+-------+---------+----------+
|  23352|Aryan Sai|02-05-2000|
|  23358|Sai Gopal|09-06-2001|
|  23354|  Sai Ram|03-05-2000|
|  23355| Sai Syam|09-04-2001|
+-------+---------+----------+

+-------+---------+----------+
|Reg.No.|     Name|       DOB|
+-------+---------+----------+
|  23355| Sai Syam|09-04-2001|
|  23354|  Sai Ram|03-05-2000|
|  23358|Sai Gopal|09-06-2001|
|  23352|Aryan Sai|02-05-2000|
+-------+---------+----------+



In [None]:
# myDf.sort("Name").show(5)
# myDf.orderBy("Name", "DOB").show(5)
myDf.orderBy(col("Name"), col("DOB")).show(5)#Also  FIne

+-------+---------+----------+
|Reg.No.|     Name|       DOB|
+-------+---------+----------+
|  23352|Aryan Sai|02-05-2000|
|  23358|Sai Gopal|09-06-2001|
|  23354|  Sai Ram|03-05-2000|
|  23355| Sai Syam|09-04-2001|
+-------+---------+----------+



In [None]:
#Lab Work
# - Use the MTCars data set to answer the folling questions.
# 1. Create the dataframe by specifying the Manual Schema
# 2. Rename all the columns to something for your liking
# 3. Show the distinct cars based on the number of cylinders
# 4. Sort the dataframe based on the milage of the car.
# 5. Your friend is planning to buy a new car in a pocket friendly manner. So allocate a score to all cars in your data frame
#     Eg: - Create a column called `score`.
#         - Come up with a formula that provides score, say :
#                 - milage is important so 0.2 * value of milage + 0.5 * # of cyl ... so on
# 6. Just for Fun add a new Row into the Data frame for Nano
#  Details: Nano;Manual;25kmpl;2Cyl;


In [92]:
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType,FloatType
from pyspark.sql.functions import col, column

In [93]:
# 1. Create the dataframe by specifying the Manual Schema
carSchema = StructType([
    StructField("model", StringType(), False),
    StructField("mpg", FloatType(), False),
    StructField("cyl", IntegerType(), False),
    StructField("disp", FloatType(), False),
    StructField("hp", IntegerType(), False),
    StructField("drat", FloatType(), False),
    StructField("wt", FloatType(), False),
    StructField("qsec", FloatType(), False),
    StructField("vs", IntegerType(), False),
    StructField("am", IntegerType(), False),
    StructField("gear", IntegerType(), False),
    StructField("carb", IntegerType(), False)
])

df = spark.read.csv("/content/mtcars.csv",  header=True, schema = carSchema, sep=",")

df.show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          M

In [94]:
# 2. Rename all the columns to something for your liking
# df = df.withColumnRenamed("Model","Mpg","Cyl","Disp", "Hp","Drat", "Wt", "Gsec", "VS", "Am","Gears","Carbon Emissions")

df = df.withColumnRenamed("model", "Model") \
      .withColumnRenamed("mpg", "Mpg") \
      .withColumnRenamed("cyl", "Cyl") \
      .withColumnRenamed("disp", "Disp") \
      .withColumnRenamed("hp", "Hp") \
      .withColumnRenamed("drat", "Drat") \
      .withColumnRenamed("wt", "Wt") \
      .withColumnRenamed("qsec", "Gsec") \
      .withColumnRenamed("vs", "VS") \
      .withColumnRenamed("am", "Am") \
      .withColumnRenamed("gear", "Gears") \
      .withColumnRenamed("carb", "Carbon Emissions")

df.show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+-----+----------------+
|              Model| Mpg|Cyl| Disp| Hp|Drat|   Wt| Gsec| VS| Am|Gears|Carbon Emissions|
+-------------------+----+---+-----+---+----+-----+-----+---+---+-----+----------------+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|    4|               4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|    4|               4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|    4|               1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|    3|               1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|    3|               2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|    3|               1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|    3|               4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|    4|               2|
|           Merc 230|

In [95]:
# 3. Show the distinct cars based on the number of cylinders
df.select("Model", "Cyl").distinct().show()

+------------------+---+
|             Model|Cyl|
+------------------+---+
|         Merc 280C|  6|
|    Toyota Corolla|  4|
|Cadillac Fleetwood|  8|
|        Merc 450SL|  8|
| Chrysler Imperial|  8|
|          Fiat 128|  4|
|      Lotus Europa|  4|
|     Mazda RX4 Wag|  6|
|    Hornet 4 Drive|  6|
|         Fiat X1-9|  4|
|         Merc 240D|  4|
|  Pontiac Firebird|  8|
|        Camaro Z28|  8|
|    Ford Pantera L|  8|
|       AMC Javelin|  8|
|       Merc 450SLC|  8|
|     Porsche 914-2|  4|
|        Merc 450SE|  8|
|  Dodge Challenger|  8|
|          Merc 280|  6|
+------------------+---+
only showing top 20 rows



In [96]:
# 4. Sort the dataframe based on the milage of the car.
df.sort(df['Mpg']).show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+-----+----------------+
|              Model| Mpg|Cyl| Disp| Hp|Drat|   Wt| Gsec| VS| Am|Gears|Carbon Emissions|
+-------------------+----+---+-----+---+----+-----+-----+---+---+-----+----------------+
| Cadillac Fleetwood|10.4|  8|472.0|205|2.93| 5.25|17.98|  0|  0|    3|               4|
|Lincoln Continental|10.4|  8|460.0|215| 3.0|5.424|17.82|  0|  0|    3|               4|
|         Camaro Z28|13.3|  8|350.0|245|3.73| 3.84|15.41|  0|  0|    3|               4|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|    3|               4|
|  Chrysler Imperial|14.7|  8|440.0|230|3.23|5.345|17.42|  0|  0|    3|               4|
|      Maserati Bora|15.0|  8|301.0|335|3.54| 3.57| 14.6|  0|  1|    5|               8|
|        Merc 450SLC|15.2|  8|275.8|180|3.07| 3.78| 18.0|  0|  0|    3|               3|
|        AMC Javelin|15.2|  8|304.0|150|3.15|3.435| 17.3|  0|  0|    3|               2|
|   Dodge Challenger|

In [97]:
# 5. Your friend is planning to buy a new car in a pocket friendly manner. So allocate a score to all cars in your data frame
#     Eg: - Create a column called `score`.
#         - Come up with a formula that provides score, say :
#                 - milage is important so 0.2 * value of milage + 0.5 * # of cyl ... so on
score_df = df.withColumn("Score", (0.2 * col("Mpg")) +  (0.8 * col("Gears")))
score_df.show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+-----+----------------+-----------------+
|              Model| Mpg|Cyl| Disp| Hp|Drat|   Wt| Gsec| VS| Am|Gears|Carbon Emissions|            Score|
+-------------------+----+---+-----+---+----+-----+-----+---+---+-----+----------------+-----------------+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|    4|               4|              7.4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|    4|               4|              7.4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|    4|               1| 7.75999984741211|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|    3|               1|6.679999923706055|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|    3|               2|6.140000152587891|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|    3|               1|6.020000076293946|
|         Duster 360|14.3|  8|360.0|2

In [100]:
# 6. Just for Fun add a new Row into the Data frame for Nano
#  Details: Nano;Manual;25kmpl;2Cyl;

new_row = Row('Nano', 621.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4)

new_df = spark.createDataFrame([new_row], ["Model","Mpg","Cyl","Disp", "Hp","Drat", "Wt", "Gsec", "VS", "Am","Gears","Carbon Emissions"])

df = df.union(new_df)

df.show()

df.where("Model = 'Nano'").show()

+-------------------+------------------+---+------------------+---+------------------+------------------+------------------+---+---+-----+----------------+
|              Model|               Mpg|Cyl|              Disp| Hp|              Drat|                Wt|              Gsec| VS| Am|Gears|Carbon Emissions|
+-------------------+------------------+---+------------------+---+------------------+------------------+------------------+---+---+-----+----------------+
|          Mazda RX4|              21.0|  6|             160.0|110|3.9000000953674316| 2.619999885559082|16.459999084472656|  0|  1|    4|               4|
|      Mazda RX4 Wag|              21.0|  6|             160.0|110|3.9000000953674316|             2.875|17.020000457763672|  0|  1|    4|               4|
|         Datsun 710|22.799999237060547|  4|             108.0| 93|3.8499999046325684| 2.319999933242798|18.610000610351562|  1|  1|    4|               1|
|     Hornet 4 Drive|21.399999618530273|  6|             258.0|1