In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.master('yarn').appName('Diamondscsv').getOrCreate()

In [None]:
spark

In [None]:
!pwd

In [None]:
!ls

* As the files are under local file system and we want to use Spark in YARN, we are using Pandas to create a Data Frame out of the local file on `gw03.itversity.com`.
* If the file is too big, this approach is not recommended.

In [None]:
import pandas as pd
diamonds_raw = pd.read_csv('/home/indirameduri/diamonds.csv')

In [None]:
diamonds_raw

* Once we have Pandas Data Frame, we can build Spark Data Frame using `spark.createDataFrame` where `spark` is instance of `pyspark.sql.SparkSession`.

In [None]:
spark.createDataFrame?

* For Pandas Data Frame, we need not specify schema as Pandas also have schema associated with Data Frame.
* Here is how we can create Spark Data Frame using Pandas Data Frame.
* This approach is typically used to pass small files locally at the time of job submission and join with larger files to leverage distributed computing capabilities of Spark.

In [None]:
diamonds_df = spark.createDataFrame(diamonds_raw)

In [None]:
diamonds_df.printSchema()

In [None]:
diamonds_df.show()

In [None]:
diamonds_df.count()

In [None]:
%%sh
hostname -f

In [None]:
import os
os.system('hdfs dfs -put diamonds.csv /user/indirameduri/')

In [None]:
!hdfs dfs -ls /user/indirameduri

In [None]:
data = spark.read.csv('/user/indirameduri/diamonds.csv',header='true',inferSchema='true')

In [None]:
data.first()

In [None]:
data.show()

**Adding index column to the DF**

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

In [None]:
newData = data.withColumn('index',monotonically_increasing_id())

In [None]:
newData.show()

**Question 1 - Count how many diamonds are of each colour**

In [None]:
newData.createOrReplaceTempView('diamonds')

In [None]:
spark.sql('select * from diamonds').show()

In [None]:
#Solution with spark sql
diamonds_cnt = spark.sql('select count(carat) count,color from diamonds group by color')
diamonds_cnt.show()

In [None]:
#Solution with DF
diamonds_cnt = newData.groupBy('color').count()
diamonds_cnt.show()

**Question 2 - Make a new row which consists of the first letter of 'cut' followed by the 'color', with a space in between the two**

**So for example if Cut = Ideal and Color = E, there should be a new column called 'CutColor' with 'I E'.**

**Also, replace the Cut and Color columns by the CutColor column.**

In [None]:
newData.show()

In [None]:
#Solution with withColoumn
from pyspark.sql.functions import concat,substring,lit

In [None]:
newData = newData.withColumn('CutColor',concat(substring(newData.cut,0,1),lit(' '),newData.color))
newData.show()

In [None]:
#Solution with Spark sql
spark.sql('select concat(substring(cut,0,1)," ",color) CutColor from diamonds').show()

**Question 3 - Create two tables and then re-join them
Define D1 to be a table with just the index and carat. Define D2 to be just the index and the price. Then rejoin the two of them to get a table with index, carat, and price.**

In [None]:
d1 = newData.select('index','carat')
d1.show()

In [None]:
d2 = newData.select('index','price')
d2.show()

In [None]:
#join d1 and d2
d3 = d1.join(d2, d1.index==d2.index).select(d1.index,d1.carat,d2.price)
d3.show()

In [None]:
#solution with spark sql
d1 = spark.sql('select index,carat from diamonds')
d1.show()
d1.createOrReplaceTempView('d1')

In [None]:
d2= spark.sql('select index,price from diamonds')
d2.show()
d2.createOrReplaceTempView('d2')

In [None]:
#Join in spark sql
d3 = spark.sql('select d1.index,d1.carat,d2.price from d1 join d2 where d1.index=d2.index')
d3.show()

**Question 4 - Multiply the dimensions to calculate volume, and then sort the entries by descending volume (with 2 decimal places).
Tips: Look into the 'round' function, which can be used inside DF.select.**

In [None]:
#Solution with DF
from pyspark.sql.functions import round
volume = newData.withColumn('volume',round((newData.x * newData.y * newData.z),2))
volume.show()

In [None]:
#solution with spark sql
volume = spark.sql('select *,round((x * y * z),2) volume from diamonds')
volume.show()

**Question 5 - Read in the Table as an RDD, Remove the Header, and Turn it into a DF
You must manually define a schema using StructType, which is read in when creating the DF**

In [None]:
from pyspark import SparkContext

In [None]:
sc = SparkContext.getOrCreate()

In [None]:
sc

In [None]:
newData.show()

In [None]:
rddData = newData.rdd.map(tuple)
rddData.take(20)

In [None]:
from pyspark.sql.types import *

schema = StructType([
    StructField("carat",DoubleType(),True),
    StructField("cut",StringType(),True),
    StructField("color",StringType(),True),
    StructField("clarity",StringType(),True),
    StructField("depth",DoubleType(),True),
    StructField("table",DoubleType(),True),
    StructField("price",LongType(),True),
    StructField("x",DoubleType(),True),
    StructField("y",DoubleType(),True),
    StructField("z",DoubleType(),True),
    StructField("index",IntegerType(),True),
    StructField("CutColor",StringType(),True)
])

df = spark.createDataFrame(rddData,schema)

df.show()


**Question 6 - Reading in the Table as an DF, Output the Average Price Per Cut**

In [None]:
newData.show()

In [None]:
#Solution with spark sql

avgPerCut = spark.sql('select round(avg(price),2) avg,cut from diamonds group By cut')
avgPerCut.show()

In [None]:
#Solution with DF

avgCut = newData.groupBy(newData.cut).agg({"price" : "avg"})
avgCut.show()

In [None]:
!ls