####Intial Connection

In [0]:
from pyspark.sql import *
from pyspark import SparkContext
from pyspark.sql.functions import *
spark = SparkSession.builder\
                    .appName('PracApp')\
                    .getOrCreate()

In [0]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name","Age"])

df.show()

####Data frames and Operations

In [0]:
df.select("Name").show()  #Selecting Columns
df.select("Age").show()

In [0]:
df.filter(df.Age >= 30).show() #Filtering

In [0]:
df.groupBy("Age").count().show() #Aggregate

In [0]:
df.orderBy("Age").show() #Ordering

In [0]:
df.orderBy("Age", ascending = False).show() #Ordering 

####Spark SQL


In [0]:
df.createOrReplaceTempView('people_v') #Creating a temporary view
result = spark.sql("SELECT * FROM people_v WHERE age > 30")

In [0]:
result.display()

In [0]:
result_2 = spark.sql("SELECT name, (name || ' - ' || age) new_col, (age * 2) new_col2 FROM people_v")
result_2.display()

In [0]:
result_3 = spark.sql("SELECT * FROM people_v a, people_v b WHERE a.name = b.name")  #Joins

In [0]:
result_3.display()

####Advanced DF Operation


In [0]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("AB",None), ("AK",None), ("Bharath", 25), ("Munna", 37), ("Don",None)]
df_2 = spark.createDataFrame(data, ["Name","Age"])

In [0]:
df_2.display()

In [0]:
df_cleaned = df_2.dropna() #Drop rows mith missing values

In [0]:
df_cleaned.display()

In [0]:
df_cleaned = df_2.fillna(0) #Fill missing values with 0

In [0]:
df_cleaned.display()

In [0]:
df_cleaned = df_2.fillna({"Age" : 0, "Name" : "UNK"}) #fill missing values by column

In [0]:
df_cleaned.display()

####Window Functions

In [0]:
df_rank = df_2.withColumn("Rank", rank().over(Window.orderBy(col("Age"))))

In [0]:
df_rank.display()

In [0]:
df_rank = df_2.withColumn("Rank", dense_rank().over(Window.orderBy(col("Age").desc())))

In [0]:
df_rank.display()

####Optimizing DF

In [0]:
#df_rank.cache() #Cachning
df_rep = df_2.repartition(4) #Partitioning

In [0]:
df_rep.display()

In [0]:
df.select(col("Name").isNotNull()).display()