# Spark DataFrames Basics

In [None]:
# we first need to create a session in it
import pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Basics").getOrCreate()



In [None]:
df=spark.read.json("people.json")   # imported the file

In [None]:
df.show()  # it will show the output

output:
age   name
null  Michael
30    Andy
19   Justin

# it automatically replace null value with null keyword


In [None]:
# if you want to know the schema of dataframe
df.printSchema()

output:
root
---age: long(nullable=True)
---name: string(nullable=True)

In [None]:
df.columns  # it will return list of columns

output:
["age","name"]

In [None]:
df.describe()

output:
DataFrames[summary:string,age:string]



In [None]:
df.describe().show()  # it will show the statistic od data , pyspark is smart  tht it automatically 
#figure out numeric column is age   so it will use age column

output:
summary     age
count       2
mean
stddev
min 
max

In [None]:
# we can change the datatye as well
 from pyspark.sql.types import (StructField,StringType,IntegerType,StructType)
    
data=[StructField("age",IntegerType(),True),
     StructField("name",StringType(),True)]


final=StructType(fields=data)


df=spark.read.json("people.json",schema=final)

df.printSchema()

output
root
---age: integer(nullable=True)
---name: string(nullable=True)


In [None]:
# lets grab column 
df["age"]  

# but this will not grab it . this wil return object
output:
    Column<b"age">

    
df.select("age").show()  # this will work


In [None]:
df.head(2)[0]
# it will print recrd of firat row
output:
    Row(age=None,Name="Michael")

 

df.select(["age","Name"]).show()   # it will print the result in row and column

In [None]:
# lets create another column
df.withcolumn("newage",df["age"]).show()
output
age   name     newage
null  Michael  nul
30    Andy     30
19   Justin     19




df.withcolumn("ndouble_age",df["age"]*2).show()
output
age   name     ndouble_age
null  Michael  nul
30    Andy     60
19   Justin     38

In [None]:
# rename column name
df.withColumnRenamed("age","new_age").show()

output
anew_age   name     
null    Michael  
30      Andy     
19      Justin     

In [None]:
# we can use SQL quey as well in pyspark  we just need t use register by below line code
df.CreateOrReplaceTempView("People")   # taking the file
results=spark.sql("select * from  people")
results.show()  # output will be produced


# Basic operation

In [None]:
df.filter("close<500").show()   # lookm typical  , but i prefer sql 
# all column with leass than 500

In [None]:
df.filter("close<500").select(["Open","Close"]).show()

# output will have two column in it
df.filter(df["close"]<500).select(["Open","Close"]).show()
# same output will be printed

In [None]:
df.filter(df["close"]<500   &  df["open"]<500 ).show()   
# using And opearion in it

In [None]:
df.filter(df["close"]<500   &  `df["open"]<500 ).show() 

In [None]:
df.filter(df["close"]==500  ).show() 

In [None]:
df.filter(df["close"]==500  ).collect()    # collect will be use in real world problem

# Aggregate operation

In [None]:
#aggreate mean to collect data and perform mean ,sum,count etc

df.groupby("Company").mean().show()
df.groupby("Company").sum().show()
df.groupby("Company").max().show()
df.groupby("Company").min().show()
df.groupby("Company").count().show()

In [None]:
# we can aggrqage on single column
df.agg({"Sales":"Sum"}).show()   # it will show all sum

sum(sales)
1122


df.agg({"Sales":"Max"}).show()   # it will show all max

Max(sales)
780

In [None]:
# another way
group=df.groupby("Comapny")
group.agg({"Sales":"Sum"}).show()

company   max(sales)
apple     780
google     111
fb         222
mst          333

In [None]:
# now we will grab function 
from pyspark.sql.functons import countDistinct,avg,stddev

df.select(avg("Sales")).show()

output
avg(sales)
360.33333

In [None]:
df.select(avg("Sales").alias("AVGSALES")).show()

output
AVGSALES
360.33333

In [None]:
# i just want 2 last digit float value
from pyspark.sql.functions import forrmat_number

sales=df.select(avg("Sales").alias("AVGSALES")).show()

sales.select(format_number("std",2),).alias("new").show()  # we have provde two time of alias otherwirse it will print previous one

std
250.09

In [None]:
df.OrderBy("Sales").show()  # in ascending order will be printed

In [None]:
df.orderby(df["sales"].desc()).show()  # in descending order wil be printed

# Missing value in dataframes

In [None]:
df.show()

output
id name sales
emp1 john  null
emp2  null  null
emp3  null 365.00
emp4  cindy  456.00



In [None]:
df.na.drop().show()  # it will drop any row wiich conraon null value i it

In [None]:
# we can restrict our null value drop by using threhold value

df.na.drop(thresh=2).show()

In [None]:
#another way to dropping null value
df.na.drop(how="any").show()  # it wil drop any null value

In [None]:
df.na.drop(how="all").show()  # if all the value is null  but in our dataset we dont have 3 null value in one column

In [None]:
# we can specify our column name
    df.na.drop(subset=["sales"]).show()  # it will frop sales null value i it 

In [None]:
# we can put value i null valaue
df.na.fill("null value").show()   # it automatically know that it si string so it didint fill integer column 
    
output
id name sales
emp1 john  null
emp2  null value  null
emp3  null value 365.00
emp4  cindy  456.00

In [None]:
df.na.fill(0).show()   # it automatically know that it si integer value so it fill sales column
    
output
id name sales
emp1 john  0
emp2  null  0
emp3  null  365.00
emp4  cindy  456.00

# Dates and Timestamp

In [None]:
from pyspark.sql.functions import (dayofmonth,hour,dayofyear,month,year,weekofyear,format_number,date_format)

df.select(dayofmonth(df["Date"])).show()

output
dayofmonth(Date)
3
4
5

In [None]:
df.select(hour(df["Date"])).show()

output
dayofmonth(Date)
0
0
0