# Data Processing using Pyspark

In [0]:
sc

In [0]:
spark

In [0]:
# Load csv Dataset 
#df=spark.read.csv('sample_data_csv.csv',inferSchema=True,header=True)
df = spark.sql("select * from sample_data_csv")

In [0]:
display(df)

ratings,age,experience,family,mobile
3,32,9.0,3,Vivo
3,27,13.0,3,Apple
4,22,2.5,0,Samsung
4,37,16.5,4,Apple
5,27,9.0,1,MI
4,27,9.0,0,Oppo
5,37,23.0,5,Vivo
5,37,23.0,5,Samsung
3,22,2.5,0,Apple
3,27,6.0,0,MI


In [0]:
#columns of dataframe
df.columns

In [0]:
#check number of columns
len(df.columns)

In [0]:
#number of records in dataframe
df.count()

In [0]:
#shape of dataset
print((df.count(),len(df.columns)))

In [0]:
#printSchema
df.printSchema()

In [0]:
#fisrt few rows of dataframe
df.show(5)

In [0]:
display(df)

ratings,age,experience,family,mobile
3,32,9.0,3,Vivo
3,27,13.0,3,Apple
4,22,2.5,0,Samsung
4,37,16.5,4,Apple
5,27,9.0,1,MI
4,27,9.0,0,Oppo
5,37,23.0,5,Vivo
5,37,23.0,5,Samsung
3,22,2.5,0,Apple
3,27,6.0,0,MI


In [0]:
#select only 2 columns
df.select('age','mobile').show(5)

In [0]:
#info about dataframe
df.describe().show()

In [0]:
from pyspark.sql.types import StringType,DoubleType,IntegerType

In [0]:
#with column
df = df.withColumn("age_after_10_yrs",(df["age"]+10))                 

In [0]:
display(df)

ratings,age,experience,family,mobile,age_after_10_yrs
3,32,9.0,3,Vivo,42.0
3,27,13.0,3,Apple,37.0
4,22,2.5,0,Samsung,32.0
4,37,16.5,4,Apple,47.0
5,27,9.0,1,MI,37.0
4,27,9.0,0,Oppo,37.0
5,37,23.0,5,Vivo,47.0
5,37,23.0,5,Samsung,47.0
3,22,2.5,0,Apple,32.0
3,27,6.0,0,MI,37.0


In [0]:
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)

In [0]:
#filter the records 
df.filter(df['mobile']=='Vivo').show()

In [0]:
#filter the records 
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()

In [0]:
#filter the multiple conditions
df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).show()

In [0]:
#filter the multiple conditions
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()

In [0]:
#Distinct Values in a column
df.select('mobile').distinct().show()

In [0]:
#distinct value count
df.select('mobile').distinct().count()

In [0]:
df.groupBy('mobile').count().show(5,False)

In [0]:
# Value counts
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)

In [0]:
# Value counts
df.groupBy('mobile').mean().show(5,False)

In [0]:
df.groupBy('mobile').sum().show(5,False)

In [0]:
# Value counts
df.groupBy('mobile').max().show(5,False)

In [0]:
# Value counts
df.groupBy('mobile').min().show(5,False)

In [0]:
#Aggregation
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)

In [0]:
# UDF
from pyspark.sql.functions import udf

In [0]:
#normal function 
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return 'High Price'
    elif brand =='MI':
        return 'Mid Price'
    else:
        return 'Low Price'

In [0]:
#create udf using python function
brand_udf= udf(price_range,StringType())

In [0]:
display(df)

ratings,age,experience,family,mobile
3,32,9.0,3,Vivo
3,27,13.0,3,Apple
4,22,2.5,0,Samsung
4,37,16.5,4,Apple
5,27,9.0,1,MI
4,27,9.0,0,Oppo
5,37,23.0,5,Vivo
5,37,23.0,5,Samsung
3,22,2.5,0,Apple
3,27,6.0,0,MI


In [0]:
#apply udf on dataframe
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)

In [0]:
#using lambda function
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())

In [0]:
#drop duplicate values
df=df.dropDuplicates()

In [0]:
#validate new count
df.count()

In [0]:
#drop column of dataframe
df_new=df.drop('mobile')