In [None]:
# Spark Context : 

  - SparkContext is an 'entry point' to the PySpark functionality that is used to 'communicate' with 
 the cluster and to create an RDD, accumulator, 
    and broadcast variables.
  - We can create only one 'SparkContext' per JVM, in order to create another first you need to 'stop' 
 the existing one using stop() method.

In [None]:
# Different Ways to Create Spark Context

  1. Create SparkContext using its constructor and pass parameters like master and appName at least as these are mandatory params.
      
      # Create SparkContext
      from pyspark import SparkContext
      sc = SparkContext("local", "Spark_Example_App")
      print(sc.appName)

      # Create RDD
      rdd3 =  sc.parallelize([1,2,3])
      rdd3.collect()

  2. Create SparkContext using by pass 'SparkConf()' ref to the  getOrCreate() of the 'SparkContext'
     
     from pyspark import SparkConf, SparkContext
     conf = SparkConf()  # Spark Configuration
     conf.setMaster("local").setAppName("Spark Example App")
     sc = SparkContext.getOrCreate(conf)

      # Create RDD
      rdd3 =  sc.parallelize([1,2,3])
      rdd3.collect()

  3. Create SparkContext in PySpark 'SparkSession'
     
     from pyspark.sql import SparkSession
     spark = SparkSession.builder.master("local[1]").appName('SparkByExamples.com').getOrCreate()
     print(spark.appName)

     # Create RDD
     rdd3 =  spark.sparkContext.parallelize([1,2,3])
     rdd3.collect()

In [None]:
# Spark Session :
  - SparkSession vs SparkContext – Since earlier versions of Spark or Pyspark, SparkContext (JavaSparkContext for Java) is an entry point to Spark 
    programming with RDD and to connect to Spark Cluster, Since Spark 2.0 SparkSession has been introduced and became an entry point to start 
    programming with DataFrame and Dataset.

  - Spark Session also includes all the APIs available in different contexts
      - Spark Context
      - SQL Context
      - Streaming Context
      - Hive Context


In [None]:
# Spark Configuration 
  - The SparkConf offers configuration for any Spark application. 
  - To start any Spark application on a local Cluster or a dataset, we need to set some configuration and parameters, and it can be done using 
  - SparkConf.

In [2]:
#1. Create Spark Session

# Testing pyspark Intallation
import findspark
findspark.init('C:\Spark')
findspark.find()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName('SparkByExamples.com').getOrCreate()

spark

### SparkConf

In [None]:
# create Spark configuration
from pyspark import SparkConf
conf = SparkConf().setAppName("Spark Demo").setMaster("local")

# pass Spark configuration on SparkContext
from pyspark import SparkContext
sc = SparkContext(conf=conf)

### SaprkContext

In [None]:
#create SparkContext

from pyspark import SparkContext
sc = SparkContext("local", "My First Spark Application")

# Create Spark RDD using SparkContext parallelize
rdd = spark.sparkContext.parallelize([1,2,3,4,5,6,7,8,9,10])

#Create RDD from external Data source
rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

#Reads entire file into a RDD as single record.
rdd3 = spark.sparkContext.wholeTextFiles("/path/textFile.txt")

# Creates empty RDD with no partition    
rdd = spark.sparkContext.emptyRDD 
# rddString = spark.sparkContext.emptyRDD[String]

#Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],10) #This creates 10 partitions

### SparkSession

In [None]:
# Create Spark Session

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName('SparkByExamples.com').getOrCreate()

# Get Active Spark session
spark.getActiveSession()

# Get Spark Version
spark.version

# Get app name
spark.sparkContext.appName

# get master 
spark.sparkContext.master

# Paralleize the task using Sparkcontext
rdd = spark.sparkContext.parallelize(data)

# create Empty RDD
emptyRDD = spark.sparkContext.emptyRDD()

# Broadcast varibles
broadcastVar = spark.sparkContext.broadcast([0, 1, 2, 3])
broadcastVar.value

# Accumulator varibles
accum = spark.sparkContext.longAccumulator("SumAccumulator")
spark.sparkContext.parallelize([1, 2, 3]).foreach(lambda x: accum.add(x))

# Create DataFrame
df=spark.createDataFrame(data,columns)

# Create Empty DataFrame
df=spark.emptyDataFrame()

# Create Dataset
df=spark.createDataset()

# Create Empty DataSet
df=spark.emptyDataFrame()

# Stop Sparkcontext
spark.stop()

# Returns a SparkContext
spark.sparkContext

### RDD

In [None]:
# Convert RDD to dataframe

# Create RDD
rdd = spark.sparkContext.parallelize(data)

# Using toDF() function
df = rdd.toDF()

# Create Empty RDD
emptyRDD = spark.sparkContext.emptyRDD()

# Create Dataframe from Empty dataFrame
from pyspark.sql.types import StructType,StructField, StringType

# Define Schema
schema = StructType([StructField('firstname', StringType(), True),
                     StructField('middlename', StringType(), True),
                     StructField('lastname', StringType(), True)])

# Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)

# Convert Empty RDD to DataFrame
df1 = emptyRDD.toDF(schema)

# Create Empty DataFrame with Schema.
df2 = spark.createDataFrame([], schema)

# Create Empty DataFrame without Schema (no columns)
df3 = spark.createDataFrame([], StructType([]))

rdd3.getNumPartitions()

rdd3_coalesce = rdd3.coalesce(1)
rdd3_coalesce.getNumPartitions()

# RDD Cache
cachedRdd = rdd.cache()

dfPersist = df.persist()

# RDD Persist
import pyspark
dfPersist = rdd.persist(pyspark.StorageLevel.MEMORY_ONLY)
dfPersist.show(false)
    
# RDD Unpersist
rddPersist2 = rddPersist.unpersist()

############################################## Transformations ################################################################

rdd = sc.textFile("PATH/blogtexts")
rdd.take(5)

# Sample
rdd3_sampled = rdd3.sample(False, 0.4, 42)

rdd3_sampled.collect()

# flapmap
rdd2 = rdd.flatMap(lambda x: x.split(" "))

# map
rdd3 = rdd2.map(lambda x: (x,1))

rdd6 = rdd5.map(lambda x: (x[1],x[0])).sortByKey()

# reduceByKey
rdd5 = rdd4.reduceByKey(lambda a,b: a+b)

# Transformation: filter

# Filter
stopwords = ['is','am','are','the','for','a']
rdd3 = rdd2.filter(lambda x: x not in stopwords)

# take
rdd3.take(10)


# Set Theory / Relational Transformation
        # Transformation: union
        # Transformation: join

        # Transformation: distinct
rdd3_distinct = rdd3.distinct()
len(rdd3_distinct.collect())

################################################# Actions ###################################################################

# collect
data = rdd6.collect()
for f in data:
    print("Key:"+ str(f[0]) +", Value:"+f[1])

# Reduce
num_rdd = sc.parallelize(range(1,1000))
num_rdd.reduce(lambda x,y: x+y)

# Mathematical / Statistical Actions
num_rdd.max(),num_rdd.min(), num_rdd.sum(),num_rdd.variance(),num_rdd.stdev() 

# first
firstRec = rdd6.first()
print("First Record : "+str(firstRec[0]) + ","+ firstRec[1])

# max
datMax = rdd6.max()

# take
data3 = rdd6.take(3)
for f in data3:
    print("data3 Key:"+ str(f[0]) +", Value:"+f[1])

# saveAsTextFile
rdd6.saveAsTextFile("/tmp/wordCount")


In [22]:
#1. Create Spark Session

# Testing pyspark Intallation
import findspark
findspark.init('C:\Spark')
findspark.find()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName('SparkByExamples.com').getOrCreate()

spark

### RDD Operations

In [4]:
# Create SparkSession
# Example
rdd1 = spark.sparkContext.parallelize([1,2,3])
rdd2 = spark.sparkContext.parallelize([4,5,6])
rdd3 = spark.sparkContext.parallelize([7,8,9])

In [5]:
rdd3.map(lambda x: x*x*x).collect()

[343, 512, 729]

In [7]:
rdd2.map(lambda x: x*x*x).collect()

[64, 125, 216]

In [8]:
rdd3.collect()

[7, 8, 9]

In [6]:
rdds = [rdd1,rdd2,rdd3]

for rdd in rdds:
    print(rdd.map(lambda x: x*x*x).collect())

[1, 8, 27]
[64, 125, 216]
[343, 512, 729]


In [9]:
rdd1.count()  # [1,2,3] there 3 item in the list

3

In [10]:
rdd2.count() # [4,5,6] there 3 item in the list

3

In [11]:
rdd1.reduce(lambda x, y : x + y) #sum elements in the list [1,2,3]

6

In [12]:
rdd1.map(lambda x:x+10).collect()

[11, 12, 13]

In [13]:
rdd1.filter(lambda x: x%2 == 0).collect()

[2]

In [14]:
rdd5 = spark.sparkContext.parallelize(['Rahul', 'Swati', 'Rohan', 'Shreya', 'Priya'])
print(rdd5.filter(lambda x: x.startswith('R')).collect())

['Rahul', 'Rohan']


In [15]:
rdd6 = spark.sparkContext.parallelize([2,4,5,6,7,8,9])
union_rdd_1 = rdd6.filter(lambda x: x % 2 == 0)
union_rdd_2 = rdd6.filter(lambda x: x % 3 == 0)
print(union_rdd_1.union(union_rdd_2).collect())

[2, 4, 6, 8, 6, 9]


In [16]:
flatmap_rdd = spark.sparkContext.parallelize(["Hey there", "This is PySpark RDD Transformations"])
(flatmap_rdd.flatMap(lambda x: x.split(" ")).collect())

['Hey', 'there', 'This', 'is', 'PySpark', 'RDD', 'Transformations']

In [17]:
rdd4 =  spark.sparkContext.parallelize(["hadoop is big data tool"])
print(rdd4.flatMap(lambda x: x.split(" ")).collect())

['hadoop', 'is', 'big', 'data', 'tool']


In [25]:
spark.sparkContext.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()

[1, 2, 1, 2, 3, 1, 2, 3, 4]

In [26]:
spark.sparkContext.parallelize([3,4,5]).map(lambda x: [x,  x*x]).collect() 

[[3, 9], [4, 16], [5, 25]]

In [27]:
spark.sparkContext.parallelize([3,4,5]).flatMap(lambda x: [x, x*x]).collect() 

[3, 9, 4, 16, 5, 25]

In [18]:
marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), \
                                            ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])

print(marks_rdd.reduceByKey(lambda x, y: x + y).collect())

[('Rahul', 48), ('Swati', 45), ('Shreya', 50), ('Abhay', 55), ('Rohan', 44)]


In [19]:
marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), \
                                            ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])

print(marks_rdd.sortByKey('ascending').collect())

[('Abhay', 29), ('Abhay', 26), ('Rahul', 25), ('Rahul', 23), ('Rohan', 22), ('Rohan', 22), ('Shreya', 22), ('Shreya', 28), ('Swati', 26), ('Swati', 19)]


In [20]:
marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), \
                                            ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.groupByKey().collect()

for key, value in dict_rdd:
    print(key, list(value))

Rahul [25, 23]
Swati [26, 19]
Shreya [22, 28]
Abhay [29, 26]
Rohan [22, 22]


In [21]:
marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), \
                                            ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.countByKey().items()
for key, value in dict_rdd:
    print(key, value)

Rahul 2
Swati 2
Rohan 2
Shreya 1
Abhay 1


In [23]:
lines = spark.sparkContext.textFile(r"C:\Users\bmi_cims\Desktop\Spark\data\greetings.txt")
lines.map(lambda line: line.split()).collect()

[['Good', 'Morning'],
 ['Good', 'Evening'],
 ['Good', 'Day'],
 ['Happy', 'Birthday'],
 ['Happy', 'New', 'Year']]

In [24]:
lines.flatMap(lambda line: line.split()).collect()

['Good',
 'Morning',
 'Good',
 'Evening',
 'Good',
 'Day',
 'Happy',
 'Birthday',
 'Happy',
 'New',
 'Year']

In [29]:
myFile = spark.sparkContext.textFile(r"C:\Users\bmi_cims\Desktop\Spark\data\DrSeuss.text")

wordspair = myFile.flatMap(lambda row: row.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x,y : x + y)

oldwordcount = wordspair.reduceByKey(lambda x,y : x + y)

oldwordcount.take(5)

[('The', 23), ('Cat', 11), ('in', 117), ('the', 220), ('Hat', 5)]

In [30]:
myFile = spark.sparkContext.textFile(r"C:\Users\bmi_cims\Desktop\Spark\data\DrSeuss.text")

wordcounts1 = myFile.map(lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower())

wordcounts2 = wordcounts1.flatMap(lambda x: x.split())

wordcounts3 = wordcounts2.map(lambda x: (x, 1))

wordcounts4 = wordcounts3.reduceByKey(lambda x,y:x+y)

wordcounts5 = wordcounts4.map(lambda x:(x[1],x[0]))

wordcounts6 = wordcounts5.sortByKey(ascending=False)

# wordcounts6.collect()
wordcounts6.take(10)

[(244, 'the'),
 (213, 'a'),
 (203, 'and'),
 (198, 'i'),
 (137, 'not'),
 (126, 'in'),
 (105, 'to'),
 (100, 'he'),
 (99, 'you'),
 (88, 'like')]

In [32]:
tsk = spark.sparkContext.parallelize([(1,"a"), (2,"b"),(1,"c"),(2,"d"),(1,"e"),(3,"f")],3)

axstream1 = tsk.reduce(max)
print(axstream1)

axstream2 = tsk.reduce(min)
print(axstream2)

(3, 'f')
(1, 'a')


### Create and Write DataFrmes from ExternalFiles

In [None]:
# Creating dataframe from Read external files


################################################## Read ####################################################################

# Creating df from csv file
df2 = spark.read.option("header",True).csv("/src/resources/file.csv")

# Creating df from text (TXT) file
df2 = spark.read.option("header",True).text("/src/resources/file.txt")

# Creating df from JSON file
df2 = spark.read.option("header",True).json("/src/resources/file.json")

# Creating df from parquet file
df2=spark.read.parquet("/temp/out/people.parquet")

################################################## Write ####################################################################

# Write DataFrame to Externals files

# CSV
df.write.csv('dataset.csv')

# JSON
data.write.save('dataset.json', format='json')

# Parquet
data.write.save('dataset.parquet', format='parquet')

# write data to parquest
df.write.parquet("/tmp/output/people.parquet")

# Append or Overwrite an existing Parquet file
df.write.mode('append').parquet("/tmp/output/people.parquet")
df.write.mode('overwrite').parquet("/tmp/output/people.parquet")

# Create Parquet partition file
df.write.partitionBy("gender","salary").mode("overwrite").parquet("/tmp/output/people2.parquet")

## Writing selected data to different file formats

# CSV
data.select(['data', 'open', 'close', 'adjusted']).write.csv('dataset.csv')

# JSON
data.select(['data', 'open', 'close', 'adjusted']).write.save('dataset.json', format='json')

# Parquet
data.select(['data', 'open', 'close', 'adjusted']).write.save('dataset.parquet', format='parquet')

### DataFrame

In [None]:
# create DataFrame
df=spark.createDataFrame(data,columns)

# creates a DataFrame with one column id
df = spark.range(10) 

# creates a DataFrame
df = spark.sql("show tables")

# printSchema
df.printSchema()

# Checking if a Column Exists in a DataFrame
df.schema.fieldNames.contains("firstname")
df.schema.contains(StructField("firstname",StringType,true))

# CHeck the data types of the column
df.select("petal_width").dtypes

# chnage the datatype of column in Dataframe
df = df.withColumn("Quantity",df.Quantity.cast('int'))
df = df.withColumn("Calories",df['calories'].cast("Integer"))

# Convert Spark Datafrme to Pandas DataFrame
pandas_df = df.toPandas()

df.show()

# Display 2 rows and full column contents
df.show(2,truncate=False) 

# Display 2 rows & column values 25 characters
df.show(2,truncate=25) 

# Display DataFrame rows & columns vertically
df.show(n=3,truncate=25,vertical=True)

# Get the rows in the dataframe as list of row
df.take(2)

# Change the value of Existing Column
from pyspark.sql.functions import col
df_value = df.withColumn("petal_length",col("petal_length") * 10)

# Rename the Column of DataFrame
re_df = df.withColumnRenamed("petal_width","petal_width_1")

# Add new column with constant value using 'lit'
from pyspark.sql.functions import lit
new_col = df.withColumn("COllege",lit("MITRC"))

#Drop column
drop_df = df.drop("COllege")

# greatest
from pyspark.sql.functions import greatest,col
df1=df.withColumn("large",greatest(col("level1"),col("level2"),col("level3"),col("level4")))

# least
from pyspark.sql.functions import least,col
df2=df.withColumn("Small",least(col("level1"),col("level2"),col("level3"),col("level4")))

# Capitalize the first letter of the user name
from pyspark.sql.functions import initcap
df = df.withColumn('name_cap', initcap('user_name'))


############################################### Missing and Fill values #######################################################

# Drop Duplicates
df = df.dropDuplicates(['user_id']).orderBy('age')

# Replace Null/None Value with Empty String
df.na.fill("").show(false)

#Replace 0 for null for all integer columns
df.na.fill(value=0).show()

#Replace Replace 0 for null on only population column 
df.na.fill(value=0,subset=["population"]).show()

df.na.fill("unknown",["city"]).na.fill("",["type"]).show()

df.na.fill({"city": "unknown", "type": ""}).show()

##################################################### Select ##################################################################
# Show specific column data
df.select("firstname").show()
df.select("firstname","lastname").show()

# Select All columns from List
columns = ['firstname','lastname','country','state']
df.select(*columns).show()

df.select([col for col in df.columns]).show()
df.select("*").show()

# Select Columns by Index
df.select(df.columns[:3]).show(3)

#Selects columns 2 to 4  and top 3 rows
df.select(df.columns[2:4]).show(3)


##################################################### Aggregation ##############################################################

# Average
from pyspark.sql.functions import avg
df_avg.select(avg("Total Cost").alias("Average Cost")).show()

# sum
from pyspark.sql.functions import sum
df_avg.select(sum("Quantity").alias("Total Items")).show()

# groupBy
df.groupBy("Shop_Name").sum("Quantity").show()

#Average money earn
df_avg.groupBy("Shop_Name").avg("Total Cost").show()

# max
from pyspark.sql.functions import max
df.select(max("Quantity").alias("Maximum Quantity")).show()

# min
from pyspark.sql.functions import min
df.groupBy("Shop_Name").min("Quantity").show()

# count
from pyspark.sql.functions import count
df.select(count("Quantity")).show()

# distinct
df.select(df["Shop_Name"]).distinct().show()

# collection list
from pyspark.sql.functions import collect_list
df.select(collect_list("Shop_Name")).show(truncate=False)

# set
from pyspark.sql.functions import collect_set
df.select(collect_set("Shop_Name")).show(truncate=False)

# countDistinct
from pyspark.sql.functions import countDistinct
df.select(countDistinct("Shop_Name")).show()

# Kurthosis
from pyspark.sql.functions import kurtosis
df.select(kurtosis('Quantity')).show(truncate=False)

# mean
from pyspark.sql.functions import mean
df.select(mean('Quantity')).show(truncate=False)

# skewness function
from pyspark.sql.functions import skewness
df.select(skewness('Quantity')).show(truncate=False)

# stddev
from pyspark.sql.functions import stddev,stddev_samp,stddev_pop
df.select(stddev("Quantity"), stddev_samp("Quantity"),stddev_pop("Quantity")).show(truncate=False)

# sumDistinct
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show(truncate=False)

# Variance
from pyspark.sql.functions import variance,var_samp,var_pop
df.select(variance("Quantity"),var_samp("Quantity"),var_pop("Quantity")).show(truncate=False)


##################################################### Filters ################################################################

# Like SQL Expression
from pyspark.sql.functions import filter
from pyspark.sql.functions import col
df.filter(col("Dept No") ==1).show()


# Filtering with multiple condition AND(&&);NOT(!) OR(||)
from pyspark.sql.functions import filter
df.filter((df["Gender"]=="M") & (df["Dept No"]==2)).show()

# Filtering on an array column
from pyspark.sql.functions import filter
from pyspark.sql.functions import array_contains
df.filter(array_contains(df["Languages"],"Python")).show()


#filter data by null values
from pyspark.sql.functions import filter
from pyspark.sql.functions import isNotNull
df.filter(df.name.isNotNull()).show()

from pyspark.sql.functions import filter
from pyspark.sql.functions import isNull
df.filter(df.name.isNull()).show()

##################################################### When ################################################################

from pyspark.sql.functions import when
df.select("name", when(df.vitamins >= "25", "rich in vitamins")).show()

from pyspark.sql.functions import col,when
df_when  = df.withColumn('Gender',when(col("Gender") == "M","Male").when(col("Gender") == "F","Female").otherwise("Other"))
df_when.show()

df_when2  = df.select(col("*"),when(col("Gender") == "M","Male").when(col("Gender") == "F","Female").otherwise("Unknow")
                                                                                                    .alias("New_gender"))

################################################ Sort and OrderBy #############################################################
# Sort the dataframe
df.sort("department","state").show(truncate=False)

from pyspark.sql.functions import col
df.sort(col("department"),col("state")).show(truncate=False)

df.orderBy("department","state").show(truncate=False)

from pyspark.sql.functions import col
df.orderBy(col("department"),col("state")).show(truncate=False)

# Sort by Ascending (ASC)
df.sort(df.department.asc(),df.state.asc()).show(truncate=False)

from pyspark.sql.functions import col
df.sort(col("department").asc(),col("state").asc()).show(truncate=False)

# Sort by Descending (DESC)
df.sort(df.department.asc(),df.state.desc()).show(truncate=False)

df.sort(col("department").asc(),col("state").desc()).show(truncate=False)

df.orderBy(col("department").asc(),col("state").desc()).show(truncate=False)

################################################ Window function #############################################################

# row_number 
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number().over(windowSpec)).show(truncate=False)

# rank
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)).show()

# dense_rank
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)).show()

# percent_rank
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)).show()

# ntile
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec)).show()

# cume_dist
from pyspark.sql.functions import cume_dist    
df.withColumn("cume_dist",cume_dist().over(windowSpec)).show()

# lag
from pyspark.sql.functions import lag    
df.withColumn("lag",lag("salary",2).over(windowSpec)).show()

# lead
from pyspark.sql.functions import lead    
df.withColumn("lead",lead("salary",2).over(windowSpec)).show()


####################################################### Joins ################################################################

# Employee dataframe
empDF = spark.createDataFrame(data=emp, schema = empColumns)

# Department dataFrame
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)

# Inner joins
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner").show(truncate=False)

# Full Outer Join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show(truncate=False)

# Left Outer Join
empDF.join(deptDF,empDF("emp_dept_id") ==  deptDF("dept_id"),"left").show(false)
empDF.join(deptDF,empDF("emp_dept_id") ==  deptDF("dept_id"),"leftouter").show(false)

# Right Outer Join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"rightouter").show(truncate=False)

# Left Semi Join

# Left Anti Join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show(truncate=False)

####################################################### Union ################################################################

df1 = spark.createDataFrame(data = simpleData1, schema = columns1)
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)

unionDF = df1.union(df2)
unionDF.show(truncate=False)

# Merge without Duplicates
disDF = df.union(df2).distinct()
disDF.show(truncate=False)

### SQL Expresssion

In [None]:
# Writing SQL Expression
df.createOrReplaceTempView("parquetTable")
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")

# using parquest path
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/tmp/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()

#  Change Column type using SQL Expression
df.createOrReplaceTempView("Table")
df_sql = spark.sql("SELECT STRING(species),Float(sepal_length) from Table")
df_sql.printSchema()

df_sql.show(5)

# Change Column type using selectExpr
from pyspark.sql.types import *

df_new = df.selectExpr("cast(petal_width as Double ) petal_width", "cast(sepal_length as Double) sepal_length")
df_new.printSchema()
df_new.show(5)

# case when

# Question: Create a column “Performance” and find it out on the basis of percentage?
from pyspark.sql.functions import expr
df_case = df.withColumn("Performance", expr("case when Percentage>88.0 then 'Excellent' " \ 
                                            + "when Percentage<83.0 then 'Average' "  \
                                            + "else 'Great' end"))
# Order by 
df.createOrReplaceTempView("EMP")
spark.sql("select employee_name,department,state,salary,age,bonus from EMP ORDER BY department asc").show(truncate=False)

# Joins
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id").show(truncate=False)
joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id").show(truncate=False)